Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ rust-s3 = { version = "0.37.2", default-features = false, features = ["tokio-rus
rustls = { version = "0.23.40", features = ["ring"] }
rustls-pemfile = "2.2.0"
scopeguard = "1.2.0"
sd-notify = "0.5"
secrecy = { version = "0.10", features = ["serde"] }
send_wrapper = "0.6.0"
serde = { version = "1.0.228", features = ["derive", "rc"] }
Expand Down
5 changes: 5 additions & 0 deletions core/ai/mcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ keywords = ["iggy", "messaging", "streaming", "mcp"]
readme = "README.md"
publish = false

[features]
systemd = ["dep:sd-notify", "dep:tokio-util"]

[dependencies]
axum = { workspace = true }
axum-server = { workspace = true }
Expand All @@ -47,12 +50,14 @@ rmcp = { workspace = true, features = [
"transport-io",
"transport-streamable-http-server",
] }
sd-notify = { workspace = true, optional = true }
serde = { workspace = true }
serde_json = { workspace = true }
socket2 = { workspace = true }
strum = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true, optional = true }
tower-http = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
Expand Down
11 changes: 11 additions & 0 deletions core/ai/mcp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ Here's the example configuration to be used with Claude Desktop:

![MCP](../../../assets/iggy_mcp_server.png)

## Systemd integration

Build with the `systemd` feature to enable systemd readiness and watchdog notifications:

```sh
cargo build --bin iggy-mcp --release --features iggy-mcp/systemd
```

The MCP server behaves the same way the Iggy server does under systemd. See
[Systemd integration](../../server/README.md#systemd-integration) for details.

## Telemetry

The MCP server supports OpenTelemetry for logs and traces. To enable telemetry, add the following configuration:
Expand Down
21 changes: 21 additions & 0 deletions core/ai/mcp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ mod error;
mod log;
mod service;
mod stream;
#[cfg(feature = "systemd")]
mod systemd;

const VERSION: &str = env!("CARGO_PKG_VERSION");

Expand Down Expand Up @@ -89,6 +91,9 @@ async fn main() -> Result<(), McpRuntimeError> {
delete: config.permissions.delete,
};

#[cfg(feature = "systemd")]
let watchdog_cancel = tokio_util::sync::CancellationToken::new();

if transport == McpTransport::Stdio {
let Ok(service) = IggyService::new(iggy_client, iggy_consumer, permissions)
.serve(stdio())
Expand All @@ -101,11 +106,21 @@ async fn main() -> Result<(), McpRuntimeError> {
return Err(McpRuntimeError::FailedToCreateService);
};

#[cfg(feature = "systemd")]
systemd::notify_ready();
#[cfg(feature = "systemd")]
systemd::spawn_watchdog(watchdog_cancel.clone());

if let Err(error) = service.waiting().await {
error!("Waiting for service error. {error}");
}
} else {
api::init(config.http, iggy_client, iggy_consumer, permissions).await?;

#[cfg(feature = "systemd")]
systemd::notify_ready();
#[cfg(feature = "systemd")]
systemd::spawn_watchdog(watchdog_cancel.clone());
}

#[cfg(unix)]
Expand All @@ -127,6 +142,12 @@ async fn main() -> Result<(), McpRuntimeError> {
}
}

#[cfg(feature = "systemd")]
{
watchdog_cancel.cancel();
systemd::notify_stopping();
}

client_to_shutdown.shutdown().await?;
info!("Iggy MCP Server stopped successfully");
Ok(())
Expand Down
57 changes: 57 additions & 0 deletions core/ai/mcp/src/systemd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use tokio_util::sync::CancellationToken;
use tracing::{info, warn};

pub fn notify_ready() {
if let Err(e) = sd_notify::notify(&[sd_notify::NotifyState::Ready]) {
warn!("Failed to send systemd READY=1 notification: {e}");
}
}

pub fn notify_stopping() {
let _ = sd_notify::notify(&[sd_notify::NotifyState::Stopping]);
}

/// Spawn the watchdog keep-alive task. It stops cooperatively when `cancel`
/// fires (driven by the SIGINT/SIGTERM handler in `main`).
pub fn spawn_watchdog(cancel: CancellationToken) {
let Some(timeout) = sd_notify::watchdog_enabled() else {
return;
};

let interval = timeout / 2;
info!(
"Systemd watchdog enabled, pinging every {}s (timeout: {}s).",
interval.as_secs(),
timeout.as_secs()
);

tokio::spawn(async move {
Comment thread
hubcio marked this conversation as resolved.
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = tokio::time::sleep(interval) => {
if let Err(e) = sd_notify::notify(&[sd_notify::NotifyState::Watchdog]) {
warn!("Failed to send systemd watchdog ping: {e}");
}
}
}
}
});
}
2 changes: 2 additions & 0 deletions core/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ default = ["mimalloc", "iggy-web"]
disable-mimalloc = []
mimalloc = ["dep:mimalloc"]
iggy-web = ["dep:rust-embed", "dep:mime_guess"]
systemd = ["dep:sd-notify"]

