Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete finished jobs by default, when they're executed #44

Merged
merged 4 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def execute

def finished
transaction do
job.finished
job.finished!
destroy!
end

Expand Down
8 changes: 2 additions & 6 deletions app/models/solid_queue/execution.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
class SolidQueue::Execution < SolidQueue::Record
include JobAttributes

self.abstract_class = true

belongs_to :job

alias_method :discard, :destroy

private
def assume_attributes_from_job
self.queue_name ||= job&.queue_name
self.priority = job&.priority if job&.priority.to_i > priority
end
end
22 changes: 22 additions & 0 deletions app/models/solid_queue/execution/job_attributes.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
module SolidQueue
class Execution
module JobAttributes
extend ActiveSupport::Concern

ASSUMIBLE_ATTRIBUTES_FROM_JOB = %i[ queue_name priority ]

class_methods do
def assume_attributes_from_job(*attributes)
before_create -> { assume_attributes_from_job(ASSUMIBLE_ATTRIBUTES_FROM_JOB | attributes) }
end
end

private
def assume_attributes_from_job(attributes)
attributes.each do |attribute|
send("#{attribute}=", job.send(attribute))
end
end
end
end
end
14 changes: 12 additions & 2 deletions app/models/solid_queue/job/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ module Job::Executable
has_one :scheduled_execution, dependent: :destroy

after_create :prepare_for_execution

scope :finished, -> { where.not(finished_at: nil) }
end

STATUSES = %w[ ready claimed failed scheduled ]
Expand All @@ -26,8 +28,12 @@ def prepare_for_execution
end
end

def finished
touch(:finished_at)
def finished!
if delete_finished_jobs?
destroy!
else
touch(:finished_at)
end
end

def finished?
Expand All @@ -50,5 +56,9 @@ def retry
def due?
scheduled_at.nil? || scheduled_at <= Time.current
end

def delete_finished_jobs?
SolidQueue.delete_finished_jobs
end
end
end
2 changes: 1 addition & 1 deletion app/models/solid_queue/ready_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ class ReadyExecution < Execution
scope :ordered, -> { order(priority: :asc) }
scope :not_paused, -> { where.not(queue_name: Pause.all_queue_names) }

before_create :assume_attributes_from_job
assume_attributes_from_job

class << self
def claim(queues, limit, process_id)
Expand Down
8 changes: 1 addition & 7 deletions app/models/solid_queue/scheduled_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ class SolidQueue::ScheduledExecution < SolidQueue::Execution
scope :ordered, -> { order(scheduled_at: :asc, priority: :asc) }
scope :next_batch, ->(batch_size) { due.ordered.limit(batch_size) }

before_create :assume_attributes_from_job
assume_attributes_from_job :scheduled_at

class << self
def prepare_batch(batch)
Expand All @@ -27,10 +27,4 @@ def prepare_batch(batch)
def execution_ready_attributes
attributes.slice("job_id", "queue_name", "priority")
end

private
def assume_attributes_from_job
super
self.scheduled_at ||= job&.scheduled_at
end
end
2 changes: 2 additions & 0 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ module SolidQueue
mattr_accessor :supervisor_pidfile
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set a default here, now it is not done in the initializer?

mattr_accessor :supervisor, default: false

mattr_accessor :delete_finished_jobs, default: true

def self.supervisor?
supervisor
end
Expand Down
8 changes: 2 additions & 6 deletions lib/solid_queue/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,8 @@ class Engine < ::Rails::Engine
config.solid_queue = ActiveSupport::OrderedOptions.new

initializer "solid_queue.config" do
config.after_initialize do |app|
SolidQueue.process_heartbeat_interval = app.config.solid_queue.process_heartbeat_interval || 60.seconds
SolidQueue.process_alive_threshold = app.config.solid_queue.process_alive_threshold || 5.minutes
SolidQueue.shutdown_timeout = app.config.solid_queue.shutdown_timeout || 5.seconds
SolidQueue.silence_polling = app.config.solid_queue.silence_polling || false
SolidQueue.supervisor_pidfile = app.config.solid_queue.supervisor_pidfile || app.root.join("tmp", "pids", "solid_queue_supervisor.pid")
config.solid_queue.each do |name, value|
SolidQueue.public_send("#{name}=", value)
end
end

Expand Down
3 changes: 3 additions & 0 deletions test/dummy/config/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,8 @@ class Application < Rails::Application
# config.eager_load_paths << Rails.root.join("extras")

config.active_job.queue_adapter = :solid_queue

