Skip to content

Commit a78d091

Browse files
committed
feat: replaced old definition services with new module service
1 parent c4b0ec7 commit a78d091

1 file changed

Lines changed: 40 additions & 190 deletions

File tree

src/flow_service/mod.rs

Lines changed: 40 additions & 190 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,15 @@ use crate::{
44
};
55
use tonic::{Extensions, Request, transport::Channel};
66
use tucana::{
7-
aquila::{
8-
DataTypeUpdateRequest, FlowTypeUpdateRequest, FunctionDefinitionUpdateRequest,
9-
RuntimeFunctionDefinitionUpdateRequest, data_type_service_client::DataTypeServiceClient,
10-
flow_type_service_client::FlowTypeServiceClient,
11-
function_definition_service_client::FunctionDefinitionServiceClient,
12-
runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient,
13-
},
14-
shared::{
15-
DefinitionDataType as DataType, FlowType, FunctionDefinition, RuntimeFunctionDefinition,
16-
},
7+
aquila::{ModuleUpdateRequest, module_service_client::ModuleServiceClient},
8+
shared::Module,
179
};
1810

1911
pub mod auth;
2012
pub mod retry;
2113

2214
pub struct FlowUpdateService {
23-
data_types: Vec<DataType>,
24-
runtime_functions: Vec<RuntimeFunctionDefinition>,
25-
functions: Vec<FunctionDefinition>,
26-
flow_types: Vec<FlowType>,
15+
modules: Vec<Module>,
2716
channel: Channel,
2817
aquila_token: String,
2918
definition_source: Option<String>,
@@ -34,35 +23,19 @@ impl FlowUpdateService {
3423
///
3524
/// This will read the definition files from the given path and initialize the service with the data types, runtime function definitions, function definitions, and flow types.
3625
pub async fn from_url(aquila_url: String, definition_path: &str, aquila_token: String) -> Self {
37-
let mut data_types = Vec::new();
38-
let mut runtime_functions = Vec::new();
39-
let mut functions = Vec::new();
40-
let mut flow_types = Vec::new();
41-
4226
let reader = Reader::configure(definition_path.to_string(), true, vec![], None);
43-
44-
let features = match reader.read_features() {
45-
Ok(features) => features,
27+
let modules = match reader.read_modules() {
28+
Ok(modules) => modules,
4629
Err(error) => {
4730
log::error!("Error occurred while reading definitions: {:?}", error);
4831
panic!("Error occurred while reading definitions")
4932
}
5033
};
5134

52-
for feature in features {
53-
data_types.append(&mut feature.data_types.clone());
54-
flow_types.append(&mut feature.flow_types.clone());
55-
runtime_functions.append(&mut feature.runtime_functions.clone());
56-
functions.append(&mut feature.functions.clone());
57-
}
58-
5935
let channel = create_channel_with_retry("Aquila", aquila_url).await;
6036

6137
Self {
62-
data_types,
63-
runtime_functions,
64-
functions,
65-
flow_types,
38+
modules,
6639
channel,
6740
aquila_token,
6841
definition_source: None,
@@ -74,194 +47,71 @@ impl FlowUpdateService {
7447
self
7548
}
7649

77-
pub fn with_flow_types(mut self, flow_types: Vec<FlowType>) -> Self {
78-
self.flow_types = flow_types;
79-
self
80-
}
81-
82-
pub fn with_data_types(mut self, data_types: Vec<DataType>) -> Self {
83-
self.data_types = data_types;
84-
self
85-
}
86-
87-
pub fn with_runtime_functions(
88-
mut self,
89-
runtime_functions: Vec<RuntimeFunctionDefinition>,
90-
) -> Self {
91-
self.runtime_functions = runtime_functions;
92-
self
93-
}
94-
95-
pub fn with_functions(mut self, functions: Vec<FunctionDefinition>) -> Self {
96-
self.functions = functions;
97-
self
98-
}
99-
10050
pub async fn send(&mut self) {
10151
let _ = self.send_with_status().await;
10252
}
10353

10454
pub async fn send_with_status(&mut self) -> bool {
105-
let data_types_success = self.update_data_types().await;
106-
let runtime_functions_success = self.update_runtime_functions().await;
107-
let functions_success = self.update_functions().await;
108-
let flow_types_success = self.update_flow_types().await;
109-
data_types_success && runtime_functions_success && functions_success && flow_types_success
55+
self.update().await
11056
}
11157

112-
async fn update_data_types(&mut self) -> bool {
113-
if self.data_types.is_empty() {
114-
log::info!("No DataTypes present.");
58+
async fn update(&mut self) -> bool {
59+
if self.modules.is_empty() {
60+
log::info!("No Modules are present, aboarting update.");
11561
return true;
11662
}
11763

64+
let mut modules = self.modules.clone();
11865
if let Some(source) = &self.definition_source {
119-
for data_type in self.data_types.iter_mut() {
120-
data_type.definition_source = source.to_string();
121-
}
66+
modules = modules
67+
.into_iter()
68+
.map(|module| apply_definition_source_to_module(module, source.clone()))
69+
.collect::<Vec<_>>();
12270
}
12371

124-
log::info!("Updating {} DataTypes.", self.data_types.len());
125-
let mut client = DataTypeServiceClient::new(self.channel.clone());
72+
log::info!("Updating {} Modules.", self.modules.len());
73+
let mut client = ModuleServiceClient::new(self.channel.clone());
12674
let request = Request::from_parts(
12775
get_authorization_metadata(&self.aquila_token),
12876
Extensions::new(),
129-
DataTypeUpdateRequest {
130-
data_types: self.data_types.clone(),
131-
},
77+
ModuleUpdateRequest { modules },
13278
);
13379

13480
match client.update(request).await {
13581
Ok(response) => {
13682
let res = response.into_inner();
137-
log::info!(
138-
"Was the update of the DataTypes accepted by Sagittarius? {}",
139-
res.success
140-
);
141-
142-
res.success
143-
}
144-
Err(err) => {
145-
log::error!("Failed to update data types: {:?}", err);
146-
false
147-
}
148-
}
149-
}
150-
151-
async fn update_functions(&mut self) -> bool {
152-
if self.functions.is_empty() {
153-
log::info!("No FunctionDefinitions present.");
154-
return true;
155-
}
15683

157-
if let Some(source) = &self.definition_source {
158-
for function in self.functions.iter_mut() {
159-
function.definition_source = source.to_string();
160-
}
161-
};
162-
163-
log::info!("Updating {} FunctionDefinitions.", self.functions.len());
164-
let mut client = FunctionDefinitionServiceClient::new(self.channel.clone());
165-
let request = Request::from_parts(
166-
get_authorization_metadata(&self.aquila_token),
167-
Extensions::new(),
168-
FunctionDefinitionUpdateRequest {
169-
functions: self.functions.clone(),
170-
},
171-
);
84+
match res.success {
85+
true => log::info!("Module definition update has been successful"),
86+
false => log::warn!("Module definition update has been unsuccessful"),
87+
};
17288

173-
match client.update(request).await {
174-
Ok(response) => {
175-
let res = response.into_inner();
176-
log::info!(
177-
"Was the update of the FunctionDefinitions accepted by Sagittarius? {}",
178-
res.success
179-
);
18089
res.success
18190
}
18291
Err(err) => {
183-
log::error!("Failed to update function definitions: {:?}", err);
92+
log::error!("Module definition update failed. Reason: {:?}", err);
18493
false
18594
}
18695
}
18796
}
97+
}
18898

189-
async fn update_runtime_functions(&mut self) -> bool {
190-
if self.runtime_functions.is_empty() {
191-
log::info!("No RuntimeFunctionDefinitions present.");
192-
return true;
193-
}
194-
195-
if let Some(source) = &self.definition_source {
196-
for runtime_function in self.runtime_functions.iter_mut() {
197-
runtime_function.definition_source = source.to_string();
198-
}
199-
}
200-
201-
log::info!(
202-
"Updating {} RuntimeFunctionDefinitions.",
203-
self.runtime_functions.len()
204-
);
205-
let mut client = RuntimeFunctionDefinitionServiceClient::new(self.channel.clone());
206-
let request = Request::from_parts(
207-
get_authorization_metadata(&self.aquila_token),
208-
Extensions::new(),
209-
RuntimeFunctionDefinitionUpdateRequest {
210-
runtime_functions: self.runtime_functions.clone(),
211-
},
212-
);
213-
214-
match client.update(request).await {
215-
Ok(response) => {
216-
let res = response.into_inner();
217-
log::info!(
218-
"Was the update of the RuntimeFunctionDefinitions accepted by Sagittarius? {}",
219-
res.success
220-
);
221-
res.success
222-
}
223-
Err(err) => {
224-
log::error!("Failed to update runtime function definitions: {:?}", err);
225-
false
226-
}
227-
}
99+
fn apply_definition_source_to_module(mut module: Module, source: String) -> Module {
100+
for data_type in &mut module.definition_data_types {
101+
data_type.definition_source = source.clone();
228102
}
229-
230-
async fn update_flow_types(&mut self) -> bool {
231-
if self.flow_types.is_empty() {
232-
log::info!("No FlowTypes present.");
233-
return true;
234-
}
235-
236-
if let Some(source) = &self.definition_source {
237-
for flow_type in self.flow_types.iter_mut() {
238-
flow_type.definition_source = Some(source.to_string());
239-
}
240-
}
241-
242-
log::info!("Updating {} FlowTypes.", self.flow_types.len());
243-
let mut client = FlowTypeServiceClient::new(self.channel.clone());
244-
let request = Request::from_parts(
245-
get_authorization_metadata(&self.aquila_token),
246-
Extensions::new(),
247-
FlowTypeUpdateRequest {
248-
flow_types: self.flow_types.clone(),
249-
},
250-
);
251-
252-
match client.update(request).await {
253-
Ok(response) => {
254-
let res = response.into_inner();
255-
log::info!(
256-
"Was the update of the FlowTypes accepted by Sagittarius? {}",
257-
res.success
258-
);
259-
res.success
260-
}
261-
Err(err) => {
262-
log::error!("Failed to update flow types: {:?}", err);
263-
false
264-
}
265-
}
103+
for flow_type in &mut module.flow_types {
104+
flow_type.definition_source = Some(source.clone());
105+
}
106+
for runtime_flow_type in &mut module.runtime_flow_types {
107+
runtime_flow_type.definition_source = Some(source.clone());
108+
}
109+
for function in &mut module.function_definitions {
110+
function.definition_source = source.clone();
266111
}
112+
for runtime_function in &mut module.runtime_function_definitions {
113+
runtime_function.definition_source = source.clone();
114+
}
115+
116+
module
267117
}

0 commit comments

Comments
 (0)