Skip to content
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11']
python-version: ['3.8', '3.9', '3.10', '3.11', '3.12', '3.13', '3.14']

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
Expand Down
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[MESSAGES CONTROL]
disable=invalid-name, missing-docstring, locally-disabled, unbalanced-tuple-unpacking,no-else-return,fixme,duplicate-code,cyclic-import,import-outside-toplevel,consider-using-with,consider-using-f-string,unspecified-encoding
disable=invalid-name, missing-docstring, locally-disabled, unbalanced-tuple-unpacking,no-else-return,fixme,duplicate-code,cyclic-import,import-outside-toplevel,consider-using-with,consider-using-f-string,unspecified-encoding,too-many-statements,too-many-branches,too-many-positional-arguments,possibly-used-before-assignment,inconsistent-return-statements,deprecated-class
[SIMILARITIES]
min-similarity-lines=5
3 changes: 1 addition & 2 deletions libagent/age/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import os
import sys
import traceback
from importlib import metadata

import bech32
from importlib import metadata
import semver
from cryptography.exceptions import InvalidTag
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
Expand Down Expand Up @@ -154,7 +154,6 @@ def main(device_type):
p = argparse.ArgumentParser()

agent_package = device_type.package_name()
resources_map = {r.key: r for r in pkg_resources.require(agent_package)}
resources = [metadata.distribution(agent_package), metadata.distribution('lib-agent')]
versions = '\n'.join('{}={}'.format(r.metadata['Name'], r.version) for r in resources)
p.add_argument('--version', help='print the version info',
Expand Down
8 changes: 4 additions & 4 deletions libagent/age/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def ecdh(self, identity, peer_pubkey):
pubkey=peer_pubkey, identity=identity)
assert result[:1] == b"\x04"
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=((peer_pubkey + self_pubkey)),
info=b"age-encryption.org/v1/X25519")
algorithm=hashes.SHA256(),
length=32,
salt=((peer_pubkey + self_pubkey)),
info=b"age-encryption.org/v1/X25519")
return hkdf.derive(result[1:])
1 change: 0 additions & 1 deletion libagent/device/ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from .. import formats
from . import interface
from .. import formats

log = logging.getLogger(__name__)

Expand Down
47 changes: 25 additions & 22 deletions libagent/device/onlykey.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
# pylint: disable=attribute-defined-outside-init
"""OnlyKey-related code (see https://www.onlykey.io/)."""

import logging
import hashlib
import codecs
import hashlib
import logging
import os
from os import path
import re
import time
from os import path

import ecdsa
import nacl.signing
import unidecode
import re

from . import interface

Expand Down Expand Up @@ -48,8 +49,8 @@ def connect(self):
self.okversion = self.okversion[8:]
if self.okversion[0] == 'v':
break
except Exception:
raise interface.NotFoundError('{} not connected: "{}"')
except Exception as exc:
raise interface.NotFoundError('{} not connected: "{}"') from exc

def set_skey(self, skey):
"""Set signing key to use."""
Expand All @@ -73,6 +74,7 @@ def import_pub(self, pubkey):
# self.import_pubkey_bytes = bytes(self.import_pubkey_obj)

def get_key_by_keygrip(self, keygrip):
"""Return key-slot info for the given keygrip."""
if keygrip is None:
return None
keygriplong = keygrip
Expand All @@ -92,19 +94,21 @@ def get_key_by_keygrip(self, keygrip):
raise KeyError('keygrip %s not found' % keygriplong)
return None


