Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion aiogram_tests/handler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,9 @@
from .handler import MessageHandler
from .handler import TelegramEventObserverHandler

__all__ = ["MessageHandler", "CallbackQueryHandler", "TelegramEventObserverHandler", "RequestHandler"]
__all__ = [
"MessageHandler",
"CallbackQueryHandler",
"TelegramEventObserverHandler",
"RequestHandler",
]
42 changes: 17 additions & 25 deletions aiogram_tests/handler/base.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,26 @@
from typing import Iterable
from typing import Iterable, Any
from typing import List
from typing import Optional
from typing import Type

from aiogram import BaseMiddleware
from aiogram import Bot
from aiogram import Dispatcher
from aiogram.dispatcher.event.telegram import TelegramEventObserver
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.methods import TelegramMethod
from aiogram.methods.base import Response
from aiogram.methods.base import TelegramType
from aiogram.types import Chat
from aiogram.types import User

from aiogram_tests.mocked_bot import MockedBot
from aiogram_tests.types.dataset import CHAT
from aiogram_tests.types.dataset import USER


class RequestHandler:
def __init__(
self,
dp_middlewares: Iterable[BaseMiddleware] = None,
exclude_observer_methods: Iterable[str] = None,
auto_mock_success: bool = False,
dp: Optional[Dispatcher] = None,
**kwargs,
dp_middlewares: Iterable[BaseMiddleware] | None = None,
exclude_observer_methods: Iterable[str] | None = None,
dp: Dispatcher | None = None,
**kwargs: Any,
):
self.bot = MockedBot(auto_mock_success=auto_mock_success)
self.bot = MockedBot()
if dp is None:
dp = Dispatcher(storage=MemoryStorage())
self.dp = dp
Expand All @@ -40,13 +32,11 @@ def __init__(
exclude_observer_methods = []

dispatcher_methods = self._get_dispatcher_event_observers()
available_methods = tuple(set(dispatcher_methods) - set(exclude_observer_methods))
available_methods = tuple(
set(dispatcher_methods) - set(exclude_observer_methods)
)
self._register_middlewares(available_methods, tuple(dp_middlewares))

Bot.set_current(self.bot)
User.set_current(USER.as_object())
Chat.set_current(CHAT.as_object())

def _get_dispatcher_event_observers(self) -> List[str]:
"""
Returns a names for bot event observers, like message, callback_query etc.
Expand All @@ -59,7 +49,9 @@ def _get_dispatcher_event_observers(self) -> List[str]:

return result

def _register_middlewares(self, event_observer: Iterable, middlewares: Iterable) -> None:
def _register_middlewares(
self, event_observer: Iterable, middlewares: Iterable
) -> None:
for eo_name in event_observer:
for m in middlewares:
eo_obj = getattr(self.dp, eo_name)
Expand All @@ -70,13 +62,13 @@ async def __call__(self, *args, **kwargs):

def add_result_for(
self,
method: Type[TelegramMethod[TelegramType]],
method: TelegramMethod[TelegramType],
ok: bool,
result: TelegramType = None,
description: Optional[str] = None,
result: TelegramType | None = None,
description: str | None = None,
error_code: int = 200,
migrate_to_chat_id: Optional[int] = None,
retry_after: Optional[int] = None,
migrate_to_chat_id: int | None = None,
retry_after: int | None = None,
) -> Response[TelegramType]:
response = self.bot.add_result_for(
method=method,
Expand Down
80 changes: 59 additions & 21 deletions aiogram_tests/handler/handler.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from typing import Callable
from typing import Dict
from typing import Callable, Any
from typing import Iterable
from typing import List
from typing import Union

from aiogram import types
from aiogram.filters import Filter
Expand All @@ -17,18 +15,18 @@ def __init__(
self,
callback: Callable,
*filters: Filter,
state: Union[State, str, None] = None,
state_data: Dict = None,
dp_middlewares: Iterable = None,
exclude_observer_methods: Iterable = None,
state: State | str | None = None,
state_data: dict[str, Any] | None = None,
dp_middlewares: Iterable | None = None,
exclude_observer_methods: Iterable | None = None,
**kwargs,
):
super().__init__(dp_middlewares, exclude_observer_methods, **kwargs)

self._callback = callback
self._filters: List = list(filters)
self._state: Union[State, str, None] = state
self._state_data: Dict = state_data
self._state: State | str | None = state
self._state_data: dict[str, Any] | None = state_data

if self._state_data is None:
self._state_data = {}
Expand All @@ -46,7 +44,9 @@ async def __call__(self, *args, **kwargs):
self.register_handler()

if self._state:
state = self.dp.fsm.get_context(self.bot, user_id=12345678, chat_id=12345678)
state = self.dp.fsm.get_context(
self.bot, user_id=12345678, chat_id=12345678
)
await state.set_state(self._state)
await state.update_data(**self._state_data)

Expand All @@ -72,10 +72,10 @@ def __init__(
self,
callback: Callable,
*filters: Filter,
state: Union[State, str, None] = None,
state_data: Dict = None,
dp_middlewares: Iterable = None,
exclude_observer_methods: Iterable = None,
state: State | str | None = None,
state_data: dict[str, Any] | None = None,
dp_middlewares: Iterable | None = None,
exclude_observer_methods: Iterable | None = None,
**kwargs,
):
super().__init__(
Expand All @@ -92,18 +92,20 @@ def register_handler(self) -> None:
self.dp.message.register(self._callback, *self._filters)

async def feed_update(self, message: types.Message, *args, **kwargs) -> None:
await self.dp.feed_update(self.bot, types.Update(update_id=12345678, message=message))
await self.dp.feed_update(
self.bot, types.Update(update_id=12345678, message=message)
)


class CallbackQueryHandler(TelegramEventObserverHandler):
def __init__(
self,
callback: Callable,
*filters: Filter,
state: Union[State, str, None] = None,
state_data: Dict = None,
dp_middlewares: Iterable = None,
exclude_observer_methods: Iterable = None,
state: State | str | None = None,
state_data: dict[str, Any] | None = None,
dp_middlewares: Iterable | None = None,
exclude_observer_methods: Iterable | None = None,
**kwargs,
):
super().__init__(
Expand All @@ -119,5 +121,41 @@ def __init__(
def register_handler(self) -> None:
self.dp.callback_query.register(self._callback, *self._filters)

async def feed_update(self, callback_query: types.CallbackQuery, *args, **kwargs) -> None:
await self.dp.feed_update(self.bot, types.Update(update_id=12345678, callback_query=callback_query))
async def feed_update(
self, callback_query: types.CallbackQuery, *args, **kwargs
) -> None:
await self.dp.feed_update(
self.bot, types.Update(update_id=12345678, callback_query=callback_query)
)


class MyChatMemberHandler(TelegramEventObserverHandler):
def __init__(
self,
callback: Callable,
*filters: Filter,
state: State | str | None = None,
state_data: dict[str, Any] | None = None,
dp_middlewares: Iterable | None = None,
exclude_observer_methods: Iterable | None = None,
**kwargs,
):
super().__init__(
callback,
*filters,
state=state,
state_data=state_data,
dp_middlewares=dp_middlewares,
exclude_observer_methods=exclude_observer_methods,
**kwargs,
)

def register_handler(self) -> None:
self.dp.my_chat_member.register(self._callback, *self._filters)

async def feed_update(
self, my_chat_member: types.ChatMemberUpdated, *args, **kwargs
) -> None:
await self.dp.feed_update(
self.bot, types.Update(update_id=12345678, my_chat_member=my_chat_member)
)
72 changes: 42 additions & 30 deletions aiogram_tests/mocked_bot.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,70 @@
from collections import deque
from typing import AsyncGenerator
from typing import AsyncGenerator, Dict, Any, TYPE_CHECKING, cast
from typing import Deque
from typing import Optional
from typing import Type
from typing import Union

from aiogram import Bot
from aiogram.client.session.base import BaseSession
from aiogram.methods import TelegramMethod
from aiogram.methods.base import Request
from aiogram.methods.base import Response
from aiogram.methods.base import TelegramType
from aiogram.types import ResponseParameters
from aiogram.types import UNSET
from aiogram.types import ResponseParameters, UNSET_PARSE_MODE
from aiogram.types import User


class MockedSession(BaseSession):
def __init__(self):
super().__init__()
super(MockedSession, self).__init__()
self.responses: Deque[Response[TelegramType]] = deque()
self.requests: Deque[Request] = deque()
self.requests: Deque[TelegramMethod[TelegramType]] = deque()
self.closed = True

def add_result(self, response: Response[TelegramType]) -> Response[TelegramType]:
self.responses.appendleft(response)
return response

def get_request(self) -> Union[Request, None]:
if self.requests:
return self.requests[-1]

return None
def get_request(self) -> TelegramMethod[TelegramType]:
return self.requests.pop()

async def close(self):
self.closed = True

async def make_request(
self, bot: Bot, method: TelegramMethod[TelegramType], timeout: Optional[int] = UNSET
self,
bot: Bot,
method: TelegramMethod[TelegramType],
timeout: Optional[int] = UNSET_PARSE_MODE,
) -> TelegramType:
self.closed = False
self.requests.append(method.build_request(bot))
self.requests.append(method)
response: Response[TelegramType] = self.responses.pop()
self.check_response(method=method, status_code=response.error_code, content=response.json())
self.check_response(
bot=bot,
method=method,
status_code=cast(int, response.error_code),
content=response.model_dump_json(),
)
return response.result # type: ignore

async def stream_content(
self, url: str, timeout: int, chunk_size: int
self,
url: str,
headers: Optional[Dict[str, Any]] = None,
timeout: int = 30,
chunk_size: int = 65536,
raise_for_status: bool = True,
) -> AsyncGenerator[bytes, None]: # pragma: no cover
yield b""


class MockedBot(Bot):
def __init__(self, auto_mock_success=False, **kwargs):
super().__init__(kwargs.pop("token", "42:TEST"), session=MockedSession(), **kwargs)
self.session = MockedSession()
if TYPE_CHECKING:
session: MockedSession

def __init__(self, **kwargs):
super(MockedBot, self).__init__(
kwargs.pop("token", "42:TEST"), session=MockedSession(), **kwargs
)
self._me = User(
id=self.id,
is_bot=True,
Expand All @@ -63,17 +73,16 @@ def __init__(self, auto_mock_success=False, **kwargs):
username="username",
language_code="ru",
)
self.auto_mock_success = auto_mock_success

def add_result_for(
self,
method: Type[TelegramMethod[TelegramType]],
method: TelegramMethod[TelegramType],
ok: bool,
result: TelegramType = None,
description: Optional[str] = None,
result: TelegramType | None = None,
description: str | None = None,
error_code: int = 200,
migrate_to_chat_id: Optional[int] = None,
retry_after: Optional[int] = None,
migrate_to_chat_id: int | None = None,
retry_after: int | None = None,
) -> Response[TelegramType]:
response = Response[method.__returning__]( # type: ignore
ok=ok,
Expand All @@ -88,10 +97,13 @@ def add_result_for(
self.session.add_result(response)
return response

async def __call__(self, method: Type[TelegramMethod[TelegramType]], request_timeout: Optional[int] = None):
if self.auto_mock_success:
self.add_result_for(method, ok=True)
async def __call__(
self,
method: TelegramMethod[TelegramType],
request_timeout: Optional[int] = None,
):
self.add_result_for(method, ok=True)
return await super().__call__(method, request_timeout)

def get_request(self) -> Request:
def get_request(self) -> TelegramMethod[TelegramType]:
return self.session.get_request()
Loading