Skip to content

Commit

Permalink
[outbox] Rename Repository into Repositories::Mysql57
Browse files Browse the repository at this point in the history
  • Loading branch information
swistak35 committed Dec 15, 2022
1 parent 66e7c9d commit 82e868c
Show file tree
Hide file tree
Showing 13 changed files with 419 additions and 408 deletions.
2 changes: 1 addition & 1 deletion contrib/ruby_event_store-outbox/.mutant.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ matcher:
- RubyEventStore::Outbox::Configuration*
- RubyEventStore::Outbox::Consumer#get_remaining_count
- RubyEventStore::Outbox::CleanupStrategies::None*
- RubyEventStore::Outbox::Repository*
- RubyEventStore::Outbox::Repositories*
- RubyEventStore::Outbox::Runner#initialize
- RubyEventStore::Outbox::Runner#run
- RubyEventStore::Outbox::Runner#prepare_traps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ module Outbox
end

require_relative "outbox/fetch_specification"
require_relative "outbox/repository"
require_relative "outbox/repositories/mysql57"
require_relative "outbox/sidekiq_scheduler"
require_relative "outbox/legacy_sidekiq_scheduler"
require_relative "outbox/version"
require_relative "outbox/tempo"
require_relative "outbox/batch_result"
require_relative "outbox/cleanup_strategies"
require_relative "outbox/repositories"
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require "logger"
require "redis"
require "active_record"
require_relative "repository"
require_relative "repositories/mysql57"
require_relative "sidekiq5_format"
require_relative "sidekiq_processor"
require_relative "fetch_specification"
Expand All @@ -24,7 +24,7 @@ def initialize(consumer_uuid, configuration, clock: Time, logger:, metrics:)
raise "Unknown format" if configuration.message_format != SIDEKIQ5_FORMAT
@processor = SidekiqProcessor.new(Redis.new(url: configuration.redis_url))

@repository = Repository.build_for_consumer(configuration.database_url)
@repository = Repositories::Mysql57.build_for_consumer(configuration.database_url)
@cleanup_strategy = CleanupStrategies.build(configuration, repository)
end

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
module RubyEventStore
module Outbox
module Repositories
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# frozen_string_literal: true

require "active_record"
require "active_support/core_ext/numeric/time.rb"

module RubyEventStore
module Outbox
module Repositories
class Mysql57
RECENTLY_LOCKED_DURATION = 10.minutes

class Record < ::ActiveRecord::Base
self.primary_key = :id
self.table_name = "event_store_outbox"

def self.remaining_for(fetch_specification)
where(format: fetch_specification.message_format, split_key: fetch_specification.split_key, enqueued_at: nil)
end

def self.for_fetch_specification(fetch_specification)
where(format: fetch_specification.message_format, split_key: fetch_specification.split_key)
end

def hash_payload
JSON.parse(payload).deep_symbolize_keys
end

def enqueued?
!enqueued_at.nil?
end
end

class Lock < ::ActiveRecord::Base
self.table_name = "event_store_outbox_locks"

def self.obtain(fetch_specification, process_uuid, clock:)
transaction do
l = get_lock_record(fetch_specification)

if l.recently_locked?(clock: clock)
:taken
else
l.update!(locked_by: process_uuid, locked_at: clock.now)
l
end
end
rescue ::ActiveRecord::Deadlocked
:deadlocked
rescue ::ActiveRecord::LockWaitTimeout
:lock_timeout
end

def refresh(clock:)
transaction do
current_process_uuid = locked_by
lock!
if locked_by == current_process_uuid
update!(locked_at: clock.now)
:ok
else
:stolen
end
end
rescue ::ActiveRecord::Deadlocked
:deadlocked
rescue ::ActiveRecord::LockWaitTimeout
:lock_timeout
end

def self.release(fetch_specification, process_uuid)
transaction do
l = get_lock_record(fetch_specification)
if !l.locked_by?(process_uuid)
:not_taken_by_this_process
else
l.update!(locked_by: nil, locked_at: nil)
:ok
end
end
rescue ::ActiveRecord::Deadlocked
:deadlocked
rescue ::ActiveRecord::LockWaitTimeout
:lock_timeout
end

def locked_by?(process_uuid)
locked_by.eql?(process_uuid)
end

def recently_locked?(clock:)
locked_by && locked_at > RECENTLY_LOCKED_DURATION.ago(clock.now)
end

def fetch_specification
FetchSpecification.new(format, split_key)
end

private

def self.lock_for_split_key(fetch_specification)
lock.find_by(format: fetch_specification.message_format, split_key: fetch_specification.split_key)
end

def self.get_lock_record(fetch_specification)
l = lock_for_split_key(fetch_specification)
if l.nil?
begin
l = create!(format: fetch_specification.message_format, split_key: fetch_specification.split_key)
rescue ::ActiveRecord::RecordNotUnique
l = lock_for_split_key(fetch_specification)
end
end
l
end
end

def self.build_for_consumer(database_url)
::ActiveRecord::Base.establish_connection(database_url) unless ::ActiveRecord::Base.connected?
if ::ActiveRecord::Base.connection.adapter_name == "Mysql2"
::ActiveRecord::Base.connection.execute("SET SESSION innodb_lock_wait_timeout = 1;")
end
new
end

def insert_record(format, split_key, payload)
Record.create!(format: format, split_key: split_key, payload: payload)
end

def retrieve_batch(fetch_specification, batch_size)
Record.remaining_for(fetch_specification).order("id ASC").limit(batch_size).to_a
end

def get_remaining_count(fetch_specification)
Record.remaining_for(fetch_specification).count
end

def obtain_lock_for_process(fetch_specification, process_uuid, clock:)
Lock.obtain(fetch_specification, process_uuid, clock: clock)
end

def release_lock_for_process(fetch_specification, process_uuid)
Lock.release(fetch_specification, process_uuid)
end

def mark_as_enqueued(record, now)
record.update_column(:enqueued_at, now)
end

def delete_enqueued_older_than(fetch_specification, duration, limit)
scope = Record.for_fetch_specification(fetch_specification).where("enqueued_at < ?", duration.ago)
scope = scope.limit(limit).order(:id) unless limit == :all
scope.delete_all
:ok
rescue ::ActiveRecord::Deadlocked
:deadlocked
rescue ::ActiveRecord::LockWaitTimeout
:lock_timeout
end
end
end
end
end

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

require "sidekiq"
require_relative "sidekiq5_format"
require_relative "repository"
require_relative "repositories/mysql57"

module RubyEventStore
module Outbox
class SidekiqProducer
def initialize(repository = Repository.new)
def initialize(repository = Repositories::Mysql57.new)
@repository = repository
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
module RubyEventStore
module Outbox
class SidekiqScheduler
def initialize(repository: Repository.new, serializer: RubyEventStore::Serializers::YAML)
def initialize(repository: Repositories::Mysql57.new, serializer: RubyEventStore::Serializers::YAML)
@serializer = serializer
@sidekiq_producer = SidekiqProducer.new(repository)
end
Expand Down
Loading

0 comments on commit 82e868c

Please sign in to comment.