Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,19 @@ ENV CONDUCTOR_AUTH_SECRET ${CONDUCTOR_AUTH_SECRET}
ENV CONDUCTOR_SERVER_URL ${CONDUCTOR_SERVER_URL}
RUN python3 ./tests/integration/main.py

FROM python_test_base AS harness-build
COPY /harness /package/harness

FROM python:3.12-alpine AS harness
RUN adduser -D -u 65532 nonroot
WORKDIR /app
COPY --from=harness-build /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
COPY --from=harness-build /package/src /app/src
COPY --from=harness-build /package/harness /app/harness
ENV PYTHONPATH=/app/src
USER nonroot
ENTRYPOINT ["python", "-u", "harness/main.py"]

FROM python:3.12-alpine AS publish
RUN apk add --no-cache tk curl
WORKDIR /package
Expand Down
81 changes: 81 additions & 0 deletions harness/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Python SDK Docker Harness

Two Docker targets built from the root `Dockerfile`: an **SDK build** and a **long-running worker harness**.

## Worker Harness

A self-feeding worker that runs indefinitely. On startup it registers five simulated tasks (`python_worker_0` through `python_worker_4`) and the `python_simulated_tasks_workflow`, then runs two background services:

- **WorkflowGovernor** -- starts a configurable number of `python_simulated_tasks_workflow` instances per second (default 2), indefinitely.
- **SimulatedTaskWorkers** -- five task handlers, each with a codename and a default sleep duration. Each worker supports configurable delay types, failure simulation, and output generation via task input parameters. The workflow chains them in sequence: quickpulse (1s) → whisperlink (2s) → shadowfetch (3s) → ironforge (4s) → deepcrawl (5s).

### Building Locally

```bash
docker build --target harness -t python-sdk-harness .
```

### Multiplatform Build and Push

To build for both `linux/amd64` and `linux/arm64` and push to GHCR:

```bash
# One-time: create a buildx builder if you don't have one
docker buildx create --name multiarch --use --bootstrap

# Build and push
docker buildx build \
--platform linux/amd64,linux/arm64 \
--target harness \
-t ghcr.io/conductor-oss/python-sdk/harness-worker:latest \
--push .
```

> **Note:** Multi-platform builds require `docker buildx` and a builder that supports cross-compilation. On macOS this works out of the box with Docker Desktop. On Linux you may need to install QEMU user-space emulators:
>
> ```bash
> docker run --privileged --rm tonistiigi/binfmt --install all
> ```

### Running

```bash
docker run -d \
-e CONDUCTOR_SERVER_URL=https://your-cluster.example.com/api \
-e CONDUCTOR_AUTH_KEY=$CONDUCTOR_AUTH_KEY \
-e CONDUCTOR_AUTH_SECRET=$CONDUCTOR_AUTH_SECRET \
-e HARNESS_WORKFLOWS_PER_SEC=4 \
python-sdk-harness
```

You can also run the harness locally without Docker (from the repo root):

```bash
# Install the SDK in development mode (one-time)
pip3 install -e .

export CONDUCTOR_SERVER_URL=https://your-cluster.example.com/api
export CONDUCTOR_AUTH_KEY=$CONDUCTOR_AUTH_KEY
export CONDUCTOR_AUTH_SECRET=$CONDUCTOR_AUTH_SECRET

python3 harness/main.py
```

Override defaults with environment variables as needed:

```bash
HARNESS_WORKFLOWS_PER_SEC=4 HARNESS_BATCH_SIZE=10 python3 harness/main.py
```

