Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ playwright-report
#local files
.local/
.planning/
**/__pycache__
5 changes: 5 additions & 0 deletions public/component_library.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
annotations: {}
folders:
- name: Aggregation
components:
- url: /components/aggregator/component.yaml
name: Aggregate Inputs
author: Hargun Singh, Morgan Wowk
- name: Quick start
components:
- url: https://raw.githubusercontent.com/Ark-kun/pipeline_components/57f780b15922061e59833541b71f3d099e710177/components/datasets/Chicago_Taxi_Trips/quick_start_version/component.yaml
Expand Down
155 changes: 155 additions & 0 deletions public/components/aggregator/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
name: Aggregate Inputs
description: |
Aggregates multiple inputs into a single output.
Supports JsonObject, JsonArray, and CSV output formats.
Connect upstream components to the input slots and choose an output format.
metadata:
annotations:
author: Hargun Singh, Morgan Wowk
# --- Aggregator-specific annotations (read by frontend) ---
tangleml.com/aggregator_components/enabled: "true"
tangleml.com/aggregator_components/default_output_type: "JsonObject"
tangleml.com/aggregator_components/supported_output_types: "JsonObject,JsonArray,CSV"
# --- Dynamic input mask ---
# The frontend reads this pattern and generates input_1, input_2, ... input_N
# as the user adds inputs. No hardcoded limit.
tangleml.com/aggregator_components/input_mask: "input_{n}"
tangleml.com/aggregator_components/input_mask_type: "String"
tangleml.com/aggregator_components/input_mask_description: "Input {n} to aggregate."
tangleml.com/aggregator_components/input_mask_optional: "true"
tangleml.com/aggregator_components/input_mask_min: "2"
tangleml.com/aggregator_components/input_mask_start: "1"
inputs:
- name: output_type
type: String
description: "Output format: JsonObject (JSON object keyed by input name), JsonArray (JSON array of values), or CSV (union of CSV inputs with matching columns)."
default: "JsonObject"
optional: true
outputs:
- name: aggregated_output
type: String
description: "The aggregated result in the chosen format."
implementation:
container:
image: python:3.12-slim
command:
- sh
- -ec
- |
program_path=$(mktemp)
printf "%s" "$0" > "$program_path"
python3 -u "$program_path" "$@"
- |
import csv
import io
import json
import os
import re
import sys


def _make_parent_dirs_and_return_path(file_path: str) -> str:
os.makedirs(os.path.dirname(file_path), exist_ok=True)
return file_path


def aggregate(
aggregated_output_path: str,
output_type: str = "JsonObject",
**input_paths: str,
) -> None:
# Collect all provided inputs
inputs = {}
for key, path in sorted(input_paths.items()):
if path and os.path.exists(path):
with open(path, "r") as f:
inputs[key] = f.read()

if not inputs:
raise ValueError("No inputs provided. At least one input must be connected.")

if output_type == "JsonObject":
result = json.dumps(inputs, indent=2)

elif output_type == "JsonArray":
result = json.dumps(list(inputs.values()), indent=2)

elif output_type == "CSV":
all_rows = []
header = None

for _name, csv_text in inputs.items():
text = csv_text.strip()
if not text:
continue
reader = csv.reader(io.StringIO(text))
rows = list(reader)
if not rows or rows == [['']]:
continue
if header is None:
header = rows[0]
all_rows.append(header)
all_rows.extend(rows[1:])
elif rows[0] == header:
all_rows.extend(rows[1:])
else:
raise ValueError(
f"CSV column mismatch. Expected {header}, got {rows[0]}. "
"All CSV inputs must have the same columns."
)

if not all_rows:
raise ValueError("No valid CSV data found in any input.")

output = io.StringIO()
writer = csv.writer(output)
writer.writerows(all_rows)
result = output.getvalue()

else:
raise ValueError(
f"Unknown output_type: '{output_type}'. Must be 'JsonObject', 'JsonArray', or 'CSV'."
)

os.makedirs(os.path.dirname(aggregated_output_path), exist_ok=True)
with open(aggregated_output_path, "w") as f:
f.write(result)


# --- Dynamic arg parsing ---
# Accepts any number of --input-N flags (no hardcoded limit).
# The frontend generates these based on the input_mask annotation.

_args = sys.argv[1:]
_output_path = None
_output_type = "JsonObject"
_input_paths = {}

_i = 0
while _i < len(_args):
_arg = _args[_i]

if _arg == "--aggregated-output" and _i + 1 < len(_args):
_output_path = _make_parent_dirs_and_return_path(_args[_i + 1])
_i += 2
elif _arg == "--output-type" and _i + 1 < len(_args):
_output_type = _args[_i + 1]
_i += 2
else:
# Match any --input-N pattern dynamically
_match = re.match(r"^--input-(\d+)$", _arg)
if _match and _i + 1 < len(_args):
_n = _match.group(1)
_input_paths[f"input_{_n}"] = _args[_i + 1]
_i += 2
else:
_i += 1

if not _output_path:
raise ValueError("--aggregated-output is required.")

aggregate(
aggregated_output_path=_output_path,
output_type=_output_type,
**_input_paths,
)