Skip to content
Closed
238 changes: 233 additions & 5 deletions garminconnect/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import contextlib
import json
import logging
import random
import re
import time
from pathlib import Path
from typing import Any

Expand Down Expand Up @@ -165,13 +168,29 @@ def login(
"""Log in to Garmin Connect.

Tries multiple login strategies in order:
1. Portal web flow with curl_cffi (desktop browser TLS + UA)
2. Portal web flow with plain requests (desktop browser UA)
3. Mobile SSO with curl_cffi (Android WebView TLS)
4. Mobile SSO with plain requests (last resort)
1. SSO embed widget with curl_cffi (HTML form, no clientId)
2. Portal web flow with curl_cffi (desktop browser TLS + UA)
3. Portal web flow with plain requests (desktop browser UA)
4. Mobile SSO with curl_cffi (Android WebView TLS)
5. Mobile SSO with plain requests (last resort)
"""
# Clear any leftover widget MFA state from a prior abandoned attempt
# so resume_login() does not incorrectly route to the widget path.
for attr in (
"_widget_session",
"_widget_signin_params",
"_widget_last_resp",
):
if hasattr(self, attr):
delattr(self, attr)

strategies: list[tuple[str, Any]] = []

# SSO embed widget — uses /sso/embed + /sso/signin HTML form flow.
# No clientId parameter, so not subject to per-client rate limiting.
if HAS_CFFI:
strategies.append(("widget+cffi", self._widget_login_cffi))
Comment thread
coderabbitai[bot] marked this conversation as resolved.

Comment thread
diegoscarabelli marked this conversation as resolved.
# Portal web login — uses /portal/api/login with desktop Chrome UA.
# This is the endpoint connect.garmin.com itself uses, so Cloudflare
# cannot block it without breaking their own website.
Expand Down Expand Up @@ -217,6 +236,198 @@ def login(
f"All login strategies failed. Last error: {last_err}"
)

# ------------------------------------------------------------------
# SSO embed widget login (HTML form flow, no clientId)
# ------------------------------------------------------------------

_CSRF_RE = re.compile(r'name="_csrf"\s+value="(.+?)"')
_TITLE_RE = re.compile(r"<title>(.+?)</title>")
Comment thread
diegoscarabelli marked this conversation as resolved.

def _widget_login_cffi(
self,
email: str,
password: str,
prompt_mfa: Any = None,
return_on_mfa: bool = False,
) -> tuple[str | None, Any]:
"""Login via the SSO embed widget using curl_cffi TLS impersonation.

This is the classic HTML form-based flow (used by the garth library
for years). It does not use a clientId parameter, so it is not
subject to the per-client rate limiting that affects the portal
and mobile JSON API endpoints.

Requires curl_cffi for TLS fingerprint impersonation to pass
Cloudflare bot detection.
"""
sess: Any = cffi_requests.Session(impersonate="chrome", timeout=30)

sso_base = f"{self._sso}/sso"
sso_embed = f"{sso_base}/embed"
embed_params = {
"id": "gauth-widget",
"embedWidget": "true",
"gauthHost": sso_base,
}
signin_params = {
**embed_params,
"gauthHost": sso_embed,
"service": sso_embed,
"source": sso_embed,
"redirectAfterAccountLoginUrl": sso_embed,
"redirectAfterAccountCreationUrl": sso_embed,
}
Comment thread
diegoscarabelli marked this conversation as resolved.

# Step 1: GET /sso/embed to establish session cookies
r = sess.get(sso_embed, params=embed_params)
if r.status_code == 429:
raise GarminConnectTooManyRequestsError(
"Widget login returned 429 on embed page"
)
if not r.ok:
raise GarminConnectConnectionError(
f"Widget login: embed page returned HTTP {r.status_code}"
Comment thread
diegoscarabelli marked this conversation as resolved.
)

# Step 2: GET /sso/signin to obtain CSRF token
r = sess.get(
f"{sso_base}/signin",
params=signin_params,
headers={"Referer": sso_embed},
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
if r.status_code == 429:
raise GarminConnectTooManyRequestsError(
"Widget login returned 429 on sign-in page"
)
csrf_match = self._CSRF_RE.search(r.text)
if not csrf_match:
raise GarminConnectConnectionError(
"Widget login: could not find CSRF token in sign-in page"
)

# Step 3: POST credentials via HTML form
r = sess.post(
f"{sso_base}/signin",
params=signin_params,
headers={"Referer": r.url},
data={
"username": email,
"password": password,
"embed": "true",
"_csrf": csrf_match.group(1),
},
timeout=30,
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if r.status_code == 429:
raise GarminConnectTooManyRequestsError(
"Widget login returned 429"
)
if not r.ok:
raise GarminConnectConnectionError(
f"Widget login: credential POST returned HTTP {r.status_code}"
)

title_match = self._TITLE_RE.search(r.text)
title = title_match.group(1) if title_match else ""

# Step 4: Handle MFA
if "MFA" in title:
self._widget_session = sess
self._widget_signin_params = signin_params
self._widget_last_resp = r

if return_on_mfa:
return "needs_mfa", sess

if prompt_mfa:
mfa_code = prompt_mfa()
Comment thread
diegoscarabelli marked this conversation as resolved.
ticket = self._complete_mfa_widget(mfa_code)
self._establish_session(
ticket, sess=sess, service_url=sso_embed
Comment thread
diegoscarabelli marked this conversation as resolved.
)
del self._widget_session
del self._widget_signin_params
del self._widget_last_resp
return None, None
Comment thread
coderabbitai[bot] marked this conversation as resolved.
raise GarminConnectAuthenticationError(
"MFA Required but no prompt_mfa mechanism supplied"
)

if title != "Success":
# Detect credential failures to prevent falling through
# to other strategies with bad credentials
title_lower = title.lower()
if any(
hint in title_lower
for hint in ("locked", "invalid", "error", "incorrect")
):
raise GarminConnectAuthenticationError(
f"Widget login: authentication failed ('{title}')"
)
raise GarminConnectConnectionError(
f"Widget login: unexpected title '{title}'"
)
Comment thread
diegoscarabelli marked this conversation as resolved.
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
coderabbitai[bot] marked this conversation as resolved.

# Step 5: Extract service ticket from success page
ticket_match = re.search(r'embed\?ticket=([^"]+)"', r.text)
if not ticket_match:
raise GarminConnectConnectionError(
"Widget login: could not find service ticket in response"
)
self._establish_session(
ticket_match.group(1), sess=sess, service_url=sso_embed
)
return None, None

def _complete_mfa_widget(self, mfa_code: str) -> str:
"""Complete MFA for the widget flow, return the service ticket."""
sess = self._widget_session
r = self._widget_last_resp

csrf_match = self._CSRF_RE.search(r.text)
if not csrf_match:
raise GarminConnectAuthenticationError(
"Widget MFA: could not find CSRF token"
)

r = sess.post(
f"{self._sso}/sso/verifyMFA/loginEnterMfaCode",
params=self._widget_signin_params,
headers={"Referer": r.url},
data={
"mfa-code": mfa_code,
"embed": "true",
"_csrf": csrf_match.group(1),
"fromPage": "setupEnterMfaCode",
},
timeout=30,
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if r.status_code == 429:
raise GarminConnectTooManyRequestsError(
"Widget MFA returned 429"
)
if not r.ok:
raise GarminConnectConnectionError(
f"Widget MFA: verify endpoint returned HTTP {r.status_code}"
)

title_match = self._TITLE_RE.search(r.text)
title = title_match.group(1) if title_match else ""

if title != "Success":
raise GarminConnectAuthenticationError(
f"Widget MFA verification failed: '{title}'"
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Comment thread
coderabbitai[bot] marked this conversation as resolved.

ticket_match = re.search(r'embed\?ticket=([^"]+)"', r.text)
if not ticket_match:
raise GarminConnectAuthenticationError(
"Widget MFA: could not find service ticket"
)
return ticket_match.group(1)

# ------------------------------------------------------------------
# Portal web login (desktop browser flow)
# ------------------------------------------------------------------
Expand Down Expand Up @@ -302,6 +513,14 @@ def _portal_web_login(
timeout=30,
)

# Garmin's Cloudflare WAF rate-limits requests that go directly from
# the SSO page GET to the login POST without intervening activity.
# A random 30-45s delay mimics natural browser behavior and
# consistently avoids the 429 block. (Adapted from upstream PR #346.)
delay_s = random.uniform(30, 45)
_LOGGER.debug("Portal login: sleeping %.1fs before credential POST", delay_s)
time.sleep(delay_s)
Comment on lines +516 to +522
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Only sleep after the sign-in GET has actually succeeded.

Because the GET on Line 506 still discards its response, a 429/5xx sign-in page now incurs the full 30–45s delay and then still sends the credential POST. With both portal strategies in the fallback chain, that can add 60–90s of avoidable latency before mobile auth even starts.

🔧 Proposed fix
-        sess.get(
+        r = sess.get(
             signin_url,
             params={
                 "clientId": PORTAL_SSO_CLIENT_ID,
                 "service": PORTAL_SSO_SERVICE_URL,
             },
             headers=get_headers,
             timeout=30,
         )
+        if r.status_code == 429:
+            raise GarminConnectTooManyRequestsError(
+                "Portal sign-in page returned 429. Cloudflare is blocking this request."
+            )
+        if not r.ok:
+            raise GarminConnectConnectionError(
+                f"Portal sign-in page returned HTTP {r.status_code}"
+            )
 
         # Garmin's Cloudflare WAF rate-limits requests that go directly from
         # the SSO page GET to the login POST without intervening activity.
         delay_s = random.uniform(30, 45)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@garminconnect/client.py` around lines 516 - 522, Only sleep before the
credential POST when the preceding sign-in GET actually succeeded: capture the
GET response (do not discard it), check that it returned a successful status and
the expected sign-in page content, and only then compute delay_s and call
_LOGGER.debug/time.sleep before issuing the POST; if the GET returned a 429/5xx
or unexpected content, skip the 30–45s sleep and handle or retry the GET error
path instead. Ensure you modify the code around the sign-in GET, delay_s,
_LOGGER.debug and time.sleep usage so the delay is conditional on a successful
GET.


# Step 2: POST credentials to the portal login API
login_url = f"{self._sso}/portal/api/login"
login_params = {
Expand Down Expand Up @@ -984,7 +1203,16 @@ def delete(self, _domain: str, path: str, **kwargs: Any) -> Any:
return resp

def resume_login(self, _client_state: Any, mfa_code: str) -> tuple[str | None, Any]:
if hasattr(self, "_mfa_portal_web_session"):
if hasattr(self, "_widget_session"):
ticket = self._complete_mfa_widget(mfa_code)
sso_embed = f"{self._sso}/sso/embed"
self._establish_session(
ticket, sess=self._widget_session, service_url=sso_embed
)
del self._widget_session
del self._widget_signin_params
del self._widget_last_resp
elif hasattr(self, "_mfa_portal_web_session"):
Comment thread
diegoscarabelli marked this conversation as resolved.
self._complete_mfa_portal_web(mfa_code)
elif hasattr(self, "_mfa_cffi_session"):
self._complete_mfa_portal(mfa_code)
Expand Down