From 897d93be349a14f7154f4324b574def9a0fd26a6 Mon Sep 17 00:00:00 2001 From: Johan Euphrosine Date: Thu, 22 Feb 2018 23:23:47 +0900 Subject: [PATCH 1/7] google-assistant-sdk/pushtotalk: fix conversation_stream handling - close stream on stop_recording - restart stream on start_playback - flush stream on stop_playback The previous behaviour (keeping the stream open the whole time) exposed a race condition where we tried to flush (write to) a stream that was still recording (read-only mode). Change-Id: I629bee9267744d5080bcdaff61c6da241ba8fe7f --- .../assistant/grpc/audio_helpers.py | 11 +++++---- .../tests/test_audio_helpers.py | 23 +++++++++++++++++-- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py b/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py index cd6d5de..6204abf 100644 --- a/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py +++ b/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py @@ -165,6 +165,9 @@ def start(self): def stop(self): pass + def flush(self): + pass + class SoundDeviceStream(object): """Audio stream based on an underlying sound device. @@ -212,13 +215,12 @@ def flush(self): def start(self): """Start the underlying stream.""" - if not self._audio_stream.active: + if self._audio_stream.active and not self._audio_stream.active: self._audio_stream.start() def stop(self): """Stop the underlying stream.""" if self._audio_stream.active: - self.flush() self._audio_stream.stop() def close(self): @@ -272,20 +274,21 @@ def start_recording(self): """Start recording from the audio source.""" self._stop_recording.clear() self._source.start() - self._sink.start() def stop_recording(self): """Stop recording from the audio source.""" self._stop_recording.set() + self._source.stop() def start_playback(self): """Start playback to the audio sink.""" self._start_playback.set() + self._sink.start() def stop_playback(self): """Stop playback from the audio sink.""" self._start_playback.clear() - self._source.stop() + self._sink.flush() self._sink.stop() @property diff --git a/google-assistant-sdk/tests/test_audio_helpers.py b/google-assistant-sdk/tests/test_audio_helpers.py index 6fefa9d..4b722fa 100644 --- a/google-assistant-sdk/tests/test_audio_helpers.py +++ b/google-assistant-sdk/tests/test_audio_helpers.py @@ -88,11 +88,18 @@ def test_write_header(self): class DummyStream(BytesIO): + started = False + stopped = False + flushed = False + def start(self): - pass + self.started = True def stop(self): - pass + self.stopped = True + + def flush(self): + self.flushed = True class ConversationStreamTest(unittest.TestCase): @@ -125,6 +132,18 @@ def start_playback(): self.assertEqual(True, self.playback_started) self.assertEqual(b'foo\0', self.sink.getvalue()) + def test_sink_source_state(self): + self.stream.start_recording() + self.assertEquals(True, self.source.started) + self.stream.stop_recording() + self.assertEquals(True, self.source.stopped) + + self.assertEquals(False, self.sink.started) + self.stream.start_playback() + self.assertEquals(True, self.sink.started) + self.stream.stop_playback() + self.assertEquals(True, self.sink.stopped) + def test_oneshot_conversation(self): self.assertEqual(b'audio', self.stream.read(5)) self.stream.stop_recording() From 3162daa4365695d3170a08fc001fbc88254a6c5b Mon Sep 17 00:00:00 2001 From: Johan Euphrosine Date: Fri, 23 Feb 2018 08:54:50 +0900 Subject: [PATCH 2/7] google-assistant-sdk/audio_helpers: fix typo Change-Id: Iaff6e3f25c57e82f10073812fcbea7b165ef044a --- .../googlesamples/assistant/grpc/audio_helpers.py | 4 ++-- google-assistant-sdk/tests/test_audio_helpers.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py b/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py index 6204abf..ed0052c 100644 --- a/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py +++ b/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py @@ -210,12 +210,12 @@ def write(self, buf): return len(buf) def flush(self): - if self._flush_size > 0: + if self._audio_stream.active and self._flush_size > 0: self._audio_stream.write(b'\x00' * self._flush_size) def start(self): """Start the underlying stream.""" - if self._audio_stream.active and not self._audio_stream.active: + if not self._audio_stream.active: self._audio_stream.start() def stop(self): diff --git a/google-assistant-sdk/tests/test_audio_helpers.py b/google-assistant-sdk/tests/test_audio_helpers.py index 4b722fa..1796d58 100644 --- a/google-assistant-sdk/tests/test_audio_helpers.py +++ b/google-assistant-sdk/tests/test_audio_helpers.py @@ -133,6 +133,7 @@ def start_playback(): self.assertEqual(b'foo\0', self.sink.getvalue()) def test_sink_source_state(self): + self.assertEquals(False, self.source.started) self.stream.start_recording() self.assertEquals(True, self.source.started) self.stream.stop_recording() From d0eb6a36d1d175d47235fa472a1487e1a06b203a Mon Sep 17 00:00:00 2001 From: PizlaTheDeveloper Date: Sun, 18 Mar 2018 11:02:21 +0000 Subject: [PATCH 3/7] fix-pushtotalk --- .../googlesamples/assistant/grpc/pushtotalk.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/google-assistant-sdk/googlesamples/assistant/grpc/pushtotalk.py b/google-assistant-sdk/googlesamples/assistant/grpc/pushtotalk.py index 5043553..c79f5d2 100644 --- a/google-assistant-sdk/googlesamples/assistant/grpc/pushtotalk.py +++ b/google-assistant-sdk/googlesamples/assistant/grpc/pushtotalk.py @@ -123,8 +123,7 @@ def assist(self): def iter_assist_requests(): for c in self.gen_assist_requests(): assistant_helpers.log_assist_request_without_audio(c) - yield c - self.conversation_stream.start_playback() + yield c # This generator yields AssistResponse proto messages # received from the gRPC Google Assistant API. @@ -138,6 +137,7 @@ def iter_assist_requests(): logging.info('Transcript of user request: "%s".', ' '.join(r.transcript for r in resp.speech_results)) + self.conversation_stream.start_playback() logging.info('Playing assistant response.') if len(resp.audio_out.audio_data) > 0: self.conversation_stream.write(resp.audio_out.audio_data) @@ -317,7 +317,6 @@ def main(api_endpoint, credentials, project_id, logging.info('Connecting to %s', api_endpoint) # Configure audio source and sink. - audio_device = None if input_audio_file: audio_source = audio_helpers.WaveSource( open(input_audio_file, 'rb'), @@ -325,13 +324,11 @@ def main(api_endpoint, credentials, project_id, sample_width=audio_sample_width ) else: - audio_source = audio_device = ( - audio_device or audio_helpers.SoundDeviceStream( + audio_source = audio_helpers.SoundDeviceStream( sample_rate=audio_sample_rate, sample_width=audio_sample_width, block_size=audio_block_size, flush_size=audio_flush_size - ) ) if output_audio_file: audio_sink = audio_helpers.WaveSink( @@ -340,13 +337,11 @@ def main(api_endpoint, credentials, project_id, sample_width=audio_sample_width ) else: - audio_sink = audio_device = ( - audio_device or audio_helpers.SoundDeviceStream( + audio_sink = audio_helpers.SoundDeviceStream( sample_rate=audio_sample_rate, sample_width=audio_sample_width, block_size=audio_block_size, flush_size=audio_flush_size - ) ) # Create conversation stream with the given audio source and sink. conversation_stream = audio_helpers.ConversationStream( From 2c63ebc1072266ecfbf118810bc54ad6e0279796 Mon Sep 17 00:00:00 2001 From: Johan Euphrosine Date: Mon, 19 Mar 2018 15:17:18 +0900 Subject: [PATCH 4/7] google-assistant-sdk: remove conversation stream locking - remove conversation stream threading.Event based locks - call stop_recording from the thread reading from the audio stream - fix tests According to: https://app.assembla.com/spaces/portaudio/wiki/Tips_Threading """ In general, calls to ReadStream and WriteStream on different streams in different threads are probably safe. That means that if you are doing Blocking I/O, one thread may read and write to one stream, and another thread may read and write to another. """ Change-Id: I1355986600453e9d1d932ebfc3e094359407b60d --- .../assistant/grpc/audio_helpers.py | 14 ---------- .../assistant/grpc/pushtotalk.py | 4 +-- .../tests/test_audio_helpers.py | 28 +++++++++++-------- 3 files changed, 18 insertions(+), 28 deletions(-) diff --git a/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py b/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py index ed0052c..0ecc8a0 100644 --- a/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py +++ b/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py @@ -15,7 +15,6 @@ """Helper functions for audio streams.""" import logging -import threading import time import wave import math @@ -266,28 +265,22 @@ def __init__(self, source, sink, iter_size, sample_width): self._sink = sink self._iter_size = iter_size self._sample_width = sample_width - self._stop_recording = threading.Event() - self._start_playback = threading.Event() self._volume_percentage = 50 def start_recording(self): """Start recording from the audio source.""" - self._stop_recording.clear() self._source.start() def stop_recording(self): """Stop recording from the audio source.""" - self._stop_recording.set() self._source.stop() def start_playback(self): """Start playback to the audio sink.""" - self._start_playback.set() self._sink.start() def stop_playback(self): """Stop playback from the audio sink.""" - self._start_playback.clear() self._sink.flush() self._sink.stop() @@ -302,19 +295,12 @@ def volume_percentage(self, new_volume_percentage): def read(self, size): """Read bytes from the source (if currently recording). - - Will returns an empty byte string, if stop_recording() was called. """ - if self._stop_recording.is_set(): - return b'' return self._source.read(size) def write(self, buf): """Write bytes to the sink (if currently playing). - - Will block until start_playback() is called. """ - self._start_playback.wait() buf = align_buf(buf, self._sample_width) buf = normalize_audio_buffer(buf, self.volume_percentage) return self._sink.write(buf) diff --git a/google-assistant-sdk/googlesamples/assistant/grpc/pushtotalk.py b/google-assistant-sdk/googlesamples/assistant/grpc/pushtotalk.py index c79f5d2..46a9b4c 100644 --- a/google-assistant-sdk/googlesamples/assistant/grpc/pushtotalk.py +++ b/google-assistant-sdk/googlesamples/assistant/grpc/pushtotalk.py @@ -123,7 +123,8 @@ def assist(self): def iter_assist_requests(): for c in self.gen_assist_requests(): assistant_helpers.log_assist_request_without_audio(c) - yield c + yield c + self.conversation_stream.stop_recording() # This generator yields AssistResponse proto messages # received from the gRPC Google Assistant API. @@ -132,7 +133,6 @@ def iter_assist_requests(): assistant_helpers.log_assist_response_without_audio(resp) if resp.event_type == END_OF_UTTERANCE: logging.info('End of audio request detected') - self.conversation_stream.stop_recording() if resp.speech_results: logging.info('Transcript of user request: "%s".', ' '.join(r.transcript diff --git a/google-assistant-sdk/tests/test_audio_helpers.py b/google-assistant-sdk/tests/test_audio_helpers.py index 1796d58..90a57ff 100644 --- a/google-assistant-sdk/tests/test_audio_helpers.py +++ b/google-assistant-sdk/tests/test_audio_helpers.py @@ -16,7 +16,6 @@ import unittest import time -import threading import wave from googlesamples.assistant.grpc import audio_helpers @@ -87,7 +86,7 @@ def test_write_header(self): self.assertEqual(b'RIFF', self.stream.getvalue()[:4]) -class DummyStream(BytesIO): +class DummyStream(BytesIO, object): started = False stopped = False flushed = False @@ -98,6 +97,16 @@ def start(self): def stop(self): self.stopped = True + def read(self, *args): + if self.stopped: + return b'' + return super(DummyStream, self).read(*args) + + def write(self, *args): + if not self.started: + return + return super(DummyStream, self).write(*args) + def flush(self): self.flushed = True @@ -121,15 +130,10 @@ def test_stop_recording(self): def test_start_playback(self): self.playback_started = False - - def start_playback(): - self.playback_started = True - self.stream.start_playback() - t = threading.Timer(0.1, start_playback) - t.start() - # write will block until start_playback is called. self.stream.write(b'foo') - self.assertEqual(True, self.playback_started) + self.assertEqual(b'', self.sink.getvalue()) + self.stream.start_playback() + self.stream.write(b'foo') self.assertEqual(b'foo\0', self.sink.getvalue()) def test_sink_source_state(self): @@ -138,13 +142,13 @@ def test_sink_source_state(self): self.assertEquals(True, self.source.started) self.stream.stop_recording() self.assertEquals(True, self.source.stopped) - + self.assertEquals(False, self.sink.started) self.stream.start_playback() self.assertEquals(True, self.sink.started) self.stream.stop_playback() self.assertEquals(True, self.sink.stopped) - + def test_oneshot_conversation(self): self.assertEqual(b'audio', self.stream.read(5)) self.stream.stop_recording() From 187b485afd5c5df8288a135108065d963a201a84 Mon Sep 17 00:00:00 2001 From: Johan Euphrosine Date: Tue, 20 Mar 2018 12:14:11 +0900 Subject: [PATCH 5/7] google-assistant-sdk/pushtotalk: add explicit end_of_utterance Change-Id: I14440b02745aad6ac67494bd4f7c7833d9dce1d6 --- .../googlesamples/assistant/grpc/audio_helpers.py | 15 ++++++++++++--- .../googlesamples/assistant/grpc/pushtotalk.py | 2 ++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py b/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py index 0ecc8a0..4f93540 100644 --- a/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py +++ b/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py @@ -14,11 +14,12 @@ """Helper functions for audio streams.""" +import array import logging +import math import time +import threading import wave -import math -import array import click import sounddevice as sd @@ -266,9 +267,11 @@ def __init__(self, source, sink, iter_size, sample_width): self._iter_size = iter_size self._sample_width = sample_width self._volume_percentage = 50 + self._end_of_utterance = threading.Event() def start_recording(self): """Start recording from the audio source.""" + self._end_of_utterance.clear() self._source.start() def stop_recording(self): @@ -284,6 +287,9 @@ def stop_playback(self): self._sink.flush() self._sink.stop() + def end_of_utterance(self): + self._end_of_utterance.set() + @property def volume_percentage(self): """The current volume setting as an integer percentage (1-100).""" @@ -312,7 +318,10 @@ def close(self): def __iter__(self): """Returns a generator reading data from the stream.""" - return iter(lambda: self.read(self._iter_size), b'') + while True: + if self._end_of_utterance.is_set(): + raise StopIteration + yield self.read(self._iter_size) @property def sample_rate(self): diff --git a/google-assistant-sdk/googlesamples/assistant/grpc/pushtotalk.py b/google-assistant-sdk/googlesamples/assistant/grpc/pushtotalk.py index 46a9b4c..4df17fc 100644 --- a/google-assistant-sdk/googlesamples/assistant/grpc/pushtotalk.py +++ b/google-assistant-sdk/googlesamples/assistant/grpc/pushtotalk.py @@ -21,6 +21,7 @@ import os.path import pathlib2 as pathlib import sys +import threading import uuid import click @@ -133,6 +134,7 @@ def iter_assist_requests(): assistant_helpers.log_assist_response_without_audio(resp) if resp.event_type == END_OF_UTTERANCE: logging.info('End of audio request detected') + self.conversation_stream.end_of_utterance() if resp.speech_results: logging.info('Transcript of user request: "%s".', ' '.join(r.transcript From 2dfc868ebce1b78f772afe1af174337cc2de4ebc Mon Sep 17 00:00:00 2001 From: Johan Euphrosine Date: Thu, 22 Mar 2018 11:47:10 +0900 Subject: [PATCH 6/7] google-assistant-sdk/pushtotalk: add explicit locking - remove end_of_utterance() - stop recording from main thread with explicit locking Change-Id: I07b24b8193560783945acde9f2b55e5533578ebd --- .../assistant/grpc/audio_helpers.py | 17 +++++++++-------- .../googlesamples/assistant/grpc/pushtotalk.py | 12 ++++++------ 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py b/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py index 4f93540..cad78c0 100644 --- a/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py +++ b/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py @@ -267,16 +267,19 @@ def __init__(self, source, sink, iter_size, sample_width): self._iter_size = iter_size self._sample_width = sample_width self._volume_percentage = 50 - self._end_of_utterance = threading.Event() + self._stop_recording = threading.Event() + self._source_lock = threading.RLock() def start_recording(self): """Start recording from the audio source.""" - self._end_of_utterance.clear() + self._stop_recording.clear() self._source.start() def stop_recording(self): """Stop recording from the audio source.""" - self._source.stop() + self._stop_recording.set() + with self._source_lock: + self._source.stop() def start_playback(self): """Start playback to the audio sink.""" @@ -287,9 +290,6 @@ def stop_playback(self): self._sink.flush() self._sink.stop() - def end_of_utterance(self): - self._end_of_utterance.set() - @property def volume_percentage(self): """The current volume setting as an integer percentage (1-100).""" @@ -302,7 +302,8 @@ def volume_percentage(self, new_volume_percentage): def read(self, size): """Read bytes from the source (if currently recording). """ - return self._source.read(size) + with self._source_lock: + return self._source.read(size) def write(self, buf): """Write bytes to the sink (if currently playing). @@ -319,7 +320,7 @@ def close(self): def __iter__(self): """Returns a generator reading data from the stream.""" while True: - if self._end_of_utterance.is_set(): + if self._stop_recording.is_set(): raise StopIteration yield self.read(self._iter_size) diff --git a/google-assistant-sdk/googlesamples/assistant/grpc/pushtotalk.py b/google-assistant-sdk/googlesamples/assistant/grpc/pushtotalk.py index 4df17fc..01561ee 100644 --- a/google-assistant-sdk/googlesamples/assistant/grpc/pushtotalk.py +++ b/google-assistant-sdk/googlesamples/assistant/grpc/pushtotalk.py @@ -21,7 +21,6 @@ import os.path import pathlib2 as pathlib import sys -import threading import uuid import click @@ -121,20 +120,21 @@ def assist(self): self.conversation_stream.start_recording() logging.info('Recording audio request.') - def iter_assist_requests(): + def iter_log_assist_requests(): for c in self.gen_assist_requests(): assistant_helpers.log_assist_request_without_audio(c) yield c - self.conversation_stream.stop_recording() + logging.debug('Reached end of AssistRequest iteration.') # This generator yields AssistResponse proto messages # received from the gRPC Google Assistant API. - for resp in self.assistant.Assist(iter_assist_requests(), + for resp in self.assistant.Assist(iter_log_assist_requests(), self.deadline): assistant_helpers.log_assist_response_without_audio(resp) if resp.event_type == END_OF_UTTERANCE: - logging.info('End of audio request detected') - self.conversation_stream.end_of_utterance() + logging.info('End of audio request detected.') + logging.info('Stopping recording.') + self.conversation_stream.stop_recording() if resp.speech_results: logging.info('Transcript of user request: "%s".', ' '.join(r.transcript From e7521edaccf742b4d623328dae167bde69c04149 Mon Sep 17 00:00:00 2001 From: Johan Euphrosine Date: Tue, 27 Mar 2018 17:37:25 +0900 Subject: [PATCH 7/7] google-assistant-sdk/pushtotalk: only start recording when we get audio data Change-Id: Icfd2e3a01ddbb35e17e54b53f26091580bf31f0f --- .../googlesamples/assistant/grpc/audio_helpers.py | 14 ++++++++++++++ .../googlesamples/assistant/grpc/pushtotalk.py | 5 +++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py b/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py index cad78c0..6d62fc1 100644 --- a/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py +++ b/google-assistant-sdk/googlesamples/assistant/grpc/audio_helpers.py @@ -269,9 +269,12 @@ def __init__(self, source, sink, iter_size, sample_width): self._volume_percentage = 50 self._stop_recording = threading.Event() self._source_lock = threading.RLock() + self._recording = False + self._playing = False def start_recording(self): """Start recording from the audio source.""" + self._recording = True self._stop_recording.clear() self._source.start() @@ -280,15 +283,26 @@ def stop_recording(self): self._stop_recording.set() with self._source_lock: self._source.stop() + self._recording = False def start_playback(self): """Start playback to the audio sink.""" + self._playing = True self._sink.start() def stop_playback(self): """Stop playback from the audio sink.""" self._sink.flush() self._sink.stop() + self._playing = False + + @property + def recording(self): + return self._recording + + @property + def playing(self): + return self._playing @property def volume_percentage(self): diff --git a/google-assistant-sdk/googlesamples/assistant/grpc/pushtotalk.py b/google-assistant-sdk/googlesamples/assistant/grpc/pushtotalk.py index 01561ee..c508fc3 100644 --- a/google-assistant-sdk/googlesamples/assistant/grpc/pushtotalk.py +++ b/google-assistant-sdk/googlesamples/assistant/grpc/pushtotalk.py @@ -139,9 +139,10 @@ def iter_log_assist_requests(): logging.info('Transcript of user request: "%s".', ' '.join(r.transcript for r in resp.speech_results)) - self.conversation_stream.start_playback() - logging.info('Playing assistant response.') if len(resp.audio_out.audio_data) > 0: + if not self.conversation_stream.playing: + self.conversation_stream.start_playback() + logging.info('Playing assistant response.') self.conversation_stream.write(resp.audio_out.audio_data) if resp.dialog_state_out.conversation_state: conversation_state = resp.dialog_state_out.conversation_state