config.solid_queue.logger = ActiveSupport::Logger.new(nil)
config.solid_queue.delete_finished_jobs = false
end
end
23 changes: 0 additions & 23 deletions test/fixtures/solid_queue/jobs.yml
Original file line number Diff line number Diff line change
@@ -1,23 +0,0 @@
add_to_buffer_job:
queue_name: background
active_job_id: b55784ad-ff9c-4c02-8d6c-016089bb3e09
class_name: AddToBufferJob
arguments: "{\"job_class\":\"AddToBufferJob\", \"job_id\":\"b55784ad-ff9c-4c02-8d6c-016089bb3e09\", \"provider_job_id\":null, \"queue_name\":\"background\", \"priority\":0, \"arguments\":[42], \"executions\":0, \"exception_executions\":{}, \"locale\":\"en\", \"timezone\":\"UTC\", \"enqueued_at\":\"2023-02-23T09:54:28Z\"}"
priority: 0

raising_job:
queue_name: background
class_name: RaisingJob
active_job_id: 60c93269-c3d0-44b1-958e-826cd145c456
arguments: "{\"job_class\":\"RaisingJob\",\"job_id\":\"60c93269-c3d0-44b1-958e-826cd145c456\",\"provider_job_id\":null,\"queue_name\":\"background\",\"priority\":0,\"arguments\":[{\"_aj_serialized\":\"ActiveJob::Serializers::ModuleSerializer\", \"value\":\"RuntimeError\"}, 2],\"executions\":0,\"exception_executions\":{},\"locale\":\"en\",\"timezone\":\"UTC\",\"enqueued_at\":\"2023-02-23T09:36:04Z\"}"
priority: 0

<% 5.times do |i| %>
<% active_job_id = SecureRandom.uuid %>
job_<%= i %>:
queue_name: fixtures
active_job_id: <%= active_job_id %>
class_name: AddToBufferJob
arguments: "{\"job_class\":\"AddToBufferJob\",\"job_id\":\"<%= active_job_id %>\",\"queue_name\":\"fixtures\",\"priority\":<%= i %>,\"arguments\":[],\"executions\":0,\"exception_executions\":{},\"locale\":\"en\",\"timezone\":\"UTC\",\"enqueued_at\":\"2023-02-20T10:33:02Z\"}"
priority: <%= i %>
<% end %>
32 changes: 27 additions & 5 deletions test/integration/jobs_lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

class JobsLifecycleTest < ActiveSupport::TestCase
setup do
@worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 1)
@scheduler = SolidQueue::Scheduler.new(batch_size: 10, polling_interval: 1)
@worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 0.5)
@scheduler = SolidQueue::Scheduler.new(batch_size: 10, polling_interval: 0.5)
end

teardown do
Expand All @@ -21,9 +21,10 @@ class JobsLifecycleTest < ActiveSupport::TestCase
@scheduler.start(mode: :async)
@worker.start(mode: :async)

wait_for_jobs_to_finish_for(0.5.seconds)
wait_for_jobs_to_finish_for(2.seconds)

assert_equal [ "hey", "ho" ], JobBuffer.values.sort
assert_equal 2, SolidQueue::Job.finished.count
end

test "schedule and run jobs" do
Expand All @@ -37,16 +38,37 @@ class JobsLifecycleTest < ActiveSupport::TestCase

travel_to 2.days.from_now

wait_for_jobs_to_finish_for(5.seconds)
wait_for_jobs_to_finish_for(2.seconds)

assert_equal 1, JobBuffer.size
assert_equal "I'm scheduled", JobBuffer.last_value

travel_to 5.days.from_now

wait_for_jobs_to_finish_for(5.seconds)
wait_for_jobs_to_finish_for(2.seconds)

assert_equal 2, JobBuffer.size
assert_equal "I'm scheduled later", JobBuffer.last_value

assert_equal 2, SolidQueue::Job.finished.count
end

test "delete finished jobs after they run" do
deleting_finished_jobs do
AddToBufferJob.perform_later "hey"
@worker.start(mode: :async)

wait_for_jobs_to_finish_for(2.seconds)
end

assert_equal 0, SolidQueue::Job.count
end

private
def deleting_finished_jobs
previous, SolidQueue.delete_finished_jobs = SolidQueue.delete_finished_jobs, true
yield
ensure
SolidQueue.delete_finished_jobs = previous
end
end
20 changes: 11 additions & 9 deletions test/models/solid_queue/claimed_execution_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
end

test "perform job successfully" do
job = solid_queue_jobs(:add_to_buffer_job)
claimed_execution = prepare_and_claim_job(job)
claimed_execution = prepare_and_claim_job AddToBufferJob.perform_later(42)
job = claimed_execution.job
assert_not job.finished?

assert_difference -> { SolidQueue::ClaimedExecution.count }, -1 do
claimed_execution.perform
Expand All @@ -20,8 +21,8 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
end

test "perform job that fails" do
job = solid_queue_jobs(:raising_job)
claimed_execution = prepare_and_claim_job(job)
claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, 2)
job = claimed_execution.job

assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::FailedExecution.count } => 1 do
claimed_execution.perform
Expand All @@ -40,8 +41,7 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
subscriber = ErrorBuffer.new

with_error_subscriber(subscriber) do
job = solid_queue_jobs(:raising_job)
claimed_execution = prepare_and_claim_job(job)
claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, 2)

claimed_execution.perform
end
Expand All @@ -51,8 +51,8 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
end

test "release" do
job = solid_queue_jobs(:add_to_buffer_job)
claimed_execution = prepare_and_claim_job(job)
claimed_execution = prepare_and_claim_job AddToBufferJob.perform_later(42)
job = claimed_execution.job

assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::ReadyExecution.count } => 1 do
claimed_execution.release
Expand All @@ -62,7 +62,9 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
end

private
def prepare_and_claim_job(job)
def prepare_and_claim_job(active_job)
job = SolidQueue::Job.find_by(active_job_id: active_job.job_id)

job.prepare_for_execution
job.reload.ready_execution.claim(@process.id)
job.reload.claimed_execution
Expand Down
49 changes: 25 additions & 24 deletions test/models/solid_queue/ready_execution_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
setup do
@jobs = SolidQueue::Job.where(queue_name: "fixtures")
@jobs.each(&:prepare_for_execution)
5.times do |i|
AddToBufferJob.set(queue: "backend", priority: 5 - i).perform_later(i)
end

@jobs = SolidQueue::Job.where(queue_name: "backend")
end

test "claim all jobs for existing queue" do
assert_claimed_jobs(@jobs.count) do
SolidQueue::ReadyExecution.claim("fixtures", @jobs.count + 1, 42)
SolidQueue::ReadyExecution.claim("backend", @jobs.count + 1, 42)
end

@jobs.each do |job|
Expand All @@ -25,7 +28,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase

test "claim some jobs for existing queue" do
assert_claimed_jobs(2) do
SolidQueue::ReadyExecution.claim("fixtures", 2, 42)
SolidQueue::ReadyExecution.claim("backend", 2, 42)
end

@jobs.order(:priority).first(2).each do |job|
Expand All @@ -40,8 +43,8 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
end

test "claim individual job" do
job = solid_queue_jobs(:add_to_buffer_job)
job.prepare_for_execution
AddToBufferJob.perform_later("hey")
job = SolidQueue::Job.last

assert_claimed_jobs(1) do
job.ready_execution.claim(42)
Expand All @@ -52,48 +55,46 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase
end

test "claim jobs using a list of queues" do
(SolidQueue::Job.all - @jobs).each(&:prepare_for_execution)
AddToBufferJob.perform_later("hey")

assert_claimed_jobs(SolidQueue::Job.count) do
SolidQueue::ReadyExecution.claim(%w[ fixtures background ], SolidQueue::Job.count + 1, 42)
assert_claimed_jobs(6) do
SolidQueue::ReadyExecution.claim(%w[ backend background ], SolidQueue::Job.count + 1, 42)
end
end

test "claim jobs using a wildcard" do
(SolidQueue::Job.all - @jobs).each(&:prepare_for_execution)
AddToBufferJob.perform_later("hey")

assert_claimed_jobs(SolidQueue::Job.count) do
assert_claimed_jobs(6) do
SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42)
end
end

test "claim jobs using a wildcard and having paused queues" do
other_jobs = SolidQueue::Job.all - @jobs
other_jobs.each(&:prepare_for_execution)
AddToBufferJob.perform_later("hey")

SolidQueue::Queue.find_by_name("fixtures").pause
SolidQueue::Queue.find_by_name("backend").pause

assert_claimed_jobs(other_jobs.count) do
assert_claimed_jobs(1) do
SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42)
end
end

test "claim jobs using queue prefixes" do
assert_claimed_jobs(2) do
SolidQueue::ReadyExecution.claim("fix*", 2, 42)
end
AddToBufferJob.perform_later("hey")

@jobs.order(:priority).first(2).each do |job|
assert_not job.reload.ready?
assert job.claimed?
assert_claimed_jobs(1) do
SolidQueue::ReadyExecution.claim("backgr*", SolidQueue::Job.count + 1, 42)
end

assert @jobs.none?(&:claimed?)
end

test "claim jobs using both exact names and prefixes" do
(SolidQueue::Job.all - @jobs).each(&:prepare_for_execution)
AddToBufferJob.perform_later("hey")

assert_claimed_jobs(SolidQueue::Job.count) do
SolidQueue::ReadyExecution.claim(%w[ fix* background ], SolidQueue::Job.count + 1, 42)
assert_claimed_jobs(6) do
SolidQueue::ReadyExecution.claim(%w[ backe* background ], SolidQueue::Job.count + 1, 42)
end
end

Expand Down
Loading