Skip to content

Commit

Permalink
Handle on_failure and on_success
Browse files Browse the repository at this point in the history
* on_failure fires the first time any of the jobs fail, even once

* on_success only fires if all jobs work (after retries)

* remove unused job_id
  • Loading branch information
jpcamara committed Sep 24, 2024
1 parent 4251685 commit 5ba1c27
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 17 deletions.
53 changes: 41 additions & 12 deletions app/models/solid_queue/job_batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ class JobBatch < Record

serialize :on_finish_active_job, coder: JSON
serialize :on_success_active_job, coder: JSON
serialize :on_failure_active_job, coder: JSON

scope :incomplete, -> {
where(finished_at: nil).where("changed_at IS NOT NULL OR last_changed_at < ?", 1.hour.ago)
}
scope :finished, -> { where.not(finished_at: nil) }

class << self
def current_batch_id
Expand Down Expand Up @@ -45,6 +47,7 @@ def dispatch_finished_batches
def batch_attributes(attributes)
on_finish_klass = attributes.delete(:on_finish)
on_success_klass = attributes.delete(:on_success)
on_failure_klass = attributes.delete(:on_failure)

if on_finish_klass.present?
attributes[:on_finish_active_job] = as_active_job(on_finish_klass).serialize
Expand All @@ -54,6 +57,10 @@ def batch_attributes(attributes)
attributes[:on_success_active_job] = as_active_job(on_success_klass).serialize
end

if on_failure_klass.present?
attributes[:on_failure_active_job] = as_active_job(on_failure_klass).serialize
end

attributes
end

Expand All @@ -69,29 +76,51 @@ def finished?
def finish
return if finished?
reset_changed_at
jobs.find_each do |next_job|
# FIXME: If it's failed but is going to retry, how do we know?
# Because we need to know if we will determine what the failed execution means
# FIXME: use "success" vs "finish" vs "discard" `completion_type` to determine
# how to analyze each job
return unless next_job.finished?
end

all_jobs_succeeded = true
attrs = {}
jobs.find_each do |next_job|
# SolidQueue does treats `discard_on` differently than failures. The job will report as being :finished,
# and there is no record of the failure.
# GoodJob would report a discard as an error. It's possible we should do that in the future?
if fire_failure_job?(next_job)
perform_completion_job(:on_failure_active_job, attrs)
update!(attrs)
end

status = next_job.status
all_jobs_succeeded = all_jobs_succeeded && status != :failed
return unless status.in?([ :finished, :failed ])
end

if on_finish_active_job.present?
active_job = ActiveJob::Base.deserialize(on_finish_active_job)
active_job.send(:deserialize_arguments_if_needed)
active_job.arguments = [ self ] + Array.wrap(active_job.arguments)
ActiveJob.perform_all_later([ active_job ])
attrs[:job] = Job.find_by(active_job_id: active_job.job_id)
perform_completion_job(:on_finish_active_job, attrs)
end

if on_success_active_job.present? && all_jobs_succeeded
perform_completion_job(:on_success_active_job, attrs)
end

update!({ finished_at: Time.zone.now }.merge(attrs))
end

private

def fire_failure_job?(job)
return false if on_failure_active_job.blank? || job.failed_execution.blank?
job = ActiveJob::Base.deserialize(on_failure_active_job)
job.provider_job_id.blank?
end

def perform_completion_job(job_field, attrs)
active_job = ActiveJob::Base.deserialize(send(job_field))
active_job.send(:deserialize_arguments_if_needed)
active_job.arguments = [ self ] + Array.wrap(active_job.arguments)
ActiveJob.perform_all_later([ active_job ])
active_job.provider_job_id = Job.find_by(active_job_id: active_job.job_id).id
attrs[job_field] = active_job.serialize
end

def reset_changed_at
if changed_at.blank? && last_changed_at.present?
update_columns(last_changed_at: Time.zone.now) # wait another hour before we check again
Expand Down
3 changes: 1 addition & 2 deletions db/migrate/20240131013203_create_solid_queue_batch_table.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
class CreateSolidQueueBatchTable < ActiveRecord::Migration[7.1]
def change
create_table :solid_queue_job_batches do |t|
t.references :job, index: { unique: true }
t.text :on_finish_active_job
t.text :on_success_active_job
t.text :on_failure_active_job
t.datetime :finished_at
t.datetime :changed_at
t.datetime :last_changed_at
Expand All @@ -16,6 +16,5 @@ def change

add_reference :solid_queue_jobs, :batch, index: true
add_foreign_key :solid_queue_jobs, :solid_queue_job_batches, column: :batch_id, on_delete: :cascade
add_foreign_key :solid_queue_job_batches, :solid_queue_jobs, column: :job_id
end
end
4 changes: 1 addition & 3 deletions test/dummy/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,16 @@
end

create_table "solid_queue_job_batches", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
t.bigint "job_id"
t.text "on_finish_active_job"
t.text "on_success_active_job"
t.text "on_failure_active_job"
t.datetime "finished_at"
t.datetime "changed_at"
t.datetime "last_changed_at"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["changed_at"], name: "index_solid_queue_job_batches_on_changed_at"
t.index ["finished_at"], name: "index_solid_queue_job_batches_on_finished_at"
t.index ["job_id"], name: "index_solid_queue_job_batches_on_job_id", unique: true
t.index ["last_changed_at"], name: "index_solid_queue_job_batches_on_last_changed_at"
end

Expand Down Expand Up @@ -160,7 +159,6 @@
add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_job_batches", "solid_queue_jobs", column: "job_id"
add_foreign_key "solid_queue_jobs", "solid_queue_job_batches", column: "batch_id", on_delete: :cascade
add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
Expand Down
8 changes: 8 additions & 0 deletions test/test_helpers/jobs_test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ def wait_for_jobs_to_finish_for(timeout = 1.second, except: [])
end
end

def wait_for_job_batches_to_finish_for(timeout = 1.second)
wait_while_with_timeout(timeout) do
skip_active_record_query_cache do
SolidQueue::JobBatch.where(finished_at: nil).any?
end
end
end

def assert_no_unfinished_jobs
skip_active_record_query_cache do
assert SolidQueue::Job.where(finished_at: nil).none?
Expand Down

0 comments on commit 5ba1c27

Please sign in to comment.