Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Run this command to always ignore formatting commits in `git blame`
# git config blame.ignoreRevsFile .git-blame-ignore-revs

# Formatting commit
3d2c9960d987b6ff0be180cadc9f9a895ee70f37
2 changes: 1 addition & 1 deletion .github/workflows/documentation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Generate the documentation
run: (cd doc && make html)
- name: Upload documentation bundle
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: documentation
path: doc/build/html/
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ build/
dist/
ledgerblue.egg-info/
__pycache__
.python-version

__version__.py
19 changes: 19 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.2.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.3.7
hooks:
# Run the linter.
- id: ruff
args: [ --fix ]
# Run the formatter.
- id: ruff-format
22 changes: 12 additions & 10 deletions doc/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,37 @@

from ledgerblue.__version__ import __version__


def setup(app):
app.add_css_file('theme_overrides.css') # Override wide tables in RTD theme
app.add_css_file("theme_overrides.css") # Override wide tables in RTD theme


# General Configuration
# =====================

extensions = []

source_suffix = ['.rst']
source_suffix = [".rst"]

master_doc = 'index'
master_doc = "index"

project = u'BOLOS Python Loader'
copyright = u'2017, Ledger Team'
author = u'Ledger Team'
project = "BOLOS Python Loader"
copyright = "2017, Ledger Team"
author = "Ledger Team"

version = __version__
release = __version__

pygments_style = 'sphinx'
pygments_style = "sphinx"

# Options for HTML Output
# =======================

html_theme = 'sphinx_rtd_theme'
html_theme = "sphinx_rtd_theme"

html_static_path = ['_static']
html_static_path = ["_static"]

# sphinxarg
# =========

extensions += ['sphinxarg.ext']
extensions += ["sphinxarg.ext"]
85 changes: 60 additions & 25 deletions ledgerblue/BleComm.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,34 @@

TAG_ID = b"\x05"


def get_argparser():
parser = argparse.ArgumentParser(description="Manage ledger ble devices.")
parser.add_argument("--show", help="Show currently selected ledger device.", action='store_true')
parser.add_argument("--demo", help="Get version demo (connect to ble device, send get version, print response and disconnect).", action='store_true')
parser.add_argument(
"--show", help="Show currently selected ledger device.", action="store_true"
)
parser.add_argument(
"--demo",
help="Get version demo (connect to ble device, send get version, print response and disconnect).",
action="store_true",
)
return parser


class NoLedgerDeviceDetected(Exception):
pass


class BleScanner(object):
def __init__(self):
self.devices = []

def __scan_callback(self, device: BLEDevice, advertisement_data: AdvertisementData):
if LEDGER_SERVICE_UUID_STAX in advertisement_data.service_uuids or LEDGER_SERVICE_UUID_NANOX in advertisement_data.service_uuids or LEDGER_SERVICE_UUID_EUROPA in advertisement_data.service_uuids:
if (
LEDGER_SERVICE_UUID_STAX in advertisement_data.service_uuids
or LEDGER_SERVICE_UUID_NANOX in advertisement_data.service_uuids
or LEDGER_SERVICE_UUID_EUROPA in advertisement_data.service_uuids
):
device_is_in_list = False
for dev in self.devices:
if device.address == dev[0]:
Expand All @@ -45,12 +58,15 @@ async def scan(self):
counter += 1
await scanner.stop()


queue: asyncio.Queue = asyncio.Queue()


def callback(sender, data):
response = bytes(data)
queue.put_nowait(response)


async def _get_client(address: str) -> BleakClient:
# Connect to client
client = BleakClient(address)
Expand All @@ -60,7 +76,11 @@ async def _get_client(address: str) -> BleakClient:
characteristic_write_with_rsp = None
characteristic_write_cmd = None
for service in client.services:
if service.uuid in [LEDGER_SERVICE_UUID_NANOX, LEDGER_SERVICE_UUID_STAX, LEDGER_SERVICE_UUID_EUROPA]:
if service.uuid in [
LEDGER_SERVICE_UUID_NANOX,
LEDGER_SERVICE_UUID_STAX,
LEDGER_SERVICE_UUID_EUROPA,
]:
for char in service.characteristics:
if "0001" in char.uuid:
characteristic_notify = char
Expand All @@ -86,44 +106,45 @@ async def _get_client(address: str) -> BleakClient:
# Get MTU value
await client.write_gatt_char(characteristic_write.uuid, bytes.fromhex("0800000000"))
response = await queue.get()
mtu = int.from_bytes(response[5:6], 'big')
mtu = int.from_bytes(response[5:6], "big")

print("[BLE] MTU {:d}".format(mtu))

