From a523f19fdb7881cf4f8265440b4fcb749bc7b254 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Tue, 6 Aug 2024 18:40:09 +0200 Subject: [PATCH] Support new persisted recurring tasks in Solid Queue See https://github.com/rails/solid_queue/pull/272 --- Gemfile.lock | 14 ++++---- README.md | 1 + .../solid_queue_ext/recurring_tasks.rb | 33 ++++++++----------- mission_control-jobs.gemspec | 2 +- ...0416_create_recurring_tasks.solid_queue.rb | 21 ++++++++++++ test/dummy/db/schema.rb | 18 +++++++++- 6 files changed, 60 insertions(+), 29 deletions(-) create mode 100644 test/dummy/db/migrate/20240806160416_create_recurring_tasks.solid_queue.rb diff --git a/Gemfile.lock b/Gemfile.lock index c110bb92..9b1966f2 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -105,7 +105,7 @@ GEM base64 (0.2.0) bigdecimal (3.1.7) builder (3.2.4) - concurrent-ruby (1.2.3) + concurrent-ruby (1.3.3) connection_pool (2.4.1) crass (1.0.6) date (3.3.4) @@ -116,8 +116,8 @@ GEM erubi (1.12.0) et-orbi (1.2.11) tzinfo - fugit (1.9.0) - et-orbi (~> 1, >= 1.2.7) + fugit (1.11.0) + et-orbi (~> 1, >= 1.2.11) raabro (~> 1.4) globalid (1.2.1) activesupport (>= 6.1) @@ -288,11 +288,11 @@ GEM rack-protection (= 4.0.0) rack-session (>= 2.0.0, < 3) tilt (~> 2.0) - solid_queue (0.3.0) + solid_queue (0.5.0) activejob (>= 7.1) activerecord (>= 7.1) - concurrent-ruby (~> 1.2.2) - fugit (~> 1.9.0) + concurrent-ruby (>= 1.3.1) + fugit (~> 1.11.0) railties (>= 7.1) sprockets (4.2.1) concurrent-ruby (~> 1.0) @@ -351,7 +351,7 @@ DEPENDENCIES rubocop-performance rubocop-rails-omakase selenium-webdriver - solid_queue + solid_queue (>= 0.5) sprockets-rails sqlite3 diff --git a/README.md b/README.md index 8761b747..0286948e 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,7 @@ This library extends Active Job with a querying interface and the following sett ## Adapter Specifics - **Resque**: Queue pausing is supported only if you have `resque-pause` installed in your project +- **Solid Queue**: Requires version >= 0.5. ## Advanced configuration diff --git a/lib/active_job/queue_adapters/solid_queue_ext/recurring_tasks.rb b/lib/active_job/queue_adapters/solid_queue_ext/recurring_tasks.rb index 873c89b4..4d40b456 100644 --- a/lib/active_job/queue_adapters/solid_queue_ext/recurring_tasks.rb +++ b/lib/active_job/queue_adapters/solid_queue_ext/recurring_tasks.rb @@ -4,36 +4,29 @@ def supports_recurring_tasks? end def recurring_tasks - tasks = recurring_tasks_from_dispatchers - last_enqueued_at_times = recurring_task_last_enqueued_at(tasks.keys) + tasks = SolidQueue::RecurringTask.all + last_enqueued_at_times = recurring_task_last_enqueued_at(tasks.map(&:key)) - recurring_tasks_from_dispatchers.collect do |task_id, task_attrs| - recurring_task_attributes_from_solid_queue_task_attributes(task_attrs).merge \ - id: task_id, - last_enqueued_at: last_enqueued_at_times[task_id] + tasks.collect do |task| + recurring_task_attributes_from_solid_queue_recurring_task(task).merge \ + last_enqueued_at: last_enqueued_at_times[task.key] end end def find_recurring_task(task_id) - if task_attrs = recurring_tasks_from_dispatchers[task_id] - recurring_task_attributes_from_solid_queue_task_attributes(task_attrs).merge \ - id: task_id, - last_enqueued_at: recurring_task_last_enqueued_at(task_id).values&.first + if task = SolidQueue::RecurringTask.find_by(key: task_id) + recurring_task_attributes_from_solid_queue_recurring_task(task).merge \ + last_enqueued_at: recurring_task_last_enqueued_at(task.key).values&.first end end private - def recurring_tasks_from_dispatchers - SolidQueue::Process.where(kind: "Dispatcher").flat_map do |process| - process.metadata["recurring_schedule"] - end.compact.reduce({}, &:merge) - end - - def recurring_task_attributes_from_solid_queue_task_attributes(task_attributes) + def recurring_task_attributes_from_solid_queue_recurring_task(task) { - job_class_name: task_attributes["class_name"], - arguments: task_attributes["arguments"], - schedule: task_attributes["schedule"] + id: task.key, + job_class_name: task.class_name, + arguments: task.arguments, + schedule: task.schedule } end diff --git a/mission_control-jobs.gemspec b/mission_control-jobs.gemspec index ac61c538..fe5c7dfe 100644 --- a/mission_control-jobs.gemspec +++ b/mission_control-jobs.gemspec @@ -23,7 +23,7 @@ Gem::Specification.new do |spec| spec.add_dependency "irb", "~> 1.13" spec.add_development_dependency "resque" - spec.add_development_dependency "solid_queue" + spec.add_development_dependency "solid_queue", ">= 0.5" spec.add_development_dependency "selenium-webdriver" spec.add_development_dependency "resque-pause" spec.add_development_dependency "mocha" diff --git a/test/dummy/db/migrate/20240806160416_create_recurring_tasks.solid_queue.rb b/test/dummy/db/migrate/20240806160416_create_recurring_tasks.solid_queue.rb new file mode 100644 index 00000000..507c9e72 --- /dev/null +++ b/test/dummy/db/migrate/20240806160416_create_recurring_tasks.solid_queue.rb @@ -0,0 +1,21 @@ +# This migration comes from solid_queue (originally 20240719134516) +class CreateRecurringTasks < ActiveRecord::Migration[7.1] + def change + create_table :solid_queue_recurring_tasks do |t| + t.string :key, null: false, index: { unique: true } + t.string :schedule, null: false + t.string :command, limit: 2048 + t.string :class_name + t.text :arguments + + t.string :queue_name + t.integer :priority, default: 0 + + t.boolean :static, default: true, index: true + + t.text :description + + t.timestamps + end + end +end diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index 43f1b56c..84c47c16 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.1].define(version: 2023_09_14_113326) do +ActiveRecord::Schema[7.1].define(version: 2024_08_06_160416) do create_table "posts", force: :cascade do |t| t.string "title" t.text "body" @@ -100,6 +100,22 @@ t.index ["task_key", "run_at"], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true end + create_table "solid_queue_recurring_tasks", force: :cascade do |t| + t.string "key", null: false + t.string "schedule", null: false + t.string "command", limit: 2048 + t.string "class_name" + t.text "arguments" + t.string "queue_name" + t.integer "priority", default: 0 + t.boolean "static", default: true + t.text "description" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true + t.index ["static"], name: "index_solid_queue_recurring_tasks_on_static" + end + create_table "solid_queue_scheduled_executions", force: :cascade do |t| t.integer "job_id", null: false t.string "queue_name", null: false