Skip to content

Commit

Permalink
When an event fails, mark dependent events as failed
Browse files Browse the repository at this point in the history
  • Loading branch information
mwiencek committed Nov 28, 2024
1 parent 36afe7f commit e6cc972
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 15 deletions.
32 changes: 32 additions & 0 deletions indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,38 @@ def handle_event_failure(pg_conn, event, error):
VALUES (%(event_id)s, %(error)s)
'''), {'event_id': event['id'], 'error': str(error)})

# If the event failed, mark any dependent events as failed, too.
pg_conn.execute_with_retry(dedent('''
WITH RECURSIVE descendants AS (
SELECT child.id, child.depends_on
FROM artwork_indexer.event_queue child
JOIN artwork_indexer.event_queue parent
ON (parent.id = any(child.depends_on)
AND parent.id = %(event_id)s
AND parent.state = 'failed')
UNION ALL
SELECT child.id, child.depends_on
FROM artwork_indexer.event_queue child
JOIN descendants parent
ON parent.id = any(child.depends_on)
),
updates AS (
UPDATE artwork_indexer.event_queue
SET state = 'failed'
WHERE id IN (SELECT id FROM descendants)
RETURNING id
)
INSERT INTO artwork_indexer.event_failure_reason
(event, failure_reason)
SELECT id, %(failure_reason)s
FROM updates
'''), {
'event_id': event['id'],
'failure_reason': 'This event was marked as failed because '
f'an event it depended on ({event['id']}) '
'had failed.',
})

try:
sentry_sdk.capture_exception(error)
except BaseException as sentry_exc:
Expand Down
20 changes: 16 additions & 4 deletions tests/test_caa.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ def test_merging_releases(self):
},
{
'id': 6,
'state': 'queued',
'state': 'failed',
'entity_type': 'release',
'action': 'delete_image',
'message': {
Expand All @@ -459,10 +459,11 @@ def test_merging_releases(self):
'depends_on': [5],
'attempts': 0,
},
release_index_event(RELEASE2_MBID, id=7, depends_on=[6]),
release_index_event(RELEASE2_MBID, id=7, depends_on=[6],
state='failed'),
{
'id': 8,
'state': 'queued',
'state': 'failed',
'entity_type': 'release',
'action': 'deindex',
'message': {'gid': RELEASE1_MBID},
Expand All @@ -480,6 +481,17 @@ def test_merging_releases(self):
'HTTP 400',
)

for event_id in (6, 7, 8):
self.assertEqual(
self.pg_conn.execute(dedent('''
SELECT failure_reason
FROM artwork_indexer.event_failure_reason
WHERE event = %(event_id)s
'''), {'event_id': event_id}).fetchone()['failure_reason'],
'This event was marked as failed because an event it '
'depended on (5) had failed.',
)

self.assertEqual(self.session.last_requests, [
release_image_copy_put(RELEASE1_MBID, RELEASE2_MBID, 1),
])
Expand All @@ -489,7 +501,7 @@ def test_merging_releases(self):
self.pg_conn.execute_and_commit(dedent('''
UPDATE artwork_indexer.event_queue
SET attempts = 0, state = 'queued'
WHERE action = 'copy_image'
WHERE id IN (5, 6, 7, 8)
'''))

xml = RELEASE_XML_TEMPLATE.format(
Expand Down
20 changes: 16 additions & 4 deletions tests/test_eaa.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ def test_merging_events(self):
},
{
'id': 6,
'state': 'queued',
'state': 'failed',
'entity_type': 'event',
'action': 'delete_image',
'message': {
Expand All @@ -372,10 +372,11 @@ def test_merging_events(self):
'depends_on': [5],
'attempts': 0,
},
event_index_event(EVENT2_MBID, id=7, depends_on=[6]),
event_index_event(EVENT2_MBID, id=7, depends_on=[6],
state='failed'),
{
'id': 8,
'state': 'queued',
'state': 'failed',
'entity_type': 'event',
'action': 'deindex',
'message': {'gid': EVENT1_MBID},
Expand All @@ -393,6 +394,17 @@ def test_merging_events(self):
'HTTP 400',
)

for event_id in (6, 7, 8):
self.assertEqual(
self.pg_conn.execute(dedent('''
SELECT failure_reason
FROM artwork_indexer.event_failure_reason
WHERE event = %(event_id)s
'''), {'event_id': event_id}).fetchone()['failure_reason'],
'This event was marked as failed because an event it '
'depended on (5) had failed.',
)

self.assertEqual(self.session.last_requests, [
event_image_copy_put(EVENT1_MBID, EVENT2_MBID, 1),
])
Expand All @@ -402,7 +414,7 @@ def test_merging_events(self):
self.pg_conn.execute_and_commit(dedent('''
UPDATE artwork_indexer.event_queue
SET attempts = 0, state = 'queued'
WHERE action = 'copy_image'
WHERE id IN (5, 6, 7, 8)
'''))

self.session.last_requests = []
Expand Down
40 changes: 33 additions & 7 deletions tests/test_general.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,12 @@ def test_depends_on(self):
def test_failure(self):
self.pg_conn.execute_and_commit(dedent('''
INSERT INTO artwork_indexer.event_queue
(id, entity_type, action, message, created)
VALUES (1, 'release', 'noop', '{"fail": true}',
(id, entity_type, action, message, depends_on, created)
VALUES (1, 'release', 'noop', '{"fail": true}', NULL,
NOW() - interval '1 day'),
(2, 'release', 'noop', '{"id": 2}', '{1}',
NOW() - interval '1 day'),
(3, 'release', 'noop', '{"id": 3}', '{2}',
NOW() - interval '1 day');
'''))

Expand All @@ -116,10 +120,11 @@ def test_failure(self):
http_client_cls=self.http_client_cls)

pg_cur = self.pg_conn.execute(
'SELECT * FROM artwork_indexer.event_queue WHERE id = 1;'
'SELECT * FROM artwork_indexer.event_queue '
'WHERE id IN (1, 2, 3) ORDER BY id'
)
event = pg_cur.fetchone()
attempts = event['attempts']
event1, event2, event3 = pg_cur.fetchall()
attempts = event1['attempts']

self.assertEqual(attempts, index + 1)

Expand All @@ -133,9 +138,30 @@ def test_failure(self):
self.assertEqual(failure_reason_count, index + 1)

if attempts < indexer.MAX_ATTEMPTS:
self.assertEqual(event['state'], 'queued')
self.assertEqual(event1['state'], 'queued')
self.assertEqual(event2['state'], 'queued')
self.assertEqual(event3['state'], 'queued')
else:
self.assertEqual(event['state'], 'failed')
self.assertEqual(event1['state'], 'failed')
self.assertEqual(event2['state'], 'failed')
self.assertEqual(event3['state'], 'failed')
event2_failure_reason, event3_failure_reason = \
self.pg_conn.execute(dedent('''
SELECT failure_reason
FROM artwork_indexer.event_failure_reason
WHERE event IN (2, 3)
ORDER BY event
''')).fetchall()
self.assertEqual(
event2_failure_reason['failure_reason'],
'This event was marked as failed because an event it ' +
'depended on (1) had failed.',
)
self.assertEqual(
event3_failure_reason['failure_reason'],
'This event was marked as failed because an event it ' +
'depended on (1) had failed.',
)

# Should not return the failed event.
self.assertEqual(indexer.get_next_event(self.pg_conn), None)
Expand Down

0 comments on commit e6cc972

Please sign in to comment.