def get_sk_dk(self):
"""Get signing key and decryption key slots from config."""
fpath = os.path.join(os.environ.get('AGENTHOMEDIR', os.environ.get('GNUPGHOME')), 'run-agent.sh')
fpath = os.path.join(os.environ.get(
'AGENTHOMEDIR', os.environ.get('GNUPGHOME')), 'run-agent.sh')
log.debug('Path to run-agent.sh = %s', fpath)
if path.exists(fpath):
with open(fpath) as f:
s = f.read()
if '--skey-slot=ECC' in s:
if s[s.find('--skey-slot=')+16:s.find('--skey-slot=')+17] == ' ':
self.set_skey(int(s[s.find('--skey-slot=')+15:s.find('--skey-slot=')+16])+100)
self.set_skey(
int(s[s.find('--skey-slot=')+15:s.find('--skey-slot=')+16])+100)
else:
self.set_skey(int(s[s.find('--skey-slot=')+15:s.find('--skey-slot=')+17])+100)
self.set_skey(
int(s[s.find('--skey-slot=')+15:s.find('--skey-slot=')+17])+100)
elif '--skey-slot=RSA' in s:
self.set_skey(int(s[s.find('--skey-slot=')+15:s.find('--skey-slot=')+16]))
elif '--skey-slot=' in s:
Expand All @@ -114,9 +118,11 @@ def get_sk_dk(self):
self.set_skey(int(s[s.find('--skey-slot=')+12:s.find('--skey-slot=')+15]))
if '--dkey-slot=ECC' in s:
if s[s.find('--dkey-slot=')+16:s.find('--dkey-slot=')+17] == ' ':
self.set_dkey(int(s[s.find('--dkey-slot=')+15:s.find('--dkey-slot=')+16])+100)
self.set_dkey(
int(s[s.find('--dkey-slot=')+15:s.find('--dkey-slot=')+16])+100)
else:
self.set_dkey(int(s[s.find('--dkey-slot=')+15:s.find('--dkey-slot=')+17])+100)
self.set_dkey(
int(s[s.find('--dkey-slot=')+15:s.find('--dkey-slot=')+17])+100)
elif '--dkey-slot=RSA' in s:
self.set_dkey(int(s[s.find('--dkey-slot=')+15:s.find('--dkey-slot=')+16]))
elif '--dkey-slot=' in s:
Expand Down Expand Up @@ -253,18 +259,16 @@ def pubkey(self, identity, ecdh=False):
if identity.identity_dict['proto'] == 'ssh':
# https://security.stackexchange.com/questions/42268/how-do-i-get-the-rsa-bit-length-with-the-pubkey-and-openssl
ok_pubkey = b'\x00\x00\x00\x07' + b'\x73\x73\x68\x2d\x72\x73\x61' + \
b'\x00\x00\x00\x03' + b'\x01\x00\x01' + \
b'\x00\x00\x01\x01' + b'\x00' + bytes(ok_pubkey)
# ok_pubkey = b'\x00\x00\x00\x07' + b'\x72\x73\x61\x2d\x73\x68\x61\x32\x2d\x32\x35\x
# 36' + b'\x00\x00\x00\x03' + b'\x01\x00\x01' + b'\x00\x00\x01\x01' + b'\x00' + byte
# s(ok_pubkey)
b'\x00\x00\x00\x03' + b'\x01\x00\x01' + \
b'\x00\x00\x01\x01' + b'\x00' + bytes(ok_pubkey)
# (legacy commented-out raw SSH RSA pubkey assembly omitted)
else:
ok_pubkey = bytes(ok_pubkey)
elif len(ok_pubkey) == 512:
if identity.identity_dict['proto'] == 'ssh':
ok_pubkey = b'\x00\x00\x00\x07' + b'\x73\x73\x68\x2d\x72\x73\x61' + \
b'\x00\x00\x00\x03' + b'\x01\x00\x01' + \
b'\x00\x00\x02\x01' + b'\x00' + bytes(ok_pubkey)
b'\x00\x00\x00\x03' + b'\x01\x00\x01' + \
b'\x00\x00\x02\x01' + b'\x00' + bytes(ok_pubkey)
else:
ok_pubkey = bytes(ok_pubkey)
else:
Expand Down Expand Up @@ -404,8 +408,6 @@ def ecdh_with_pubkey(self, identity, pubkey):
self_pubkey = self.pubkey(ecdh=False, identity=identity)
log.info('Using self_pubkey= %s', self_pubkey)
session_key = self.ecdh(identity, pubkey)
if self_pubkey:
self_pubkey = self_pubkey
return session_key, self_pubkey

def ecdh(self, identity, pubkey):
Expand Down Expand Up @@ -502,7 +504,8 @@ def get_button(self, byte):
else:
return byte % 6 + 1

def convert_keyslot (self, s):

def convert_keyslot(self, s): # pylint: disable=unused-argument
"""Return key slot number."""
if 'ECC' in s:
if len(s) == 5:
Expand Down
1 change: 0 additions & 1 deletion libagent/device/trezor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from .. import formats
from . import interface
from .. import formats

log = logging.getLogger(__name__)

