Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion tests/perf_tests/Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,29 @@
USERS ?= 250
SPAWN_RATE ?= 50
RUN_TIME ?= 5m

# Interactive UI — single process, useful for smoke tests
test:
poetry run locust -f src/locustfile.py

# Interactive UI — search only
test-read-only:
poetry run locust -f src/locustfile.py SearchUser

.PHONY: test test-read-only
# Headless distributed — master + N workers, runs for 5 minutes then exits
test-headless:
poetry run locust -f src/locustfile.py \
--headless \
--users $(USERS) \
--spawn-rate $(SPAWN_RATE) \
--run-time $(RUN_TIME)

# Search-only headless distributed
test-read-only-headless:
poetry run locust -f src/locustfile.py SearchUser \
--headless \
--users $(USERS) \
--spawn-rate $(SPAWN_RATE) \
--run-time $(RUN_TIME)

.PHONY: test test-read-only test-headless test-read-only-headless
3 changes: 2 additions & 1 deletion tests/perf_tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ This project contains Locust performance tests for the Immunisation FHIR API.
To run them, ensure you have the
`APIGEE_ENVIRONMENT` : Currently, only the ref environment is supported.
`PERF_CREATE_RPS_PER_USER` : numeric
`PERF_SEARCH_RPS_PER_USER` : numeric

env vars set, and call `make test`.
env vars set, and call `PERF_SEARCH_RPS_PER_USER=1 make test-read-only-headless`.
114 changes: 105 additions & 9 deletions tests/perf_tests/src/locustfile.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
import json
import os
import random
import sys
import time
import uuid
from pathlib import Path
from urllib.parse import urlencode

import boto3
import gevent.lock
import pandas as pd
from locust import HttpUser, constant_throughput, task
from botocore.config import Config
from locust import constant_throughput, events, task
from locust.contrib.fasthttp import FastHttpUser
from locust.runners import WorkerRunner

from common.api_clients.authentication import AppRestrictedAuth
from common.clients import get_secrets_manager_client

# from common.clients import get_secrets_manager_client
from common.models.constants import Urls
from objectModels import patient_loader
from objectModels.api_immunization_builder import create_immunization_object
Expand All @@ -21,7 +29,18 @@
if not APIGEE_ENVIRONMENT:
raise ValueError("APIGEE_ENVIRONMENT must be set")

_BOTO_CONFIG = Config(
max_pool_connections=50, # default is 10; needs to exceed max concurrent Locust users
retries={"mode": "standard", "max_attempts": 1},
)
_secrets_client = boto3.client(
"secretsmanager",
region_name=os.getenv("AWS_REGION", "eu-west-2"),
config=_BOTO_CONFIG,
)

PERF_CREATE_TASK_RPS_PER_USER = float(os.getenv("PERF_CREATE_RPS_PER_USER", "1"))
PERF_SEARCH_RPS_PER_USER = float(os.getenv("PERF_SEARCH_RPS_PER_USER", "1"))

