Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 39 additions & 7 deletions scylla-server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions scylla-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "scylla-server"
version = "0.0.1"
edition = "2021"
edition = "2024"
default-run = "scylla-server"

[dependencies]
Expand All @@ -15,7 +15,7 @@ axum = { version = "0.8.4", features = ["multipart", "macros"] }
tower = { version = "0.5.2", features = ["timeout"] }
tower-http = { version = "0.6.7", features = ["cors", "trace"] }
socketioxide = { version = "0.18.0", features = ["tracing"] }
rumqttc = { git = "https://github.com/bytebeamio/rumqtt", branch = "main"}
rumqttc = { version = "0.25.1" }
tokio-util = { version= "0.7.16", features = ["full"] }
tracing = "0.1.43"
tracing-subscriber = { version = "0.3.20", features = ["ansi", "env-filter"] }
Expand All @@ -34,6 +34,9 @@ rustc-hash = "2.1.1"
evalexpr = "13.0.0"
serde_with = "3.14.1"
derive_more = { version = "2.0.1", features = ["as_ref", "display"] }
tokio-tungstenite = "0.28.0"
futures-util = "0.3.31"
tungstenite = "0.28.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6"

Expand All @@ -58,4 +61,3 @@ strip = false
[[bin]]
name = "scylla-server"
path = "src/main.rs"

2 changes: 1 addition & 1 deletion scylla-server/src/controllers/car_command_controller.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::sync::Arc;

use axum::{
Extension,
extract::{Json, Path},
http::StatusCode,
Extension,
};
use axum_extra::extract::Query;
use protobuf::Message;
Expand Down
6 changes: 3 additions & 3 deletions scylla-server/src/controllers/data_controller.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use axum::{
extract::{Path, Query, State},
Json,
extract::{Path, Query, State},
};
use serde::Deserialize;

use crate::{
error::ScyllaError, services::data_service, transformers::data_transformer::PublicData,
PoolHandle,
PoolHandle, error::ScyllaError, services::data_service,
transformers::data_transformer::PublicData,
};

#[derive(Deserialize)]
Expand Down
6 changes: 3 additions & 3 deletions scylla-server/src/controllers/data_type_controller.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use axum::{extract::State, Json};
use axum::{Json, extract::State};

use crate::{
error::ScyllaError, services::data_type_service,
transformers::data_type_transformer::PublicDataType, PoolHandle,
PoolHandle, error::ScyllaError, services::data_type_service,
transformers::data_type_transformer::PublicDataType,
};

/// Get a list of data types
Expand Down
4 changes: 2 additions & 2 deletions scylla-server/src/controllers/file_insertion_controller.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::sync::Arc;

use axum::{
extract::{Multipart, State},
Extension, Json,
extract::{Multipart, State},
};
use axum_macros::debug_handler;
use chrono::DateTime;
Expand All @@ -13,10 +13,10 @@ use tokio::{fs, sync::mpsc};
use tracing::{debug, info, trace, warn};

use crate::{
ClientData, PoolHandle,
error::ScyllaError,
proto::{playback_data, serverdata},
services::run_service,
ClientData, PoolHandle,
};

use super::OutputDirectory;
Expand Down
4 changes: 2 additions & 2 deletions scylla-server/src/controllers/rule_controller.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::sync::Arc;

use axum::{debug_handler, extract::Path, Extension, Json};
use axum::{Extension, Json, debug_handler, extract::Path};
use axum_extra::{
headers::{authorization::Basic, Authorization},
TypedHeader,
headers::{Authorization, authorization::Basic},
};
use tokio::sync::RwLock;
use tracing::debug;
Expand Down
4 changes: 2 additions & 2 deletions scylla-server/src/controllers/run_controller.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::sync::atomic::Ordering;

use axum::{
extract::{Path, State},
Json,
extract::{Path, State},
};

use crate::{
error::ScyllaError, services::run_service, transformers::run_transformer::PublicRun, PoolHandle,
PoolHandle, error::ScyllaError, services::run_service, transformers::run_transformer::PublicRun,
};

/// get a list of runs
Expand Down
6 changes: 3 additions & 3 deletions scylla-server/src/controllers/scylla_config_controller.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::sync::atomic::Ordering;

