diff --git a/setup.py b/setup.py index b93275584..c0963c4e3 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,7 @@ def find_version(*file_paths): keywords='slack slack-web slack-rtm chat chatbots bots chatops', packages=find_packages(exclude=['docs', 'docs-src', 'tests']), install_requires=[ - 'websocket-client >=0.35, <1.0a0', + 'websockets >=3.4', 'requests >=2.11, <3.0a0', 'six >=1.10, <2.0a0', ]) diff --git a/slackclient/client.py b/slackclient/client.py index 34f3aec3d..5b9d80e6e 100644 --- a/slackclient/client.py +++ b/slackclient/client.py @@ -105,7 +105,7 @@ def api_call(self, method, timeout=None, **kwargs): ) return result - def rtm_read(self): + def rtm_read(self, blocking=False): ''' Reads from the RTM Websocket stream then calls `self.process_changes(item)` for each line in the returned data. @@ -127,7 +127,7 @@ def rtm_read(self): # in the future, this should handle some events internally i.e. channel # creation if self.server: - json_data = self.server.websocket_safe_read() + json_data = self.server.websocket_safe_read(blocking=blocking) data = [] if json_data != '': for d in json_data.split('\n'): diff --git a/slackclient/server.py b/slackclient/server.py index 3c1f814f5..6ec1f0074 100644 --- a/slackclient/server.py +++ b/slackclient/server.py @@ -1,3 +1,4 @@ +import asyncio from .slackrequest import SlackRequest from requests.packages.urllib3.util.url import parse_url from .channel import Channel @@ -5,7 +6,7 @@ from .util import SearchList, SearchDict from ssl import SSLError -from websocket import create_connection +from websockets import client as wsclient import json @@ -96,6 +97,9 @@ def parse_slack_login_data(self, login_data, use_rtm_start): self.parse_channel_data(login_data["ims"]) def connect_slack_websocket(self, ws_url): + asyncio.get_event_loop().run_until_complete(self._connect_slack_websocket(ws_url)) + + async def _connect_slack_websocket(self, ws_url): """Uses http proxy if available""" if self.proxies and 'http' in self.proxies: parts = parse_url(self.proxies['http']) @@ -106,11 +110,14 @@ def connect_slack_websocket(self, ws_url): proxy_auth, proxy_port, proxy_host = None, None, None try: - self.websocket = create_connection(ws_url, - http_proxy_host=proxy_host, - http_proxy_port=proxy_port, - http_proxy_auth=proxy_auth) - self.websocket.sock.setblocking(0) + self.websocket = await wsclient.connect( + ws_url, + extra_headers={ + "http_proxy_host":proxy_host, + "http_proxy_port":proxy_port, + "http_proxy_auth":proxy_auth + } + ) except Exception as e: raise SlackConnectionError(str(e)) @@ -175,26 +182,40 @@ def rtm_send_message(self, channel, message, thread=None, reply_broadcast=None): def ping(self): return self.send_to_websocket({"type": "ping"}) - def websocket_safe_read(self): + + def websocket_safe_read(self, blocking=False): + readop = asyncio.Future() + readco = self._websocket_safe_read(readop, blocking=blocking) + asyncio.get_event_loop().run_until_complete(readco) + return readop.result() + + async def _websocket_safe_read(self, readop, blocking=False): """ Returns data if available, otherwise ''. Newlines indicate multiple messages """ data = "" - while True: - try: - data += "{0}\n".format(self.websocket.recv()) - except SSLError as e: - if e.errno == 2: - # errno 2 occurs when trying to read or write data, but more - # data needs to be received on the underlying TCP transport - # before the request can be fulfilled. - # - # Python 2.7.9+ and Python 3.3+ give this its own exception, - # SSLWantReadError - return '' - raise - return data.rstrip() + if not blocking: + while True: + try: + #todo make this non-blocking + data += "{0}\n".format(await self.websocket.recv()) + except SSLError as e: + if e.errno == 2: + # errno 2 occurs when trying to read or write data, but more + # data needs to be received on the underlying TCP transport + # before the request can be fulfilled. + # + # Python 2.7.9+ and Python 3.3+ give this its own exception, + # SSLWantReadError + return '' + raise + readop.set_result(data.rstrip()) + else: + rawdata = await self.websocket.recv() + data += "{0}\n".format(rawdata) + readop.set_result(data.rstrip()) + def attach_user(self, name, user_id, real_name, tz): self.users.update({user_id: User(self, name, user_id, real_name, tz)})