Skip to content

Commit c64b30d

Browse files
committed
Add TimerTask.new(interval_type:) option to configure interval calculation
Can be either `:fixed_delay` or `:fixed_rate`, default to `:fixed_delay`
1 parent 9f40827 commit c64b30d

File tree

2 files changed

+125
-4
lines changed

2 files changed

+125
-4
lines changed

lib/concurrent-ruby/concurrent/timer_task.rb

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,17 @@ module Concurrent
3232
# be tested separately then passed to the `TimerTask` for scheduling and
3333
# running.
3434
#
35+
# A `TimerTask` supports two different types of interval calculations.
36+
# A fixed delay will always wait the same amount of time between the
37+
# completion of one task and the start of the next. A fixed rate will
38+
# attempt to maintain a constant rate of execution regardless of the
39+
# duration of the task. For example, if a fixed rate task is scheduled
40+
# to run every 60 seconds but the task itself takes 10 seconds to
41+
# complete, the next task will be scheduled to run 50 seconds after
42+
# the start of the previous task. If the task takes 70 seconds to
43+
# complete, the next task will be start immediately after the previous
44+
# task completes. Tasks will not be executed concurrently.
45+
#
3546
# In some cases it may be necessary for a `TimerTask` to affect its own
3647
# execution cycle. To facilitate this, a reference to the TimerTask instance
3748
# is passed as an argument to the provided block every time the task is
@@ -74,6 +85,12 @@ module Concurrent
7485
#
7586
# #=> 'Boom!'
7687
#
88+
# @example Configuring `:interval_type` with either :fixed_delay or :fixed_rate, default is :fixed_delay
89+
# task = Concurrent::TimerTask.new(execution_interval: 5, interval_type: :fixed_rate) do
90+
# puts 'Boom!'
91+
# end
92+
# task.interval_type #=> :fixed_rate
93+
#
7794
# @example Last `#value` and `Dereferenceable` mixin
7895
# task = Concurrent::TimerTask.new(
7996
# dup_on_deref: true,
@@ -152,8 +169,16 @@ class TimerTask < RubyExecutorService
152169
# Default `:execution_interval` in seconds.
153170
EXECUTION_INTERVAL = 60
154171

155-
# Default `:timeout_interval` in seconds.
156-
TIMEOUT_INTERVAL = 30
172+
# Maintain the interval between the end of one execution and the start of the next execution.
173+
FIXED_DELAY = :fixed_delay
174+
175+
# Maintain the interval between the start of one execution and the start of the next.
176+
# If execution time exceeds the interval, the next execution will start immediately
177+
# after the previous execution finishes. Executions will not run concurrently.
178+
FIXED_RATE = :fixed_rate
179+
180+
# Default `:interval_type`
181+
DEFAULT_INTERVAL_TYPE = FIXED_DELAY
157182

158183
# Create a new TimerTask with the given task and configuration.
159184
#
@@ -164,6 +189,9 @@ class TimerTask < RubyExecutorService
164189
# @option opts [Boolean] :run_now Whether to run the task immediately
165190
# upon instantiation or to wait until the first # execution_interval
166191
# has passed (default: false)
192+
# @options opts [Symbol] :interval_type method to calculate the interval
193+
# between executions, can be either :fixed_rate or :fixed_delay.
194+
# (default: :fixed_delay)
167195
#
168196
# @!macro deref_options
169197
#
@@ -242,6 +270,10 @@ def execution_interval=(value)
242270
end
243271
end
244272

273+
# @!attribute [r] interval_type
274+
# @return [Symbol] method to calculate the interval between executions
275+
attr_reader :interval_type
276+
245277
# @!attribute [rw] timeout_interval
246278
# @return [Fixnum] Number of seconds the task can run before it is
247279
# considered to have failed.
@@ -264,10 +296,15 @@ def ns_initialize(opts, &task)
264296
set_deref_options(opts)
265297