use axum::{extract::Path, Json};
use axum::{Json, extract::Path};
use serde::Serialize;

use crate::{
error::ScyllaError, BATCH_UPSERT_TIME, DATA_UPLOAD_DISABLE, RATE_LIMIT_MODE,
SOCKET_DISCARD_PERCENT, STATIC_RATE_LIMIT_VALUE,
BATCH_UPSERT_TIME, DATA_UPLOAD_DISABLE, RATE_LIMIT_MODE, SOCKET_DISCARD_PERCENT,
STATIC_RATE_LIMIT_VALUE, error::ScyllaError,
};

/// holding all of scylla's settings
Expand Down
4 changes: 2 additions & 2 deletions scylla-server/src/controllers/video_streamer_controller.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{fs, io::SeekFrom, sync::Arc, vec};

use axum::{
Extension, Json,
body::{Body, BodyDataStream},
extract::Path,
http::{header, HeaderMap, Response, StatusCode},
Extension, Json,
http::{HeaderMap, Response, StatusCode, header},
};
use rumqttc::v5::AsyncClient;
use tokio::{
Expand Down
4 changes: 2 additions & 2 deletions scylla-server/src/db_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use tokio::sync::{broadcast, mpsc};
use tokio::time::Duration;

use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument, trace, warn, Level};
use tracing::{Level, debug, info, instrument, trace, warn};

use crate::services::{data_service, data_type_service};
use crate::{ClientData, PoolHandle, BATCH_UPSERT_TIME, DATA_UPLOAD_DISABLE};
use crate::{BATCH_UPSERT_TIME, ClientData, DATA_UPLOAD_DISABLE, PoolHandle};

/// A few threads to manage the processing and inserting of special types,
/// upserting of metadata for data, and batch uploading the database
Expand Down
45 changes: 45 additions & 0 deletions scylla-server/src/grafana_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use crate::ClientData;
use futures_util::SinkExt;
use tokio::sync::broadcast;
use tokio_tungstenite::connect_async;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};

pub async fn grafana_live_handler(
cancel_token: CancellationToken,
mut data_channel: broadcast::Receiver<ClientData>,
gf_url: String,
gf_token: String,
gf_socket: String,
_gf_measurement: String,
) {
let req = tungstenite::client::ClientRequestBuilder::new(
(format!("{}/api/live/push/{}", gf_url, gf_socket))
.parse()
.expect("Invalid Grafana URL"),
)
.with_header("Authorization", format!("Bearer {gf_token}"));
let (mut ws_st, _) = match connect_async(req).await {
Ok(res) => res,
Err(res) => {
warn!("Could not connect to Grafana! {}", res);
warn!("Bailing out of Grafana handler!");
return;
}
};

loop {
tokio::select! {
_ = cancel_token.cancelled() => {
debug!("Shutting down Grafana handler!");
break;
},
Ok(data) = data_channel.recv() => {
match ws_st.send(tungstenite::Message::Text(data.to_influx_lp().into())).await {
Ok(_) => trace!("Pushed message to Grafana"),
Err(_) => debug!("Could not push Grafana message!"),
}
}
}
}
}
37 changes: 37 additions & 0 deletions scylla-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod services;
pub mod db_handler;
pub mod mqtt_processor;

pub mod grafana_handler;
pub mod metadata_structs;
pub mod rule_structs;
pub mod socket_handler;
Expand Down Expand Up @@ -100,3 +101,39 @@ impl From<ClientData> for models::DataInsert {
}
}
}

impl ClientData {
/// Converts ClientData to Influx Line Protocol
/// Skips tags, handles multi-value points by sending as name_idex
fn to_influx_lp(&self) -> String {
if self.values.len() == 1 {
format!(
"{} {}={} {}",
self.name,
self.name,
self.values.first().unwrap_or(&-1.0),
self.timestamp.timestamp_nanos_opt().unwrap_or(0)
)
} else {
let mut res = format!("{} ", self.name);
for i in 0..self.values.len() {
res.push_str(&format!(
"{}_{}={}",
self.name,
i,
self.values
.get(i)
.expect("Impossible OOB on Grafana multi-point sender")
));
}
res.push_str(
&self
.timestamp
.timestamp_nanos_opt()
.unwrap_or(0)
.to_string(),
);
res
}
}
}
Loading