Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ async def start_stream(

self._current = controller
logger.info(f"Starting {stream_type.name.title()} stream")
match (result := await self._current.start(options)):
# Type checker can't narrow the union type based on runtime isinstance checks
match (result := await self._current.start(options)): # type: ignore[arg-type]
case Success():
logger.info("Stream started successfully.")
case Failure():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,6 @@ class _InternalState(enum.IntFlag):
def ip_address(self) -> str:
"""The IP address of the GoPro device

Raises:
GoProNotOpened: The GoPro IP address is not yet available

Returns:
str: IP address
"""
Expand Down Expand Up @@ -369,7 +366,7 @@ def _build_http_request_args(self, message: HttpMessage) -> dict[str, Any]:
return request_args

@enforce_message_rules
async def _get_json(
async def _get_json( # type: ignore[override]
self,
message: HttpMessage,
*,
Expand Down Expand Up @@ -407,7 +404,7 @@ async def _get_json(
return response

@enforce_message_rules
async def _get_stream(
async def _get_stream( # type: ignore[override]
self,
message: HttpMessage,
*,
Expand All @@ -433,7 +430,7 @@ async def _get_stream(
return GoProResp(protocol=GoProResp.Protocol.HTTP, status=ErrorCode.SUCCESS, data=file, identifier=url)

@enforce_message_rules
async def _put_json(
async def _put_json( # type: ignore[override]
self,
message: HttpMessage,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -848,17 +848,17 @@ def _handle_cohn(self, message: HttpMessage) -> HttpMessage:
except GoProNotOpened:
return message

async def _get_json(self, message: HttpMessage, *args: Any, **kwargs: Any) -> GoProResp:
async def _get_json(self, message: HttpMessage, *args: Any, **kwargs: Any) -> GoProResp: # type: ignore[override]
message = self._handle_cohn(message)
return await super()._get_json(*args, message=message, **kwargs)
return await super()._get_json(*args, message=message, **kwargs) # type: ignore[call-arg]

async def _get_stream(self, message: HttpMessage, *args: Any, **kwargs: Any) -> GoProResp:
async def _get_stream(self, message: HttpMessage, *args: Any, **kwargs: Any) -> GoProResp: # type: ignore[override]
message = self._handle_cohn(message)
return await super()._get_stream(*args, message=message, **kwargs)
return await super()._get_stream(*args, message=message, **kwargs) # type: ignore[call-arg]

async def _put_json(self, message: HttpMessage, *args: Any, **kwargs: Any) -> GoProResp:
async def _put_json(self, message: HttpMessage, *args: Any, **kwargs: Any) -> GoProResp: # type: ignore[override]
message = self._handle_cohn(message)
return await super()._put_json(*args, message=message, **kwargs)
return await super()._put_json(*args, message=message, **kwargs) # type: ignore[call-arg]

@GoProBase._ensure_opened((GoProMessageInterface.BLE,))
async def _open_wifi(self, timeout: int = 30, retries: int = 5) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
class Hexlify(Adapter):
"""Construct adapter for pretty hex representation"""

def _decode(self, obj: bytes, context: Any, path: Any) -> str:
def _decode(self, obj: bytes, context: Any, path: Any) -> str: # pylint: disable=unused-argument
return obj.hex(":")

def _encode(self, obj: str, context: Any, path: Any) -> list[int]:
def _encode(self, obj: str, context: Any, path: Any) -> list[int]: # pylint: disable=unused-argument
return list(map(int, obj.split(":")))


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import bleak
import pexpect
from bleak.assigned_numbers import CharacteristicPropertyName
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.device import BLEDevice as BleakDevice
from bleak.backends.scanner import AdvertisementData
Expand Down Expand Up @@ -344,11 +345,11 @@ async def discover_chars(self, handle: bleak.BleakClient, uuids: type[UUIDs] | N
GattDB: Gatt Database
"""

def bleak_props_adapter(bleak_props: list[str]) -> CharProps:
def bleak_props_adapter(bleak_props: list[CharacteristicPropertyName]) -> CharProps:
"""Convert a list of bleak string properties into a CharProps

Args:
bleak_props (list[str]): bleak strings to convert
bleak_props (list[CharacteristicPropertyName]): bleak strings to convert

Returns:
CharProps: converted Enum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class Characteristic:
def __post_init__(self, init_descriptors: Optional[list[Descriptor]]) -> None:
self._descriptors: dict[BleUUID, Descriptor] = {}
# Mypy should eventually support this: see https://github.com/python/mypy/issues/3004
self.descriptors = init_descriptors or [] # type: ignore
self.descriptors = init_descriptors or []
if self.descriptor_handle is None:
self.descriptor_handle = self.handle + 1

Expand Down Expand Up @@ -296,7 +296,7 @@ class Service:
def __post_init__(self, init_characteristics: Optional[list[Characteristic]]) -> None:
self._characteristics: dict[BleUUID, Characteristic] = {}
# Mypy should eventually support this: see https://github.com/python/mypy/issues/3004
self.characteristics = init_characteristics or [] # type: ignore
self.characteristics = init_characteristics or []

def __str__(self) -> str:
return self.name
Expand Down Expand Up @@ -408,7 +408,7 @@ def iter_items():
def __init__(self, init_services: list[Service]) -> None:
self._services: dict[BleUUID, Service] = {}
# Mypy should eventually support this: see https://github.com/python/mypy/issues/3004
self.services = init_services # type: ignore
self.services = init_services
self.characteristics = self.CharacteristicView(self)

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def __init__(self, interface: str | None = None, password: str | None = None) ->
self._driver = self._detect_driver()

# Attempt to set interface (will raise an exception if not able to auto-detect)
self.interface = interface # type: ignore
self.interface = interface

logger.debug(f"Wifi setup. Using {self}")

Expand Down Expand Up @@ -230,7 +230,7 @@ def interface(self, interface: str | None) -> None:
Args:
interface (str | None): interface (or None)
"""
self._driver.interface = interface # type: ignore
self._driver.interface = interface

@property
def is_on(self) -> bool:
Expand Down Expand Up @@ -727,26 +727,36 @@ def _sync_connect() -> bool:
# Escape single quotes
ssid = ssid.replace(r"'", '''"'"''')

logger.info(f"Scanning for {ssid}...")
start = time.time()
discovered = False
while not discovered and (time.time() - start) <= timeout:
# Scan for network
response = cmd(r"/usr/sbin/system_profiler SPAirPortDataType")
regex = re.compile(
r"\n\s+([\x20-\x7E]{1,32}):\n\s+PHY Mode:"
) # 0x20...0x7E --> ASCII for printable characters
if ssid in sorted(regex.findall(response)):
break
time.sleep(1)
# On macOS 15+, system_profiler redacts SSID names, so we skip explicit scanning
# and rely on networksetup's internal scanning when attempting to connect
version = Version(platform.mac_ver()[0])
if version < Version("15"):
logger.info(f"Scanning for {ssid}...")
start = time.time()
discovered = False
while not discovered and (time.time() - start) <= timeout:
# Scan for network
response = cmd(r"/usr/sbin/system_profiler SPAirPortDataType")
regex = re.compile(
r"\n\s+([\x20-\x7E]{1,32}):\n\s+PHY Mode:"
) # 0x20...0x7E --> ASCII for printable characters
if ssid in sorted(regex.findall(response)):
break
time.sleep(1)
else:
logger.warning("Wifi Scan timed out")
return False
else:
logger.warning("Wifi Scan timed out")
return False
# On macOS 15+, we can't pre-scan due to SSID redaction, so we'll attempt
# connection directly. networksetup will scan internally and return error if not found.
logger.info(f"Attempting to connect to {ssid} without explicitly scanning first...")
# Check version once for use in connection logic

# If we're already connected, return
if self.current()[0] == ssid:
logger.info("Wifi already connected")
return True
# If we're already connected, return (skip on macOS 15+ where current() can hang)
if version < Version("15"):
if self.current()[0] == ssid:
logger.info("Wifi already connected")
return True

# Connect now that we found the ssid
logger.info(f"Connecting to {ssid}...")
Expand All @@ -755,6 +765,24 @@ def _sync_connect() -> bool:
if "not find" in response.lower():
logger.warning("Network was not found.")
return False

# Check if we're on macOS 15+ where SSID verification doesn't work
if version >= Version("15"):
# On macOS 15+, we can't verify the SSID due to redaction, so we just
# check that we're connected to *some* network after a brief wait
logger.debug("macOS 15+: Skipping SSID verification, checking for any connection...")
time.sleep(2) # Give connection time to establish
current, state = self.current()
if state == SsidState.CONNECTED:
logger.info("Connected to network (SSID redacted by macOS)")
# Additional delay for network to be ready
time.sleep(3)
return True
else:
logger.warning("No connection established after networksetup command")
return False

# For macOS < 15, we can verify the actual SSID
# Now wait for network to actually establish
current = self.current()[0]
logger.debug(f"current wifi: {current}")
Expand Down Expand Up @@ -784,31 +812,51 @@ async def disconnect(self) -> bool:
def current(self) -> tuple[str | None, SsidState]:
"""Get the currently connected SSID if there is one.

Note:
On macOS 15+, the SSID is redacted for privacy. In this case, this method
will return (None, SsidState.CONNECTED) when connected to a network, even
though the actual SSID cannot be determined.

Returns:
tuple[str | None, SsidState]: (SSID or None if not connected, SSID state)
tuple[str | None, SsidState]: (SSID or None if not connected/redacted, SSID state)
"""
# attempt to get current network
ssid: str | None = None
is_connected = False

# On MacOS <= 14...
version = Version(platform.mac_ver()[0])
try:
if version <= Version("14"):
if "Current Wi-Fi Network: " in (output := cmd(f"networksetup -getairportnetwork {self.interface}")):
ssid = output.replace("Current Wi-Fi Network: ", "").strip()
is_connected = True
elif version < Version("15.6"):
if match := re.search(r"\n\s+SSID : ([\x20-\x7E]{1,32})", cmd(f"ipconfig getsummary {self.interface}")):
ssid = match.group(1)
except:
is_connected = True
if ssid == "<redacted>":
ssid = None # Redacted, but we know we're connected
except Exception:
# Ignore exceptions here; fallback logic below will attempt to get the SSID using an alternative method.
pass

# For current MacOs versions or if above failed.
# TODO this should be parsed more generally but Apple is probably going to remove this functionality also...so
# I'm not going to bother. Assuming the current ID is only needed to prevent connecting "better" solution is
# try to communicate with the camera using a raw HTTP endpoint to get the camera name
ssid = cmd(
r"system_profiler SPAirPortDataType | sed -n '/Current Network Information:/,/PHY Mode:/ p' | head -2 | tail -1 | sed 's/^[[:space:]]*//' | sed 's/:$//'"
).strip()

return (ssid, SsidState.CONNECTED) if ssid else (None, SsidState.DISCONNECTED)
if not ssid and not is_connected:
output = cmd(
r"system_profiler SPAirPortDataType | sed -n '/Current Network Information:/,/PHY Mode:/ p' | head -2 | tail -1 | sed 's/^[[:space:]]*//' | sed 's/:$//'"
).strip()
if output and output != "":
is_connected = True
if output != "<redacted>":
ssid = output
# else: connected but redacted, ssid remains None

# Determine state based on connection status
if is_connected:
return (ssid, SsidState.CONNECTED)
else:
return (None, SsidState.DISCONNECTED)

def available_interfaces(self) -> list[str]:
"""Return a list of available Wifi Interface strings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,14 @@ async def close(self) -> None:
def is_connected(self) -> bool:
"""Is the WiFi connection currently established?

Note:
On macOS 15+, the SSID name is redacted for privacy, so this method
only checks the connection state, not the SSID name.

Returns:
bool: True if yes, False if no
"""
(ssid, state) = self._controller.current()
return ssid is not None and state is SsidState.CONNECTED
# On modern macOS (15+), SSID may be None even when connected due to privacy redaction
# So we primarily check the connection state
return state is SsidState.CONNECTED
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,14 @@ async def disconnect(self) -> bool:
def current(self) -> tuple[Optional[str], SsidState]:
"""Return the SSID and state of the current network.

Note:
On macOS 15+, SSID names are redacted for privacy. In this case, SSID may be None
even when state is CONNECTED. Use the state field to determine connection status.

Returns:
tuple[Optional[str], SsidState]: Tuple of SSID str and state. If SSID is None,
there is no current connection.
tuple[Optional[str], SsidState]: Tuple of SSID str and state. SSID may be None if:
- Not connected to any network
- Connected but SSID is redacted (macOS 15+)
"""

@abstractmethod
Expand Down
Loading