Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions mergin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class TokenError(Exception):
pass


class AuthTokenExpiredError(Exception):
pass


class ServerType(Enum):
OLD = auto() # Server is old and does not support workspaces
CE = auto() # Server is Community Edition
Expand Down Expand Up @@ -80,7 +84,15 @@ class MerginClient:
Currently, only HTTP proxies are supported.
"""

def __init__(self, url=None, auth_token=None, login=None, password=None, plugin_version=None, proxy_config=None):
def __init__(
self,
url=None,
auth_token=None,
login=None,
password=None,
plugin_version=None,
proxy_config=None,
):
self.url = url if url is not None else MerginClient.default_url()
self._auth_params = None
self._auth_session = None
Expand Down Expand Up @@ -134,15 +146,20 @@ def __init__(self, url=None, auth_token=None, login=None, password=None, plugin_
self.opener = urllib.request.build_opener(*handlers, https_handler)
urllib.request.install_opener(self.opener)

if login and not password:
raise ClientError("Unable to log in: no password provided for '{}'".format(login))
if password and not login:
raise ClientError("Unable to log in: password provided but no username/email")
if login or password:
if login and not password:
raise ClientError("Unable to log in: no password provided for '{}'".format(login))
if password and not login:
raise ClientError("Unable to log in: password provided but no username/email")

if login and password:
self._auth_params = {"login": login, "password": password}
if login and password:
self._auth_params = {"login": login, "password": password}
if not self._auth_session:
self.login(login, password)

else:
if not self._auth_session:
self.login(login, password)
raise ClientError("Unable to log in: no auth token provided for login")

def setup_logging(self):
"""Setup Mergin Maps client logging."""
Expand Down Expand Up @@ -190,11 +207,19 @@ def wrapper(self, *args):
delta = self._auth_session["expire"] - datetime.now(timezone.utc)
if delta.total_seconds() < 5:
self.log.info("Token has expired - refreshing...")
self.login(self._auth_params["login"], self._auth_params["password"])
if self._auth_params.get("login", None) and self._auth_params.get("password", None):
self.log.info("Token has expired - refreshing...")
self.login(self._auth_params["login"], self._auth_params["password"])
else:
raise AuthTokenExpiredError("Token has expired - please re-login")
else:
# Create a new authorization token
self.log.info(f"No token - login user: {self._auth_params['login']}")
self.login(self._auth_params["login"], self._auth_params["password"])
if self._auth_params.get("login", None) and self._auth_params.get("password", None):
self.login(self._auth_params["login"], self._auth_params["password"])
else:
raise ClientError("Missing login or password")

return f(self, *args)

return wrapper
Expand Down