Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions charts/events/Chart.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,8 @@ dependencies:
- name: argo-events
repository: https://argoproj.github.io/argo-helm
version: 2.4.9
digest: sha256:5ff9d1d6df66d89986dedf48471f04c95f32ad1ce0175edbc6584635cf5a766f
generated: "2026-04-14T15:17:37.915149836+01:00"
- name: metacontroller-helm
repository: oci://ghcr.io/metacontroller
version: 4.15.0
digest: sha256:cce917621c4901d4b789c0610b11ae33dfb2d5b20483671a755871ed569b3e6d
generated: "2026-04-28T14:20:52.776857429+01:00"
6 changes: 5 additions & 1 deletion charts/events/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ name: events
description: Data Analysis event triggering
type: application

version: 0.1.3
version: 0.2.0

dependencies:
- name: argo-events
alias: argoevents
repository: https://argoproj.github.io/argo-helm
version: 2.4.9
condition: argo-events.enabled

- name: metacontroller-helm
version: 4.15.0
repository: oci://ghcr.io/metacontroller
50 changes: 50 additions & 0 deletions charts/events/crds/trigger-crds.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: triggers.workflows.diamond.ac.uk
spec:
group: workflows.diamond.ac.uk
names:
kind: Trigger
plural: triggers
singular: trigger
scope: Namespaced
versions:
- name: v1alpha1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
enabled:
type: boolean
default: true
eventName:
type: string
workflow:
type: object
required: [template]
properties:
template:
type: string
parameters:
type: array
default: []
items:
type: object
properties:
name:
type: string
messagePath:
type: string
required:
- workflow
- eventName
required:
- spec
subresources:
status: {}
154 changes: 154 additions & 0 deletions charts/events/hooks/sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
from http.server import BaseHTTPRequestHandler, HTTPServer
import json

class Controller(BaseHTTPRequestHandler):
def sync(self, parent: dict, related: dict):

triggers = []
dependencies = []
sourceTriggers: dict = related.get("Trigger.workflows.diamond.ac.uk/v1alpha1", {})
eventSourceName: str | None = parent.get("metadata", {}).get("name")
beamline: str | None = parent.get("metadata", {}).get("labels", {}).get("workflows.diamond.ac.uk/beamline")

for dlsTrigger in sourceTriggers.values():
spec: dict = dlsTrigger.get("spec", {})
name: str = dlsTrigger.get("metadata", {}).get("name")
workflow = spec.get("workflow", {})
template = workflow.get("template")
eventName = spec.get("eventName")
userParameters: list[dict] = workflow.get("parameters", [])

dependencies.append({
"name": name,
"eventSourceName": eventSourceName,
"eventName": eventName
})

sensorParams = [{
"src": {
"dependencyName": name,
"dataKey": "body.user_namespace"
},
"dest": "metadata.namespace"
}]
templateArgs = []

for userParam in userParameters:
sensorParams.append({
"src": {
"dependencyName": name,
"dataKey": userParam.get("messagePath", "")
},
"dest": f"spec.arguments.parameters.{userParameters.index(userParam)}.value"
})

templateArgs.append({"name": userParam.get("name")})

triggers.append({
"template": {
"name": name,
"argoWorkflow": {
"parameters": sensorParams,
"operation": "submit",
"source": {
"resource": {
"apiVersion": "argoproj.io/v1alpha1",
"kind": "Workflow",
"metadata": {
"generateName": f"{template}-event-",
},
"spec": {
"serviceAccountName": "argo-workflow",
"workflowTemplateRef": {
"name": template,
"clusterScope": True
},
"arguments": {
"parameters": templateArgs
},
# "workflowMetadata": {
# "labels": {
# "workflows.diamond.ac.uk/creator-posix-uid": "36055"
# }
# }
},
}
}
}
}
})

sensor = {
"apiVersion": "argoproj.io/v1alpha1",
"kind": "Sensor",
"metadata": {
"name": f"{beamline}-{eventSourceName}-sensor"
},
"spec": {
"template":
{"serviceAccountName": "operate-workflow-sa"},
"dependencies": dependencies,
"triggers": triggers
}
}

desired_children = [sensor]

