Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions JSON.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,12 @@ In `cfbs.json`'s `"steps"`, the build step name must be separated from the rest
- Source is relative to module directory and target is relative to `out/masterfiles`.
- In most cases, the build step should be: `input ./input.json def.json`
- `replace <n> <a> <b> <filename>`

- Replace string `<a>` with string `<b>`, exactly `<n>` times, in file `filename`.
- string `<b>` must not contain string `<a>`, as that could lead to confusing / recursive replacement situations.
- The number of occurences is strict: It will error if the string cannot be found, cannot be replaced exactly `<n>` times, or can still be found after replacements are done.
(This is to try to catch mistakes).
- `n` must be an integer, from 1 to 1000, and may optionally have a trailing `+` to signify "or more".
At most 1000 replacements will be performed, regardless of whether you specify `+` or not.

- `replace_version <to_replace> <filename>`
- Replace the string inside the file with the version number of that module.
- The module must have a version and the string must occur exactly once in the file.
Expand Down
12 changes: 6 additions & 6 deletions cfbs/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
immediate_subdirectories,
mkdir,
read_json,
user_error,
GenericExitError,
)


Expand Down Expand Up @@ -112,11 +112,11 @@ def mpf_vcf_dicts(offline=False):

cfbs_ri_dir = os.path.join(cfbs_dir(), RI_SUBDIRS)
if not os.path.exists(cfbs_ri_dir):
user_error(ERROR_MESSAGE)
raise GenericExitError(ERROR_MESSAGE)

ri_versions = immediate_subdirectories(cfbs_ri_dir)
if len(ri_versions) == 0:
user_error(ERROR_MESSAGE)
raise GenericExitError(ERROR_MESSAGE)

