Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 7 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pip install pyoverkiz
import asyncio
import time

from pyoverkiz.auth.credentials import UsernamePasswordCredentials
from pyoverkiz.const import SUPPORTED_SERVERS
from pyoverkiz.client import OverkizClient
from pyoverkiz.enums import Server
Expand All @@ -47,7 +48,8 @@ PASSWORD = ""

async def main() -> None:
async with OverkizClient(
USERNAME, PASSWORD, server=SUPPORTED_SERVERS[Server.SOMFY_EUROPE]
server=Server.SOMFY_EUROPE,
credentials=UsernamePasswordCredentials(USERNAME, PASSWORD),
) as client:
try:
await client.login()
Expand Down Expand Up @@ -76,38 +78,22 @@ asyncio.run(main())
```python
import asyncio
import time
import aiohttp

from pyoverkiz.auth.credentials import LocalTokenCredentials
from pyoverkiz.client import OverkizClient
from pyoverkiz.const import SUPPORTED_SERVERS, OverkizServer
from pyoverkiz.enums import Server
from pyoverkiz.utils import create_local_server_config

USERNAME = ""
PASSWORD = ""
LOCAL_GATEWAY = "gateway-xxxx-xxxx-xxxx.local" # or use the IP address of your gateway
VERIFY_SSL = True # set verify_ssl to False if you don't use the .local hostname


async def main() -> None:
token = "" # generate your token via the Somfy app and include it here

# Local Connection
session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(verify_ssl=VERIFY_SSL)
)

async with OverkizClient(
username="",
password="",
token=token,
session=session,
server=create_local_server_config(host=LOCAL_GATEWAY),
credentials=LocalTokenCredentials(token),
verify_ssl=VERIFY_SSL,
server=OverkizServer(
name="Somfy TaHoma (local)",
endpoint=f"https://{LOCAL_GATEWAY}:8443/enduser-mobile-web/1/enduserAPI/",
manufacturer="Somfy",
configuration_url=None,
),
) as client:
await client.login()

Expand Down
24 changes: 24 additions & 0 deletions pyoverkiz/auth/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Authentication module for pyoverkiz."""

from __future__ import annotations

from pyoverkiz.auth.base import AuthContext, AuthStrategy
from pyoverkiz.auth.credentials import (
Credentials,
LocalTokenCredentials,
RexelOAuthCodeCredentials,
TokenCredentials,
UsernamePasswordCredentials,
)
from pyoverkiz.auth.factory import build_auth_strategy

__all__ = [
"AuthContext",
"AuthStrategy",
"Credentials",
"LocalTokenCredentials",
"RexelOAuthCodeCredentials",
"TokenCredentials",
"UsernamePasswordCredentials",
"build_auth_strategy",
]
42 changes: 42 additions & 0 deletions pyoverkiz/auth/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Base classes for authentication strategies."""

from __future__ import annotations

import datetime
from collections.abc import Mapping
from dataclasses import dataclass
from typing import Protocol


@dataclass(slots=True)
class AuthContext:
"""Authentication context holding tokens and expiration."""

access_token: str | None = None
refresh_token: str | None = None
expires_at: datetime.datetime | None = None

def is_expired(self, *, skew_seconds: int = 5) -> bool:
"""Check if the access token is expired, considering a skew time."""
if not self.expires_at:
return False

return datetime.datetime.now() >= self.expires_at - datetime.timedelta(
seconds=skew_seconds
)


class AuthStrategy(Protocol):
"""Protocol for authentication strategies."""

async def login(self) -> None:
"""Perform login to obtain tokens."""

async def refresh_if_needed(self) -> bool:
"""Refresh tokens if they are expired. Return True if refreshed."""

def auth_headers(self, path: str | None = None) -> Mapping[str, str]:
"""Generate authentication headers for requests."""

async def close(self) -> None:
"""Clean up any resources held by the strategy."""
37 changes: 37 additions & 0 deletions pyoverkiz/auth/credentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Credentials for authentication strategies."""

from __future__ import annotations

from dataclasses import dataclass


class Credentials:
"""Marker base class for auth credentials."""


@dataclass(slots=True)
class UsernamePasswordCredentials(Credentials):
"""Credentials using username and password."""

username: str
password: str


@dataclass(slots=True)
class TokenCredentials(Credentials):
"""Credentials using an (API) token."""

token: str


@dataclass(slots=True)
class LocalTokenCredentials(TokenCredentials):
"""Credentials using a local API token."""


@dataclass(slots=True)
class RexelOAuthCodeCredentials(Credentials):
"""Credentials using Rexel OAuth2 authorization code."""

code: str
redirect_uri: str
127 changes: 127 additions & 0 deletions pyoverkiz/auth/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""Factory to build authentication strategies based on server and credentials."""

from __future__ import annotations

import ssl

from aiohttp import ClientSession

from pyoverkiz.auth.credentials import (
Credentials,
LocalTokenCredentials,
RexelOAuthCodeCredentials,
TokenCredentials,
UsernamePasswordCredentials,
)
from pyoverkiz.auth.strategies import (
AuthStrategy,
BearerTokenAuthStrategy,
CozytouchAuthStrategy,
LocalTokenAuthStrategy,
NexityAuthStrategy,
RexelAuthStrategy,
SessionLoginStrategy,
SomfyAuthStrategy,
)
from pyoverkiz.enums import APIType, Server
from pyoverkiz.models import ServerConfig


def build_auth_strategy(
*,
server_config: ServerConfig,
credentials: Credentials,
session: ClientSession,
ssl_context: ssl.SSLContext | bool,
) -> AuthStrategy:
"""Build the correct auth strategy for the given server and credentials."""
server: Server | None = server_config.server

if server == Server.SOMFY_EUROPE:
return SomfyAuthStrategy(
_ensure_username_password(credentials),
session,
server_config,
ssl_context,
server_config.type,
)

if server in {
Server.ATLANTIC_COZYTOUCH,
Server.THERMOR_COZYTOUCH,
Server.SAUTER_COZYTOUCH,
}:
return CozytouchAuthStrategy(
_ensure_username_password(credentials),
session,
server_config,
ssl_context,
server_config.type,
)

if server == Server.NEXITY:
return NexityAuthStrategy(
_ensure_username_password(credentials),
session,
server_config,
ssl_context,
server_config.type,
)

if server == Server.REXEL:
return RexelAuthStrategy(
_ensure_rexel(credentials),
session,
server_config,
ssl_context,
server_config.type,
)

if server_config.type == APIType.LOCAL:
if isinstance(credentials, LocalTokenCredentials):
return LocalTokenAuthStrategy(
credentials, session, server_config, ssl_context, server_config.type
)
return BearerTokenAuthStrategy(
_ensure_token(credentials),
session,
server_config,
ssl_context,
server_config.type,
)

if isinstance(credentials, TokenCredentials) and not isinstance(
credentials, LocalTokenCredentials
):
return BearerTokenAuthStrategy(
credentials, session, server_config, ssl_context, server_config.type
)

return SessionLoginStrategy(
_ensure_username_password(credentials),
session,
server_config,
ssl_context,
server_config.type,
)


def _ensure_username_password(credentials: Credentials) -> UsernamePasswordCredentials:
"""Validate that credentials are username/password based."""
if not isinstance(credentials, UsernamePasswordCredentials):
raise TypeError("UsernamePasswordCredentials are required for this server.")
return credentials


def _ensure_token(credentials: Credentials) -> TokenCredentials:
"""Validate that credentials carry a bearer token."""
if not isinstance(credentials, TokenCredentials):
raise TypeError("TokenCredentials are required for this server.")
return credentials


def _ensure_rexel(credentials: Credentials) -> RexelOAuthCodeCredentials:
"""Validate that credentials are of Rexel OAuth code type."""
if not isinstance(credentials, RexelOAuthCodeCredentials):
raise TypeError("RexelOAuthCodeCredentials are required for this server.")
return credentials
Loading