IMMUNIZATION_TARGETS = [
"3IN1",
Expand Down Expand Up @@ -53,29 +72,106 @@ def _load_valid_patients():

VALID_PATIENT_IDS = _load_valid_patients()

_TOKEN_LOCK = gevent.lock.Semaphore(1)

class BaseImmunizationUser(HttpUser):
abstract = True

authenticator = AppRestrictedAuth(
get_secrets_manager_client(),
class LocustTokenManager:
"""Serialises token refreshes across all Locust greenlets (double-checked locking pattern)."""

def __init__(self, auth: AppRestrictedAuth):
self._auth = auth

def get_access_token(self) -> str:
now = int(time.time())
# Fast path — no lock needed, reads are safe if the token is already cached
if (
self._auth.cached_access_token
and self._auth.cached_access_token_expiry_time is not None
and self._auth.cached_access_token_expiry_time > now + 30 # ACCESS_TOKEN_MIN_ACCEPTABLE_LIFETIME_SECONDS
):
return self._auth.cached_access_token

# Slow path — exactly one greenlet refreshes; all others wait then hit the fast path
with _TOKEN_LOCK:
now = int(time.time()) # re-read after acquiring the lock
if (
self._auth.cached_access_token
and self._auth.cached_access_token_expiry_time is not None
and self._auth.cached_access_token_expiry_time > now + 30
):
return self._auth.cached_access_token
return self._auth.get_access_token()
Comment on lines +96 to +103
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is the first thing that get_access_token() does. Given we have the lock, we can just call that instead.

Suggested change
now = int(time.time()) # re-read after acquiring the lock
if (
self._auth.cached_access_token
and self._auth.cached_access_token_expiry_time is not None
and self._auth.cached_access_token_expiry_time > now + 30
):
return self._auth.cached_access_token
return self._auth.get_access_token()
return self._auth.get_access_token()



# Module-level singleton — pre-warmed before any user spawns
_shared_token_manager = LocustTokenManager(
AppRestrictedAuth(
_secrets_client,
APIGEE_ENVIRONMENT,
f"imms/perf-tests/{APIGEE_ENVIRONMENT}/jwt-secrets",
)
)


@events.init.add_listener
def _pre_warm_auth(environment, **kwargs):
"""Fetch token once before users spawn so all users start with a cached token.
Only runs on master/standalone — workers fetch lazily on first request,
staggered by the on_start jitter, avoiding simultaneous Secrets Manager calls.
"""
if isinstance(environment.runner, WorkerRunner):
return

try:
token = _shared_token_manager.get_access_token()
print(f"[perf] Auth pre-warm complete. Token length: {len(token)}")
except Exception as exc:
error_text = str(exc)
is_credential_error = any(
kw in error_text for kw in ("ForbiddenException", "ExpiredToken", "No access", "TokenExpired")
)
if is_credential_error:
print(
"\n[perf] FATAL: AWS credentials expired or inaccessible.\n"
f" Error: {exc}\n\n"
" Fix: run one of the following, then retry 'make test':\n"
" aws sso login --profile <your-profile-name>\n",
file=sys.stderr,
)
sys.exit(1)
raise


class BaseImmunizationUser(FastHttpUser):
abstract = True

# token_manager = LocustTokenManager(
# AppRestrictedAuth(
# _secrets_client,
# APIGEE_ENVIRONMENT,
# f"imms/perf-tests/{APIGEE_ENVIRONMENT}/jwt-secrets",
# )
# )

token_manager = _shared_token_manager
host = f"https://{APIGEE_ENVIRONMENT}.api.service.nhs.uk/immunisation-fhir-api/FHIR/R4"

def get_headers(self):
return {
"Accept": CONTENT_TYPE_FHIR_JSON,
"Authorization": f"Bearer {self.authenticator.get_access_token()}",
"Authorization": f"Bearer {self.token_manager.get_access_token()}",
"Content-Type": CONTENT_TYPE_FHIR_JSON,
"X-Correlation-ID": str(uuid.uuid4()),
"X-Request-ID": str(uuid.uuid4()),
}

def on_start(self):
# Jitter each user's start by up to 2 s to avoid simultaneous first-request burst
gevent.sleep(random.uniform(0, 2.0))

def _build_create_payload(self):
immunization_target = random.choice(IMMUNIZATION_TARGETS)
patient = load_patient_by_id(random.choice(VALID_PATIENT_IDS))
patient = load_patient_by_id("Valid_NHS")
immunization = create_immunization_object(patient, immunization_target)
return json.loads(immunization.json(exclude_none=True))

Expand All @@ -94,7 +190,7 @@ def _delete_created_immunization(self, immunization_id: str):


class SearchUser(BaseImmunizationUser):
wait_time = constant_throughput(1)
wait_time = constant_throughput(PERF_SEARCH_RPS_PER_USER)

@task
def search_single_vacc_type(self):
Expand Down
Loading