diff --git a/app/models/solid_queue/job_batch.rb b/app/models/solid_queue/job_batch.rb index 3d0de10d..1bac9139 100644 --- a/app/models/solid_queue/job_batch.rb +++ b/app/models/solid_queue/job_batch.rb @@ -7,10 +7,12 @@ class JobBatch < Record serialize :on_finish_active_job, coder: JSON serialize :on_success_active_job, coder: JSON + serialize :on_failure_active_job, coder: JSON scope :incomplete, -> { where(finished_at: nil).where("changed_at IS NOT NULL OR last_changed_at < ?", 1.hour.ago) } + scope :finished, -> { where.not(finished_at: nil) } class << self def current_batch_id @@ -45,6 +47,7 @@ def dispatch_finished_batches def batch_attributes(attributes) on_finish_klass = attributes.delete(:on_finish) on_success_klass = attributes.delete(:on_success) + on_failure_klass = attributes.delete(:on_failure) if on_finish_klass.present? attributes[:on_finish_active_job] = as_active_job(on_finish_klass).serialize @@ -54,6 +57,10 @@ def batch_attributes(attributes) attributes[:on_success_active_job] = as_active_job(on_success_klass).serialize end + if on_failure_klass.present? + attributes[:on_failure_active_job] = as_active_job(on_failure_klass).serialize + end + attributes end @@ -69,22 +76,29 @@ def finished? def finish return if finished? reset_changed_at - jobs.find_each do |next_job| - # FIXME: If it's failed but is going to retry, how do we know? - # Because we need to know if we will determine what the failed execution means - # FIXME: use "success" vs "finish" vs "discard" `completion_type` to determine - # how to analyze each job - return unless next_job.finished? - end + all_jobs_succeeded = true attrs = {} + jobs.find_each do |next_job| + # SolidQueue does treats `discard_on` differently than failures. The job will report as being :finished, + # and there is no record of the failure. + # GoodJob would report a discard as an error. It's possible we should do that in the future? + if fire_failure_job?(next_job) + perform_completion_job(:on_failure_active_job, attrs) + update!(attrs) + end + + status = next_job.status + all_jobs_succeeded = all_jobs_succeeded && status != :failed + return unless status.in?([ :finished, :failed ]) + end if on_finish_active_job.present? - active_job = ActiveJob::Base.deserialize(on_finish_active_job) - active_job.send(:deserialize_arguments_if_needed) - active_job.arguments = [ self ] + Array.wrap(active_job.arguments) - ActiveJob.perform_all_later([ active_job ]) - attrs[:job] = Job.find_by(active_job_id: active_job.job_id) + perform_completion_job(:on_finish_active_job, attrs) + end + + if on_success_active_job.present? && all_jobs_succeeded + perform_completion_job(:on_success_active_job, attrs) end update!({ finished_at: Time.zone.now }.merge(attrs)) @@ -92,6 +106,21 @@ def finish private + def fire_failure_job?(job) + return false if on_failure_active_job.blank? || job.failed_execution.blank? + job = ActiveJob::Base.deserialize(on_failure_active_job) + job.provider_job_id.blank? + end + + def perform_completion_job(job_field, attrs) + active_job = ActiveJob::Base.deserialize(send(job_field)) + active_job.send(:deserialize_arguments_if_needed) + active_job.arguments = [ self ] + Array.wrap(active_job.arguments) + ActiveJob.perform_all_later([ active_job ]) + active_job.provider_job_id = Job.find_by(active_job_id: active_job.job_id).id + attrs[job_field] = active_job.serialize + end + def reset_changed_at if changed_at.blank? && last_changed_at.present? update_columns(last_changed_at: Time.zone.now) # wait another hour before we check again diff --git a/db/migrate/20240131013203_create_solid_queue_batch_table.rb b/db/migrate/20240131013203_create_solid_queue_batch_table.rb index 8e9e79af..f97faee5 100644 --- a/db/migrate/20240131013203_create_solid_queue_batch_table.rb +++ b/db/migrate/20240131013203_create_solid_queue_batch_table.rb @@ -1,9 +1,9 @@ class CreateSolidQueueBatchTable < ActiveRecord::Migration[7.1] def change create_table :solid_queue_job_batches do |t| - t.references :job, index: { unique: true } t.text :on_finish_active_job t.text :on_success_active_job + t.text :on_failure_active_job t.datetime :finished_at t.datetime :changed_at t.datetime :last_changed_at @@ -16,6 +16,5 @@ def change add_reference :solid_queue_jobs, :batch, index: true add_foreign_key :solid_queue_jobs, :solid_queue_job_batches, column: :batch_id, on_delete: :cascade - add_foreign_key :solid_queue_job_batches, :solid_queue_jobs, column: :job_id end end diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index 31f7837b..294c5096 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -47,9 +47,9 @@ end create_table "solid_queue_job_batches", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| - t.bigint "job_id" t.text "on_finish_active_job" t.text "on_success_active_job" + t.text "on_failure_active_job" t.datetime "finished_at" t.datetime "changed_at" t.datetime "last_changed_at" @@ -57,7 +57,6 @@ t.datetime "updated_at", null: false t.index ["changed_at"], name: "index_solid_queue_job_batches_on_changed_at" t.index ["finished_at"], name: "index_solid_queue_job_batches_on_finished_at" - t.index ["job_id"], name: "index_solid_queue_job_batches_on_job_id", unique: true t.index ["last_changed_at"], name: "index_solid_queue_job_batches_on_last_changed_at" end @@ -160,7 +159,6 @@ add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade - add_foreign_key "solid_queue_job_batches", "solid_queue_jobs", column: "job_id" add_foreign_key "solid_queue_jobs", "solid_queue_job_batches", column: "batch_id", on_delete: :cascade add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade diff --git a/test/test_helpers/jobs_test_helper.rb b/test/test_helpers/jobs_test_helper.rb index d0833fcf..f73458f0 100644 --- a/test/test_helpers/jobs_test_helper.rb +++ b/test/test_helpers/jobs_test_helper.rb @@ -9,6 +9,14 @@ def wait_for_jobs_to_finish_for(timeout = 1.second, except: []) end end + def wait_for_job_batches_to_finish_for(timeout = 1.second) + wait_while_with_timeout(timeout) do + skip_active_record_query_cache do + SolidQueue::JobBatch.where(finished_at: nil).any? + end + end + end + def assert_no_unfinished_jobs skip_active_record_query_cache do assert SolidQueue::Job.where(finished_at: nil).none?