All resource names use a `python_` prefix so multiple SDK harnesses (C#, JS, Go, Java, etc.) can coexist on the same cluster.

### Environment Variables

| Variable | Required | Default | Description |
|---|---|---|---|
| `CONDUCTOR_SERVER_URL` | yes | -- | Conductor API base URL |
| `CONDUCTOR_AUTH_KEY` | no | -- | Orkes auth key |
| `CONDUCTOR_AUTH_SECRET` | no | -- | Orkes auth secret |
| `HARNESS_WORKFLOWS_PER_SEC` | no | 2 | Workflows to start per second |
| `HARNESS_BATCH_SIZE` | no | 20 | Number of tasks each worker polls per batch |
| `HARNESS_POLL_INTERVAL_MS` | no | 100 | Milliseconds between poll cycles |
113 changes: 113 additions & 0 deletions harness/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
from __future__ import annotations

import os
import signal
import sys

from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.http.models.task_def import TaskDef
from conductor.client.orkes_clients import OrkesClients
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.task.simple_task import SimpleTask

from simulated_task_worker import SimulatedTaskWorker
from workflow_governor import WorkflowGovernor

WORKFLOW_NAME = "python_simulated_tasks_workflow"

SIMULATED_WORKERS = [
("python_worker_0", "quickpulse", 1),
("python_worker_1", "whisperlink", 2),
("python_worker_2", "shadowfetch", 3),
("python_worker_3", "ironforge", 4),
("python_worker_4", "deepcrawl", 5),
]


def env_int_or_default(key: str, default: int) -> int:
s = os.environ.get(key, "")
if not s:
return default
try:
return int(s)
except ValueError:
return default


def register_metadata(clients: OrkesClients) -> None:
metadata_client = clients.get_metadata_client()
workflow_executor = clients.get_workflow_executor()

for task_name, codename, sleep_seconds in SIMULATED_WORKERS:
task_def = TaskDef(
name=task_name,
description=f"Python SDK harness simulated task ({codename}, default delay {sleep_seconds}s)",
retry_count=1,
timeout_seconds=300,
response_timeout_seconds=300,
)
metadata_client.register_task_def(task_def)

wf = ConductorWorkflow(
executor=workflow_executor,
name=WORKFLOW_NAME,
version=1,
description="Python SDK harness simulated task workflow",
)
wf.owner_email("python-sdk-harness@conductor.io")

for task_name, codename, _ in SIMULATED_WORKERS:
wf.add(SimpleTask(task_def_name=task_name, task_reference_name=codename))

wf.register(overwrite=True)
print(f"Registered workflow {WORKFLOW_NAME} with {len(SIMULATED_WORKERS)} tasks")


def main() -> None:
configuration = Configuration()
clients = OrkesClients(configuration)

register_metadata(clients)

workflows_per_sec = env_int_or_default("HARNESS_WORKFLOWS_PER_SEC", 2)
batch_size = env_int_or_default("HARNESS_BATCH_SIZE", 20)
poll_interval_ms = env_int_or_default("HARNESS_POLL_INTERVAL_MS", 100)

workers = []
for task_name, codename, sleep_seconds in SIMULATED_WORKERS:
worker = SimulatedTaskWorker(task_name, codename, sleep_seconds, batch_size, poll_interval_ms)
workers.append(worker)

task_handler = TaskHandler(
workers=workers,
configuration=configuration,
scan_for_annotated_workers=False,
)

workflow_executor = clients.get_workflow_executor()
governor = WorkflowGovernor(workflow_executor, WORKFLOW_NAME, workflows_per_sec)
governor.start()

main_pid = os.getpid()
shutting_down = False

def shutdown(signum, frame):
nonlocal shutting_down
if os.getpid() != main_pid or shutting_down:
return
shutting_down = True
print("Shutting down...")
governor.stop()
task_handler.stop_processes()
sys.exit(0)

signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)

task_handler.start_processes()
task_handler.join_processes()


if __name__ == "__main__":
main()
132 changes: 132 additions & 0 deletions harness/manifests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# Kubernetes Manifests

This directory contains Kubernetes manifests for deploying the Python SDK harness worker to the certification clusters.

## Prerequisites

**Set your namespace environment variable:**
```bash
export NS=your-namespace-here
```

All kubectl commands below use `-n $NS` to specify the namespace. The manifests intentionally do not include hardcoded namespaces.

**Note:** The harness worker images are published as public packages on GHCR and do not require authentication to pull. No image pull secrets are needed.

## Files

| File | Description |
|---|---|
| `deployment.yaml` | Deployment (single file, works on all clusters) |
| `configmap-aws.yaml` | Conductor URL + auth key for certification-aws |
| `configmap-azure.yaml` | Conductor URL + auth key for certification-az |
| `configmap-gcp.yaml` | Conductor URL + auth key for certification-gcp |
| `secret-conductor.yaml` | Conductor auth secret (placeholder template) |

## Quick Start

### 1. Create the Conductor Auth Secret

The `CONDUCTOR_AUTH_SECRET` must be created as a Kubernetes secret before deploying.

```bash
kubectl create secret generic conductor-credentials \
--from-literal=auth-secret=YOUR_AUTH_SECRET \
-n $NS
```