ri_latest_version = max(ri_versions)
mpf_vcf_path = os.path.join(
Expand All @@ -134,7 +134,7 @@ def mpf_vcf_dicts(offline=False):
try:
latest_release_data = get_json(LATEST_RELEASE_API_URL)
except FetchError as e:
user_error(
raise GenericExitError(
"Downloading CFEngine release information failed - check your Wi-Fi / network settings."
)

Expand All @@ -155,7 +155,7 @@ def mpf_vcf_dicts(offline=False):
try:
fetch_url(ri_checksums_url, archive_checksums_path)
except FetchError as e:
user_error(str(e))
raise GenericExitError(str(e))

with open(archive_checksums_path) as file:
lines = [line.rstrip() for line in file]
Expand Down Expand Up @@ -614,7 +614,7 @@ def analyze_policyset(
+ "\n ".join(possible_policyset_relpaths)
+ "\n"
)
user_error(
raise GenericExitError(
"There doesn't seem to be a valid policy set in the supplied path.\n Usage: cfbs analyze path/to/policy-set\n"
+ extra_error_text
)
Expand Down
8 changes: 4 additions & 4 deletions cfbs/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os

from cfbs import commands
from cfbs.utils import cache, user_error
from cfbs.utils import cache, GenericExitError


def get_args():
Expand All @@ -23,13 +23,13 @@ def get_manual():
with open(file_path, "r", encoding="utf-8") as man_file:
man = man_file.read()
if not man:
user_error("Manual file is empty")
raise GenericExitError("Manual file is empty")
else:
return man
except OSError:
user_error("Error reading manual file " + file_path)
raise GenericExitError("Error reading manual file " + file_path)
else:
user_error("Manual file does not exist")
raise GenericExitError("Manual file does not exist")


@cache
Expand Down
105 changes: 39 additions & 66 deletions cfbs/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
sh,
strip_left,
touch,
user_error,
GenericExitError,
write_json,
)
from cfbs.pretty import pretty, pretty_file
Expand All @@ -35,6 +35,7 @@
MAX_REPLACEMENTS,
step_has_valid_arg_count,
split_build_step,
validate_build_step,
)


Expand Down Expand Up @@ -79,35 +80,29 @@ def _generate_augment(module_name, input_data):


def _perform_replace_step(n, a, b, filename):
assert n and a and b and filename
assert a not in b

or_more = False
if n.endswith("+"):
n = n[0:-1]
or_more = True
n = int(n)
if n <= 0:
user_error("replace build step cannot replace something %s times" % (n))
if n > MAX_REPLACEMENTS or n == MAX_REPLACEMENTS and or_more:
user_error(
"replace build step cannot replace something more than %s times"
% (MAX_REPLACEMENTS)
)
if a in b and (n >= 2 or or_more):
user_error(
"'%s' must not contain '%s' (could lead to recursive replacing)" % (a, b)
)
if not os.path.isfile(filename):
user_error("No such file '%s' in replace build step" % (filename,))
try:
with open(filename, "r") as f:
content = f.read()
except FileNotFoundError:
raise GenericExitError("No such file '%s' in replace build step" % (filename,))
except:
user_error("Could not open/read '%s' in replace build step" % (filename,))
raise GenericExitError(
"Could not open/read '%s' in replace build step" % (filename,)
)
new_content = previous_content = content
for i in range(0, n):
previous_content = new_content
new_content = previous_content.replace(a, b, 1)
if new_content == previous_content:
user_error(
raise GenericExitError(
"replace build step could only replace '%s' in '%s' %s times, not %s times (required)"
% (a, filename, i, n)
)
Expand All @@ -119,21 +114,22 @@ def _perform_replace_step(n, a, b, filename):
if new_content == previous_content:
break
if a in new_content:
user_error("too many occurences of '%s' in '%s'" % (a, filename))
raise GenericExitError("too many occurences of '%s' in '%s'" % (a, filename))
try:
with open(filename, "w") as f:
f.write(new_content)
except:
user_error("Failed to write to '%s'" % (filename,))
raise GenericExitError("Failed to write to '%s'" % (filename,))


def _perform_build_step(module, step, max_length):
def _perform_build_step(module, i, step, max_length):
operation, args = split_build_step(step)
name = module["name"]
source = module["_directory"]
counter = module["_counter"]
destination = "out/masterfiles"

prefix = "%03d %s :" % (counter, pad_right(module["name"], max_length))
prefix = "%03d %s :" % (counter, pad_right(name, max_length))

assert operation in AVAILABLE_BUILD_STEPS # Should already be validated
if operation == "copy":
Expand Down Expand Up @@ -164,7 +160,7 @@ def _perform_build_step(module, step, max_length):
dst = ""
print("%s json '%s' 'masterfiles/%s'" % (prefix, src, dst))
if not os.path.isfile(os.path.join(source, src)):
user_error("'%s' is not a file" % src)
raise GenericExitError("'%s' is not a file" % src)
src, dst = os.path.join(source, src), os.path.join(destination, dst)
extras, original = read_json(src), read_json(dst)
if not extras:
Expand Down Expand Up @@ -215,14 +211,14 @@ def _perform_build_step(module, step, max_length):
if dst in [".", "./"]:
dst = ""
print("%s input '%s' 'masterfiles/%s'" % (prefix, src, dst))
if src.startswith(module["name"] + "/"):
if src.startswith(name + "/"):
log.warning(
"Deprecated 'input' build step behavior - it should be: 'input ./input.json def.json'"
)
# We'll translate it to what it should be
# TODO: Consider removing this behavior for cfbs 4?
src = "." + src[len(module["name"]) :]
src = os.path.join(module["name"], src)
src = "." + src[len(name) :]
src = os.path.join(name, src)
dst = os.path.join(destination, dst)
if not os.path.isfile(os.path.join(src)):
log.warning(
Expand All @@ -231,10 +227,10 @@ def _perform_build_step(module, step, max_length):
)
return
extras, original = read_json(src), read_json(dst)
extras = _generate_augment(module["name"], extras)
extras = _generate_augment(name, extras)
log.debug("Generated augment: %s", pretty(extras))
if not extras:
user_error(
raise GenericExitError(
"Input data '%s' is incomplete: Skipping build step."
% os.path.basename(src)
)
Expand All @@ -257,7 +253,7 @@ def _perform_build_step(module, step, max_length):
cf_files = find("out/masterfiles/" + file, extension=".cf")
files += (strip_left(f, "out/masterfiles/") for f in cf_files)
else:
user_error(
raise GenericExitError(
"Unsupported filetype '%s' for build step '%s': "
% (file, operation)
+ "Expected directory (*/) of policy file (*.cf)"
Expand Down Expand Up @@ -293,84 +289,61 @@ def _perform_build_step(module, step, max_length):
elif operation == "replace":
assert len(args) == 4
print("%s replace '%s'" % (prefix, "' '".join(args)))
# New build step so let's be a bit strict about validating it:
validate_build_step(name, module, i, operation, args, strict=True)
n, a, b, file = args
file = os.path.join(destination, file)
_perform_replace_step(n, a, b, file)
elif operation == "replace_version":
assert len(args) == 2
assert len(args) == 3
# New build step so let's be a bit strict about validating it:
validate_build_step(name, module, i, operation, args, strict=True)
print("%s replace_version '%s'" % (prefix, "' '".join(args)))
file = os.path.join(destination, args[1])
if not os.path.isfile(file):
user_error(
"No such file '%s' in replace_version for module '%s"
% (file, module["name"])
)
try:
with open(file, "r") as f:
content = f.read()
except:
user_error(
"Could not open/read '%s' in replace_version for module '%s"
% (file, module["name"])
)
to_replace = args[0]
n = args[0]
to_replace = args[1]
filename = os.path.join(destination, args[2])
version = module["version"]
new_content = content.replace(to_replace, version, 1)
if new_content == content:
user_error(
"replace_version requires that '%s' has exactly 1 occurence of '%s' - 0 found"
% (file, to_replace)
)
if to_replace in new_content:
user_error(
"replace_version requires that '%s' has exactly 1 occurence of '%s' - more than 1 found"
% (file, to_replace)
)
try:
with open(file, "w") as f:
f.write(new_content)
except:
user_error("Failed to write to '%s'" % (file,))
_perform_replace_step(n, to_replace, version, filename)


def perform_build(config) -> int:
if not config.get("build"):
user_error("No 'build' key found in the configuration")
raise GenericExitError("No 'build' key found in the configuration")

# mini-validation
for module in config.get("build", []):
for step in module["steps"]:
operation, args = split_build_step(step)

if step.split() != [operation] + args:
user_error(
raise GenericExitError(
"Incorrect whitespace in the `%s` build step - singular spaces are required"
% step
)

if operation not in AVAILABLE_BUILD_STEPS:
user_error("Unknown build step operation: %s" % operation)
raise GenericExitError("Unknown build step operation: %s" % operation)

expected = AVAILABLE_BUILD_STEPS[operation]
actual = len(args)
if not step_has_valid_arg_count(args, expected):
if type(expected) is int:
user_error(
raise GenericExitError(
"The `%s` build step expects %d arguments, %d were given"
% (step, expected, actual)
)
else:
expected = int(expected[0:-1])
user_error(
raise GenericExitError(
"The `%s` build step expects %d or more arguments, %d were given"
% (step, expected, actual)
)

print("\nSteps:")
module_name_length = config.longest_module_key_length("name")
for module in config.get("build", []):
for step in module["steps"]:
_perform_build_step(module, step, module_name_length)
for i, step in enumerate(module["steps"]):
_perform_build_step(module, i, step, module_name_length)
assert os.path.isdir("./out/masterfiles/")
shutil.copyfile("./cfbs.json", "./out/masterfiles/cfbs.json")
if os.path.isfile("out/masterfiles/def.json"):
Expand Down
Loading