Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions scylla-server/src/controllers/rule_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ pub async fn get_all_rules(
Json(rules_manager.get_all_rules().await)
}

#[debug_handler]
pub async fn get_client_subscribed_rules(
Path(client_id): Path<String>,
Extension(rules_manager): Extension<Arc<RuleManager>>,
) -> Json<Vec<Rule>> {
debug!("Fetching subscribed rules for client {}", client_id);
Json(rules_manager.get_client_rules(ClientId(client_id)).await)
}

#[debug_handler]
pub async fn get_all_rules_with_client_info(
Path(client_id): Path<String>,
Expand Down
6 changes: 5 additions & 1 deletion scylla-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use scylla_server::{
data_type_controller, file_insertion_controller,
rule_controller::{
add_rule, delete_rule, edit_rule, get_all_rules, get_all_rules_with_client_info,
subscribe_rules,
get_client_subscribed_rules, subscribe_rules,
},
run_controller, scylla_config_controller,
video_streamer_controller::{self},
Expand Down Expand Up @@ -411,6 +411,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.route("/rules/delete/{rule_id}", post(delete_rule))
.route("/rules", get(get_all_rules))
.route("/rules/{client_id}", get(get_all_rules_with_client_info))
.route(
"/rules/subscribed/{client_id}",
get(get_client_subscribed_rules),
)
.route("/rules/edit/{rule_id}", put(edit_rule))
.route("/rules/subscribe", post(subscribe_rules))
//.route("/rules/delete/{rule_id}", post()).route("/rules/poll")
Expand Down
20 changes: 20 additions & 0 deletions scylla-server/src/rule_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,26 @@ impl RuleManager {
.collect()
}

pub async fn get_client_rules(&self, client_id: ClientId) -> Vec<Rule> {
let subscribed_rule_ids = self
.subscriptions
.read()
.await
.get_right(&client_id)
.cloned()
.unwrap_or_default();

if subscribed_rule_ids.is_empty() {
return Vec::new();
}

let rules = self.rules.read().await;
subscribed_rule_ids
.into_iter()
.filter_map(|rule_id| rules.get(&rule_id).cloned())
.collect()
}

pub async fn get_all_rules_with_subscription_status(
&self,
requesting_client_id: ClientId,
Expand Down
67 changes: 67 additions & 0 deletions scylla-server/tests/rule_structs_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,73 @@ async fn test_handle_msg_multiple_clients_same_rule() -> Result<(), RuleManagerE
Ok(())
}

#[tokio::test]
async fn test_get_client_rules_returns_only_subscribed_rules() -> Result<(), RuleManagerError> {
let rule_manager = RuleManager::new();
let owner = ClientId("owner".to_string());
let client = ClientId("client_2".to_string());

let rule_1 = Rule::new(
RuleId("rule_1".to_string()),
Topic("topic/1".to_string()),
core::time::Duration::from_secs(60),
"a > 10".to_owned(),
);
let rule_2 = Rule::new(
RuleId("rule_2".to_string()),
Topic("topic/2".to_string()),
core::time::Duration::from_secs(60),
"a > 20".to_owned(),
);
let rule_3 = Rule::new(
RuleId("rule_3".to_string()),
Topic("topic/3".to_string()),
core::time::Duration::from_secs(60),
"a > 30".to_owned(),
);

rule_manager.add_rule(owner.clone(), rule_1).await?;
rule_manager.add_rule(owner, rule_2).await?;
rule_manager.add_rule(client.clone(), rule_3).await?;

rule_manager
.subscribe_rules(client.clone(), vec![RuleId("rule_1".to_string())])
.await?;

let client_rules = rule_manager.get_client_rules(client).await;
let rule_ids: std::collections::HashSet<String> =
client_rules.into_iter().map(|rule| rule.id.0).collect();

assert_eq!(rule_ids.len(), 2);
assert!(rule_ids.contains("rule_1"));
assert!(rule_ids.contains("rule_3"));
assert!(!rule_ids.contains("rule_2"));

Ok(())
}

#[tokio::test]
async fn test_get_client_rules_empty_for_missing_client() -> Result<(), RuleManagerError> {
let rule_manager = RuleManager::new();
let owner = ClientId("owner".to_string());

let rule = Rule::new(
RuleId("rule_1".to_string()),
Topic("topic/1".to_string()),
core::time::Duration::from_secs(60),
"a > 10".to_owned(),
);

rule_manager.add_rule(owner, rule).await?;

let missing_client_rules = rule_manager
.get_client_rules(ClientId("missing_client".to_string()))
.await;
assert!(missing_client_rules.is_empty());

Ok(())
}

fn check_rules_present(rules: Vec<Rule>, prefix: &str, k: usize) {
assert_eq!(rules.len(), k);
let topics = rules.into_iter().map(|r| r.topic.0).collect::<Vec<_>>();
Expand Down