If the `conductor-credentials` secret already exists in the namespace (e.g. from the e2e-testrunner-worker), it can be reused as-is.

See `secret-conductor.yaml` for more details.

### 2. Apply the ConfigMap for Your Cluster

```bash
# AWS
kubectl apply -f manifests/configmap-aws.yaml -n $NS

# Azure
kubectl apply -f manifests/configmap-azure.yaml -n $NS

# GCP
kubectl apply -f manifests/configmap-gcp.yaml -n $NS
```

### 3. Deploy

```bash
kubectl apply -f manifests/deployment.yaml -n $NS
```

### 4. Verify

```bash
# Check pod status
kubectl get pods -n $NS -l app=python-sdk-harness-worker

# Watch logs
kubectl logs -n $NS -l app=python-sdk-harness-worker -f
```

## Building and Pushing the Image

From the repository root:

```bash
# Build the harness target and push to GHCR
docker buildx build \
--platform linux/amd64,linux/arm64 \
--target harness \
-t ghcr.io/conductor-oss/python-sdk/harness-worker:latest \
--push .
```

After pushing a new image with the same tag, restart the deployment to pull it:

```bash
kubectl rollout restart deployment/python-sdk-harness-worker -n $NS
kubectl rollout status deployment/python-sdk-harness-worker -n $NS
```

## Tuning

The harness worker accepts these optional environment variables (set in `deployment.yaml`):

| Variable | Default | Description |
|---|---|---|
| `HARNESS_WORKFLOWS_PER_SEC` | 2 | Workflows to start per second |
| `HARNESS_BATCH_SIZE` | 20 | Tasks each worker polls per batch |
| `HARNESS_POLL_INTERVAL_MS` | 100 | Milliseconds between poll cycles |

Edit `deployment.yaml` to change these, then re-apply:

```bash
kubectl apply -f manifests/deployment.yaml -n $NS
```

## Troubleshooting

### Pod not starting

```bash
kubectl describe pod -n $NS -l app=python-sdk-harness-worker
kubectl logs -n $NS -l app=python-sdk-harness-worker --tail=100
```

### Secret not found

```bash
kubectl get secret conductor-credentials -n $NS
```

## Resource Limits

Default resource allocation:
- **Memory**: 256Mi (request) / 512Mi (limit)
- **CPU**: 100m (request) / 500m (limit)

Adjust in `deployment.yaml` based on workload. Higher `HARNESS_WORKFLOWS_PER_SEC` values may need more CPU/memory.

## Service

The harness worker does **not** need a Service or Ingress. It connects to Conductor via outbound HTTP polling. All communication is outbound.
13 changes: 13 additions & 0 deletions harness/manifests/configmap-aws.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
# ConfigMap for certification-aws cluster
# Contains Conductor connection details specific to this cluster
apiVersion: v1
kind: ConfigMap
metadata:
name: python-sdk-harness-config
# namespace: xxxxx # supply this in kubectl command
labels:
app: python-sdk-harness-worker
data:
CONDUCTOR_SERVER_URL: "https://certification-aws.orkesconductor.io/api"
CONDUCTOR_AUTH_KEY: "7ba9d0ec-247b-11f1-8d42-ea3efeda41b2"
13 changes: 13 additions & 0 deletions harness/manifests/configmap-azure.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
# ConfigMap for certification-az cluster
# Contains Conductor connection details specific to this cluster
apiVersion: v1
kind: ConfigMap
metadata:
name: python-sdk-harness-config
# namespace: xxxxx # supply this in kubectl command
labels:
app: python-sdk-harness-worker
data:
CONDUCTOR_SERVER_URL: "https://certification-az.orkesconductor.io/api"
CONDUCTOR_AUTH_KEY: "bf170d61-2797-11f1-833e-4ae04d100a03"
13 changes: 13 additions & 0 deletions harness/manifests/configmap-gcp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
# ConfigMap for certification-gcp cluster
# Contains Conductor connection details specific to this cluster
apiVersion: v1
kind: ConfigMap
metadata:
name: python-sdk-harness-config
# namespace: xxxxx # supply this in kubectl command
labels:
app: python-sdk-harness-worker
data:
CONDUCTOR_SERVER_URL: "https://certification-gcp.orkesconductor.com/api"
CONDUCTOR_AUTH_KEY: "e6c1ac61-286b-11f1-be01-c682b5750c3a"
Loading
Loading