desired_status = {
"triggers": len(triggers)
}

return {"status": desired_status, "children": desired_children}

def customize(self, parent: dict):

beamline: str | None = parent.get("metadata", {}).get("labels", {}).get("workflows.diamond.ac.uk/beamline")
sourceTypes = list(parent.get("spec", {}).keys())

if not sourceTypes or not beamline:
return []

return [{
"apiVersion": "workflows.diamond.ac.uk/v1alpha1",
"resource": "triggers",
"labelSelector": {
"matchExpressions": [{
"key": "workflows.diamond.ac.uk/source",
"operator": "In",
"values": sourceTypes
}],
"matchLabels": {"workflows.diamond.ac.uk/beamline": beamline}
}
}]

def do_POST(self):
if self.path == '/sync':
observed = json.loads(self.rfile.read(int(self.headers.get("content-length", 0))))
desired = self.sync(
observed.get("parent"),
observed.get("related", [])
)
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(desired).encode())

elif self.path == '/customize':
request: dict = json.loads(self.rfile.read(int(self.headers.get('content-length', 0))))
related = self.customize(request.get("parent", {}))
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"relatedResources": related}).encode())

else:
self.send_response(401)
self.send_header("Content-type", "application/json")
self.end_headers()
error_msg = {
"error": "404",
"endpoint": self.path
}
self.wfile.write(json.dumps(error_msg).encode("utf-8"))

HTTPServer(("", 80), Controller).serve_forever()
2 changes: 2 additions & 0 deletions charts/events/templates/event-sources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: {{ $eventSource.name }}
labels:
workflows.diamond.ac.uk/beamline: {{ $eventSource.beamline }}
spec:
service:
ports:
Expand Down
25 changes: 25 additions & 0 deletions charts/events/templates/example-trigger.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{{- if .Values.metacontroller.enabled -}}
apiVersion: workflows.diamond.ac.uk/v1alpha1
kind: Trigger
metadata:
name: example-trigger
labels:
workflows.diamond.ac.uk/source: webhook
workflows.diamond.ac.uk/beamline: test-beamline
spec:
enabled: true
eventName: example
workflow:
template: example-template
parameters:
- name: jpg
messagePath: body.parameters.jpg
- name: jpeg
messagePath: body.parameters.jpeg
- name: tif
messagePath: body.parameters.tif
- name: tiff
messagePath: body.parameters.tiff
- name: png
messagePath: body.parameters.png
{{- end }}
60 changes: 0 additions & 60 deletions charts/events/templates/sensor.yaml

This file was deleted.

23 changes: 23 additions & 0 deletions charts/events/templates/trigger-controller.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{{- if .Values.metacontroller.enabled -}}
apiVersion: metacontroller.k8s.io/v1alpha1
kind: CompositeController
metadata:
name: trigger-controller
spec:
generateSelector: true
parentResource:
apiVersion: argoproj.io/v1alpha1
resource: eventsources
childResources:
- apiVersion: argoproj.io/v1alpha1
resource: sensors
updateStrategy:
method: InPlace
hooks:
sync:
webhook:
url: http://workflows-metacontroller.events/sync
customize:
webhook:
url: http://workflows-metacontroller.events/customize
{{- end }}
44 changes: 44 additions & 0 deletions charts/events/templates/webhooks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{{- if .Values.metacontroller.enabled -}}
apiVersion: v1
kind: ConfigMap
metadata:
name: trigger-controller-hooks
data:
{{- (.Files.Glob "hooks/sync.py").AsConfig | nindent 2 }}
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: workflows-metacontroller
spec:
replicas: 1
selector:
matchLabels:
app: workflows-metacontroller
template:
metadata:
labels:
app: workflows-metacontroller
spec:
containers:
- name: controller
image: python:3
command: ["python3", "/hooks/sync.py"]
volumeMounts:
- name: hooks
mountPath: /hooks
volumes:
- name: hooks
configMap:
name: trigger-controller-hooks
---
apiVersion: v1
kind: Service
metadata:
name: workflows-metacontroller
spec:
selector:
app: workflows-metacontroller
ports:
- port: 80
{{- end }}
Loading
Loading