Skip to content

Commit

Permalink
Order both by priority and job_id, to ensure deterministic ordering
Browse files Browse the repository at this point in the history
When all priorities are the same.
  • Loading branch information
rosa committed Nov 22, 2023
1 parent 2b8f732 commit 78ca4b8
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 10 deletions.
2 changes: 1 addition & 1 deletion app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def claiming(job_ids, process_id, &block)
job_data = Array(job_ids).collect { |job_id| { job_id: job_id, process_id: process_id } }

insert_all(job_data)
where(job_id: job_ids).tap do |claimed|
where(job_id: job_ids).load.tap do |claimed|
block.call(claimed)
SolidQueue.logger.info("[SolidQueue] Claimed #{claimed.size} jobs")
end
Expand Down
7 changes: 4 additions & 3 deletions app/models/solid_queue/ready_execution.rb
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
module SolidQueue
class ReadyExecution < Execution
scope :queued_as, ->(queue_name) { where(queue_name: queue_name) }
scope :ordered, -> { order(priority: :asc) }
scope :ordered, -> { order(priority: :asc, job_id: :asc) }

assume_attributes_from_job

class << self
def claim(queue_list, limit, process_id)
QueueSelector.new(queue_list, self).scoped_relations.flat_map do |queue_relation|
select_and_lock(queue_relation, process_id, limit).tap do |locked|
limit -= locked.count
break if limit <= 0
limit -= locked.size
end
end
end

private
def select_and_lock(queue_relation, process_id, limit)
return [] if limit <= 0

transaction do
candidates = select_candidates(queue_relation, limit)
lock(candidates, process_id)
Expand Down
8 changes: 4 additions & 4 deletions lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def initialize(**options)
options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS)

@polling_interval = options[:polling_interval]
@queues = options[:queues]
@queues = Array(options[:queues])
@pool = Pool.new(options[:threads], on_idle: -> { wake_up })
end

Expand All @@ -21,13 +21,13 @@ def run
end

if claimed_executions.size > 0
procline "performing #{claimed_executions.count} jobs in #{queues}"
procline "performing #{claimed_executions.count} jobs"

claimed_executions.each do |execution|
pool.post(execution)
end
else
procline "waiting for jobs in #{queues}"
procline "waiting for jobs in #{queues.join(",")}"
interruptible_sleep(polling_interval)
end
end
Expand All @@ -44,7 +44,7 @@ def all_work_completed?
end

def metadata
super.merge(queues: queues, thread_pool_size: pool.size, idle_threads: pool.idle_threads, polling_interval: polling_interval)
super.merge(queues: queues.join(","), thread_pool_size: pool.size, idle_threads: pool.idle_threads, polling_interval: polling_interval)
end
end
end
4 changes: 2 additions & 2 deletions test/unit/configuration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class ConfigurationTest < ActiveSupport::TestCase

assert_equal SolidQueue::Configuration::SCHEDULER_DEFAULTS[:polling_interval], configuration.scheduler.polling_interval
assert_equal 2, configuration.workers.count
assert_equal [ "background" ], configuration.workers.map(&:queues).uniq
assert_equal [ "background" ], configuration.workers.flat_map(&:queues).uniq
assert_equal [ 10 ], configuration.workers.map(&:polling_interval).uniq
end

Expand All @@ -30,7 +30,7 @@ class ConfigurationTest < ActiveSupport::TestCase
configuration = SolidQueue::Configuration.new(mode: :work, load_from: config_as_hash)

assert_equal 3, configuration.workers.count
assert_equal [ "background" ], configuration.workers.map(&:queues).uniq
assert_equal [ "background" ], configuration.workers.flat_map(&:queues).uniq
assert_equal [ 10 ], configuration.workers.map(&:polling_interval).uniq
end
end

0 comments on commit 78ca4b8

Please sign in to comment.