diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 21b023bc..32dd5acf 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -24,18 +24,17 @@ jobs: fail-fast: false matrix: ruby-version: - - 3.1.6 - - 3.2.0 - - 3.2.4 - - 3.3.0 - - 3.3.1 - - 3.3.2 - - 3.3.4 - - 3.3.5 - - 3.3.6 - - 3.4.0 - - 3.4.1 + - 3.1 + - 3.2 + - 3.3 + - 3.4 database: [ mysql, postgres, sqlite ] + gemfile: [ rails_7_1, rails_7_2, rails_8_0, rails_main ] + exclude: + - ruby-version: "3.1" + gemfile: rails_8_0 + - ruby-version: "3.1" + gemfile: rails_main services: mysql: image: mysql:8.0.31 @@ -52,6 +51,7 @@ jobs: - 55432:5432 env: TARGET_DB: ${{ matrix.database }} + BUNDLE_GEMFILE: ${{ github.workspace }}/gemfiles/${{ matrix.gemfile }}.gemfile steps: - name: Checkout code uses: actions/checkout@v4 @@ -60,6 +60,9 @@ jobs: with: ruby-version: ${{ matrix.ruby-version }} bundler-cache: true + - name: Update to latest Rails + run: | + bundle update railties - name: Setup test database run: | bin/rails db:setup diff --git a/.gitignore b/.gitignore index 3cbf5eae..86382d5a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ /.bundle/ /doc/ +/gemfiles/*.lock /log/*.log /pkg/ /tmp/ diff --git a/Appraisals b/Appraisals new file mode 100644 index 00000000..24860528 --- /dev/null +++ b/Appraisals @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +appraise "rails-7-1" do + # rdoc 6.14 is not compatible with Ruby 3.1 + gem 'rdoc', '6.13' + gem "railties", "~> 7.1.0" +end + +appraise "rails-7-2" do + gem 'rdoc', '6.13' + gem "railties", "~> 7.2.0" +end + +appraise "rails-8-0" do + gem "railties", "~> 8.0.0" +end + +appraise "rails-main" do + gem "railties", github: "rails/rails", branch: "main" +end diff --git a/Gemfile.lock b/Gemfile.lock index 42b81dcf..c97dae35 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - solid_queue (1.1.5) + solid_queue (1.2.0) activejob (>= 7.1) activerecord (>= 7.1) concurrent-ruby (>= 1.3.1) @@ -50,6 +50,10 @@ GEM mutex_m securerandom (>= 0.3) tzinfo (~> 2.0) + appraisal (2.5.0) + bundler + rake + thor (>= 0.14.0) ast (2.4.2) base64 (0.2.0) benchmark (0.4.0) @@ -189,6 +193,7 @@ PLATFORMS x86_64-linux DEPENDENCIES + appraisal debug (~> 1.9) logger mocha diff --git a/README.md b/README.md index c7a2ef2c..0af80429 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL or SQLite, - [Installation](#installation) - [Usage in development and other non-production environments](#usage-in-development-and-other-non-production-environments) - [Single database configuration](#single-database-configuration) + - [Dashboard UI Setup](#dashboard-ui-setup) - [Incremental adoption](#incremental-adoption) - [High performance requirements](#high-performance-requirements) - [Configuration](#configuration) @@ -23,6 +24,7 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL or SQLite, - [Lifecycle hooks](#lifecycle-hooks) - [Errors when enqueuing](#errors-when-enqueuing) - [Concurrency controls](#concurrency-controls) + - [Performance considerations](#performance-considerations) - [Failed jobs and retries](#failed-jobs-and-retries) - [Error reporting on jobs](#error-reporting-on-jobs) - [Puma plugin](#puma-plugin) @@ -156,6 +158,10 @@ Running Solid Queue in a separate database is recommended, but it's also possibl You won't have multiple databases, so `database.yml` doesn't need to have primary and queue database. +### Dashboard ui setup + +For viewing information about your jobs via a UI, we recommend taking a look at [mission_control-jobs](https://github.com/rails/mission_control-jobs), a dashboard where, among other things, you can examine and retry/discard failed jobs. + ### Incremental adoption If you're planning to adopt Solid Queue incrementally by switching one job at the time, you can do so by leaving the `config.active_job.queue_adapter` set to your old backend, and then set the `queue_adapter` directly in the jobs you're moving: @@ -192,6 +198,8 @@ By default, Solid Queue will try to find your configuration under `config/queue. bin/jobs -c config/calendar.yml ``` +You can also skip all recurring tasks by setting the environment variable `SOLID_QUEUE_SKIP_RECURRING=true`. This is useful for environments like staging, review apps, or development where you don't want any recurring jobs to run. This is equivalent to using the `--skip-recurring` option with `bin/jobs`. + This is what this configuration looks like: ```yml @@ -365,7 +373,7 @@ There are several settings that control how Solid Queue works that you can set a - `silence_polling`: whether to silence Active Record logs emitted when polling for both workers and dispatchers—defaults to `true`. - `supervisor_pidfile`: path to a pidfile that the supervisor will create when booting to prevent running more than one supervisor in the same host, or in case you want to use it for a health check. It's `nil` by default. - `preserve_finished_jobs`: whether to keep finished jobs in the `solid_queue_jobs` table—defaults to `true`. -- `clear_finished_jobs_after`: period to keep finished jobs around, in case `preserve_finished_jobs` is true—defaults to 1 day. **Note:** Right now, there's no automatic cleanup of finished jobs. You'd need to do this by periodically invoking `SolidQueue::Job.clear_finished_in_batches`, which can be configured as [a recurring task](#recurring-tasks). +- `clear_finished_jobs_after`: period to keep finished jobs around, in case `preserve_finished_jobs` is true — defaults to 1 day. When installing Solid Queue, [a recurring job](#recurring-tasks) is automatically configured to clear finished jobs every hour on the 12th minute in batches. You can edit the `recurring.yml` configuration to change this as you see fit. - `default_concurrency_control_period`: the value to be used as the default for the `duration` parameter in [concurrency controls](#concurrency-controls). It defaults to 3 minutes. @@ -425,11 +433,11 @@ In the case of recurring tasks, if such error is raised when enqueuing the job c ## Concurrency controls -Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Jobs are never discarded or lost, only blocked. +Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, by default, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Alternatively, jobs can be configured to be discarded instead of blocked. This means that if a job with certain arguments has already been enqueued, other jobs with the same characteristics (in the same concurrency _class_) won't be enqueued. ```ruby class MyJob < ApplicationJob - limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group + limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group, on_conflict: on_conflict_behaviour # ... ``` @@ -437,10 +445,19 @@ class MyJob < ApplicationJob - `to` is `1` by default. - `duration` is set to `SolidQueue.default_concurrency_control_period` by default, which itself defaults to `3 minutes`, but that you can configure as well. - `group` is used to control the concurrency of different job classes together. It defaults to the job class name. +- `on_conflict` controls behaviour when enqueuing a job that conflicts with the concurrency limits configured. It can be set to one of the following: + - (default) `:block`: the job is blocked and is dispatched when another job completes and unblocks it, or when the duration expires. + - `:discard`: the job is discarded. When you choose this option, bear in mind that if a job runs and fails to remove the concurrency lock (or _semaphore_, read below to know more about this), all jobs conflicting with it will be discarded up to the interval defined by `duration` has elapsed. When a job includes these controls, we'll ensure that, at most, the number of jobs (indicated as `to`) that yield the same `key` will be performed concurrently, and this guarantee will last for `duration` for each job enqueued. Note that there's no guarantee about _the order of execution_, only about jobs being performed at the same time (overlapping). -The concurrency limits use the concept of semaphores when enqueuing, and work as follows: when a job is enqueued, we check if it specifies concurrency controls. If it does, we check the semaphore for the computed concurrency key. If the semaphore is open, we claim it and we set the job as _ready_. Ready means it can be picked up by workers for execution. When the job finishes executing (be it successfully or unsuccessfully, resulting in a failed execution), we signal the semaphore and try to unblock the next job with the same key, if any. Unblocking the next job doesn't mean running that job right away, but moving it from _blocked_ to _ready_. Since something can happen that prevents the first job from releasing the semaphore and unblocking the next job (for example, someone pulling a plug in the machine where the worker is running), we have the `duration` as a failsafe. Jobs that have been blocked for more than duration are candidates to be released, but only as many of them as the concurrency rules allow, as each one would need to go through the semaphore dance check. This means that the `duration` is not really about the job that's enqueued or being run, it's about the jobs that are blocked waiting. It's important to note that after one or more candidate jobs are unblocked (either because a job finishes or because `duration` expires and a semaphore is released), the `duration` timer for the still blocked jobs is reset. This happens indirectly via the expiration time of the semaphore, which is updated. +The concurrency limits use the concept of semaphores when enqueuing, and work as follows: when a job is enqueued, we check if it specifies concurrency controls. If it does, we check the semaphore for the computed concurrency key. If the semaphore is open, we claim it and we set the job as _ready_. Ready means it can be picked up by workers for execution. When the job finishes executing (be it successfully or unsuccessfully, resulting in a failed execution), we signal the semaphore and try to unblock the next job with the same key, if any. Unblocking the next job doesn't mean running that job right away, but moving it from _blocked_ to _ready_. If you're using the `discard` behaviour for `on_conflict`, jobs enqueued while the semaphore is closed will be discarded. + +Since something can happen that prevents the first job from releasing the semaphore and unblocking the next job (for example, someone pulling a plug in the machine where the worker is running), we have the `duration` as a failsafe. Jobs that have been blocked for more than `duration` are candidates to be released, but only as many of them as the concurrency rules allow, as each one would need to go through the semaphore dance check. This means that the `duration` is not really about the job that's enqueued or being run, it's about the jobs that are blocked waiting, or about the jobs that would get discarded while the semaphore is closed. + +It's important to note that after one or more candidate jobs are unblocked (either because a job finishes or because `duration` expires and a semaphore is released), the `duration` timer for the still blocked jobs is reset. This happens indirectly via the expiration time of the semaphore, which is updated. + +When using `discard` as the behaviour to handle conflicts, you might have jobs discarded for up to the `duration` interval if something happens and a running job fails to release the semaphore. For example: @@ -475,10 +492,38 @@ In this case, if we have a `Box::MovePostingsByContactToDesignatedBoxJob` job en Note that the `duration` setting depends indirectly on the value for `concurrency_maintenance_interval` that you set for your dispatcher(s), as that'd be the frequency with which blocked jobs are checked and unblocked (at which point, only one job per concurrency key, at most, is unblocked). In general, you should set `duration` in a way that all your jobs would finish well under that duration and think of the concurrency maintenance task as a failsafe in case something goes wrong. -Jobs are unblocked in order of priority but queue order is not taken into account for unblocking jobs. That means that if you have a group of jobs that share a concurrency group but are in different queues, or jobs of the same class that you enqueue in different queues, the queue order you set for a worker is not taken into account when unblocking blocked ones. The reason is that a job that runs unblocks the next one, and the job itself doesn't know about a particular worker's queue order (you could even have different workers with different queue orders), it can only know about priority. Once blocked jobs are unblocked and available for polling, they'll be picked up by a worker following its queue order. +Jobs are unblocked in order of priority but **queue order is not taken into account for unblocking jobs**. That means that if you have a group of jobs that share a concurrency group but are in different queues, or jobs of the same class that you enqueue in different queues, the queue order you set for a worker is not taken into account when unblocking blocked ones. The reason is that a job that runs unblocks the next one, and the job itself doesn't know about a particular worker's queue order (you could even have different workers with different queue orders), it can only know about priority. Once blocked jobs are unblocked and available for polling, they'll be picked up by a worker following its queue order. Finally, failed jobs that are automatically or manually retried work in the same way as new jobs that get enqueued: they get in the queue for getting an open semaphore, and whenever they get it, they'll be run. It doesn't matter if they had already gotten an open semaphore in the past. +### Performance considerations + +Concurrency controls introduce significant overhead (blocked executions need to be created and promoted to ready, semaphores need to be created and updated) so you should consider carefully whether you need them. For throttling purposes, where you plan to have `limit` significantly larger than 1, I'd encourage relying on a limited number of workers per queue instead. For example: + +```ruby +class ThrottledJob < ApplicationJob + queue_as :throttled +``` + +```yml +production: + workers: + - queues: throttled + threads: 1 + polling_interval: 1 + - queues: default + threads: 5 + polling_interval: 0.1 + processes: 3 +``` + +Or something similar to that depending on your setup. You can also assign a different queue to a job on the moment of enqueuing so you can decide whether to enqueue a job in the throttled queue or another queue depending on the arguments, or pass a block to `queue_as` as explained [here](https://guides.rubyonrails.org/active_job_basics.html#queues). + + +In addition, mixing concurrency controls with **bulk enqueuing** (Active Job's `perform_all_later`) is not a good idea because concurrency controlled job needs to be enqueued one by one to ensure concurrency limits are respected, so you lose all the benefits of bulk enqueuing. + +When jobs that have concurrency controls and `on_conflict: :discard` are enqueued in bulk, the ones that fail to be enqueued and are discarded would have `successfully_enqueued` set to `false`. The total count of jobs enqueued returned by `perform_all_later` will exclude these jobs as expected. + ## Failed jobs and retries Solid Queue doesn't include any automatic retry mechanism, it [relies on Active Job for this](https://edgeguides.rubyonrails.org/active_job_basics.html#retrying-or-discarding-failed-jobs). Jobs that fail will be kept in the system, and a _failed execution_ (a record in the `solid_queue_failed_executions` table) will be created for these. The job will stay there until manually discarded or re-enqueued. You can do this in a console as: @@ -490,8 +535,6 @@ failed_execution.retry # This will re-enqueue the job as if it was enqueued for failed_execution.discard # This will delete the job from the system ``` -However, we recommend taking a look at [mission_control-jobs](https://github.com/rails/mission_control-jobs), a dashboard where, among other things, you can examine and retry/discard failed jobs. - ### Error reporting on jobs Some error tracking services that integrate with Rails, such as Sentry or Rollbar, hook into [Active Job](https://guides.rubyonrails.org/active_job_basics.html#exceptions) and automatically report not handled errors that happen during job execution. However, if your error tracking system doesn't, or if you need some custom reporting, you can hook into Active Job yourself. A possible way of doing this would be: @@ -533,6 +576,7 @@ plugin :solid_queue if ENV["SOLID_QUEUE_IN_PUMA"] ``` that you set in production only. This is what Rails 8's default Puma config looks like. Otherwise, if you're using Puma in development but not Solid Queue, starting Puma would start also Solid Queue supervisor and it'll most likely fail because it won't be properly configured. +**Note**: phased restarts are not supported currently because the plugin requires [app preloading](https://github.com/puma/puma?tab=readme-ov-file#cluster-mode) to work. ## Jobs and transactional integrity :warning: Having your jobs in the same ACID-compliant database as your application data enables a powerful yet sharp tool: taking advantage of transactional integrity to ensure some action in your app is not committed unless your job is also committed and vice versa, and ensuring that your job won't be enqueued until the transaction within which you're enqueuing it is committed. This can be very powerful and useful, but it can also backfire if you base some of your logic on this behaviour, and in the future, you move to another active job backend, or if you simply move Solid Queue to its own database, and suddenly the behaviour changes under you. Because this can be quite tricky and many people shouldn't need to worry about it, by default Solid Queue is configured in a different database as the main app. @@ -571,6 +615,8 @@ Solid Queue supports defining recurring tasks that run at specific times in the bin/jobs --recurring_schedule_file=config/schedule.yml ``` +You can completely disable recurring tasks by setting the environment variable `SOLID_QUEUE_SKIP_RECURRING=true` or by using the `--skip-recurring` option with `bin/jobs`. + The configuration itself looks like this: ```yml diff --git a/app/models/solid_queue/job.rb b/app/models/solid_queue/job.rb index 8574c1ec..234e5eb4 100644 --- a/app/models/solid_queue/job.rb +++ b/app/models/solid_queue/job.rb @@ -29,7 +29,7 @@ def enqueue(active_job, scheduled_at: Time.current) active_job.scheduled_at = scheduled_at create_from_active_job(active_job).tap do |enqueued_job| - active_job.provider_job_id = enqueued_job.id + active_job.provider_job_id = enqueued_job.id if enqueued_job.persisted? end end @@ -49,7 +49,7 @@ def create_from_active_job(active_job) def create_all_from_active_jobs(active_jobs) job_rows = active_jobs.map { |job| attributes_from_active_job(job) } insert_all(job_rows) - where(active_job_id: active_jobs.map(&:job_id)) + where(active_job_id: active_jobs.map(&:job_id)).order(id: :asc) end def attributes_from_active_job(active_job) diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 6ae12e28..b7410b08 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -34,6 +34,10 @@ def blocked? end private + def concurrency_on_conflict + job_class.concurrency_on_conflict.to_s.inquiry + end + def acquire_concurrency_lock return true unless concurrency_limited? @@ -46,6 +50,14 @@ def release_concurrency_lock Semaphore.signal(self) end + def handle_concurrency_conflict + if concurrency_on_conflict.discard? + destroy + else + block + end + end + def block BlockedExecution.create_or_find_by!(job_id: id) end diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index e2146a67..b0a4cb93 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -67,7 +67,7 @@ def prepare_for_execution def dispatch if acquire_concurrency_lock then ready else - block + handle_concurrency_conflict end end diff --git a/gemfiles/rails_7_1.gemfile b/gemfiles/rails_7_1.gemfile new file mode 100644 index 00000000..fcb9a654 --- /dev/null +++ b/gemfiles/rails_7_1.gemfile @@ -0,0 +1,8 @@ +# This file was generated by Appraisal + +source "https://rubygems.org" + +gem "rdoc", "6.13" +gem "railties", "~> 7.1.0" + +gemspec path: "../" diff --git a/gemfiles/rails_7_2.gemfile b/gemfiles/rails_7_2.gemfile new file mode 100644 index 00000000..bfd04992 --- /dev/null +++ b/gemfiles/rails_7_2.gemfile @@ -0,0 +1,8 @@ +# This file was generated by Appraisal + +source "https://rubygems.org" + +gem "rdoc", "6.13" +gem "railties", "~> 7.2.0" + +gemspec path: "../" diff --git a/gemfiles/rails_8_0.gemfile b/gemfiles/rails_8_0.gemfile new file mode 100644 index 00000000..28bd9f00 --- /dev/null +++ b/gemfiles/rails_8_0.gemfile @@ -0,0 +1,7 @@ +# This file was generated by Appraisal + +source "https://rubygems.org" + +gem "railties", "~> 8.0.0" + +gemspec path: "../" diff --git a/gemfiles/rails_main.gemfile b/gemfiles/rails_main.gemfile new file mode 100644 index 00000000..53b80cd9 --- /dev/null +++ b/gemfiles/rails_main.gemfile @@ -0,0 +1,7 @@ +# This file was generated by Appraisal + +source "https://rubygems.org" + +gem "railties", branch: "main", git: "https://github.com/rails/rails.git" + +gemspec path: "../" diff --git a/lib/active_job/concurrency_controls.rb b/lib/active_job/concurrency_controls.rb index 0ea290f6..47b0f5ee 100644 --- a/lib/active_job/concurrency_controls.rb +++ b/lib/active_job/concurrency_controls.rb @@ -5,6 +5,7 @@ module ConcurrencyControls extend ActiveSupport::Concern DEFAULT_CONCURRENCY_GROUP = ->(*) { self.class.name } + CONCURRENCY_ON_CONFLICT_BEHAVIOUR = %i[ block discard ] included do class_attribute :concurrency_key, instance_accessor: false @@ -12,14 +13,16 @@ module ConcurrencyControls class_attribute :concurrency_limit class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period + class_attribute :concurrency_on_conflict, default: :block end class_methods do - def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period) + def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_conflict: :block) self.concurrency_key = key self.concurrency_limit = to self.concurrency_group = group self.concurrency_duration = duration + self.concurrency_on_conflict = on_conflict.presence_in(CONCURRENCY_ON_CONFLICT_BEHAVIOUR) || :block end end diff --git a/lib/active_job/queue_adapters/solid_queue_adapter.rb b/lib/active_job/queue_adapters/solid_queue_adapter.rb index d3042194..fe556042 100644 --- a/lib/active_job/queue_adapters/solid_queue_adapter.rb +++ b/lib/active_job/queue_adapters/solid_queue_adapter.rb @@ -7,7 +7,10 @@ module QueueAdapters # To use it set the queue_adapter config to +:solid_queue+. # # Rails.application.config.active_job.queue_adapter = :solid_queue - class SolidQueueAdapter + class SolidQueueAdapter < (Rails::VERSION::MAJOR == 7 && Rails::VERSION::MINOR == 1 ? Object : AbstractAdapter) + class_attribute :stopping, default: false, instance_writer: false + SolidQueue.on_worker_stop { self.stopping = true } + def enqueue_after_transaction_commit? true end diff --git a/lib/generators/solid_queue/install/templates/config/recurring.yml b/lib/generators/solid_queue/install/templates/config/recurring.yml index d045b191..b4207f9b 100644 --- a/lib/generators/solid_queue/install/templates/config/recurring.yml +++ b/lib/generators/solid_queue/install/templates/config/recurring.yml @@ -1,10 +1,15 @@ -# production: +# examples: # periodic_cleanup: # class: CleanSoftDeletedRecordsJob # queue: background # args: [ 1000, { batch_size: 500 } ] # schedule: every hour -# periodic_command: +# periodic_cleanup_with_command: # command: "SoftDeletedRecord.due.delete_all" # priority: 2 # schedule: at 5am every day + +production: + clear_solid_queue_finished_jobs: + command: "SolidQueue::Job.clear_finished_in_batches(sleep_between_batches: 0.3)" + schedule: every hour at minute 12 diff --git a/lib/solid_queue/cli.rb b/lib/solid_queue/cli.rb index 33d2588b..404c7e72 100644 --- a/lib/solid_queue/cli.rb +++ b/lib/solid_queue/cli.rb @@ -13,7 +13,8 @@ class Cli < Thor banner: "SOLID_QUEUE_RECURRING_SCHEDULE" class_option :skip_recurring, type: :boolean, default: false, - desc: "Whether to skip recurring tasks scheduling" + desc: "Whether to skip recurring tasks scheduling", + banner: "SOLID_QUEUE_SKIP_RECURRING" def self.exit_on_failure? true diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index ba13f0f4..a002b41d 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -88,7 +88,7 @@ def default_options recurring_schedule_file: Rails.root.join(ENV["SOLID_QUEUE_RECURRING_SCHEDULE"] || DEFAULT_RECURRING_SCHEDULE_FILE_PATH), only_work: false, only_dispatch: false, - skip_recurring: false + skip_recurring: ActiveModel::Type::Boolean.new.cast(ENV["SOLID_QUEUE_SKIP_RECURRING"]) } end diff --git a/lib/solid_queue/processes/base.rb b/lib/solid_queue/processes/base.rb index 6069a90d..8eec3ceb 100644 --- a/lib/solid_queue/processes/base.rb +++ b/lib/solid_queue/processes/base.rb @@ -4,7 +4,8 @@ module SolidQueue module Processes class Base include Callbacks # Defines callbacks needed by other concerns - include AppExecutor, Registrable, Interruptible, Procline + include AppExecutor, Registrable, Procline + prepend Interruptible attr_reader :name diff --git a/lib/solid_queue/processes/interruptible.rb b/lib/solid_queue/processes/interruptible.rb index 67173aeb..35acdc5b 100644 --- a/lib/solid_queue/processes/interruptible.rb +++ b/lib/solid_queue/processes/interruptible.rb @@ -2,6 +2,11 @@ module SolidQueue::Processes module Interruptible + def initialize(...) + super + @self_pipe = create_self_pipe + end + def wake_up interrupt end @@ -9,6 +14,8 @@ def wake_up private SELF_PIPE_BLOCK_SIZE = 11 + attr_reader :self_pipe + def interrupt self_pipe[:writer].write_nonblock(".") rescue Errno::EAGAIN, Errno::EINTR @@ -21,14 +28,10 @@ def interruptible_sleep(time) if time > 0 && self_pipe[:reader].wait_readable(time) loop { self_pipe[:reader].read_nonblock(SELF_PIPE_BLOCK_SIZE) } end - rescue Errno::EAGAIN, Errno::EINTR + rescue Errno::EAGAIN, Errno::EINTR, IO::EWOULDBLOCKWaitReadable end # Self-pipe for signal-handling (http://cr.yp.to/docs/selfpipe.html) - def self_pipe - @self_pipe ||= create_self_pipe - end - def create_self_pipe reader, writer = IO.pipe { reader: reader, writer: writer } diff --git a/lib/solid_queue/processes/registrable.rb b/lib/solid_queue/processes/registrable.rb index 58cabfa8..2cc9036d 100644 --- a/lib/solid_queue/processes/registrable.rb +++ b/lib/solid_queue/processes/registrable.rb @@ -7,8 +7,7 @@ module Registrable included do after_boot :register, :launch_heartbeat - before_shutdown :stop_heartbeat - after_shutdown :deregister + after_shutdown :stop_heartbeat, :deregister end def process_id diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index f2207691..7d010593 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -172,8 +172,11 @@ def replace_fork(pid, status) end end + # When a supervised fork crashes or exits we need to mark all the + # executions it had claimed as failed so that they can be retried + # by some other worker. def handle_claimed_jobs_by(terminated_fork, status) - if registered_process = process.supervisees.find_by(name: terminated_fork.name) + if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name) error = Processes::ProcessExitError.new(status) registered_process.fail_all_claimed_executions_with(error) end diff --git a/lib/solid_queue/version.rb b/lib/solid_queue/version.rb index 56f8034a..3de06d85 100644 --- a/lib/solid_queue/version.rb +++ b/lib/solid_queue/version.rb @@ -1,3 +1,3 @@ module SolidQueue - VERSION = "1.1.5" + VERSION = "1.2.0" end diff --git a/solid_queue.gemspec b/solid_queue.gemspec index 17242ff9..5ffd1239 100644 --- a/solid_queue.gemspec +++ b/solid_queue.gemspec @@ -32,6 +32,7 @@ Gem::Specification.new do |spec| spec.add_dependency "fugit", "~> 1.11.0" spec.add_dependency "thor", "~> 1.3.1" + spec.add_development_dependency "appraisal" spec.add_development_dependency "debug", "~> 1.9" spec.add_development_dependency "mocha" spec.add_development_dependency "puma" diff --git a/test/dummy/app/jobs/continuable_job.rb b/test/dummy/app/jobs/continuable_job.rb new file mode 100644 index 00000000..00fdfdd2 --- /dev/null +++ b/test/dummy/app/jobs/continuable_job.rb @@ -0,0 +1,22 @@ +begin + require "active_job/continuation" +rescue LoadError + # Zeitwerk requires that we define the constant + class ContinuableJob; end + return +end + +class ContinuableJob < ApplicationJob + include ActiveJob::Continuable + + def perform(result, pause: 0) + step :step_one do + sleep pause if pause > 0 + result.update!(queue_name: queue_name, status: "stepped", value: "step_one") + end + step :step_two do + sleep pause if pause > 0 + result.update!(queue_name: queue_name, status: "stepped", value: "step_two") + end + end +end diff --git a/test/dummy/app/jobs/discardable_update_result_job.rb b/test/dummy/app/jobs/discardable_update_result_job.rb new file mode 100644 index 00000000..74f46fbd --- /dev/null +++ b/test/dummy/app/jobs/discardable_update_result_job.rb @@ -0,0 +1,3 @@ +class DiscardableUpdateResultJob < UpdateResultJob + limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard +end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index dbce706d..f0984078 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -180,6 +180,9 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end test "verify transactions remain valid after Job creation conflicts via limits_concurrency" do + # Doesn't work with enqueue_after_transaction_commit? true on SolidQueueAdapter, but only Rails 7.2 uses this + skip if Rails::VERSION::MAJOR == 7 && Rails::VERSION::MINOR == 2 + ActiveRecord::Base.transaction do SequentialUpdateResultJob.perform_later(@result, name: "A", pause: 0.2.seconds) SequentialUpdateResultJob.perform_later(@result, name: "B") @@ -193,6 +196,59 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end end + test "discard jobs when concurrency limit is reached with on_conflict: :discard" do + job1 = DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 3) + # should be discarded due to concurrency limit + job2 = DiscardableUpdateResultJob.perform_later(@result, name: "2") + # should also be discarded + job3 = DiscardableUpdateResultJob.perform_later(@result, name: "3") + + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs + + # Only the first job did something + assert_stored_sequence(@result, [ "1" ]) + + # All jobs have finished and have no blocked executions + jobs = SolidQueue::Job.where(active_job_id: [ job1, job2, job3 ].map(&:job_id)) + assert_equal 1, jobs.count + + assert_equal job1.provider_job_id, jobs.first.id + assert_nil job2.provider_job_id + assert_nil job3.provider_job_id + end + + test "discard on conflict across different concurrency keys" do + another_result = JobResult.create!(queue_name: "default", status: "seq: ") + DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 0.2) + DiscardableUpdateResultJob.perform_later(another_result, name: "2", pause: 0.2) + DiscardableUpdateResultJob.perform_later(@result, name: "3") # Should be discarded + DiscardableUpdateResultJob.perform_later(another_result, name: "4") # Should be discarded + + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs + + # Only the first 2 jobs did something + assert_stored_sequence(@result, [ "1" ]) + assert_stored_sequence(another_result, [ "2" ]) + end + + test "discard on conflict and release semaphore" do + DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 0.1) + # will be discarded + DiscardableUpdateResultJob.perform_later(@result, name: "2") + + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs + + # Enqueue another job that shouldn't be discarded or blocked + DiscardableUpdateResultJob.perform_later(@result, name: "3") + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs + + assert_stored_sequence(@result, [ "1", "3" ]) + end + private def assert_stored_sequence(result, *sequences) expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}" }.join } diff --git a/test/integration/continuation_test.rb b/test/integration/continuation_test.rb new file mode 100644 index 00000000..8d5d863e --- /dev/null +++ b/test/integration/continuation_test.rb @@ -0,0 +1,65 @@ +# frozen_string_literal: true + +require "test_helper" + +begin + require "active_job/continuation" +rescue LoadError + return +end + +class ContinuationTest < ActiveSupport::TestCase + self.use_transactional_tests = false + + def setup + start_processes + @result = JobResult.create! + end + + teardown do + terminate_process(@pid) if process_exists?(@pid) + end + + test "continuable job completes" do + ContinuableJob.perform_later(@result) + + wait_for_jobs_to_finish_for(5.seconds) + + assert_no_unfinished_jobs + assert_last_step :step_two + end + + test "continuable job can be interrupted and resumed" do + job = ContinuableJob.perform_later(@result, pause: 0.5.seconds) + + sleep 0.2.seconds + signal_process(@pid, :TERM) + + wait_for_jobs_to_be_released_for(2.seconds) + + assert_no_claimed_jobs + assert_unfinished_jobs job + assert_last_step :step_one + + ActiveJob::QueueAdapters::SolidQueueAdapter.stopping = false + start_processes + wait_for_jobs_to_finish_for(10.seconds) + + assert_no_unfinished_jobs + assert_last_step :step_two + end + + private + def assert_last_step(step) + @result.reload + assert_equal "stepped", @result.status + assert_equal step.to_s, @result.value + end + + def start_processes + default_worker = { queues: "default", polling_interval: 0.1, processes: 3, threads: 2 } + dispatcher = { polling_interval: 0.1, batch_size: 200, concurrency_maintenance_interval: 1 } + @pid = run_supervisor_as_fork(workers: [ default_worker ], dispatchers: [ dispatcher ]) + wait_for_registered_processes(5, timeout: 5.second) # 3 workers working the default queue + dispatcher + supervisor + end +end diff --git a/test/integration/lifecycle_hooks_test.rb b/test/integration/lifecycle_hooks_test.rb index b2fd50da..da7feedc 100644 --- a/test/integration/lifecycle_hooks_test.rb +++ b/test/integration/lifecycle_hooks_test.rb @@ -6,121 +6,140 @@ class LifecycleHooksTest < ActiveSupport::TestCase self.use_transactional_tests = false test "run lifecycle hooks" do - SolidQueue.on_start do |s| - name = s.class.name.demodulize.downcase - JobResult.create!(status: :hook_called, value: "#{name}_start") - end - - SolidQueue.on_stop do |s| - name = s.class.name.demodulize.downcase - JobResult.create!(status: :hook_called, value: "#{name}_stop") - end - - SolidQueue.on_exit do |s| - name = s.class.name.demodulize.downcase - JobResult.create!(status: :hook_called, value: "#{name}_exit") - end - - SolidQueue.on_worker_start do |w| - name = w.class.name.demodulize.downcase - queues = w.queues.join("_") - JobResult.create!(status: :hook_called, value: "#{name}_#{queues}_start") - end - - SolidQueue.on_worker_stop do |w| - name = w.class.name.demodulize.downcase - queues = w.queues.join("_") - JobResult.create!(status: :hook_called, value: "#{name}_#{queues}_stop") - end - - SolidQueue.on_worker_exit do |w| - name = w.class.name.demodulize.downcase - queues = w.queues.join("_") - JobResult.create!(status: :hook_called, value: "#{name}_#{queues}_exit") + resetting_hooks do + SolidQueue.on_start do |s| + name = s.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_start") + end + + SolidQueue.on_stop do |s| + name = s.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_stop") + end + + SolidQueue.on_exit do |s| + name = s.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_exit") + end + + SolidQueue.on_worker_start do |w| + name = w.class.name.demodulize.downcase + queues = w.queues.join("_") + JobResult.create!(status: :hook_called, value: "#{name}_#{queues}_start") + end + + SolidQueue.on_worker_stop do |w| + name = w.class.name.demodulize.downcase + queues = w.queues.join("_") + JobResult.create!(status: :hook_called, value: "#{name}_#{queues}_stop") + end + + SolidQueue.on_worker_exit do |w| + name = w.class.name.demodulize.downcase + queues = w.queues.join("_") + JobResult.create!(status: :hook_called, value: "#{name}_#{queues}_exit") + end + + SolidQueue.on_dispatcher_start do |d| + name = d.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_#{d.batch_size}_start") + end + + SolidQueue.on_dispatcher_stop do |d| + name = d.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_#{d.batch_size}_stop") + end + + SolidQueue.on_dispatcher_exit do |d| + name = d.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_#{d.batch_size}_exit") + end + + SolidQueue.on_scheduler_start do |s| + name = s.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_start") + end + + SolidQueue.on_scheduler_stop do |s| + name = s.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_stop") + end + + SolidQueue.on_scheduler_exit do |s| + name = s.class.name.demodulize.downcase + JobResult.create!(status: :hook_called, value: "#{name}_exit") + end + + pid = run_supervisor_as_fork( + workers: [ { queues: "first_queue" }, { queues: "second_queue", processes: 1 } ], + dispatchers: [ { batch_size: 100 } ], + skip_recurring: false + ) + + wait_for_registered_processes(5) + + terminate_process(pid) + wait_for_registered_processes(0) + + + results = skip_active_record_query_cache do + JobResult.where(status: :hook_called) + end + + assert_equal %w[ + supervisor_start supervisor_stop supervisor_exit + worker_first_queue_start worker_first_queue_stop worker_first_queue_exit + worker_second_queue_start worker_second_queue_stop worker_second_queue_exit + dispatcher_100_start dispatcher_100_stop dispatcher_100_exit + scheduler_start scheduler_stop scheduler_exit + ].sort, results.map(&:value).sort + + assert_equal({ "hook_called" => 15 }, results.map(&:status).tally) end + end - SolidQueue.on_dispatcher_start do |d| - name = d.class.name.demodulize.downcase - JobResult.create!(status: :hook_called, value: "#{name}_#{d.batch_size}_start") - end + test "handle errors on lifecycle hooks" do + resetting_hooks do + previous_on_thread_error, SolidQueue.on_thread_error = SolidQueue.on_thread_error, ->(error) { JobResult.create!(status: :error, value: error.message) } + SolidQueue.on_start { raise RuntimeError, "everything is broken" } - SolidQueue.on_dispatcher_stop do |d| - name = d.class.name.demodulize.downcase - JobResult.create!(status: :hook_called, value: "#{name}_#{d.batch_size}_stop") - end + pid = run_supervisor_as_fork + wait_for_registered_processes(4) - SolidQueue.on_dispatcher_exit do |d| - name = d.class.name.demodulize.downcase - JobResult.create!(status: :hook_called, value: "#{name}_#{d.batch_size}_exit") - end + terminate_process(pid) + wait_for_registered_processes(0) - SolidQueue.on_scheduler_start do |s| - name = s.class.name.demodulize.downcase - JobResult.create!(status: :hook_called, value: "#{name}_start") - end + result = skip_active_record_query_cache { JobResult.last } - SolidQueue.on_scheduler_stop do |s| - name = s.class.name.demodulize.downcase - JobResult.create!(status: :hook_called, value: "#{name}_stop") + assert_equal "error", result.status + assert_equal "everything is broken", result.value end + end - SolidQueue.on_scheduler_exit do |s| - name = s.class.name.demodulize.downcase - JobResult.create!(status: :hook_called, value: "#{name}_exit") + private + def resetting_hooks + reset_hooks(SolidQueue::Supervisor) do + reset_hooks(SolidQueue::Worker) do + reset_hooks(SolidQueue::Dispatcher) do + reset_hooks(SolidQueue::Scheduler) do + yield + end + end + end + end end - pid = run_supervisor_as_fork( - workers: [ { queues: "first_queue" }, { queues: "second_queue", processes: 1 } ], - dispatchers: [ { batch_size: 100 } ], - skip_recurring: false - ) - - wait_for_registered_processes(5) - - terminate_process(pid) - wait_for_registered_processes(0) - - - results = skip_active_record_query_cache do - job_results = JobResult.where(status: :hook_called) - assert_equal 15, job_results.count - job_results + def reset_hooks(process) + exit_hooks = process.lifecycle_hooks[:exit] + start_hooks = process.lifecycle_hooks[:start] + stop_hooks = process.lifecycle_hooks[:stop] + process.lifecycle_hooks[:exit] = [] + process.lifecycle_hooks[:start] = [] + process.lifecycle_hooks[:stop] = [] + yield + ensure + process.lifecycle_hooks[:exit] = exit_hooks + process.lifecycle_hooks[:start] = start_hooks + process.lifecycle_hooks[:stop] = stop_hooks end - - assert_equal({ "hook_called" => 15 }, results.map(&:status).tally) - assert_equal %w[ - supervisor_start supervisor_stop supervisor_exit - worker_first_queue_start worker_first_queue_stop worker_first_queue_exit - worker_second_queue_start worker_second_queue_stop worker_second_queue_exit - dispatcher_100_start dispatcher_100_stop dispatcher_100_exit - scheduler_start scheduler_stop scheduler_exit - ].sort, results.map(&:value).sort - ensure - SolidQueue::Supervisor.clear_hooks - SolidQueue::Worker.clear_hooks - SolidQueue::Dispatcher.clear_hooks - SolidQueue::Scheduler.clear_hooks - end - - test "handle errors on lifecycle hooks" do - previous_on_thread_error, SolidQueue.on_thread_error = SolidQueue.on_thread_error, ->(error) { JobResult.create!(status: :error, value: error.message) } - SolidQueue.on_start { raise RuntimeError, "everything is broken" } - - pid = run_supervisor_as_fork - wait_for_registered_processes(4) - - terminate_process(pid) - wait_for_registered_processes(0) - - result = skip_active_record_query_cache { JobResult.last } - - assert_equal "error", result.status - assert_equal "everything is broken", result.value - ensure - SolidQueue.on_thread_error = previous_on_thread_error - SolidQueue::Supervisor.clear_hooks - SolidQueue::Worker.clear_hooks - SolidQueue::Dispatcher.clear_hooks - SolidQueue::Scheduler.clear_hooks - end end diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index 17a658d7..013e8511 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -10,6 +10,14 @@ def perform(job_result) end end + class DiscardableNonOverlappingJob < NonOverlappingJob + limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard + end + + class DiscardableThrottledJob < NonOverlappingJob + limits_concurrency to: 2, key: ->(job_result, **) { job_result }, on_conflict: :discard + end + class NonOverlappingGroupedJob1 < NonOverlappingJob limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup" end @@ -18,8 +26,19 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup" end + class DiscardableNonOverlappingGroupedJob1 < NonOverlappingJob + limits_concurrency key: ->(job_result, **) { job_result }, group: "DiscardingGroup", on_conflict: :discard + end + + class DiscardableNonOverlappingGroupedJob2 < NonOverlappingJob + limits_concurrency key: ->(job_result, **) { job_result }, group: "DiscardingGroup", on_conflict: :discard + end + setup do @result = JobResult.create!(queue_name: "default") + @discarded_concurrent_error = SolidQueue::Job::EnqueueError.new( + "Dispatched job discarded due to concurrent configuration." + ) end test "enqueue active job to be executed right away" do @@ -98,7 +117,27 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob assert_equal active_job.concurrency_key, job.concurrency_key end - test "enqueue jobs with concurrency controls in the same concurrency group" do + test "block jobs when concurrency limits are reached" do + assert_ready do + NonOverlappingJob.perform_later(@result, name: "A") + end + + assert_blocked do + NonOverlappingJob.perform_later(@result, name: "B") + end + + blocked_execution = SolidQueue::BlockedExecution.last + assert blocked_execution.expires_at <= SolidQueue.default_concurrency_control_period.from_now + end + + test "skips jobs with on_conflict set to discard when concurrency limits are reached" do + assert_job_counts(ready: 1) do + DiscardableNonOverlappingJob.perform_later(@result, name: "A") + DiscardableNonOverlappingJob.perform_later(@result, name: "B") + end + end + + test "block jobs in the same concurrency group when concurrency limits are reached" do assert_ready do active_job = NonOverlappingGroupedJob1.perform_later(@result, name: "A") assert_equal 1, active_job.concurrency_limit @@ -112,7 +151,38 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob end end - test "enqueue multiple jobs" do + test "skips jobs with on_conflict set to discard in the same concurrency group when concurrency limits are reached" do + assert_job_counts(ready: 1) do + active_job = DiscardableNonOverlappingGroupedJob1.perform_later(@result, name: "A") + assert_equal 1, active_job.concurrency_limit + assert_equal "DiscardingGroup/JobResult/#{@result.id}", active_job.concurrency_key + + active_job = DiscardableNonOverlappingGroupedJob2.perform_later(@result, name: "B") + assert_equal 1, active_job.concurrency_limit + assert_equal "DiscardingGroup/JobResult/#{@result.id}", active_job.concurrency_key + end + end + + test "enqueue scheduled job with concurrency controls and on_conflict set to discard" do + assert_ready do + DiscardableNonOverlappingJob.perform_later(@result, name: "A") + end + + assert_scheduled do + DiscardableNonOverlappingJob.set(wait: 5.minutes).perform_later(@result, name: "B") + end + + scheduled_job = SolidQueue::Job.last + + travel_to 10.minutes.from_now + + # The scheduled job is not enqueued because it conflicts with + # the first one and is discarded + assert_equal 0, SolidQueue::ScheduledExecution.dispatch_next_batch(10) + assert_nil SolidQueue::Job.find_by(id: scheduled_job.id) + end + + test "enqueue jobs in bulk" do active_jobs = [ AddToBufferJob.new(2), AddToBufferJob.new(6).set(wait: 2.minutes), @@ -134,17 +204,27 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob assert active_jobs.all?(&:successfully_enqueued?) end - test "block jobs when concurrency limits are reached" do - assert_ready do - NonOverlappingJob.perform_later(@result, name: "A") - end + test "enqueues jobs in bulk with concurrency controls and some set to discard" do + active_jobs = [ + AddToBufferJob.new(2), + DiscardableNonOverlappingJob.new(@result), + NonOverlappingJob.new(@result), + AddToBufferJob.new(6).set(wait: 2.minutes), + NonOverlappingJob.new(@result), + DiscardableNonOverlappingJob.new(@result) # this one won't be enqueued + ] + not_enqueued = active_jobs.last - assert_blocked do - NonOverlappingJob.perform_later(@result, name: "B") + assert_job_counts(ready: 3, scheduled: 1, blocked: 1) do + ActiveJob.perform_all_later(active_jobs) end - blocked_execution = SolidQueue::BlockedExecution.last - assert blocked_execution.expires_at <= SolidQueue.default_concurrency_control_period.from_now + jobs = SolidQueue::Job.last(5) + assert_equal active_jobs.without(not_enqueued).map(&:provider_job_id).sort, jobs.pluck(:id).sort + assert active_jobs.without(not_enqueued).all?(&:successfully_enqueued?) + + assert_nil not_enqueued.provider_job_id + assert_not not_enqueued.successfully_enqueued? end test "discard ready job" do @@ -249,8 +329,8 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob test "raise EnqueueError when there's an ActiveRecordError" do SolidQueue::Job.stubs(:create!).raises(ActiveRecord::Deadlocked) - active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test") assert_raises SolidQueue::Job::EnqueueError do + active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test") SolidQueue::Job.enqueue(active_job) end @@ -260,6 +340,8 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob end test "enqueue successfully inside a rolled-back transaction in the app DB" do + # Doesn't work with enqueue_after_transaction_commit? true on SolidQueueAdapter, but only Rails 7.2 uses this + skip if Rails::VERSION::MAJOR == 7 && Rails::VERSION::MINOR == 2 assert_difference -> { SolidQueue::Job.count } do assert_no_difference -> { JobResult.count } do JobResult.transaction do @@ -282,6 +364,7 @@ def assert_ready(&block) def assert_scheduled(&block) assert_job_counts(scheduled: 1, &block) + assert SolidQueue::Job.last.scheduled? end def assert_blocked(&block) diff --git a/test/test_helper.rb b/test/test_helper.rb index f54b73f2..7c1c8792 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -33,6 +33,7 @@ class ActiveSupport::TestCase setup do @_on_thread_error = SolidQueue.on_thread_error SolidQueue.on_thread_error = silent_on_thread_error_for(ExpectedTestError, @_on_thread_error) + ActiveJob::QueueAdapters::SolidQueueAdapter.stopping = false end teardown do @@ -97,4 +98,16 @@ def silent_on_thread_error_for(exceptions, on_thread_error) end end end + + # Waits until the given block returns truthy or the timeout is reached. + # Similar to other helper methods in this file but waits *for* a condition + # instead of *while* it is true. + def wait_for(timeout: 1.second, interval: 0.05) + Timeout.timeout(timeout) do + loop do + break if skip_active_record_query_cache { yield } + sleep interval + end + end + end end diff --git a/test/test_helpers/configuration_test_helper.rb b/test/test_helpers/configuration_test_helper.rb index 24b95e6b..fa3a402a 100644 --- a/test/test_helpers/configuration_test_helper.rb +++ b/test/test_helpers/configuration_test_helper.rb @@ -4,4 +4,18 @@ module ConfigurationTestHelper def config_file_path(name) Rails.root.join("config/#{name}.yml") end + + def with_env(env_vars) + original_values = {} + env_vars.each do |key, value| + original_values[key] = ENV[key] + ENV[key] = value + end + + yield + ensure + original_values.each do |key, value| + ENV[key] = value + end + end end diff --git a/test/test_helpers/jobs_test_helper.rb b/test/test_helpers/jobs_test_helper.rb index d0833fcf..8b71e7f6 100644 --- a/test/test_helpers/jobs_test_helper.rb +++ b/test/test_helpers/jobs_test_helper.rb @@ -9,6 +9,20 @@ def wait_for_jobs_to_finish_for(timeout = 1.second, except: []) end end + def wait_for_jobs_to_be_released_for(timeout = 1.second) + wait_while_with_timeout(timeout) do + skip_active_record_query_cache do + SolidQueue::ClaimedExecution.count > 0 + end + end + end + + def assert_unfinished_jobs(*jobs) + skip_active_record_query_cache do + assert_equal jobs.map(&:job_id).sort, SolidQueue::Job.where(finished_at: nil).map(&:active_job_id).sort + end + end + def assert_no_unfinished_jobs skip_active_record_query_cache do assert SolidQueue::Job.where(finished_at: nil).none? diff --git a/test/test_helpers/processes_test_helper.rb b/test/test_helpers/processes_test_helper.rb index 9a6d0f65..927ddfde 100644 --- a/test/test_helpers/processes_test_helper.rb +++ b/test/test_helpers/processes_test_helper.rb @@ -62,14 +62,19 @@ def find_processes_registered_as(kind) def terminate_process(pid, timeout: 10, signal: :TERM) signal_process(pid, signal) - wait_for_process_termination_with_timeout(pid, timeout: timeout) + wait_for_process_termination_with_timeout(pid, timeout: timeout, signaled: signal) end - def wait_for_process_termination_with_timeout(pid, timeout: 10, exitstatus: 0) + def wait_for_process_termination_with_timeout(pid, timeout: 10, exitstatus: 0, signaled: nil) Timeout.timeout(timeout) do if process_exists?(pid) - Process.waitpid(pid) - assert exitstatus, $?.exitstatus + begin + status = Process.waitpid2(pid).last + assert_equal exitstatus, status.exitstatus, "Expected pid #{pid} to exit with status #{exitstatus}" if status.exitstatus + assert_equal signaled, Signal.list.key(status.termsig).to_sym, "Expected pid #{pid} to be terminated with signal #{signaled}" if status.termsig + rescue Errno::ECHILD + # Child pid already reaped + end end end rescue Timeout::Error diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index 68a693e3..2ccaa728 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -85,6 +85,30 @@ class ConfigurationTest < ActiveSupport::TestCase assert_processes configuration, :dispatcher, 1, polling_interval: 0.1, recurring_tasks: nil end + test "skip recurring tasks when SOLID_QUEUE_SKIP_RECURRING environment variable is set" do + with_env("SOLID_QUEUE_SKIP_RECURRING" => "true") do + configuration = SolidQueue::Configuration.new(dispatchers: [ { polling_interval: 0.1 } ]) + assert_processes configuration, :dispatcher, 1, polling_interval: 0.1 + assert_processes configuration, :scheduler, 0 + end + end + + test "include recurring tasks when SOLID_QUEUE_SKIP_RECURRING environment variable is false" do + with_env("SOLID_QUEUE_SKIP_RECURRING" => "false") do + configuration = SolidQueue::Configuration.new(dispatchers: [ { polling_interval: 0.1 } ]) + assert_processes configuration, :dispatcher, 1, polling_interval: 0.1 + assert_processes configuration, :scheduler, 1 + end + end + + test "include recurring tasks when SOLID_QUEUE_SKIP_RECURRING environment variable is not set" do + with_env("SOLID_QUEUE_SKIP_RECURRING" => nil) do + configuration = SolidQueue::Configuration.new(dispatchers: [ { polling_interval: 0.1 } ]) + assert_processes configuration, :dispatcher, 1, polling_interval: 0.1 + assert_processes configuration, :scheduler, 1 + end + end + test "validate configuration" do # Valid and invalid recurring tasks configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_invalid)) diff --git a/test/unit/process_recovery_test.rb b/test/unit/process_recovery_test.rb new file mode 100644 index 00000000..ec777fa7 --- /dev/null +++ b/test/unit/process_recovery_test.rb @@ -0,0 +1,84 @@ +# frozen_string_literal: true + +require "test_helper" + +class ProcessRecoveryTest < ActiveSupport::TestCase + self.use_transactional_tests = false + + setup do + @pid = nil + JobResult.delete_all + end + + teardown do + terminate_process(@pid) if @pid + JobResult.delete_all + end + + test "supervisor handles missing process record and fails claimed executions properly" do + # Start a supervisor with one worker + @pid = run_supervisor_as_fork(workers: [ { queues: "*", polling_interval: 0.1, processes: 1 } ]) + wait_for_registered_processes(2, timeout: 1.second) # Supervisor + 1 worker + + supervisor_process = SolidQueue::Process.find_by(kind: "Supervisor", pid: @pid) + assert supervisor_process + + worker_process = SolidQueue::Process.find_by(kind: "Worker") + assert worker_process + + # Enqueue a job and manually claim it for the worker to avoid timing races + job = enqueue_store_result_job(42) + claimed_execution = SolidQueue::ReadyExecution.claim("*", 5, worker_process.id).first + assert claimed_execution.present? + assert_equal worker_process.id, claimed_execution.process_id + + # Simulate supervisor process record disappearing + supervisor_process.delete + assert_nil SolidQueue::Process.find_by(id: supervisor_process.id) + + # Terminate the worker process + worker_pid = worker_process.pid + terminate_process(worker_pid, signal: :KILL) + + + # Wait for the supervisor to reap the worker and fail the job + wait_for_failed_executions(1, timeout: 5.seconds) + + # Assert the execution is failed + failed_execution = SolidQueue::FailedExecution.last + assert failed_execution.present? + assert_equal "SolidQueue::Processes::ProcessExitError", failed_execution.exception_class + + # Ensure supervisor replaces the worker (even though its own record was missing) + wait_for_registered_processes(2, timeout: 5.seconds) + assert_operator SolidQueue::Process.where(kind: "Worker").count, :>=, 1 + end + + private + def assert_registered_workers_for(*queues, supervisor_pid: nil) + workers = find_processes_registered_as("Worker") + registered_queues = workers.map { |process| process.metadata["queues"] }.compact + assert_equal queues.map(&:to_s).sort, registered_queues.sort + if supervisor_pid + assert_equal [ supervisor_pid ], workers.map { |process| process.supervisor.pid }.uniq + end + end + + def enqueue_store_result_job(value, queue_name = :default, **options) + StoreResultJob.set(queue: queue_name).perform_later(value, **options) + end + + def assert_no_claimed_jobs + skip_active_record_query_cache do + assert_empty SolidQueue::ClaimedExecution.all + end + end + + def wait_for_claimed_executions(count, timeout: 1.second) + wait_for(timeout: timeout) { SolidQueue::ClaimedExecution.count == count } + end + + def wait_for_failed_executions(count, timeout: 1.second) + wait_for(timeout: timeout) { SolidQueue::FailedExecution.count == count } + end +end diff --git a/test/unit/supervisor_test.rb b/test/unit/supervisor_test.rb index 108ebb6f..7a531ad2 100644 --- a/test/unit/supervisor_test.rb +++ b/test/unit/supervisor_test.rb @@ -185,6 +185,33 @@ class SupervisorTest < ActiveSupport::TestCase end end + # Regression test for supervisor failing to handle claimed jobs when its own + # process record has been pruned (NoMethodError in #handle_claimed_jobs_by). + test "handle_claimed_jobs_by fails claimed executions even if supervisor record is missing" do + worker_name = "worker-test-#{SecureRandom.hex(4)}" + + worker_process = SolidQueue::Process.register(kind: "Worker", pid: 999_999, name: worker_name) + + job = StoreResultJob.perform_later(42) + claimed_execution = SolidQueue::ReadyExecution.claim("*", 1, worker_process.id).first + + terminated_fork = Struct.new(:name).new(worker_name) + + DummyStatus = Struct.new(:pid, :exitstatus) do + def signaled? = false + def termsig = nil + end + status = DummyStatus.new(worker_process.pid, 1) + + supervisor = SolidQueue::Supervisor.allocate + + supervisor.send(:handle_claimed_jobs_by, terminated_fork, status) + + failed = SolidQueue::FailedExecution.find_by(job_id: claimed_execution.job_id) + assert failed.present? + assert_equal "SolidQueue::Processes::ProcessExitError", failed.exception_class + end + private def assert_registered_workers(supervisor_pid: nil, count: 1) assert_registered_processes(kind: "Worker", count: count, supervisor_pid: supervisor_pid)