Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,23 @@ jobs:
run: if [ -f requirements.txt ]; then pip3 install -r requirements.txt ; fi

- name: Run backend tests
run: pytest
run: pytest

infrastructure-tests:
runs-on: ubuntu-latest

steps:
- name: Checkout repo
uses: actions/checkout@v3

- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.14'
cache: 'pip'

- name: Install dependencies
run: pip install -r infrastructure/test/requirements.txt

- name: Run infrastructure tests
run: pytest infrastructure/test/ -v
18 changes: 18 additions & 0 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,21 @@ To run frontend tests:
```
npm test
```

### CloudFormation template unit tests

The `infrastructure/test/` directory contains unit tests for the CloudFormation templates using [cloud-radar](https://github.com/DontShaveTheYak/cloud-radar).

#### Prerequisites

Install the test dependencies (from the `infrastructure/` directory):

```bash
pip install -r test/requirements.txt
```

#### Running the tests

```bash
pytest test/ -v
```
158 changes: 158 additions & 0 deletions infrastructure/test/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
"""Pytest fixtures for parallelcluster-ui cloud-radar tests.

Background:
cloud-radar does not yet process the AWS::LanguageExtensions transform
(specifically Fn::ForEach::*). Support is proposed in
https://github.com/DontShaveTheYak/cloud-radar/pull/646 but not yet merged.

To keep the test code self-contained, we ported here the minimal ForEach
expansion logic. Once the support for the ForEach expression will be released
in cloud-radar, then we can remove this logic.
"""

import copy
import re
from pathlib import Path
from typing import Any, List, Tuple

import pytest
import yaml

from cloud_radar.cf.unit import Template


# ---------------------------------------------------------------------------
# Fn::ForEach expansion (ported from cloud-radar PR #646).
# Only the pieces needed to pre-expand a self-contained template are included.
# ---------------------------------------------------------------------------

Replacement = Tuple[str, Any, str]


def _alnum(value: Any) -> str:
return re.sub(r"[^a-zA-Z0-9]", "", str(value))


def _apply_str(value: str, replacement: Replacement) -> str:
ident, val, alnum = replacement
return value.replace(f"${{{ident}}}", str(val)).replace(f"&{{{ident}}}", alnum)


def _substitute(obj: Any, replacements: List[Replacement]) -> Any:
if not replacements:
return obj

repl = replacements[0]
rest = replacements[1:]
ident, val, _ = repl

if isinstance(obj, str):
return _substitute(_apply_str(obj, repl), rest)

if isinstance(obj, dict):
if len(obj) == 1 and "Ref" in obj and obj["Ref"] == ident:
return _substitute(val, rest)

if len(obj) == 1 and "Fn::Sub" in obj:
sub_val = obj["Fn::Sub"]
if isinstance(sub_val, str):
return _substitute({"Fn::Sub": _apply_str(sub_val, repl)}, rest)
if isinstance(sub_val, list) and len(sub_val) == 2:
tmpl_str, variables = sub_val
if isinstance(tmpl_str, str):
tmpl_str = _apply_str(tmpl_str, repl)
if isinstance(variables, dict):
variables = _substitute(variables, [repl])
return _substitute({"Fn::Sub": [tmpl_str, variables]}, rest)

if len(obj) == 1 and "Fn::GetAtt" in obj:
ga = obj["Fn::GetAtt"]
if isinstance(ga, list) and len(ga) == 2:
resource_name = _substitute(ga[0], [repl])
attr = ga[1]
attr = val if attr == ident else _substitute(attr, [repl])
return _substitute({"Fn::GetAtt": [resource_name, attr]}, rest)

result = {}
for k, v in obj.items():
if isinstance(k, str) and k.startswith("Fn::ForEach::"):
if isinstance(v, list) and len(v) == 3:
result[k] = _substitute(v, [repl])
else:
result[k] = v
else:
result[_substitute(k, [repl])] = _substitute(v, [repl])
return _substitute(result, rest)

if isinstance(obj, list):
return _substitute([_substitute(item, [repl]) for item in obj], rest)

return _substitute(obj, rest)


def _expand_entry(key: str, value: Any) -> dict:
if not (isinstance(value, list) and len(value) == 3):
raise ValueError(f"Invalid Fn::ForEach structure for {key}")

ident, collection, output_tmpl = value

if not isinstance(ident, str):
raise TypeError(f"Fn::ForEach identifier must be a String in {key}")

if not isinstance(output_tmpl, dict):
raise TypeError(f"Fn::ForEach output template must be a Dict in {key}")

items = collection if isinstance(collection, list) else list(collection.values())

result = {}
for item in items:
replacement = (ident, item, _alnum(item))
substituted = _substitute(copy.deepcopy(output_tmpl), [replacement])
substituted = _apply_foreach(substituted)
if isinstance(substituted, dict):
result.update(substituted)
return result


def _apply_foreach(data: Any) -> Any:
if isinstance(data, dict):
out = {}
for k, v in data.items():
if isinstance(k, str) and k.startswith("Fn::ForEach::"):
out.update(_expand_entry(k, v))
else:
out[k] = _apply_foreach(v)
return out
if isinstance(data, list):
return [_apply_foreach(item) for item in data]
return data


# ---------------------------------------------------------------------------
# Fixture
# ---------------------------------------------------------------------------

@pytest.fixture
def template():
template_path = Path(__file__).parent.parent / "parallelcluster-ui.yaml"

t = Template.from_yaml(template_path.resolve(), {})

# Pre-expand Fn::ForEach so cloud-radar can render the template.
t.template = _apply_foreach(t.template)

# cloud-radar's default !Ref on a resource returns the logical name string.
# The template does !Select [2|3, !Split ['/', !Ref EcrImage]], which in
# real CFN receives an ImageBuilder image ARN of the form
# "arn:...:image/recipe-name/version/build". Provide a fake value with
# enough slashes via the Cloud-Radar Metadata override.
ecr_image = t.template.get("Resources", {}).get("EcrImage")
if ecr_image is not None:
ecr_image.setdefault("Metadata", {}).setdefault("Cloud-Radar", {})[
"ref"
] = "arn:aws:imagebuilder:us-east-1:123456789012:image/test-recipe/1.0.0/1"

# render() reloads self.template from self.raw, so keep them in sync.
t.raw = yaml.dump(t.template)

return t
6 changes: 6 additions & 0 deletions infrastructure/test/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Constants shared across parallelcluster-ui cloud-radar tests."""

BASE_PARAMS = {
"AdminUserEmail": "firstname@email.com",
"Version": "3.15.0",
}
28 changes: 28 additions & 0 deletions infrastructure/test/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Helper functions for parallelcluster-ui cloud-radar tests."""

from typing import Any, List, Tuple


def policy_statements(policy: Any) -> List[dict]:
"""Extract and validate the Statement list from a policy document."""
assert isinstance(policy, dict), f"Policy should be a dict, got {type(policy)}"
statements = policy.get("Statement")
assert isinstance(statements, list), "Policy.Statement should be a list"
return statements


def iam_roles_with_boundary(stack) -> List[Tuple[str, dict]]:
"""Return (name, resource) for every IAM Role that declares a
PermissionsBoundary property in its Properties.

This intentionally ignores roles that never set the property (e.g.
ApiVersionMapFunctionRole) so we only assert against the roles that the
template is responsible for wiring the boundary into.
"""
results = []
roles = stack.get_resources_of_type("AWS::IAM::Role")
for name, definition in roles.items():
properties = definition.get("Properties", {})
if "PermissionsBoundary" in properties:
results.append((name, definition))
return results
2 changes: 2 additions & 0 deletions infrastructure/test/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
cloud-radar
pytest
57 changes: 57 additions & 0 deletions infrastructure/test/test_apig.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""API Gateway policy tests for parallelcluster-ui.yaml."""

import pytest
from cloud_radar.cf.unit import Template

from constants import BASE_PARAMS
from helpers import policy_statements


@pytest.mark.parametrize(
"vpc_endpoint_id",
[
pytest.param("vpce-0123456789abcdef0", id="private"),
pytest.param(None, id="public"),
],
)
def test_api_gateway_policy(template: Template, vpc_endpoint_id: str | None):
"""ApiGatewayRestApi.Policy must deny non-VPCE traffic when a
VpcEndpointId is specified, and allow all traffic when it is not."""
params = {**BASE_PARAMS}
if vpc_endpoint_id is not None:
params["VpcEndpointId"] = vpc_endpoint_id

stack = template.create_stack(params)

api = stack.get_resource("ApiGatewayRestApi")
policy = api.get_property_value("Policy")
statements = policy_statements(policy)

# Both modes must contain an unconditional Allow.
allow = next((s for s in statements if s.get("Effect") == "Allow"), None)
assert allow is not None, "Expected an Allow statement"
assert allow["Action"] == "execute-api:Invoke"
assert allow["Principal"] == "*"
assert allow["Resource"] == "execute-api:/*"

if vpc_endpoint_id is not None:
# Private mode: Deny + Allow
assert len(statements) == 2, f"Expected 2 statements, got {statements}"

deny = next((s for s in statements if s.get("Effect") == "Deny"), None)
assert deny is not None, "Expected a Deny statement in private mode"
assert deny["Action"] == "execute-api:Invoke"
assert deny["Principal"] == "*"
assert deny["Resource"] == "execute-api:/*"
assert deny["Condition"] == {
"StringNotEquals": {"aws:sourceVpce": vpc_endpoint_id}
}
else:
# Public mode: Allow only, no Deny, no Condition on Allow
assert len(statements) == 1, (
f"Expected 1 statement in public mode, got {statements}"
)
assert "Condition" not in allow, "Public-mode Allow must not carry a Condition"
assert not any(
s.get("Effect") == "Deny" for s in statements
), "Public-mode policy must not contain any Deny statements"
60 changes: 60 additions & 0 deletions infrastructure/test/test_permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Permissions boundary tests for parallelcluster-ui.yaml."""

import pytest
from cloud_radar.cf.unit import Template

from constants import BASE_PARAMS
from helpers import iam_roles_with_boundary


@pytest.mark.parametrize(
"boundary_arn",
[
pytest.param(
"arn:aws:iam::123456789012:policy/parallelcluster-ui-permissions-boundary",
id="with-boundary",
),
pytest.param(None, id="without-boundary"),
],
)
def test_permissions_boundary(template: Template, boundary_arn: str | None):
"""PermissionsBoundaryPolicy must be forwarded to the Cognito nested stack
and applied to every IAM Role when specified, and absent when not."""
params = {**BASE_PARAMS}
if boundary_arn is not None:
params["PermissionsBoundaryPolicy"] = boundary_arn

stack = template.create_stack(params)

# Cognito nested stack parameter
cognito = stack.get_resource("Cognito")
cognito_params = cognito.get_property_value("Parameters")

# IAM Roles with a PermissionsBoundary property
roles_with_boundary = iam_roles_with_boundary(stack)

if boundary_arn is not None:
assert cognito_params.get("PermissionsBoundaryPolicy") == boundary_arn, (
"PermissionsBoundaryPolicy was not forwarded to the Cognito nested stack"
)

assert roles_with_boundary, "Expected at least one IAM Role with a boundary"
for name, definition in roles_with_boundary:
boundary = definition["Properties"]["PermissionsBoundary"]
assert boundary == boundary_arn, (
f"Role {name} has PermissionsBoundary {boundary!r}, "
f"expected {boundary_arn!r}"
)
else:
assert cognito_params.get("PermissionsBoundaryPolicy", "") == "", (
"Expected Cognito PermissionsBoundaryPolicy to be empty when unset"
)

for name, definition in roles_with_boundary:
# cloud-radar keeps AWS::NoValue as the empty string, so the
# property may still appear with a falsy value.
boundary = definition["Properties"].get("PermissionsBoundary", "")
assert not boundary, (
f"Role {name} received PermissionsBoundary {boundary!r} "
f"when none was supplied"
)
3 changes: 2 additions & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[pytest]
pythonpath = .
pythonpath = .
addopts = --ignore=infrastructure
Loading