266298
self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL
299+
if opts[:interval_type] && ![FIXED_DELAY, FIXED_RATE].include?(opts[:interval_type])
300+
raise ArgumentError.new('interval_type must be either :fixed_delay or :fixed_rate')
301+
end
267302
if opts[:timeout] || opts[:timeout_interval]
268303
warn 'TimeTask timeouts are now ignored as these were not able to be implemented correctly'
269304
end
305+
270306
@run_now = opts[:now] || opts[:run_now]
307+
@interval_type = opts[:interval_type] || DEFAULT_INTERVAL_TYPE
271308
@executor = Concurrent::SafeTaskExecutor.new(task)
272309
@running = Concurrent::AtomicBoolean.new(false)
273310
@value = nil
@@ -296,16 +333,27 @@ def schedule_next_task(interval = execution_interval)
296333
# @!visibility private
297334
def execute_task(completion)
298335
return nil unless @running.true?
336+
start_time = Concurrent.monotonic_time
299337
_success, value, reason = @executor.execute(self)
300338
if completion.try?
301339
self.value = value
302-
schedule_next_task
340+
schedule_next_task(calculate_next_interval(start_time))
303341
time = Time.now
304342
observers.notify_observers do
305343
[time, self.value, reason]
306344
end
307345
end
308346
nil
309347
end
348+
349+
# @!visibility private
350+
def calculate_next_interval(start_time)
351+
if @interval_type == FIXED_RATE
352+
run_time = Concurrent.monotonic_time - start_time
353+
[execution_interval - run_time, 0].max
354+
else # FIXED_DELAY
355+
execution_interval
356+
end
357+
end
310358
end
311359
end

spec/concurrent/timer_task_spec.rb

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,22 @@ def trigger_observable(observable)
8282
subject = TimerTask.new(execution_interval: 5) { nil }
8383
expect(subject.execution_interval).to eq 5
8484
end
85+
86+
it 'raises an exception if :interval_type is not a valid value' do
87+
expect {
88+
Concurrent::TimerTask.new(interval_type: :cat) { nil }
89+
}.to raise_error(ArgumentError)
90+
end
91+
92+
it 'uses the default :interval_type when no type is given' do
93+
subject = TimerTask.new { nil }
94+
expect(subject.interval_type).to eq TimerTask::FIXED_DELAY
95+
end
96+
97+
it 'uses the given interval type' do
98+
subject = TimerTask.new(interval_type: TimerTask::FIXED_RATE) { nil }
99+
expect(subject.interval_type).to eq TimerTask::FIXED_RATE
100+
end
85101
end
86102

87103
context '#kill' do
@@ -112,7 +128,6 @@ def trigger_observable(observable)
112128
end
113129

114130
specify '#execution_interval is writeable' do
115-
116131
latch = CountDownLatch.new(1)
117132
subject = TimerTask.new(timeout_interval: 1,
118133
execution_interval: 1,
@@ -132,6 +147,28 @@ def trigger_observable(observable)
132147
subject.kill
133148
end
134149

150+
it 'raises on invalid interval_type' do
151+
expect {
152+
fixed_delay = TimerTask.new(interval_type: TimerTask::FIXED_DELAY,
153+
execution_interval: 0.1,
154+
run_now: true) { nil }
155+
fixed_delay.kill
156+
}.not_to raise_error
157+
158+
expect {
159+
fixed_rate = TimerTask.new(interval_type: TimerTask::FIXED_RATE,
160+
execution_interval: 0.1,
161+
run_now: true) { nil }
162+
fixed_rate.kill
163+
}.not_to raise_error
164+
165+
expect {
166+
TimerTask.new(interval_type: :unknown,
167+
execution_interval: 0.1,
168+
run_now: true) { nil }
169+
}.to raise_error(ArgumentError)
170+
end
171+
135172
specify '#timeout_interval being written produces a warning' do
136173
subject = TimerTask.new(timeout_interval: 1,
137174
execution_interval: 0.1,
@@ -181,6 +218,42 @@ def trigger_observable(observable)
181218
expect(latch.count).to eq(0)
182219
subject.kill
183220
end
221+
222+
it 'uses a fixed delay when set' do
223+
finished = []
224+
latch = CountDownLatch.new(2)
225+
subject = TimerTask.new(interval_type: TimerTask::FIXED_DELAY,
226+
execution_interval: 0.1,
227+
run_now: true) do |task|
228+
sleep(0.2)
229+
finished << Concurrent.monotonic_time
230+
latch.count_down
231+
end
232+
subject.execute
233+
latch.wait(1)
234+
subject.kill
235+
236+
expect(latch.count).to eq(0)
237+
expect(finished[1] - finished[0]).to be >= 0.3
238+
end
239+
240+
it 'uses a fixed rate when set' do
241+
finished = []
242+
latch = CountDownLatch.new(2)
243+
subject = TimerTask.new(interval_type: TimerTask::FIXED_RATE,
244+
execution_interval: 0.1,
245+
run_now: true) do |task|
246+
sleep(0.2)
247+
finished << Concurrent.monotonic_time
248+
latch.count_down
249+
end
250+
subject.execute
251+
latch.wait(1)
252+
subject.kill
253+
254+
expect(latch.count).to eq(0)
255+
expect(finished[1] - finished[0]).to be < 0.3
256+
end
184257
end
185258

186259
context 'observation' do

0 commit comments

Comments
 (0)