Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ccb4b0e
feat: satisfied new tucana interface
raphael-goetz Apr 28, 2026
3bd0436
dependencies: updated tucana and code0-flow
raphael-goetz Apr 28, 2026
c7e88d9
feat: implemented status heartbeat
raphael-goetz Apr 28, 2026
aa23084
feat: satisfied new tucana grpc interface
raphael-goetz Apr 28, 2026
16a7cdb
Merge pull request #189 from code0-tech/#187-status-heartbeat
raphael-goetz Apr 30, 2026
f05a09e
ref: moved error codes into docs
raphael-goetz May 2, 2026
f72a532
docs: started rework
raphael-goetz May 2, 2026
547887c
ref: renamed runtime status update env
raphael-goetz May 2, 2026
7cd101c
feat: added readme
raphael-goetz May 2, 2026
13a38ae
feat: updated env-example
raphael-goetz May 2, 2026
5d87e83
docs: added dev and installation guide
raphael-goetz May 2, 2026
abae681
fix: hybrid => dynamic
raphael-goetz May 2, 2026
d94c7ca
Potential fix for pull request finding
raphael-goetz May 2, 2026
99f8794
Potential fix for pull request finding
raphael-goetz May 2, 2026
9241826
fix: Aquila => Taurus
raphael-goetz May 2, 2026
c7b75e4
Potential fix for pull request finding
raphael-goetz May 2, 2026
e8cbc1b
docs: satisfied markdown linter
raphael-goetz May 2, 2026
bf6ba87
docs: satisfied markdown linter
raphael-goetz May 2, 2026
cd945f0
fix: added title to error page
raphael-goetz May 2, 2026
e76bc01
docs: satisfied markdown linter
raphael-goetz May 2, 2026
dbdb833
docs: satisfied markdown linter
raphael-goetz May 2, 2026
9941d9f
docs: fixed mermaid diagramm
raphael-goetz May 2, 2026
89fc641
Merge pull request #192 from code0-tech/#122-setup-telescopium
raphael-goetz May 2, 2026
58c7fe8
feat: satisfied new tucana interface
raphael-goetz Apr 28, 2026
ed7d87f
dependencies: updated tucana and code0-flow
raphael-goetz Apr 28, 2026
e34f3b8
feat: satisfied new tucana grpc interface
raphael-goetz Apr 28, 2026
39f87ac
Merge branch '#186-new-module-structure' of https://github.com/code0-…
raphael-goetz May 4, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions .env-example
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
ENVIRONMENT='development'
MODE='dynamic'

NATS_URL='nats://localhost:4222'
AQUILA_URL='http://localhost:8080'
AQUILA_URL='http://localhost:50051'
AQUILA_TOKEN='token'

WITH_HEALTH_SERVICE=false
GRPC_HOST='127.0.0.1'
GRPC_PORT=50051
DEFINITIONS='./definitions'

DEFINITIONS='./definitions'
RUNTIME_STATUS_UPDATE_INTERVAL_SECONDS=30
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ edition = "2024"

[workspace.dependencies]
async-trait = "0.1.89"
code0-flow = { version = "0.0.32" }
tucana = { version = "0.0.68" }
code0-flow = { version = "0.0.33" }
tucana = { version = "0.0.70" }
Comment thread
raphael-goetz marked this conversation as resolved.
tokio = { version = "1.44.1", features = ["rt-multi-thread", "signal"] }
log = "0.4.27"
futures-lite = "2.6.0"
Expand Down
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Taurus
The heart of the execution block - the runtime itself

- Executes flows and handles test executions
- Requests single node executions from Actions
- Serves the standard CodeZero library

## Used Technologies

[Rust](https://www.rust-lang.org/) x [Tonic](https://docs.rs/tonic/latest/tonic/)

## Contribute / Setup
Read the [Installation Guide](https://docs.code0.tech/taurus/installation/) if you want to deploy a Taurus instance or see the [Development Guide](https://docs.code0.tech/taurus/dev/) if you want to contribute.
8 changes: 7 additions & 1 deletion crates/taurus-core/src/runtime/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ mod tests {
value: Some(NodeValue {
value: Some(node_value::Value::LiteralValue(value)),
}),
cast: None,
}
}

Expand All @@ -189,8 +190,11 @@ mod tests {
database_id,
runtime_parameter_id: runtime_parameter_id.to_string(),
value: Some(NodeValue {
value: Some(node_value::Value::NodeFunctionId(node_id)),
value: Some(node_value::Value::SubFlow(unimplemented!(
"Taurus needs to handle SubFlows (issue nr #184)"
))),
Comment on lines 192 to +195
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test helper constructs a NodeParameter by executing unimplemented!(), which will panic immediately when the test builds its graph (before the engine even runs). Please construct a real node_value::Value::SubFlow representing a thunk/sub-flow (or refactor tests to the new parameter encoding) so the tests can execute.

Copilot uses AI. Check for mistakes.
}),
cast: None,
}
}

