diff --git a/lib/async/redis/client.rb b/lib/async/redis/client.rb index 15bccf2..f5af3c1 100755 --- a/lib/async/redis/client.rb +++ b/lib/async/redis/client.rb @@ -22,6 +22,7 @@ require_relative 'pool' require_relative 'context/multi' require_relative 'context/subscribe' +require_relative 'context/pipeline' require_relative 'methods/strings' require_relative 'methods/keys' @@ -113,7 +114,20 @@ def nested(&block) context.close end end - + + def pipelined(&block) + context = Context::Pipeline.new(@pool) + + return context unless block_given? + + begin + yield context + context.run + ensure + context.close + end + end + def call(*arguments) @pool.acquire do |connection| connection.write_request(arguments) diff --git a/lib/async/redis/context/pipeline.rb b/lib/async/redis/context/pipeline.rb new file mode 100644 index 0000000..24eba53 --- /dev/null +++ b/lib/async/redis/context/pipeline.rb @@ -0,0 +1,46 @@ +module Async + module Redis + module Context + + # This class accumulates commands and sends several of them in a single + # request, instead of sending them one by one. + class Pipeline < Nested + include Methods::Strings + include Methods::Keys + include Methods::Lists + + def initialize(pool) + super(pool) + @command_counter = 0 + end + + def call(command, *args) + @connection.write_request([command, *args], accumulate=true) + @command_counter += 1 + end + + # Send to redis all the accumulated commands. + # Returns an array with the result for each command in the same order + # that they were added with .call(). + def run + @connection.flush + + responses = @command_counter.times.map { @connection.read_object } + @command_counter = 0 + return responses + end + + alias :dispatch :run + + def close + run + + if @connection + @pool.release(@connection) + @connection = nil + end + end + end + end + end +end diff --git a/lib/async/redis/protocol/resp.rb b/lib/async/redis/protocol/resp.rb index 08e3763..e354048 100644 --- a/lib/async/redis/protocol/resp.rb +++ b/lib/async/redis/protocol/resp.rb @@ -42,18 +42,18 @@ def closed? end # The redis server doesn't want actual objects (e.g. integers) but only bulk strings. So, we inline it for performance. - def write_request(arguments) + def write_request(arguments, accumulate=false) write_lines("*#{arguments.count}") - + arguments.each do |argument| string = argument.to_s - + write_lines("$#{string.bytesize}", string) end - - @stream.flush + + @stream.flush unless accumulate end - + def write_object(object) case object when String @@ -118,6 +118,10 @@ def read_object alias read_response read_object + def flush + @stream.flush + end + private # Override Async::IO::Protocol::Line#write_line diff --git a/spec/async/redis/client_spec.rb b/spec/async/redis/client_spec.rb index 8e7a5a8..1efbbd2 100644 --- a/spec/async/redis/client_spec.rb +++ b/spec/async/redis/client_spec.rb @@ -93,4 +93,18 @@ client.close end + + it "can use pipelining" do + client.set 'async_redis_test_key_1', 'a' + client.set 'async_redis_test_key_2', 'b' + + res = client.pipelined do |context| + context.get 'async_redis_test_key_1' + context.get 'async_redis_test_key_2' + end + + expect(res).to eq ['a', 'b'] + + client.close + end end diff --git a/spec/async/redis/context/pipeline_spec.rb b/spec/async/redis/context/pipeline_spec.rb new file mode 100644 index 0000000..44c8c56 --- /dev/null +++ b/spec/async/redis/context/pipeline_spec.rb @@ -0,0 +1,70 @@ +require 'async/redis/client' +require 'async/redis/context/pipeline' + +RSpec.describe Async::Redis::Context::Pipeline, timeout: 5 do + include_context Async::RSpec::Reactor + + let(:endpoint) {Async::Redis.local_endpoint} + let(:client) {Async::Redis::Client.new(endpoint)} + let(:pool) {client.instance_variable_get(:@pool)} + let(:pipeline) {Async::Redis::Context::Pipeline.new(pool)} + + let(:small_key_count) { 50 } + let(:large_key_count) { 1500 } + let(:key_prefix) { 'pipeline_key_' } + let(:keys) { large_key_count.times.map { |i| "#{key_prefix}#{i}" } } + + it 'accumulates commands without running them prematurely' do + small_key_count.times do |i| + pipeline.set(keys[i], i) + expect(client.keys("#{key_prefix}*").length).to eq 0 + end + + pipeline.run + expect(client.keys("#{key_prefix}*").length).to eq small_key_count + + pipeline.close + client.close + end + + it 'can read back the responses to each request' do + small_key_count.times do |i| + pipeline.set(keys[i], i) + end + + pipeline.close + + client.close + end + + it 'does not send any commands more than once' do + small_key_count.times do |i| + pipeline.set("#{key_prefix}#{i}", i) + end + + pipeline.run + + # increment each key once + small_key_count.times do |i| + pipeline.incr(keys[i]) + end + + pipeline.close + + small_key_count.times do |i| + expect(client.get(keys[i]).to_i).to eq i+1 + end + + client.close + end + + it 'behaves well even when the buffer gets full' do + large_key_count.times do |i| + pipeline.set(keys[i], i) + end + + pipeline.close + + client.close + end +end