Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scylla-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "scylla-server"
version = "0.0.1"
edition = "2021"
edition = "2024"
default-run = "scylla-server"

[dependencies]
Expand Down
10 changes: 6 additions & 4 deletions scylla-server/src/controllers/car_command_controller.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use axum::{extract::Path, Extension};
use axum::{Extension, extract::Path};
use axum_extra::extract::Query;
use protobuf::Message;
use rumqttc::v5::AsyncClient;
Expand All @@ -19,10 +19,12 @@ pub struct ConfigRequest {

/// Sends a configuration to the car over MQTT
/// * `key` - The key of the configuration, as defined in the cangen YAML
/// * `data_query` - The data of the configuration, a URI query list of data=<f32>. If empty or too short, filled with cangen YAMl defaults
/// * `data_query` - The data of the configuration, a URI query list of data=<f32>. If empty or too short, filled with cangen YAML defaults
/// * `client` - The MQTT client to be used to send the data
///
/// More info: This follows the specification of sending a command_data object over siren to topic CALYPSO_BIDIR_CMD_PREFIX/<key>
/// More info: This follows the specification of sending a `command_data` object over siren to topic `CALYPSO_BIDIR_CMD_PREFIX`/<key>
/// # Errors
/// Returns a scyllaError if the DB fails
pub async fn send_config_command(
Path(key): Path<String>,
Query(data_query): Query<ConfigRequest>,
Expand All @@ -48,7 +50,7 @@ pub async fn send_config_command(
// publish the message to the topic that calypso's encoder is susbcribed to
if let Err(err) = client
.publish(
format!("{}{}", CALYPSO_BIDIR_CMD_PREFIX, key),
format!("{CALYPSO_BIDIR_CMD_PREFIX}{key}"),
rumqttc::v5::mqttbytes::QoS::ExactlyOnce,
false,
bytes,
Expand Down
8 changes: 5 additions & 3 deletions scylla-server/src/controllers/data_controller.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use axum::{
extract::{Path, State},
Json,
extract::{Path, State},
};

use crate::{
error::ScyllaError, services::data_service, transformers::data_transformer::PublicData,
PoolHandle,
PoolHandle, error::ScyllaError, services::data_service,
transformers::data_transformer::PublicData,
};

/// Get all of the data points of a certain data type name and run ID
/// # Errors
/// Returns a scyllaError if the DB fails
pub async fn get_data(
State(pool): State<PoolHandle>,
Path((data_type_name, run_id)): Path<(String, i32)>,
Expand Down
8 changes: 5 additions & 3 deletions scylla-server/src/controllers/data_type_controller.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use axum::{extract::State, Json};
use axum::{Json, extract::State};

use crate::{
error::ScyllaError, services::data_type_service,
transformers::data_type_transformer::PublicDataType, PoolHandle,
PoolHandle, error::ScyllaError, services::data_type_service,
transformers::data_type_transformer::PublicDataType,
};

/// Get a list of data types
/// # Errors
/// Returns a scyllaError if the DB fails
pub async fn get_all_data_types(
State(pool): State<PoolHandle>,
) -> Result<Json<Vec<PublicDataType>>, ScyllaError> {
Expand Down
21 changes: 13 additions & 8 deletions scylla-server/src/controllers/file_insertion_controller.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use axum::{
extract::{Multipart, State},
Extension,
extract::{Multipart, State},
};
use axum_macros::debug_handler;
use chrono::DateTime;
Expand All @@ -10,12 +10,18 @@ use tokio::sync::mpsc;
use tracing::{debug, info, trace, warn};

use crate::{
error::ScyllaError, proto::playback_data, services::run_service, ClientData, PoolHandle,
ClientData, PoolHandle, error::ScyllaError, proto::playback_data, services::run_service,
};

/// Inserts a file using http multipart
/// This file is parsed and clientdata values are extracted, the run ID of each variable is inferred, and then data is batch uploaded
/// # Errors
/// Returns a scyllaError if the DB fails
/// # Panics
/// Panics if impossible time generated
// super cool: adding this tag tells you what variable is misbehaving in cases of axum Send+Sync Handler fails
#[allow(clippy::cast_possible_wrap)]
#[allow(clippy::cast_sign_loss)]
#[debug_handler]
pub async fn insert_file(
State(pool): State<PoolHandle>,
Expand Down Expand Up @@ -62,19 +68,18 @@ pub async fn insert_file(
match stream.read_message::<playback_data::PlaybackData>() {
Ok(f) => {
trace!("Decoded file msg: {}", f);
let f = match run_rng.get(&f.time_us) {
Some(a) => ClientData {
let f = if let Some(a) = run_rng.get(&f.time_us) {
ClientData {
run_id: *a,
name: f.topic.clone(),
unit: f.unit,
values: f.values,
timestamp: DateTime::from_timestamp_micros(f.time_us as i64)
.unwrap(),
},
None => {
count_bad_run += 1;
continue;
}
} else {
count_bad_run += 1;
continue;
};
insertable_data.push(f);
}
Expand Down
22 changes: 16 additions & 6 deletions scylla-server/src/controllers/run_controller.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::sync::atomic::Ordering;

use axum::{
extract::{Path, State},
Json,
extract::{Path, State},
};

use crate::{
error::ScyllaError, services::run_service, transformers::run_transformer::PublicRun, PoolHandle,
PoolHandle, error::ScyllaError, services::run_service, transformers::run_transformer::PublicRun,
};

/// get a list of runs
/// # Errors
/// Returns a scyllaError if the DB fails
pub async fn get_all_runs(
State(pool): State<PoolHandle>,
) -> Result<Json<Vec<PublicRun>>, ScyllaError> {
Expand All @@ -22,6 +24,8 @@ pub async fn get_all_runs(
}

/// get the latest run
/// # Errors
/// Returns a scyllaError if the DB fails
pub async fn get_latest_run(
State(pool): State<PoolHandle>,
) -> Result<Json<PublicRun>, ScyllaError> {
Expand All @@ -34,18 +38,18 @@ pub async fn get_latest_run(
}

/// get a run given its ID
/// # Errors
/// Returns a scyllaError if the DB fails
pub async fn get_run_by_id(
State(pool): State<PoolHandle>,
Path(run_id): Path<i32>,
) -> Result<Json<PublicRun>, ScyllaError> {
let mut db = pool.get().await?;
let run_data = run_service::get_run_by_id(&mut db, run_id).await?;

if run_data.is_none() {
let Some(run_data_safe) = run_data else {
return Err(ScyllaError::EmptyResult);
}

let run_data_safe = run_data.unwrap();
};

let transformed_run_data = PublicRun::from(run_data_safe);

Expand All @@ -54,6 +58,8 @@ pub async fn get_run_by_id(

/// create a new run with an auto-incremented ID
/// note the new run must be updated so the channel passed in notifies the data processor to use the new run
/// # Errors
/// Returns a scyllaError if the DB fails
pub async fn new_run(State(pool): State<PoolHandle>) -> Result<Json<PublicRun>, ScyllaError> {
let mut db = pool.get().await?;
let run_data = run_service::create_run(&mut db, chrono::offset::Utc::now()).await?;
Expand All @@ -68,6 +74,8 @@ pub async fn new_run(State(pool): State<PoolHandle>) -> Result<Json<PublicRun>,
}

/// creates a new run with all associated data (driver, location, notes)
/// # Errors
/// Returns a scyllaError if the DB fails
pub async fn new_run_with_data(
State(pool): State<PoolHandle>,
Path((driver, location, run_notes)): Path<(String, String, String)>,
Expand All @@ -92,6 +100,8 @@ pub async fn new_run_with_data(
}

/// updates a run's notes with a given run id
/// # Errors
/// Returns a scyllaError if the DB fails
pub async fn update_run_with_data(
State(pool): State<PoolHandle>,
Path((run_id, driver, location, run_notes)): Path<(i32, String, String, String)>,
Expand Down
33 changes: 21 additions & 12 deletions scylla-server/src/db_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tokio::sync::{broadcast, mpsc};
use tokio::time::Duration;

use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument, trace, warn, Level};
use tracing::{Level, debug, info, instrument, trace, warn};

use crate::services::{data_service, data_type_service};
use crate::{ClientData, PoolHandle};
Expand All @@ -25,11 +25,12 @@ pub struct DbHandler {
}

/// Chunks a vec into roughly equal vectors all under size `max_chunk_size`
/// This precomputes vec capacity but does however call to_vec(), reallocating the slices
fn chunk_vec<T: Clone>(input: Vec<T>, max_chunk_size: usize) -> Vec<Vec<T>> {
if max_chunk_size == 0 {
panic!("Maximum chunk size must be greater than zero");
}
/// This precomputes vec capacity but does however call `to_vec()`, reallocating the slices
fn chunk_vec<T: Clone>(input: &[T], max_chunk_size: usize) -> Vec<Vec<T>> {
assert!(
max_chunk_size > 0,
"Maximum chunk size must be greater than zero"
);

let len = input.len();
if len == 0 {
Expand All @@ -56,6 +57,7 @@ fn chunk_vec<T: Clone>(input: Vec<T>, max_chunk_size: usize) -> Vec<Vec<T>> {
impl DbHandler {
/// Make a new db handler
/// * `recv` - the broadcast reciver of which clientdata will be sent
#[must_use]
pub fn new(
receiver: broadcast::Receiver<ClientData>,
pool: PoolHandle,
Expand All @@ -80,7 +82,7 @@ impl DbHandler {
) {
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
() = cancel_token.cancelled() => {
let Ok(mut database) = pool.get().await else {
warn!("Could not get connection for cleanup");
break;
Expand All @@ -94,7 +96,7 @@ impl DbHandler {
continue;
}
let chunk_size = final_msgs.len() / ((final_msgs.len() / 8190) + 1);
let chunks = chunk_vec(final_msgs, chunk_size);
let chunks = chunk_vec(&final_msgs, chunk_size);
debug!("Batch uploading {} chunks in sequence", chunks.len());
for chunk in chunks {
info!(
Expand All @@ -116,7 +118,7 @@ impl DbHandler {
}
let msg_len = msgs.len();
let chunk_size = msg_len / ((msg_len / 8190) + 1);
let chunks = chunk_vec(msgs, chunk_size);
let chunks = chunk_vec(&msgs, chunk_size);
info!("Batch uploading {} chunks in parrallel, {} messages.", chunks.len(), msg_len);
for chunk in chunks {
tokio::spawn(DbHandler::batch_upload(chunk, pool.clone()));
Expand All @@ -138,7 +140,7 @@ impl DbHandler {
) {
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
() = cancel_token.cancelled() => {
warn!("Cancelling fake upload with {} batches left in queue!", batch_queue.len());
break;
},
Expand All @@ -163,9 +165,16 @@ impl DbHandler {

/// A loop which uses self and a sender channel to process data
/// If the data is special, i.e. coordinates, driver, etc. it will store it in its special location of the db immediately
/// For all data points it will add the to the data_channel for batch uploading logic when a certain time has elapsed
/// For all data points it will add the to the `data_channel` for batch uploading logic when a certain time has elapsed
/// Before this time the data is stored in an internal queue.
/// On cancellation, the messages currently in the queue will be sent as a final flush of any remaining messages received before cancellation
/// # Panics
/// Panics if the last data sent to the thread fails to be sent
#[allow(
clippy::cast_sign_loss,
clippy::cast_precision_loss,
clippy::cast_possible_truncation
)]
pub async fn handling_loop(
mut self,
data_channel: mpsc::Sender<Vec<ClientData>>,
Expand All @@ -176,7 +185,7 @@ impl DbHandler {
let mut max_batch_size = 2usize;
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
() = cancel_token.cancelled() => {
debug!("Pushing final messages to queue");
data_channel.send(self.data_queue).await.expect("Could not comm data to db thread, shutdown");
break;
Expand Down
4 changes: 2 additions & 2 deletions scylla-server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ impl IntoResponse for ScyllaError {
let (status, reason) = match self {
ScyllaError::ConnError(error) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Could not connect to db: {}", error),
format!("Could not connect to db: {error}"),
),
ScyllaError::DbError(error) => (
StatusCode::BAD_REQUEST,
format!("Misc query error: {}", error),
format!("Misc query error: {error}"),
),
ScyllaError::InvalidEncoding(reason) => (StatusCode::UNPROCESSABLE_ENTITY, reason),
ScyllaError::CommFailure(reason) => (StatusCode::BAD_GATEWAY, reason),
Expand Down
3 changes: 3 additions & 0 deletions scylla-server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![warn(clippy::pedantic)]

use chrono::serde::ts_milliseconds;

pub mod controllers;
Expand All @@ -15,6 +17,7 @@ pub mod models;
#[allow(non_snake_case)]
pub mod schema;

#[allow(clippy::pedantic)]
pub mod proto;

pub mod transformers;
Expand Down
Loading
Loading