Expand All @@ -208,6 +212,7 @@ mod tests {
paths: Vec::new(),
})),
}),
cast: None,
}
}

Expand Down Expand Up @@ -276,6 +281,7 @@ mod tests {
paths: Vec::new(),
})),
}),
cast: None,
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/taurus-core/src/runtime/engine/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub fn compile_flow(
let arg = match value {
node_value::Value::LiteralValue(v) => CompiledArg::Literal(v.clone()),
node_value::Value::ReferenceValue(r) => CompiledArg::Reference(r.clone()),
node_value::Value::NodeFunctionId(id) => CompiledArg::DeferredNode(*id),
node_value::Value::SubFlow(_sub_flow) => unimplemented!("Taurus needs to handle SubFlows (issue nr #184)"),
};
Comment thread
raphael-goetz marked this conversation as resolved.

parameters.push(CompiledParameter {
Expand Down
6 changes: 3 additions & 3 deletions crates/taurus-core/src/runtime/engine/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::cell::RefCell;
use std::collections::HashMap;

use futures_lite::future::block_on;
use tucana::aquila::ExecutionRequest;
use tucana::aquila::ActionExecutionRequest;
use tucana::shared::reference_value::Target;
use tucana::shared::value::Kind;
use tucana::shared::{Struct, Value};
Expand Down Expand Up @@ -485,7 +485,7 @@ impl<'a> EngineExecutor<'a> {
&self,
node: &CompiledNode,
values: Vec<Value>,
) -> Result<ExecutionRequest, RuntimeError> {
) -> Result<ActionExecutionRequest, RuntimeError> {
if node.parameters.len() != values.len() {
return Err(RuntimeError::new(
"T-CORE-000005",
Expand All @@ -499,7 +499,7 @@ impl<'a> EngineExecutor<'a> {
fields.insert(parameter.runtime_parameter_id.clone(), value);
}

Ok(ExecutionRequest {
Ok(ActionExecutionRequest {
execution_identifier: Uuid::new_v4().to_string(),
function_identifier: node.handler_id.clone(),
parameters: Some(Struct { fields }),
Expand Down
5 changes: 3 additions & 2 deletions crates/taurus-core/src/runtime/remote/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
//! trait without coupling the core engine to a specific transport.

use async_trait::async_trait;
use tucana::{aquila::ExecutionRequest, shared::Value};
use tucana::{aquila::ActionExecutionRequest, shared::Value};

use crate::types::errors::runtime_error::RuntimeError;

pub struct RemoteExecution {
/// Remote service identifier to route the call.
pub target_service: String,
/// Execution request payload expected by the remote runtime.
pub request: ExecutionRequest,
pub request: ActionExecutionRequest,

}

#[async_trait]
Expand Down
26 changes: 4 additions & 22 deletions crates/taurus-provider/src/providers/remote/nats_remote_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use prost::Message;
use taurus_core::runtime::remote::{RemoteExecution, RemoteRuntime};
use taurus_core::types::errors::runtime_error::RuntimeError;
use tonic::async_trait;
use tucana::aquila::ExecutionResult;
use tucana::aquila::ActionExecutionResponse;
use tucana::shared::Value;

pub struct NATSRemoteRuntime {
Expand Down Expand Up @@ -42,8 +42,8 @@ impl RemoteRuntime for NATSRemoteRuntime {
}
};

let decode_result = ExecutionResult::decode(message.payload);
let execution_result = match decode_result {
let decode_result = ActionExecutionResponse::decode(message.payload);
let _execution_result = match decode_result {
Ok(r) => r,
Comment on lines +45 to 47
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execute_remote decodes an ActionExecutionResponse but then discards it and never produces a Result<Value, RuntimeError> based on the response. This currently makes all remote executions unusable; please map the decoded response to Ok(Value) / Err(RuntimeError) (including handling missing/failed results) instead of ignoring it.

Copilot uses AI. Check for mistakes.
Err(err) => {
log::error!(
Expand All @@ -58,24 +58,6 @@ impl RemoteRuntime for NATSRemoteRuntime {
}
};

match execution_result.result {
Some(result) => match result {
tucana::aquila::execution_result::Result::Success(value) => Ok(value),
tucana::aquila::execution_result::Result::Error(err) => {
let code = err.code.to_string();
let description = match err.description {
Some(string) => string,
None => "Unknown Error".to_string(),
};
let error = RuntimeError::new(code, "RemoteExecutionError", description);
Err(error)
}
},
None => Err(RuntimeError::new(
"T-PROV-000003",
"RemoteRuntimeExeption",
"Result of Remote Response was empty.",
)),
}
unimplemented!("Taurus needs to handle text executions (issue nr #185)")
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving unimplemented!() in this runtime path will panic any time a remote node executes. Please replace this with actual response handling (or, at minimum, return a structured RuntimeError with an appropriate stable code/category) so production flows don't crash.

Suggested change
unimplemented!("Taurus needs to handle text executions (issue nr #185)")
Err(RuntimeError::new(
"T-PROV-000003",
"RemoteRuntimeUnsupportedResponse",
"Remote runtime response handling is not implemented for this execution type.",
))

Copilot uses AI. Check for mistakes.
}
}
80 changes: 56 additions & 24 deletions crates/taurus/src/app/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
mod worker;

use std::time::Duration;

use code0_flow::flow_config::load_env_file;
use code0_flow::flow_config::mode::Mode::DYNAMIC;
use code0_flow::flow_service::FlowUpdateService;
use std::sync::Arc;
use std::time::Duration;
use taurus_core::runtime::engine::ExecutionEngine;
use taurus_provider::providers::emitter::nats_emitter::NATSRespondEmitter;
use taurus_provider::providers::remote::nats_remote_runtime::NATSRemoteRuntime;
use tokio::signal;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tonic_health::pb::health_server::HealthServer;
use tucana::shared::{RuntimeFeature, Translation};

use crate::client::runtime_status::TaurusRuntimeStatusService;
use crate::client::runtime_usage::TaurusRuntimeUsageService;
Expand All @@ -27,7 +26,7 @@ pub async fn run() {
let client = connect_nats(&config).await;

let mut health_task = spawn_health_task(&config);
let (runtime_status_service, runtime_usage_service) =
let (runtime_status_service, runtime_usage_service, mut runtime_status_heartbeat_task) =
setup_dynamic_services_if_needed(&config).await;

let nats_remote = NATSRemoteRuntime::new(client.clone());
Expand All @@ -41,6 +40,14 @@ pub async fn run() {
);

wait_for_shutdown(&mut worker_task, &mut health_task).await;
if let Some(handle) = runtime_status_heartbeat_task.take() {
handle.abort();
if let Err(err) = handle.await {
if !err.is_cancelled() {
log::warn!("Runtime status heartbeat task ended unexpectedly: {}", err);
}
}
}
update_stopped_status(runtime_status_service.as_ref()).await;

log::info!("Taurus shutdown complete");
Expand Down Expand Up @@ -95,11 +102,12 @@ fn spawn_health_task(config: &Config) -> Option<JoinHandle<()>> {
async fn setup_dynamic_services_if_needed(
config: &Config,
) -> (
Option<TaurusRuntimeStatusService>,
Option<Arc<TaurusRuntimeStatusService>>,
Option<TaurusRuntimeUsageService>,
Option<JoinHandle<()>>,
) {
if config.mode != DYNAMIC {
return (None, None);
return (None, None, None);
}

push_definitions_until_success(config).await;
Expand All @@ -109,23 +117,60 @@ async fn setup_dynamic_services_if_needed(
.await,
);

let runtime_status_service = Some(
let runtime_status_service = Some(Arc::new(
TaurusRuntimeStatusService::from_url(
config.aquila_url.clone(),
config.aquila_token.clone(),
"taurus".into(),
runtime_features(),
)
.await,
);
));

if let Some(status_service) = runtime_status_service.as_ref() {
status_service
.update_runtime_status(tucana::shared::execution_runtime_status::Status::Running)
.await;
}

(runtime_status_service, runtime_usage_service)
let runtime_status_heartbeat_task = if config.runtime_status_update_interval_seconds > 0 {
let status_service = runtime_status_service
.as_ref()
.expect("runtime status service should exist in dynamic mode")
.clone();
let update_interval_seconds = config.runtime_status_update_interval_seconds;

let handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(update_interval_seconds));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

// First tick is immediate; consume it so heartbeats start after the interval.
interval.tick().await;

loop {
interval.tick().await;
status_service
.update_runtime_status(
tucana::shared::execution_runtime_status::Status::Running,
)
.await;
}
});

log::info!(
"Runtime status heartbeat started (interval={}s)",
update_interval_seconds
);
Some(handle)
} else {
log::info!("Runtime status heartbeat is disabled");
None
};

(
runtime_status_service,
runtime_usage_service,
runtime_status_heartbeat_task,
)
}

async fn push_definitions_until_success(config: &Config) {
Expand All @@ -152,20 +197,7 @@ async fn push_definitions_until_success(config: &Config) {
}
}

fn runtime_features() -> Vec<RuntimeFeature> {
vec![RuntimeFeature {
name: vec![Translation {
code: "en-US".to_string(),
content: "Runtime".to_string(),
}],
description: vec![Translation {
code: "en-US".to_string(),
content: "Will execute incoming flows.".to_string(),
}],
}]
}

async fn update_stopped_status(runtime_status_service: Option<&TaurusRuntimeStatusService>) {
async fn update_stopped_status(runtime_status_service: Option<&Arc<TaurusRuntimeStatusService>>) {
if let Some(status_service) = runtime_status_service {
status_service
.update_runtime_status(tucana::shared::execution_runtime_status::Status::Stopped)
Expand Down
Loading
Loading