diff --git a/app/models/solid_queue/queue.rb b/app/models/solid_queue/queue.rb index 4cf05bfb..3b68d5ed 100644 --- a/app/models/solid_queue/queue.rb +++ b/app/models/solid_queue/queue.rb @@ -6,14 +6,63 @@ class Queue class << self def all - Job.select(:queue_name).distinct.collect do |job| - new(job.queue_name) + queue_names.collect do |queue_name| + new(queue_name) end end def find_by_name(name) new(name) end + + private + + def queue_names + # PostgreSQL doesn't perform well with SELECT DISTINCT + # => Use recursive common table expressions if possible for better performance (https://wiki.postgresql.org/wiki/Loose_indexscan) + if SolidQueue::Record.connection.adapter_name.downcase == "postgresql" && SolidQueue::Record.connection.supports_common_table_expressions? + Job.connection.execute(queue_names_recursive_cte_sql).to_a.map { |row| row["queue_name"] } + else + Job.select(:queue_name).distinct.map(&:queue_name) + end + end + + def queue_names_recursive_cte_sql + # This relies on the fact that queue_name in solid_queue_jobs is NOT NULL + # The sql looks something like below: + # WITH RECURSIVE t AS ( + # (SELECT queue_name FROM solid_queue_jobs ORDER BY queue_name LIMIT 1) -- parentheses required + # UNION ALL + # SELECT (SELECT queue_name FROM solid_queue_jobs WHERE queue_name > t.queue_name ORDER BY queue_name LIMIT 1) + # FROM t + # WHERE t.queue_name IS NOT NULL + # ) + # SELECT queue_name FROM t WHERE queue_name IS NOT NULL; + + cte_table = Arel::Table.new(:t) + jobs_table = Job.arel_table + + cte_base_case = jobs_table.project(jobs_table[:queue_name]).order(jobs_table[:queue_name]).take(1) + + subquery = jobs_table + .project(jobs_table[:queue_name]) + .where(jobs_table[:queue_name].gt(cte_table[:queue_name])) + .order(jobs_table[:queue_name]) + .take(1) + cte_recursive_case = cte_table.project(subquery) + .where(cte_table[:queue_name].not_eq(nil)) + + cte_definition = Arel::Nodes::Cte.new( + Arel.sql("t"), + Arel::Nodes::UnionAll.new(cte_base_case, cte_recursive_case), + ) + + cte_table + .project(cte_table[:queue_name]) + .where(cte_table[:queue_name].not_eq(nil)) + .with(:recursive, cte_definition) + .to_sql + end end def initialize(name) diff --git a/test/models/solid_queue/queue_test.rb b/test/models/solid_queue/queue_test.rb new file mode 100644 index 00000000..2deacd44 --- /dev/null +++ b/test/models/solid_queue/queue_test.rb @@ -0,0 +1,14 @@ +require "test_helper" + +class SolidQueue::QueueTest < ActiveSupport::TestCase + test "list all queues" do + queue_names = [ "test", "test2", "the_queue", "backend" ] + queue_names.each do |queue_name| + (SecureRandom.random_number(5) + 1).times do |i| + AddToBufferJob.set(queue: queue_name).perform_later(i) + end + end + + assert_equal queue_names.sort, SolidQueue::Queue.all.map(&:name).sort + end +end