Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,14 @@ workflows:
<<: *filter_all_tags
requires:
- init
# - test:
# <<: *filter_all_tags
# requires:
# - init
# - test-macos:
# <<: *filter_all_tags
# requires:
# - init
- test:
<<: *filter_all_tags
requires:
- init
- test-macos:
<<: *filter_all_tags
requires:
- init
- build-package-pypi:
<<: *filter_all_tags
requires:
Expand Down
7 changes: 7 additions & 0 deletions mtls/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,13 @@ def create_certificate(
options.update(organization=organization)
if common_name:
options.update(common_name=common_name)
if output:
if not output.endswith(('pfx', 'p12')):
click.secho("Output path must end in pfx or p12")
sys.exit(1)
if not output.startswith(('.', '/')):
click.secho("Output path must be a valid path")
sys.exit(1)
ctx.obj.get_crl(False)
ctx.obj.set_user_options(options)
ctx.obj.create_cert(output)
Expand Down
34 changes: 22 additions & 12 deletions mtls/mtls.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def __init__(self, server, options={}):
base=self.BASE_SERVER_PATH, server=self.server
)


def check_revoked(self, cert):
with open(self.crl_file_path, "rb") as f:
crl = x509.load_pem_x509_crl(
Expand All @@ -116,7 +117,7 @@ def create_cert(self, output):
self.override = True
self._create_db()
cert = None
if not self._has_root_cert():
if not self._has_root_cert() or output:
self._get_and_set_root_cert()
if sys.platform == "darwin":
(valid, exists, revoked) = self.check_valid_cert(
Expand Down Expand Up @@ -153,13 +154,14 @@ def create_cert(self, output):
sys.exit(1)
cert = self.convert_to_cert(cert_str)
try:
with open(self.cert_file_path, "w") as cert_file:
click.echo("Writing file to {}".format(self.cert_file_path))
cert_file.write(
cert.public_bytes(serialization.Encoding.PEM).decode(
"utf-8"
if not output:
with open(self.cert_file_path, "w") as cert_file:
click.echo("Writing file to {}".format(self.cert_file_path))
cert_file.write(
cert.public_bytes(serialization.Encoding.PEM).decode(
"utf-8"
)
)
)
except Exception as e:
click.secho(
"Could not write certificate to {}".format(
Expand All @@ -170,7 +172,7 @@ def create_cert(self, output):
if cert is None:
click.echo("Could not convert to certificate")
sys.exit(1)
p12 = OpenSSL.crypto.PKCS12()
p12 = OpenSSL.crypto.PKCS12Type()
pkey = OpenSSL.crypto.PKey.from_cryptography_key(key)
fpbytes = cert.fingerprint(hashes.SHA1())
fp = binascii.hexlify(fpbytes)
Expand All @@ -179,22 +181,27 @@ def create_cert(self, output):
"current_sha", fp.decode("UTF-8"), self.server
)
certificate = OpenSSL.crypto.X509.from_cryptography(cert)
ca_certificate = OpenSSL.crypto.X509.from_cryptography(
self.ca_certificate
)
p12.set_privatekey(pkey)
p12.set_certificate(certificate)
p12.set_ca_certificates(iter([ca_certificate]))
p12.set_friendlyname(bytes(self.friendly_name, "UTF-8"))
pwd = self._genPW()
if output:
self.pfx_path = output
pw = self.encrypt(pwd, self.config.get(self.server, "fingerprint"))
pfx_base = "/".join(self.pfx_path.split("/")[:-1])
pw_file = self.pfx_path.split("/")[-1].split(".")[:-1]
pw_file += ".password.asc"
pfx_file_base = self.pfx_path.split("/")[-1].split(".")[:-1]
pw_file = "{}.password.asc".format(pfx_file_base[0])
pw_file = "".join(pw_file)
pw_file = "{}/{}".format(pfx_base, pw_file)
with open(pw_file, "wb") as pwfile:
click.echo("Writing password to: {}".format(pw_file))
pwfile.write(str(pw).encode("UTF-8"))
with open(self.pfx_path, "wb") as f:
click.echo("Writing pfx to: {}".format(self.pfx_path))
f.write(p12.export(passphrase=bytes(pwd, "UTF-8")))
if not output:
self.update_cert_storage(self.pfx_path, pwd)
Expand Down Expand Up @@ -245,6 +252,7 @@ def _get_and_set_root_cert(self):
# the user and subsequent calls later.
with open(self.ca_cert_file_path, "w") as ca_cert:
ca_cert.write(data["cert"])
self.ca_certificate = self.get_cert_from_file(self.ca_cert_file_path)
self.add_root_ca_to_store(self.ca_cert_file_path)

def add_root_ca_to_store(self, ca_cert_file_path):
Expand Down Expand Up @@ -418,8 +426,10 @@ def check_valid_cert(self, name=None, usage="V", is_root=False):
revoked = self.check_revoked(self.get_cert_from_file())
return is_valid, cert_exists, revoked

def get_cert_from_file(self):
with open(self.cert_file_path, "rb") as cert_file:
def get_cert_from_file(self, cert_file_path=None):
if not cert_file_path:
cert_file_path = self.cert_file_path
with open(cert_file_path, "rb") as cert_file:
return x509.load_pem_x509_certificate(
cert_file.read(), default_backend()
)
Expand Down
8 changes: 5 additions & 3 deletions test/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,15 +350,17 @@ def test_create_certificate_with_cli_output_option(self):
"certificate",
"create",
"-o",
self.HOME.name + "/me.pfx",
"{}/me.pfx".format(self.HOME.name),
],
)
self.assertEqual(result.exit_code, 0, msg=result.exc_info)
self.assertTrue(
os.path.isfile("{}/{}".format(self.HOME.name, "me.pfx"))
os.path.isfile("{}/{}".format(self.HOME.name, "me.pfx")),
msg="Expected file to exist: {}/{}".format(self.HOME.name, "me.pfx")
)
self.assertTrue(
os.path.isfile("{}/{}".format(self.HOME.name, "me.password.asc"))
os.path.isfile("{}/{}".format(self.HOME.name, "me.password.asc")),
msg="Expected file to exist: {}/{}".format(self.HOME.name, "me.password.asc")
)

def test_revoke_certificate(self):
Expand Down