Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.12
4 changes: 2 additions & 2 deletions dingtalk_stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from .frames import CallbackMessage
from .frames import SystemMessage
from .frames import AckMessage
from .chatbot import ChatbotMessage, RichTextContent, ImageContent, reply_specified_single_chat, \
reply_specified_group_chat
from .chatbot import ChatbotMessage, RichTextContent, ImageContent, FileContent, AudioContent, VideoContent, \
reply_specified_single_chat, reply_specified_group_chat
from .chatbot import TextContent
from .chatbot import AtUser
from .chatbot import ChatbotHandler, AsyncChatbotHandler
Expand Down
137 changes: 137 additions & 0 deletions dingtalk_stream/chatbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,99 @@ def to_dict(self):
if self.download_code is not None:
result['downloadCode'] = self.download_code
return result

class FileContent(object):

def __init__(self):
self.download_code = None
self.file_name = None
self.extensions = {}

@classmethod
def from_dict(cls, d):
content = FileContent()
for name, value in d.items():
if name == 'downloadCode':
content.download_code = value
elif name == 'fileName':
content.file_name = value
else:
content.extensions[name] = value
return content

def to_dict(self):
result = self.extensions.copy()
if self.download_code is not None:
result['downloadCode'] = self.download_code
if self.file_name is not None:
result['fileName'] = self.file_name
return result


class AudioContent(object):

def __init__(self):
self.download_code = None
self.recognition = None
self.duration = None
self.extensions = {}

@classmethod
def from_dict(cls, d):
content = AudioContent()
for name, value in d.items():
if name == 'downloadCode':
content.download_code = value
elif name == 'recognition':
content.recognition = value
elif name == 'duration':
content.duration = value
else:
content.extensions[name] = value
return content

def to_dict(self):
result = self.extensions.copy()
if self.download_code is not None:
result['downloadCode'] = self.download_code
if self.recognition is not None:
result['recognition'] = self.recognition
if self.duration is not None:
result['duration'] = self.duration
return result


class VideoContent(object):

def __init__(self):
self.download_code = None
self.video_type = None
self.duration = None
self.extensions = {}

@classmethod
def from_dict(cls, d):
content = VideoContent()
for name, value in d.items():
if name == 'downloadCode':
content.download_code = value
elif name == 'videoType':
content.video_type = value
elif name == 'duration':
content.duration = value
else:
content.extensions[name] = value
return content

def to_dict(self):
result = self.extensions.copy()
if self.download_code is not None:
result['downloadCode'] = self.download_code
if self.video_type is not None:
result['videoType'] = self.video_type
if self.duration is not None:
result['duration'] = self.duration
return result


class RichTextContent(object):
Expand Down Expand Up @@ -181,6 +274,9 @@ def __init__(self):
self.message_type = None
self.image_content = None
self.rich_text_content = None
self.file_content = None
self.audio_content = None
self.video_content = None
self.sender_staff_id = None
self.hosting_context: HostingContext = None
self.conversation_msg_context = None
Expand Down Expand Up @@ -232,6 +328,12 @@ def from_dict(cls, d):
msg.image_content = ImageContent.from_dict(d['content'])
elif value == 'richText':
msg.rich_text_content = RichTextContent.from_dict(d['content'])
elif value == "file":
msg.file_content = FileContent.from_dict(d['content'])
elif value == "audio":
msg.audio_content = AudioContent.from_dict(d['content'])
elif value == "video":
msg.video_content = VideoContent.from_dict(d['content'])
elif name == 'senderStaffId':
msg.sender_staff_id = value
elif name == 'hostingContext':
Expand Down Expand Up @@ -281,6 +383,12 @@ def to_dict(self):
result['content'] = self.image_content.to_dict()
if self.rich_text_content is not None:
result['content'] = self.rich_text_content.to_dict()
if self.file_content is not None:
result['content'] = self.file_content.to_dict()
if self.audio_content is not None:
result['content'] = self.audio_content.to_dict()
if self.video_content is not None:
result['content'] = self.video_content.to_dict()
if self.conversation_type is not None:
result['conversationType'] = self.conversation_type
if self.at_users is not None:
Expand Down Expand Up @@ -323,6 +431,27 @@ def get_image_list(self):

return images

def get_file_list(self):
if self.message_type != 'file':
return None
if self.file_content is None or self.file_content.download_code is None:
return None
return [self.file_content.download_code]

def get_audio_list(self):
if self.message_type != 'audio':
return None
if self.audio_content is None or self.audio_content.download_code is None:
return None
return [self.audio_content.download_code]

