Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve DISTINCT query for finding queue names in PostgreSQL #397

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 51 additions & 2 deletions app/models/solid_queue/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,63 @@ class Queue

class << self
def all
Job.select(:queue_name).distinct.collect do |job|
new(job.queue_name)
queue_names.collect do |queue_name|
new(queue_name)
end
end

def find_by_name(name)
new(name)
end

private

def queue_names
# PostgreSQL doesn't perform well with SELECT DISTINCT
# => Use recursive common table expressions if possible for better performance (https://wiki.postgresql.org/wiki/Loose_indexscan)
if SolidQueue::Record.connection.adapter_name.downcase == "postgresql" && SolidQueue::Record.connection.supports_common_table_expressions?
Job.connection.execute(queue_names_recursive_cte_sql).to_a.map { |row| row["queue_name"] }
else
Job.select(:queue_name).distinct.map(&:queue_name)
end
end

def queue_names_recursive_cte_sql
# This relies on the fact that queue_name in solid_queue_jobs is NOT NULL
# The sql looks something like below:
# WITH RECURSIVE t AS (
# (SELECT queue_name FROM solid_queue_jobs ORDER BY queue_name LIMIT 1) -- parentheses required
# UNION ALL
# SELECT (SELECT queue_name FROM solid_queue_jobs WHERE queue_name > t.queue_name ORDER BY queue_name LIMIT 1)
# FROM t
# WHERE t.queue_name IS NOT NULL
# )
# SELECT queue_name FROM t WHERE queue_name IS NOT NULL;

cte_table = Arel::Table.new(:t)
jobs_table = Job.arel_table

cte_base_case = jobs_table.project(jobs_table[:queue_name]).order(jobs_table[:queue_name]).take(1)

subquery = jobs_table
.project(jobs_table[:queue_name])
.where(jobs_table[:queue_name].gt(cte_table[:queue_name]))
.order(jobs_table[:queue_name])
.take(1)
cte_recursive_case = cte_table.project(subquery)
.where(cte_table[:queue_name].not_eq(nil))

cte_definition = Arel::Nodes::Cte.new(
Arel.sql("t"),
Arel::Nodes::UnionAll.new(cte_base_case, cte_recursive_case),
)

cte_table
.project(cte_table[:queue_name])
.where(cte_table[:queue_name].not_eq(nil))
.with(:recursive, cte_definition)
.to_sql
end
end

def initialize(name)
Expand Down
14 changes: 14 additions & 0 deletions test/models/solid_queue/queue_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
require "test_helper"

class SolidQueue::QueueTest < ActiveSupport::TestCase
test "list all queues" do
queue_names = [ "test", "test2", "the_queue", "backend" ]
queue_names.each do |queue_name|
(SecureRandom.random_number(5) + 1).times do |i|
AddToBufferJob.set(queue: queue_name).perform_later(i)
end
end

assert_equal queue_names.sort, SolidQueue::Queue.all.map(&:name).sort
end
end