From 8bd31b6848edb939a6907e0768916e703f584766 Mon Sep 17 00:00:00 2001 From: Huba Nagy <12huba@gmail.com> Date: Mon, 23 Apr 2018 17:01:20 +1200 Subject: [PATCH 1/2] For reading objects from the stream. --- lib/async/redis/protocol/resp.rb | 82 ++++++++++++++++++++++++++------ 1 file changed, 67 insertions(+), 15 deletions(-) diff --git a/lib/async/redis/protocol/resp.rb b/lib/async/redis/protocol/resp.rb index 2aa8f94..79a48ac 100644 --- a/lib/async/redis/protocol/resp.rb +++ b/lib/async/redis/protocol/resp.rb @@ -29,6 +29,12 @@ module Protocol class RESP < Async::IO::Protocol::Line CRLF = "\r\n".freeze + SIMPLE_STRING = "+".freeze + ERROR = "-".freeze + INTEGER = ":".freeze + BULK_STRING = "$".freeze + ARRAY = "*".freeze + class << self alias client new end @@ -80,31 +86,77 @@ def read_object # puts "token: #{token}" case token - when '$' - length = read_line.to_i + when SIMPLE_STRING + string = read_line + + return string + when ERROR + raise NotImplementedError("Implementation for token #{token} missing") + when INTEGER + integer = read_line.to_i + return integer + when BULK_STRING + buffer = read_line + length = buffer.to_i if length == -1 return nil else buffer = @stream.read(length) - read_line # Eat trailing whitespace? + read_line # Eat trailing whitespace because length does not include the CRLF return buffer end - when '*' - count = read_line.to_i - array = Array.new(count) {read_object} + when ARRAY + array = [] # the actual main array + array_stack = [array] # a stack of references to the sub arrays + length_stack = [read_line.to_i] # a stack of lengths + + return nil if length_stack.last == -1 + + while length_stack.length > 0 + if length_stack.last == 0 + length_stack.pop() + array_stack.pop() + end + + length_stack.last -= 1 + + sub_token = @stream.read(1) + + case sub_token + when SIMPLE_STRING + array_stack.last << read_line + when ERROR + raise NotImplementedError("Implementation for token #{sub_token} missing") + when INTEGER + array_stack.last << read_line.to_i + when BULK_STRING + buffer = read_line + length = buffer.to_i + if length == -1 + array_stack.last << nil + else + buffer = @stream.read(length) + read_line # Eat trailing whitespace because length does not include the CRLF + + array_stack.last << buffer + end + when ARRAY + new_length = read_line + if new_length == -1 + array_stack.last << nil + else + length_stack << new_length + array_stack.last << [] + array_stack << array_stack.last.last + end + else + raise NotImplementedError("Implementation for token #{sub_token} missing") + end + end return array - when ':' - return read_line.to_i - - when '-' - raise Error.new(read_line) - - when '+' - return read_line - else raise NotImplementedError, "Implementation for token #{token} missing" end From 7ac2734fc258f7b4ae8920edd075336a1939ce3f Mon Sep 17 00:00:00 2001 From: Huba Nagy <12huba@gmail.com> Date: Mon, 23 Apr 2018 17:57:52 +1200 Subject: [PATCH 2/2] It now works. --- lib/async/redis/protocol/resp.rb | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/lib/async/redis/protocol/resp.rb b/lib/async/redis/protocol/resp.rb index 79a48ac..22bd31e 100644 --- a/lib/async/redis/protocol/resp.rb +++ b/lib/async/redis/protocol/resp.rb @@ -91,7 +91,8 @@ def read_object return string when ERROR - raise NotImplementedError("Implementation for token #{token} missing") + error = read_line + return error when INTEGER integer = read_line.to_i @@ -114,13 +115,16 @@ def read_object return nil if length_stack.last == -1 - while length_stack.length > 0 + while !length_stack.empty? + puts "#{length_stack}" if length_stack.last == 0 length_stack.pop() array_stack.pop() + + break if length_stack.empty? end - length_stack.last -= 1 + length_stack << (length_stack.pop - 1) sub_token = @stream.read(1)