def get_video_list(self):
if self.message_type != 'video':
return None
if self.video_content is None or self.video_content.download_code is None:
return None
return [self.video_content.download_code]

def __str__(self):
return 'ChatbotMessage(message_type=%s, text=%s, sender_nick=%s, conversation_title=%s)' % (
self.message_type,
Expand Down Expand Up @@ -564,6 +693,14 @@ def get_image_download_url(self, download_code: str) -> str:
return ""
return response.json()["downloadUrl"]

def get_file_download_url(self, download_code: str) -> str:
"""
根据downloadCode获取文件下载链接 https://open.dingtalk.com/document/isvapp/download-the-file-content-of-the-robot-receiving-message
:param download_code:
:return:
"""
return self.get_image_download_url(download_code)

def set_off_duty_prompt(self, text: str, title: str = "", logo: str = ""):
"""
设置离线提示词,需要使用OpenAPI,当前仅支持自建应用。
Expand Down
106 changes: 106 additions & 0 deletions examples/filebot/filebot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# !/usr/bin/env python

import argparse
import logging
from dingtalk_stream import AckMessage
import dingtalk_stream


def setup_logger():
logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter('%(asctime)s %(name)-8s %(levelname)-8s %(message)s [%(filename)s:%(lineno)d]'))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger


def define_options():
parser = argparse.ArgumentParser()
parser.add_argument(
'--client_id', dest='client_id', required=True,
help='app_key or suite_key from https://open-dev.digntalk.com'
)
parser.add_argument(
'--client_secret', dest='client_secret', required=True,
help='app_secret or suite_secret from https://open-dev.digntalk.com'
)
options = parser.parse_args()
return options


class FileBotHandler(dingtalk_stream.ChatbotHandler):
def __init__(self, logger: logging.Logger = None):
super(dingtalk_stream.ChatbotHandler, self).__init__()
if logger:
self.logger = logger

async def process(self, callback: dingtalk_stream.CallbackMessage):
incoming_message = dingtalk_stream.ChatbotMessage.from_dict(callback.data)
if incoming_message.message_type not in ('file', 'audio', 'video'):
return AckMessage.STATUS_OK, 'OK'

download_codes = None
if incoming_message.message_type == 'file':
download_codes = incoming_message.get_file_list()
elif incoming_message.message_type == 'audio':
download_codes = incoming_message.get_audio_list()
else:
download_codes = incoming_message.get_video_list()

lines = []
if incoming_message.message_type == 'file':
if incoming_message.file_content is not None and incoming_message.file_content.file_name:
lines.append('File: %s' % incoming_message.file_content.file_name)
else:
lines.append('File received')
elif incoming_message.message_type == 'audio':
lines.append('Audio received')
if incoming_message.audio_content is not None:
if incoming_message.audio_content.duration is not None:
lines.append('Duration: %s' % incoming_message.audio_content.duration)
if incoming_message.audio_content.recognition:
lines.append('Recognition: %s' % incoming_message.audio_content.recognition)
else:
lines.append('Video received')
if incoming_message.video_content is not None:
if incoming_message.video_content.duration is not None:
lines.append('Duration: %s' % incoming_message.video_content.duration)
if incoming_message.video_content.video_type:
lines.append('Video Type: %s' % incoming_message.video_content.video_type)

if download_codes:
download_urls = []
for download_code in download_codes:
download_url = self.get_file_download_url(download_code)
if download_url:
self.logger.info('%s download url: %s', incoming_message.message_type, download_url)
download_urls.append(download_url)

if download_urls:
if len(download_urls) == 1:
lines.append('Download URL: %s' % download_urls[0])
else:
lines.append('Download URLs:')
lines.extend(['%d. %s' % (idx + 1, url) for idx, url in enumerate(download_urls)])

if lines:
response = '\n'.join(lines)
self.reply_text(response, incoming_message)

return AckMessage.STATUS_OK, 'OK'


def main():
logger = setup_logger()
options = define_options()

credential = dingtalk_stream.Credential(options.client_id, options.client_secret)
client = dingtalk_stream.DingTalkStreamClient(credential)
client.register_callback_handler(dingtalk_stream.chatbot.ChatbotMessage.TOPIC, FileBotHandler(logger))
client.start_forever()


if __name__ == '__main__':
main()
11 changes: 11 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[project]
name = "dingtalk-stream-sdk-python"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"aiohttp>=3.13.3",
"requests>=2.32.5",
"websockets>=16.0",
]
Loading