From 2aca66debe02ae9e25599e9fd9d3e70feb86fffe Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Thu, 16 Nov 2023 22:55:56 +0100 Subject: [PATCH] Use a concurrent timer task to expire semaphores and unblock executions This task is managed by the scheduler and uses its same polling interval to run. --- app/models/solid_queue/blocked_execution.rb | 2 +- app/models/solid_queue/semaphore.rb | 1 + lib/solid_queue/process_registration.rb | 4 +- lib/solid_queue/scheduler.rb | 70 +++++++++++++------ test/integration/concurrency_controls_test.rb | 18 +++-- 5 files changed, 60 insertions(+), 35 deletions(-) diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index 5b3bccef..296d504c 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -4,7 +4,7 @@ class BlockedExecution < SolidQueue::Execution has_one :semaphore, foreign_key: :concurrency_key, primary_key: :concurrency_key - scope :releasable, -> { left_outer_joins(:execution_semaphore).merge(Semaphore.available.or(Semaphore.where(id: nil))) } + scope :releasable, -> { left_outer_joins(:semaphore).merge(Semaphore.available.or(Semaphore.where(id: nil))) } scope :ordered, -> { order(priority: :asc) } class << self diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index 461c01d8..e55c67b3 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -1,6 +1,7 @@ class SolidQueue::Semaphore < SolidQueue::Record scope :available, -> { where("value > 0") } scope :locked, -> { where(value: 0) } + scope :expired, -> { where(expires_at: ...Time.current)} class << self def wait_for(concurrency_key, limit, duration) diff --git a/lib/solid_queue/process_registration.rb b/lib/solid_queue/process_registration.rb index e0e9f076..2e7fe0ad 100644 --- a/lib/solid_queue/process_registration.rb +++ b/lib/solid_queue/process_registration.rb @@ -9,7 +9,7 @@ module ProcessRegistration define_callbacks :start, :run, :shutdown set_callback :start, :before, :register - set_callback :start, :before, :start_heartbeat + set_callback :start, :before, :launch_heartbeat set_callback :run, :after, -> { stop unless registered? } @@ -43,7 +43,7 @@ def registered? process.persisted? end - def start_heartbeat + def launch_heartbeat @heartbeat_task = Concurrent::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) { heartbeat } @heartbeat_task.execute end diff --git a/lib/solid_queue/scheduler.rb b/lib/solid_queue/scheduler.rb index e122e3fb..dc6b5f7e 100644 --- a/lib/solid_queue/scheduler.rb +++ b/lib/solid_queue/scheduler.rb @@ -1,34 +1,60 @@ # frozen_string_literal: true -class SolidQueue::Scheduler - include SolidQueue::Runner +module SolidQueue + class Scheduler + include Runner - attr_accessor :batch_size, :polling_interval + attr_accessor :batch_size, :polling_interval - def initialize(**options) - options = options.dup.with_defaults(SolidQueue::Configuration::SCHEDULER_DEFAULTS) + set_callback :start, :before, :launch_concurrency_maintenance + set_callback :shutdown, :before, :stop_concurrency_maintenance - @batch_size = options[:batch_size] - @polling_interval = options[:polling_interval] - end + def initialize(**options) + options = options.dup.with_defaults(SolidQueue::Configuration::SCHEDULER_DEFAULTS) + + @batch_size = options[:batch_size] + @polling_interval = options[:polling_interval] + end - private - def run - with_polling_volume do - batch = SolidQueue::ScheduledExecution.next_batch(batch_size) + private + def run + with_polling_volume do + batch = SolidQueue::ScheduledExecution.next_batch(batch_size).tap(&:load) - if batch.size > 0 - procline "preparing #{batch.size} jobs for execution" + if batch.size > 0 + procline "preparing #{batch.size} jobs for execution" - SolidQueue::ScheduledExecution.prepare_batch(batch) - else - procline "waiting" - interruptible_sleep(polling_interval) + SolidQueue::ScheduledExecution.prepare_batch(batch) + else + procline "waiting" + interruptible_sleep(polling_interval) + end end end - end - def metadata - super.merge(batch_size: batch_size, polling_interval: polling_interval) - end + def launch_concurrency_maintenance + @concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: polling_interval) do + expire_semaphores + unblock_blocked_executions + end + + @concurrency_maintenance_task.execute + end + + def stop_concurrency_maintenance + @concurrency_maintenance_task.shutdown + end + + def expire_semaphores + Semaphore.expired.in_batches(of: batch_size, &:delete_all) + end + + def unblock_blocked_executions + BlockedExecution.unblock(batch_size) + end + + def metadata + super.merge(batch_size: batch_size, polling_interval: polling_interval) + end + end end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index 0a7f9f5d..ba1a1183 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -10,7 +10,9 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase @result = JobResult.create!(queue_name: "default", status: "seq: ") default_worker = { queues: "default", polling_interval: 1, processes: 3, threads: 2 } - @pid = run_supervisor_as_fork(load_configuration_from: { workers: [ default_worker ] }) + scheduler = { polling_interval: 1, batch_size: 200 } + + @pid = run_supervisor_as_fork(mode: :all, load_configuration_from: { workers: [ default_worker ], scheduler: scheduler }) wait_for_registered_processes(4, timeout: 0.2.second) # 3 workers working the default queue + supervisor end @@ -80,9 +82,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert_stored_sequence @result, [ "B", "D", "F" ] + ("G".."K").to_a end - test "rely on worker to unblock blocked executions with an available semaphore" do - skip "Moving this task to the supervisor" - + test "rely on scheduler to unblock blocked executions with an available semaphore" do # Simulate a scenario where we got an available semaphore and some stuck jobs job = SequentialUpdateResultJob.perform_later(@result, name: "A") @@ -115,9 +115,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a end - test "rely on worker to unblock blocked executions with a missing semaphore" do - skip "Moving this task to the supervisor" - + test "rely on scheduler to unblock blocked executions with an expired semaphore" do # Simulate a scenario where we got an available semaphore and some stuck jobs job = SequentialUpdateResultJob.perform_later(@result, name: "A") wait_for_jobs_to_finish_for(2.seconds) @@ -135,10 +133,10 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end end - # Then delete the semaphore, as if we had cleared it - SolidQueue::Semaphore.find_by(concurrency_key: job.concurrency_key).destroy! + # Simulate semaphore expiration + SolidQueue::Semaphore.find_by(concurrency_key: job.concurrency_key).update(expires_at: 1.hour.ago) - # And wait for workers to release the jobs + # And wait for scheduler to release the jobs wait_for_jobs_to_finish_for(2.seconds) assert_no_pending_jobs