Skip to content

Commit

Permalink
Support new persisted recurring tasks in Solid Queue
Browse files Browse the repository at this point in the history
  • Loading branch information
rosa committed Aug 14, 2024
1 parent 56ddbe1 commit 2362ce9
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 29 deletions.
14 changes: 7 additions & 7 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ GEM
base64 (0.2.0)
bigdecimal (3.1.7)
builder (3.2.4)
concurrent-ruby (1.2.3)
concurrent-ruby (1.3.3)
connection_pool (2.4.1)
crass (1.0.6)
date (3.3.4)
Expand All @@ -116,8 +116,8 @@ GEM
erubi (1.12.0)
et-orbi (1.2.11)
tzinfo
fugit (1.9.0)
et-orbi (~> 1, >= 1.2.7)
fugit (1.11.0)
et-orbi (~> 1, >= 1.2.11)
raabro (~> 1.4)
globalid (1.2.1)
activesupport (>= 6.1)
Expand Down Expand Up @@ -288,11 +288,11 @@ GEM
rack-protection (= 4.0.0)
rack-session (>= 2.0.0, < 3)
tilt (~> 2.0)
solid_queue (0.3.0)
solid_queue (0.4.1)
activejob (>= 7.1)
activerecord (>= 7.1)
concurrent-ruby (~> 1.2.2)
fugit (~> 1.9.0)
concurrent-ruby (>= 1.3.1)
fugit (~> 1.11.0)
railties (>= 7.1)
sprockets (4.2.1)
concurrent-ruby (~> 1.0)
Expand Down Expand Up @@ -351,7 +351,7 @@ DEPENDENCIES
rubocop-performance
rubocop-rails-omakase
selenium-webdriver
solid_queue
solid_queue (>= 0.4.1)
sprockets-rails
sqlite3

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ This library extends Active Job with a querying interface and the following sett
## Adapter Specifics

- **Resque**: Queue pausing is supported only if you have `resque-pause` installed in your project
- **Solid Queue**: Requires version >= 0.4.1.

## Advanced configuration

Expand Down
33 changes: 13 additions & 20 deletions lib/active_job/queue_adapters/solid_queue_ext/recurring_tasks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,29 @@ def supports_recurring_tasks?
end

def recurring_tasks
tasks = recurring_tasks_from_dispatchers
last_enqueued_at_times = recurring_task_last_enqueued_at(tasks.keys)
tasks = SolidQueue::RecurringTask.all
last_enqueued_at_times = recurring_task_last_enqueued_at(tasks.map(&:key))

recurring_tasks_from_dispatchers.collect do |task_id, task_attrs|
recurring_task_attributes_from_solid_queue_task_attributes(task_attrs).merge \
id: task_id,
last_enqueued_at: last_enqueued_at_times[task_id]
tasks.collect do |task|
recurring_task_attributes_from_solid_queue_recurring_task(task).merge \
last_enqueued_at: last_enqueued_at_times[task.key]
end
end

def find_recurring_task(task_id)
if task_attrs = recurring_tasks_from_dispatchers[task_id]
recurring_task_attributes_from_solid_queue_task_attributes(task_attrs).merge \
id: task_id,
last_enqueued_at: recurring_task_last_enqueued_at(task_id).values&.first
if task = SolidQueue::RecurringTask.find_by(key: task_id)
recurring_task_attributes_from_solid_queue_recurring_task(task).merge \
last_enqueued_at: recurring_task_last_enqueued_at(task.key).values&.first
end
end

private
def recurring_tasks_from_dispatchers
SolidQueue::Process.where(kind: "Dispatcher").flat_map do |process|
process.metadata["recurring_schedule"]
end.compact.reduce({}, &:merge)
end

def recurring_task_attributes_from_solid_queue_task_attributes(task_attributes)
def recurring_task_attributes_from_solid_queue_recurring_task(task)
{
job_class_name: task_attributes["class_name"],
arguments: task_attributes["arguments"],
schedule: task_attributes["schedule"]
id: task.key,
job_class_name: task.class_name,
arguments: task.arguments,
schedule: task.schedule
}
end

Expand Down
2 changes: 1 addition & 1 deletion mission_control-jobs.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Gem::Specification.new do |spec|
spec.add_dependency "irb", "~> 1.13"

spec.add_development_dependency "resque"
spec.add_development_dependency "solid_queue"
spec.add_development_dependency "solid_queue", ">= 0.4.1"
spec.add_development_dependency "selenium-webdriver"
spec.add_development_dependency "resque-pause"
spec.add_development_dependency "mocha"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# This migration comes from solid_queue (originally 20240719134516)
class CreateRecurringTasks < ActiveRecord::Migration[7.1]
def change
create_table :solid_queue_recurring_tasks do |t|
t.string :key, null: false, index: { unique: true }
t.string :schedule, null: false
t.string :command, limit: 2048
t.string :class_name
t.text :arguments

t.string :queue_name
t.integer :priority, default: 0

t.boolean :static, default: true, index: true

t.text :description

t.timestamps
end
end
end
18 changes: 17 additions & 1 deletion test/dummy/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[7.1].define(version: 2023_09_14_113326) do
ActiveRecord::Schema[7.1].define(version: 2024_08_06_160416) do
create_table "posts", force: :cascade do |t|
t.string "title"
t.text "body"
Expand Down Expand Up @@ -100,6 +100,22 @@
t.index ["task_key", "run_at"], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true
end

create_table "solid_queue_recurring_tasks", force: :cascade do |t|
t.string "key", null: false
t.string "schedule", null: false
t.string "command", limit: 2048
t.string "class_name"
t.text "arguments"
t.string "queue_name"
t.integer "priority", default: 0
t.boolean "static", default: true
t.text "description"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true
t.index ["static"], name: "index_solid_queue_recurring_tasks_on_static"
end

create_table "solid_queue_scheduled_executions", force: :cascade do |t|
t.integer "job_id", null: false
t.string "queue_name", null: false
Expand Down

0 comments on commit 2362ce9

Please sign in to comment.