return client, mtu, characteristic_write


async def _read(mtu) -> bytes:
response = await queue.get()
assert len(response) >= 5
assert response[0] == TAG_ID[0]
sequence = int.from_bytes(response[1:3], 'big')
sequence = int.from_bytes(response[1:3], "big")
assert sequence == 0
total_size = int.from_bytes(response[3:5], "big")

total_size_bkup = total_size
if total_size >= (mtu-5):
if total_size >= (mtu - 5):
apdu = response[5:mtu]
total_size -= (mtu-5)
total_size -= mtu - 5

response = await queue.get()
assert response[0] == TAG_ID[0]
assert len(response) >= 3
next_sequence = int.from_bytes(response[1:3], 'big')
assert next_sequence == sequence+1
next_sequence = int.from_bytes(response[1:3], "big")
assert next_sequence == sequence + 1
sequence = next_sequence

while total_size >= (mtu-3):
while total_size >= (mtu - 3):
apdu += response[3:mtu]
total_size -= (mtu-3)
total_size -= mtu - 3
response = await queue.get()
assert response[0] == TAG_ID[0]
assert len(response) >= 3
sequence = int.from_bytes(response[1:3], 'big')
assert next_sequence == sequence+1
sequence = int.from_bytes(response[1:3], "big")
assert next_sequence == sequence + 1
sequence = next_sequence

if total_size > 0:
apdu += response[3:3+total_size]
apdu += response[3 : 3 + total_size]
else:
apdu = response[5:]

Expand Down Expand Up @@ -162,7 +183,9 @@ def __init__(self, address):

def open(self):
self.loop = asyncio.new_event_loop()
self.client, self.mtu, self.write_characteristic = self.loop.run_until_complete(_get_client(self.address))
self.client, self.mtu, self.write_characteristic = self.loop.run_until_complete(
_get_client(self.address)
)
self.opened = True

def close(self):
Expand All @@ -172,7 +195,9 @@ def close(self):
self.loop.close()

def __write(self, data: bytes):
self.loop.run_until_complete(_write(self.client, data, self.write_characteristic, self.mtu))
self.loop.run_until_complete(
_write(self.client, data, self.write_characteristic, self.mtu)
)

def __read(self) -> bytes:
return self.loop.run_until_complete(_read(self.mtu))
Expand All @@ -181,18 +206,25 @@ def exchange(self, data: bytes, timeout=1000) -> bytes:
self.__write(data)
return self.__read()


if __name__ == "__main__":
args = get_argparser().parse_args()
try:
if args.show:
print(f"Environment variable LEDGER_BLE_MAC currently set to '{os.environ['LEDGER_BLE_MAC']}'")
print(
f"Environment variable LEDGER_BLE_MAC currently set to '{os.environ['LEDGER_BLE_MAC']}'"
)
elif args.demo:
try:
ble_device = BleDevice(os.environ['LEDGER_BLE_MAC'])
ble_device = BleDevice(os.environ["LEDGER_BLE_MAC"])
except Exception as ex:
print(f"Please run 'python -m ledgerblue.BleComm' to select wich device to connect to")
print(
f"Please run 'python -m ledgerblue.BleComm' to select wich device to connect to"
)
raise ex
print("-----------------------------Get version BLE demo------------------------------")
print(
"-----------------------------Get version BLE demo------------------------------"
)
ble_device.open()
print(f"Connected to {ble_device.address}")
get_version_apdu = bytes.fromhex("e001000000")
Expand All @@ -201,22 +233,25 @@ def exchange(self, data: bytes, timeout=1000) -> bytes:
print(f"[BLE] <= {result.hex()}")
ble_device.close()
print(f"Disconnected from {ble_device.address}")
print(79*"-")
print(79 * "-")
else:
scanner = BleScanner()
asyncio.run(scanner.scan())
devices_str = ""
device_idx = 0
if len(scanner.devices):
for device in scanner.devices:
devices_str += f" -{device_idx+1}- mac=\"{device[0]}\" name=\"{device[1]}\"\n"
devices_str += (
f' -{device_idx + 1}- mac="{device[0]}" name="{device[1]}"\n'
)
device_idx += 1
index = int(input(f"Select device by index\n{devices_str}"))
print(f"Please run 'export LEDGER_BLE_MAC=\"{scanner.devices[index-1][0]}\"' to select which device to connect to")
print(
f"Please run 'export LEDGER_BLE_MAC=\"{scanner.devices[index - 1][0]}\"' to select which device to connect to"
)
else:
raise NoLedgerDeviceDetected
except NoLedgerDeviceDetected as ex:
print(ex)
except Exception as ex:
raise ex

Loading