@@ -26,6 +26,7 @@ pub struct FlowUpdateService {
2626 flow_types : Vec < FlowType > ,
2727 channel : Channel ,
2828 aquila_token : String ,
29+ definition_source : Option < String > ,
2930}
3031
3132impl FlowUpdateService {
@@ -64,9 +65,15 @@ impl FlowUpdateService {
6465 flow_types,
6566 channel,
6667 aquila_token,
68+ definition_source : None ,
6769 }
6870 }
6971
72+ pub fn with_definition_source ( mut self , source : String ) -> Self {
73+ self . definition_source = Some ( source) ;
74+ self
75+ }
76+
7077 pub fn with_flow_types ( mut self , flow_types : Vec < FlowType > ) -> Self {
7178 self . flow_types = flow_types;
7279 self
@@ -90,24 +97,30 @@ impl FlowUpdateService {
9097 self
9198 }
9299
93- pub async fn send ( & self ) {
100+ pub async fn send ( & mut self ) {
94101 let _ = self . send_with_status ( ) . await ;
95102 }
96103
97- pub async fn send_with_status ( & self ) -> bool {
104+ pub async fn send_with_status ( & mut self ) -> bool {
98105 let data_types_success = self . update_data_types ( ) . await ;
99106 let runtime_functions_success = self . update_runtime_functions ( ) . await ;
100107 let functions_success = self . update_functions ( ) . await ;
101108 let flow_types_success = self . update_flow_types ( ) . await ;
102109 data_types_success && runtime_functions_success && functions_success && flow_types_success
103110 }
104111
105- async fn update_data_types ( & self ) -> bool {
112+ async fn update_data_types ( & mut self ) -> bool {
106113 if self . data_types . is_empty ( ) {
107114 log:: info!( "No DataTypes present." ) ;
108115 return true ;
109116 }
110117
118+ if let Some ( source) = & self . definition_source {
119+ for data_type in self . data_types . iter_mut ( ) {
120+ data_type. definition_source = source. to_string ( ) ;
121+ }
122+ }
123+
111124 log:: info!( "Updating {} DataTypes." , self . data_types. len( ) ) ;
112125 let mut client = DataTypeServiceClient :: new ( self . channel . clone ( ) ) ;
113126 let request = Request :: from_parts (
@@ -135,12 +148,18 @@ impl FlowUpdateService {
135148 }
136149 }
137150
138- async fn update_functions ( & self ) -> bool {
151+ async fn update_functions ( & mut self ) -> bool {
139152 if self . functions . is_empty ( ) {
140153 log:: info!( "No FunctionDefinitions present." ) ;
141154 return true ;
142155 }
143156
157+ if let Some ( source) = & self . definition_source {
158+ for function in self . functions . iter_mut ( ) {
159+ function. definition_source = source. to_string ( ) ;
160+ }
161+ } ;
162+
144163 log:: info!( "Updating {} FunctionDefinitions." , self . functions. len( ) ) ;
145164 let mut client = FunctionDefinitionServiceClient :: new ( self . channel . clone ( ) ) ;
146165 let request = Request :: from_parts (
@@ -167,12 +186,18 @@ impl FlowUpdateService {
167186 }
168187 }
169188
170- async fn update_runtime_functions ( & self ) -> bool {
189+ async fn update_runtime_functions ( & mut self ) -> bool {
171190 if self . runtime_functions . is_empty ( ) {
172191 log:: info!( "No RuntimeFunctionDefinitions present." ) ;
173192 return true ;
174193 }
175194
195+ if let Some ( source) = & self . definition_source {
196+ for runtime_function in self . runtime_functions . iter_mut ( ) {
197+ runtime_function. definition_source = source. to_string ( ) ;
198+ }
199+ }
200+
176201 log:: info!(
177202 "Updating {} RuntimeFunctionDefinitions." ,
178203 self . runtime_functions. len( )
@@ -202,12 +227,18 @@ impl FlowUpdateService {
202227 }
203228 }
204229
205- async fn update_flow_types ( & self ) -> bool {
230+ async fn update_flow_types ( & mut self ) -> bool {
206231 if self . flow_types . is_empty ( ) {
207232 log:: info!( "No FlowTypes present." ) ;
208233 return true ;
209234 }
210235
236+ if let Some ( source) = & self . definition_source {
237+ for flow_type in self . flow_types . iter_mut ( ) {
238+ flow_type. definition_source = Some ( source. to_string ( ) ) ;
239+ }
240+ }
241+
211242 log:: info!( "Updating {} FlowTypes." , self . flow_types. len( ) ) ;
212243 let mut client = FlowTypeServiceClient :: new ( self . channel . clone ( ) ) ;
213244 let request = Request :: from_parts (
0 commit comments