Expand Down
1 change: 0 additions & 1 deletion libagent/device/trezor_defs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""TREZOR-related definitions."""

import logging
# pylint: disable=unused-import,import-error,no-name-in-module,no-member
import logging
import os
Expand Down
16 changes: 6 additions & 10 deletions libagent/formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,9 @@

import ecdsa
import nacl.signing
import Crypto.Hash
import Crypto.PublicKey
import Crypto.Signature
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA256, SHA512
from Crypto.PublicKey import RSA
import nacl.signing
from Crypto.Signature import pkcs1_15

from . import util

Expand All @@ -38,7 +34,8 @@
SSH_NIST256_CERT_TYPE = SSH_NIST256_KEY_TYPE + SSH_NIST256_CERT_POSTFIX
SSH_ED25519_KEY_TYPE = b'ssh-ed25519'
SSH_RSA_KEY_TYPE = b'ssh-rsa'
SUPPORTED_KEY_TYPES = {SSH_NIST256_KEY_TYPE, SSH_NIST256_CERT_TYPE, SSH_ED25519_KEY_TYPE, SSH_RSA_KEY_TYPE}
SUPPORTED_KEY_TYPES = {SSH_NIST256_KEY_TYPE,
SSH_NIST256_CERT_TYPE, SSH_ED25519_KEY_TYPE, SSH_RSA_KEY_TYPE}

hashfunc = hashlib.sha256

Expand Down Expand Up @@ -133,7 +130,7 @@ def ed25519_verify(sig, msg):
def rsa_verify(sig, msg):
pub = bytes(b'ssh-rsa ' + base64.b64encode(blob))
log.debug('RSA pubkey: %s', pub)
vk = Crypto.PublicKey.RSA.importKey(pub)
vk = RSA.importKey(pub)
log.debug('message: %s', msg)
if b'rsa-sha2-512' in msg:
h = SHA512.new(msg)
Expand All @@ -143,7 +140,7 @@ def rsa_verify(sig, msg):
log.debug('rsa-sha2-256')
log.debug('hash: %s', h.hexdigest())
try:
Crypto.Signature.pkcs1_15.new(vk).verify(h, sig)
pkcs1_15.new(vk).verify(h, sig)
log.debug('The RSA signature is valid.')
except (ValueError, TypeError):
log.debug('The RSA signature is not valid.')
Expand Down Expand Up @@ -233,12 +230,11 @@ def serialize_verifying_key(vk):
return key_type, blob

if (len(vk) == 279 or len(vk) == 535):
#RSA 2048 or RSA 4096
# RSA 2048 or RSA 4096
pubkey = vk
key_type = SSH_RSA_KEY_TYPE
return key_type, pubkey


raise TypeError('unsupported {!r}'.format(vk))


Expand Down
30 changes: 16 additions & 14 deletions libagent/gpg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@
import subprocess
import sys
import time
from importlib import metadata

import daemon
import semver
import Crypto.Hash
import Crypto.PublicKey
import Crypto.Signature
from Crypto.Signature import pkcs1_15
import daemon
import semver
from Crypto.Hash import SHA256, SHA512
from Crypto.PublicKey import RSA
from importlib import metadata
from Crypto.Signature import pkcs1_15

from .. import device, formats, server, util
from . import agent, client, encode, keyring, protocol
Expand All @@ -36,7 +36,7 @@

def export_public_key(device_type, args):
"""Generate a new pubkey for a new/existing GPG identity."""
#log.warning('NOTE: in order to re-generate the exact same GPG key later, '
# log.warning('NOTE: in order to re-generate the exact same GPG key later, '
# 'run this command with "--time=%d" commandline flag (to set '
# 'the timestamp of the GPG key manually).', args.time)
c = client.Client(device=device_type())
Expand All @@ -45,7 +45,7 @@ def export_public_key(device_type, args):
if device_type.package_name() == 'onlykey-agent':
if hasattr(device_type, 'import_pubkey'):
return device_type.import_pubkey

verifying_key = c.pubkey(identity=identity, ecdh=False)
decryption_key = c.pubkey(identity=identity, ecdh=True)
signer_func = functools.partial(c.sign, identity=identity)
Expand Down Expand Up @@ -125,7 +125,7 @@ def write_file(path, data):
def run_init(device_type, args):
"""Initialize hardware-based GnuPG identity."""
util.setup_logging(verbosity=args.verbose)
#log.warning('This GPG tool is still in EXPERIMENTAL mode, '
# log.warning('This GPG tool is still in EXPERIMENTAL mode, '
# 'so please note that the API and features may '
# 'change without backwards compatibility!')

Expand All @@ -137,9 +137,9 @@ def run_init(device_type, args):
homedir = args.homedir
if not homedir:
homedir = os.path.expanduser('~/.gnupg/{}'.format(device_name))

# Save homedir as environment variable
os.environ['AGENTHOMEDIR']=homedir
os.environ['AGENTHOMEDIR'] = homedir

log.info('GPG home directory: %s', homedir)

Expand Down Expand Up @@ -204,7 +204,7 @@ def run_init(device_type, args):
fi
""".format(homedir))
check_call(['chmod', '700', f.name])

# Generate new GPG identity and import into GPG keyring
pubkey = write_file(os.path.join(homedir, 'pubkey.asc'),
export_public_key(device_type, args))
Expand Down Expand Up @@ -281,9 +281,9 @@ def run_agent(device_type):
def run_agent_internal(args, device_type):
"""Actually run the server."""
assert args.homedir

# Save homedir as environment variable
os.environ['AGENTHOMEDIR']=args.homedir
os.environ['AGENTHOMEDIR'] = args.homedir

log_file = os.path.join(args.homedir, 'gpg-agent.log')
util.setup_logging(verbosity=args.verbose, filename=log_file)
Expand Down Expand Up @@ -350,9 +350,11 @@ def main(device_type):
p.add_argument('-dk', '--dkey', type=str, metavar='DECRYPT_KEY',
default='ECC32',
help='specify key to use for decryption')
p.add_argument('-i', '--import-pub', type=argparse.FileType('r'), metavar='IMPORT_PUBLIC_KEY',
p.add_argument('-i', '--import-pub', type=argparse.FileType('r'),
metavar='IMPORT_PUBLIC_KEY',
default=None,
help='import existing OpenPGP public key to use (Load private using OnlyKey App)')
help=('import existing OpenPGP public key to use '
'(Load private using OnlyKey App)'))
p.add_argument('-t', '--time', type=int, default=0)

p.add_argument('--homedir', type=str, default=os.environ.get('GNUPGHOME'),
Expand Down
Loading
Loading