diff --git a/indexer.py b/indexer.py index 9868dbb..bdc49f0 100644 --- a/indexer.py +++ b/indexer.py @@ -98,6 +98,38 @@ def handle_event_failure(pg_conn, event, error): VALUES (%(event_id)s, %(error)s) '''), {'event_id': event['id'], 'error': str(error)}) + # If the event failed, mark any dependent events as failed, too. + pg_conn.execute_with_retry(dedent(''' + WITH RECURSIVE descendants AS ( + SELECT child.id, child.depends_on + FROM artwork_indexer.event_queue child + JOIN artwork_indexer.event_queue parent + ON (parent.id = any(child.depends_on) + AND parent.id = %(event_id)s + AND parent.state = 'failed') + UNION ALL + SELECT child.id, child.depends_on + FROM artwork_indexer.event_queue child + JOIN descendants parent + ON parent.id = any(child.depends_on) + ), + updates AS ( + UPDATE artwork_indexer.event_queue + SET state = 'failed' + WHERE id IN (SELECT id FROM descendants) + RETURNING id + ) + INSERT INTO artwork_indexer.event_failure_reason + (event, failure_reason) + SELECT id, %(failure_reason)s + FROM updates + '''), { + 'event_id': event['id'], + 'failure_reason': 'This event was marked as failed because ' + f'an event it depended on ({event['id']}) ' + 'had failed.', + }) + try: sentry_sdk.capture_exception(error) except BaseException as sentry_exc: diff --git a/tests/test_caa.py b/tests/test_caa.py index b27639f..e259d40 100644 --- a/tests/test_caa.py +++ b/tests/test_caa.py @@ -448,7 +448,7 @@ def test_merging_releases(self): }, { 'id': 6, - 'state': 'queued', + 'state': 'failed', 'entity_type': 'release', 'action': 'delete_image', 'message': { @@ -459,10 +459,11 @@ def test_merging_releases(self): 'depends_on': [5], 'attempts': 0, }, - release_index_event(RELEASE2_MBID, id=7, depends_on=[6]), + release_index_event(RELEASE2_MBID, id=7, depends_on=[6], + state='failed'), { 'id': 8, - 'state': 'queued', + 'state': 'failed', 'entity_type': 'release', 'action': 'deindex', 'message': {'gid': RELEASE1_MBID}, @@ -480,6 +481,17 @@ def test_merging_releases(self): 'HTTP 400', ) + for event_id in (6, 7, 8): + self.assertEqual( + self.pg_conn.execute(dedent(''' + SELECT failure_reason + FROM artwork_indexer.event_failure_reason + WHERE event = %(event_id)s + '''), {'event_id': event_id}).fetchone()['failure_reason'], + 'This event was marked as failed because an event it ' + 'depended on (5) had failed.', + ) + self.assertEqual(self.session.last_requests, [ release_image_copy_put(RELEASE1_MBID, RELEASE2_MBID, 1), ]) @@ -489,7 +501,7 @@ def test_merging_releases(self): self.pg_conn.execute_and_commit(dedent(''' UPDATE artwork_indexer.event_queue SET attempts = 0, state = 'queued' - WHERE action = 'copy_image' + WHERE id IN (5, 6, 7, 8) ''')) xml = RELEASE_XML_TEMPLATE.format( diff --git a/tests/test_eaa.py b/tests/test_eaa.py index 9e27f82..5fa7d45 100644 --- a/tests/test_eaa.py +++ b/tests/test_eaa.py @@ -361,7 +361,7 @@ def test_merging_events(self): }, { 'id': 6, - 'state': 'queued', + 'state': 'failed', 'entity_type': 'event', 'action': 'delete_image', 'message': { @@ -372,10 +372,11 @@ def test_merging_events(self): 'depends_on': [5], 'attempts': 0, }, - event_index_event(EVENT2_MBID, id=7, depends_on=[6]), + event_index_event(EVENT2_MBID, id=7, depends_on=[6], + state='failed'), { 'id': 8, - 'state': 'queued', + 'state': 'failed', 'entity_type': 'event', 'action': 'deindex', 'message': {'gid': EVENT1_MBID}, @@ -393,6 +394,17 @@ def test_merging_events(self): 'HTTP 400', ) + for event_id in (6, 7, 8): + self.assertEqual( + self.pg_conn.execute(dedent(''' + SELECT failure_reason + FROM artwork_indexer.event_failure_reason + WHERE event = %(event_id)s + '''), {'event_id': event_id}).fetchone()['failure_reason'], + 'This event was marked as failed because an event it ' + 'depended on (5) had failed.', + ) + self.assertEqual(self.session.last_requests, [ event_image_copy_put(EVENT1_MBID, EVENT2_MBID, 1), ]) @@ -402,7 +414,7 @@ def test_merging_events(self): self.pg_conn.execute_and_commit(dedent(''' UPDATE artwork_indexer.event_queue SET attempts = 0, state = 'queued' - WHERE action = 'copy_image' + WHERE id IN (5, 6, 7, 8) ''')) self.session.last_requests = [] diff --git a/tests/test_general.py b/tests/test_general.py index 638880f..5696782 100644 --- a/tests/test_general.py +++ b/tests/test_general.py @@ -98,8 +98,12 @@ def test_depends_on(self): def test_failure(self): self.pg_conn.execute_and_commit(dedent(''' INSERT INTO artwork_indexer.event_queue - (id, entity_type, action, message, created) - VALUES (1, 'release', 'noop', '{"fail": true}', + (id, entity_type, action, message, depends_on, created) + VALUES (1, 'release', 'noop', '{"fail": true}', NULL, + NOW() - interval '1 day'), + (2, 'release', 'noop', '{"id": 2}', '{1}', + NOW() - interval '1 day'), + (3, 'release', 'noop', '{"id": 3}', '{2}', NOW() - interval '1 day'); ''')) @@ -116,10 +120,11 @@ def test_failure(self): http_client_cls=self.http_client_cls) pg_cur = self.pg_conn.execute( - 'SELECT * FROM artwork_indexer.event_queue WHERE id = 1;' + 'SELECT * FROM artwork_indexer.event_queue ' + 'WHERE id IN (1, 2, 3) ORDER BY id' ) - event = pg_cur.fetchone() - attempts = event['attempts'] + event1, event2, event3 = pg_cur.fetchall() + attempts = event1['attempts'] self.assertEqual(attempts, index + 1) @@ -133,9 +138,30 @@ def test_failure(self): self.assertEqual(failure_reason_count, index + 1) if attempts < indexer.MAX_ATTEMPTS: - self.assertEqual(event['state'], 'queued') + self.assertEqual(event1['state'], 'queued') + self.assertEqual(event2['state'], 'queued') + self.assertEqual(event3['state'], 'queued') else: - self.assertEqual(event['state'], 'failed') + self.assertEqual(event1['state'], 'failed') + self.assertEqual(event2['state'], 'failed') + self.assertEqual(event3['state'], 'failed') + event2_failure_reason, event3_failure_reason = \ + self.pg_conn.execute(dedent(''' + SELECT failure_reason + FROM artwork_indexer.event_failure_reason + WHERE event IN (2, 3) + ORDER BY event + ''')).fetchall() + self.assertEqual( + event2_failure_reason['failure_reason'], + 'This event was marked as failed because an event it ' + + 'depended on (1) had failed.', + ) + self.assertEqual( + event3_failure_reason['failure_reason'], + 'This event was marked as failed because an event it ' + + 'depended on (1) had failed.', + ) # Should not return the failed event. self.assertEqual(indexer.get_next_event(self.pg_conn), None)