Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 37 additions & 7 deletions dingtalk_stream/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, credential: Credential, logger: logging.Logger = None):
self._pre_started = False
self._is_event_required = False
self._access_token = {}
self._running_tasks = set()

def register_all_event_handler(self, handler: EventHandler):
handler.dingtalk_client = self
Expand All @@ -59,37 +60,50 @@ def pre_start(self):

async def start(self):
self.pre_start()
self._running_tasks = set()

while True:
keepalive_task = None
try:
connection = self.open_connection()

if not connection:
self.logger.error('open connection failed')
await asyncio.sleep(10)
await asyncio.sleep(5)
continue
self.logger.info('endpoint is %s', connection)

uri = f'{connection["endpoint"]}?ticket={quote_plus(connection["ticket"])}'
async with websockets.connect(uri) as websocket:
self.websocket = websocket
asyncio.create_task(self.keepalive(websocket))
keepalive_task = asyncio.create_task(self.keepalive(websocket))
async for raw_message in websocket:
json_message = json.loads(raw_message)
asyncio.create_task(self.background_task(json_message))
try:
json_message = json.loads(raw_message)
except json.JSONDecodeError as e:
self.logger.error('invalid json message, error=%s, raw=%s', e, raw_message[:200])
continue
task = asyncio.create_task(self.background_task(json_message))
self._running_tasks.add(task)
task.add_done_callback(self._task_done_callback)
except KeyboardInterrupt as e:
break
except (asyncio.exceptions.CancelledError,
websockets.exceptions.ConnectionClosedError) as e:
self.logger.error('[start] network exception, error=%s', e)
await asyncio.sleep(10)
await asyncio.sleep(3)
continue
except Exception as e:
await asyncio.sleep(3)
self.logger.exception('unknown exception', e)
continue
finally:
pass
if keepalive_task:
keepalive_task.cancel()
try:
await keepalive_task
except asyncio.CancelledError:
pass

async def keepalive(self, ws, ping_interval=60):
while True:
Expand All @@ -100,12 +114,28 @@ async def keepalive(self, ws, ping_interval=60):
break

async def background_task(self, json_message):
message_id = json_message.get('headers', {}).get('messageId', 'unknown')
try:
route_result = await self.route_message(json_message)
if route_result == DingTalkStreamClient.TAG_DISCONNECT:
await self.websocket.close()
except Exception as e:
self.logger.error(f"error processing message: {e}")
self.logger.error(
"error processing message, messageId=%s, type=%s, topic=%s, error=%s",
message_id,
json_message.get('type'),
json_message.get('headers', {}).get('topic'),
e,
exc_info=True,
)

def _task_done_callback(self, task):
self._running_tasks.discard(task)
if task.cancelled():
return
error = task.exception()
if error:
self.logger.error('background task failed, error=%s', error, exc_info=error)

async def route_message(self, json_message):
result = ''
Expand Down