[dependencies]
ahash = { workspace = true }
Expand Down Expand Up @@ -86,6 +87,7 @@ rolling-file = { workspace = true }
rust-embed = { workspace = true, optional = true }
rustls = { workspace = true }
rustls-pemfile = { workspace = true }
sd-notify = { workspace = true, optional = true }
secrecy = { workspace = true }
send_wrapper = { workspace = true }
serde = { workspace = true }
Expand Down
11 changes: 11 additions & 0 deletions core/server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@ This is the core server component of Apache Iggy. You can run it directly with `

The configuration file is located at [core/server/config.toml](https://github.com/apache/iggy/blob/master/core/server/config.toml). You can customize the server settings by modifying this file or by using environment variables e.g. `IGGY_TCP_ADDRESS=0.0.0.0:8090`.

## Systemd integration

Build with the `systemd` feature to enable systemd readiness and watchdog notifications:

```sh
cargo build --bin iggy-server --release --features systemd
```

The server will notify systemd when it is ready and then periodically send
watchdog messages at half the configured `WatchdogSec` interval for the unit.

![Server](../../assets/server.png)

![Architecture](../../assets/iggy_architecture.png)
21 changes: 20 additions & 1 deletion core/server/src/shard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ pub mod task_registry;
pub mod tasks;
pub mod transmission;

#[cfg(feature = "systemd")]
pub mod systemd;

mod communication;

pub use communication::calculate_shard_assignment;
Expand Down Expand Up @@ -175,6 +178,11 @@ impl IggyShard {
if !self.config.system.logging.sysinfo_print_interval.is_zero() && self.id == 0 {
periodic::spawn_sysinfo_printer(self.clone());
}

#[cfg(feature = "systemd")]
if self.id == 0 {
periodic::spawn_systemd_watchdog(self.clone());
}
}

pub async fn run(self: &Rc<Self>) -> Result<(), IggyError> {
Expand All @@ -194,7 +202,18 @@ impl IggyShard {
// Spawn shutdown handler
compio::runtime::spawn(async move {
let _ = stop_receiver.recv().await;
shard_for_shutdown.trigger_shutdown().await;
#[cfg(feature = "systemd")]
if shard_for_shutdown.id == 0 {
systemd::notify_stopping();
}
let drained = shard_for_shutdown.trigger_shutdown().await;
#[cfg(feature = "systemd")]
if shard_for_shutdown.id == 0 && !drained {
warn!("Graceful shutdown timed out; some tasks did not drain in time");
systemd::notify_status("graceful shutdown timed out");
}
#[cfg(not(feature = "systemd"))]
let _ = drained;
let _ = shutdown_complete_tx.send(()).await;
})
.detach();
Expand Down
47 changes: 47 additions & 0 deletions core/server/src/shard/systemd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

//! Thin wrappers around `sd_notify` so every systemd interaction on the server
//! side lives in one place (mirrors `core/ai/mcp/src/systemd.rs`).

use tracing::warn;

/// Tell systemd the service has finished start-up (`READY=1`).
pub fn notify_ready() {
if let Err(e) = sd_notify::notify(&[sd_notify::NotifyState::Ready]) {
warn!("Failed to send systemd READY=1 notification: {e}");
}
}

/// Tell systemd the service has begun shutting down (`STOPPING=1`).
pub fn notify_stopping() {
let _ = sd_notify::notify(&[sd_notify::NotifyState::Stopping]);
}

/// Surface a non-fatal shutdown problem in `systemctl status` / journald.
pub fn notify_status(status: &str) {
let _ = sd_notify::notify(&[sd_notify::NotifyState::Status(status)]);
}

/// Send a single watchdog keep-alive ping (`WATCHDOG=1`).
pub fn ping_watchdog() {
if let Err(e) = sd_notify::notify(&[sd_notify::NotifyState::Watchdog]) {
warn!("Failed to send systemd watchdog ping: {e}");
}
}
3 changes: 3 additions & 0 deletions core/server/src/shard/tasks/oneshot/config_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ async fn write_config(
}
}

#[cfg(feature = "systemd")]
crate::shard::systemd::notify_ready();

let mut current_config = shard_clone.config.clone();

let tcp_addr = shard_clone.tcp_bound_address.get();
Expand Down
4 changes: 4 additions & 0 deletions core/server/src/shard/tasks/periodic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ mod message_saver;
mod personal_access_token_cleaner;
mod revocation_timeout;
mod sysinfo_printer;
#[cfg(feature = "systemd")]
mod systemd_watchdog;

pub use heartbeat_verifier::spawn_heartbeat_verifier;
pub use jwt_token_cleaner::spawn_jwt_token_cleaner;
Expand All @@ -31,3 +33,5 @@ pub use message_saver::spawn_message_saver;
pub use personal_access_token_cleaner::spawn_personal_access_token_cleaner;
pub use revocation_timeout::spawn_revocation_timeout_checker;
pub use sysinfo_printer::spawn_sysinfo_printer;
#[cfg(feature = "systemd")]
pub use systemd_watchdog::spawn_systemd_watchdog;
47 changes: 47 additions & 0 deletions core/server/src/shard/tasks/periodic/systemd_watchdog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::shard::IggyShard;
use crate::shard::systemd;
use iggy_common::IggyError;
use std::rc::Rc;
use tracing::info;

pub fn spawn_systemd_watchdog(shard: Rc<IggyShard>) {
let Some(timeout) = sd_notify::watchdog_enabled() else {
return;
};

let interval = timeout / 2;
info!(
"Systemd watchdog enabled, pinging every {}s (timeout: {}s).",
interval.as_secs(),
timeout.as_secs()
);

shard
.task_registry
.periodic("systemd_watchdog")
.every(interval)
.tick(move |_shutdown| ping_watchdog())
.spawn();
}

async fn ping_watchdog() -> Result<(), IggyError> {
systemd::ping_watchdog();
Ok(())
}
Loading