11# -*- coding: utf-8 -*-
22# Copyright 2015 OpenMarket Ltd
3+ # Copyright 2017 Adam Beckmeyer
34#
45# Licensed under the Apache License, Version 2.0 (the "License");
56# you may not use this file except in compliance with the License.
1213# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1314# See the License for the specific language governing permissions and
1415# limitations under the License.
16+ from gevent import monkey ; monkey .patch_all ()
17+
1518from .api import MatrixHttpApi
1619from .errors import MatrixRequestError , MatrixUnexpectedResponse
1720from .room import Room
1821from .user import User
19- from threading import Thread
20- from time import sleep
22+ from .queue import RequestQueue
23+ import gevent
24+ import gevent .pool
25+ from gevent .event import AsyncResult
26+ from functools import partial
2127from uuid import uuid4
2228import logging
2329import sys
@@ -59,8 +65,8 @@ def global_callback(incoming_event):
5965
6066 """
6167
62- def __init__ (self , base_url , token = None , user_id = None ,
63- valid_cert_check = True , sync_filter_limit = 20 ):
68+ def __init__ (self , base_url , token = None , user_id = None , valid_cert_check = True ,
69+ sync_filter_limit = 20 , async = False , num_threads = 10 ):
6470 """ Create a new Matrix Client object.
6571
6672 Args:
@@ -73,13 +79,22 @@ def __init__(self, base_url, token=None, user_id=None,
7379 the token) if supplying a token; otherwise, ignored.
7480 valid_cert_check (bool): Check the homeservers
7581 certificate on connections?
82+ async (bool): Run the client in async mode; if `True`, methods
83+ return `AsyncResult`s instead of blocking on api calls.
84+ num_threads (int): Number of greenlets with which to make
85+ matrix requests. Only evaluated if `async`.
7686
7787 Returns:
7888 MatrixClient
7989
8090 Raises:
8191 MatrixRequestError, ValueError
8292 """
93+ # Set properties that may be overwritten if async
94+ self .queue = None
95+ self .thread_pool = None
96+ self ._call = self ._sync_call
97+
8398 if token is not None and user_id is None :
8499 raise ValueError ("must supply user_id along with token" )
85100
@@ -96,6 +111,14 @@ def __init__(self, base_url, token=None, user_id=None,
96111 self .sync_thread = None
97112 self .should_listen = False
98113
114+ # Create queue and threads that read from it
115+ if async :
116+ self ._call = self ._async_call
117+ self .queue = RequestQueue ()
118+ self .thread_pool = gevent .pool .Pool (size = num_threads )
119+ while not self .thread_pool .full ():
120+ self .thread_pool .add (gevent .spawn (self .queue .call_forever ))
121+
99122 """ Time to wait before attempting a /sync request after failing."""
100123 self .bad_sync_timeout_limit = 60 * 60
101124 self .rooms = {
@@ -116,9 +139,14 @@ def set_user_id(self, user_id):
116139
117140 def register_as_guest (self ):
118141 """ Register a guest account on this HS.
142+
143+ Note: Registration and login methods are always synchronous.
144+
119145 Note: HS must have guest registration enabled.
146+
120147 Returns:
121148 str: Access Token
149+
122150 Raises:
123151 MatrixRequestError
124152 """
@@ -128,6 +156,8 @@ def register_as_guest(self):
128156 def register_with_password (self , username , password ):
129157 """ Register for a new account on this HS.
130158
159+ Note: Registration and login methods are always synchronous.
160+
131161 Args:
132162 username (str): Account username
133163 password (str): Account password
@@ -158,6 +188,8 @@ def _post_registration(self, response):
158188 def login_with_password_no_sync (self , username , password ):
159189 """ Login to the homeserver.
160190
191+ Note: Registration and login methods are always synchronous.
192+
161193 Args:
162194 username (str): Account username
163195 password (str): Account password
@@ -182,6 +214,8 @@ def login_with_password_no_sync(self, username, password):
182214 def login_with_password (self , username , password , limit = 10 ):
183215 """ Login to the homeserver.
184216
217+ Note: Registration and login methods are always synchronous.
218+
185219 Args:
186220 username (str): Account username
187221 password (str): Account password
@@ -203,6 +237,8 @@ def login_with_password(self, username, password, limit=10):
203237
204238 def logout (self ):
205239 """ Logout from the homeserver.
240+
241+ Note: Registration and login methods are synchronous.
206242 """
207243 self .stop_listener_thread ()
208244 self .api .logout ()
@@ -217,12 +253,17 @@ def create_room(self, alias=None, is_public=False, invitees=()):
217253
218254 Returns:
219255 Room
256+ or
257+ AsyncResult(Room)
220258
221259 Raises:
222260 MatrixRequestError
223261 """
224- response = self .api .create_room (alias , is_public , invitees )
225- return self ._mkroom (response ["room_id" ])
262+ out = self ._call (
263+ partial (self .api .create_room , alias , is_public , invitees ),
264+ self ._mkroom
265+ )
266+ return out
226267
227268 def join_room (self , room_id_or_alias ):
228269 """ Join a room.
@@ -232,15 +273,17 @@ def join_room(self, room_id_or_alias):
232273
233274 Returns:
234275 Room
276+ or
277+ AsyncResult(Room)
235278
236279 Raises:
237280 MatrixRequestError
238281 """
239- response = self .api . join_room ( room_id_or_alias )
240- room_id = (
241- response [ "room_id" ] if "room_id" in response else room_id_or_alias
282+ out = self ._call (
283+ partial ( self . api . join_room , room_id_or_alias ),
284+ partial ( self . _mkroom , room_id_or_alias = room_id_or_alias )
242285 )
243- return self . _mkroom ( room_id )
286+ return out
244287
245288 def get_rooms (self ):
246289 """ Return a dict of {room_id: Room objects} that the user has joined.
@@ -360,7 +403,7 @@ def listen_forever(self, timeout_ms=30000, exception_handler=None):
360403 if e .code >= 500 :
361404 logger .warning ("Problem occured serverside. Waiting %i seconds" ,
362405 bad_sync_timeout )
363- sleep (bad_sync_timeout )
406+ gevent . sleep (bad_sync_timeout )
364407 bad_sync_timeout = min (bad_sync_timeout * 2 ,
365408 self .bad_sync_timeout_limit )
366409 else :
@@ -375,6 +418,9 @@ def listen_forever(self, timeout_ms=30000, exception_handler=None):
375418 def start_listener_thread (self , timeout_ms = 30000 , exception_handler = None ):
376419 """ Start a listener thread to listen for events in the background.
377420
421+ Note that as of right now this thread is responsible for calling
422+ listener callbacks as well as for syncing with the homeserver.
423+
378424 Args:
379425 timeout (int): How long to poll the Home Server for before
380426 retrying.
@@ -383,12 +429,10 @@ def start_listener_thread(self, timeout_ms=30000, exception_handler=None):
383429 thread.
384430 """
385431 try :
386- thread = Thread (target = self .listen_forever ,
387- args = (timeout_ms , exception_handler ))
388- thread .daemon = True
432+ thread = gevent .spawn (self .listen_forever ,
433+ timeout_ms , exception_handler )
389434 self .sync_thread = thread
390435 self .should_listen = True
391- thread .start ()
392436 except :
393437 e = sys .exc_info ()[0 ]
394438 logger .error ("Error: unable to start thread. %s" , str (e ))
@@ -413,21 +457,40 @@ def upload(self, content, content_type):
413457 MatrixRequestError: If the upload failed for some reason.
414458 """
415459 try :
416- response = self .api .media_upload (content , content_type )
417- if "content_uri" in response :
418- return response ["content_uri" ]
419- else :
420- raise MatrixUnexpectedResponse (
421- "The upload was successful, but content_uri wasn't found."
422- )
460+ # If not async, exceptions can be handled and logged
461+ return self ._call (
462+ partial (self ._media_upload , content , content_type ),
463+ self ._upload
464+ )
465+ except MatrixRequestError as e :
466+ raise MatrixRequestError (
467+ code = e .code ,
468+ content = "Upload failed: %s" % e
469+ )
470+
471+ def _media_upload (self , content , content_type ):
472+ """Wraps `self.api.media_upload` to allow error handling."""
473+ try :
474+ return self .api .media_upload (content , content_type )
423475 except MatrixRequestError as e :
424476 raise MatrixRequestError (
425477 code = e .code ,
426478 content = "Upload failed: %s" % e
427479 )
428480
429- def _mkroom (self , room_id ):
430- self .rooms [room_id ] = Room (self , room_id )
481+ def _upload (self , response ):
482+ """Helper function to be used as callback by `self.upload`"""
483+ if "content_uri" in response :
484+ return response ["content_uri" ]
485+ else :
486+ raise MatrixUnexpectedResponse (
487+ "The upload was successful, but content_uri wasn't found."
488+ )
489+
490+ def _mkroom (self , response = None , room_id_or_alias = None ):
491+ if response and "room_id" in response :
492+ room_id_or_alias = response ["room_id" ]
493+ self .rooms [room_id_or_alias ] = Room (self , room_id )
431494 return self .rooms [room_id ]
432495
433496 def _process_state_event (self , state_event , current_room ):
@@ -447,11 +510,12 @@ def _process_state_event(self, state_event, current_room):
447510 listener ['event_type' ] is None or
448511 listener ['event_type' ] == state_event ['type' ]
449512 ):
450- listener ['callback' ]( state_event )
513+ gevent . spawn ( listener ['callback' ], state_event )
451514
452515 def _sync (self , timeout_ms = 30000 ):
453516 # TODO: Deal with presence
454517 # TODO: Deal with left rooms
518+ # TODO: Use gevent pool with queue to call listeners
455519 response = self .api .sync (self .sync_token , timeout_ms , filter = self .sync_filter )
456520 self .sync_token = response ["next_batch" ]
457521
@@ -467,7 +531,7 @@ def _sync(self, timeout_ms=30000):
467531
468532 for room_id , sync_room in response ['rooms' ]['join' ].items ():
469533 if room_id not in self .rooms :
470- self ._mkroom (room_id )
534+ self ._mkroom (room_id_or_alias = room_id )
471535 room = self .rooms [room_id ]
472536 room .prev_batch = sync_room ["timeline" ]["prev_batch" ]
473537
@@ -507,8 +571,7 @@ def get_user(self, user_id):
507571 Args:
508572 user_id (str): The matrix user id of a user.
509573 """
510-
511- return User (self .api , user_id )
574+ return User (self .api , user_id , self ._call )
512575
513576 def remove_room_alias (self , room_alias ):
514577 """Remove mapping of an alias
@@ -524,3 +587,15 @@ def remove_room_alias(self, room_alias):
524587 return True
525588 except MatrixRequestError :
526589 return False
590+
591+ def _async_call (self , api_callback , result_callback ):
592+ # First create an AsyncResult for retrieving server response
593+ api_result = AsyncResult ()
594+ self .queue .put ((api_callback , api_result ))
595+ # Then create AsyncResult for the result of the callback on the response
596+ final_result = AsyncResult ()
597+ gevent .spawn (lambda : result_callback (api_result .get ())).link (final_result )
598+ return final_result
599+
600+ def _sync_call (self , api_callback , result_callback ):
601+ return result_callback (api_callback ())
0 commit comments