@@ -26,6 +26,7 @@ pub struct FlowUpdateService {
2626 flow_types : Vec < FlowType > ,
2727 channel : Channel ,
2828 aquila_token : String ,
29+ definition_source : Option < String > ,
2930}
3031
3132impl FlowUpdateService {
@@ -64,9 +65,15 @@ impl FlowUpdateService {
6465 flow_types,
6566 channel,
6667 aquila_token,
68+ definition_source : None ,
6769 }
6870 }
6971
72+ pub fn with_definiton_source ( mut self , source : String ) -> Self {
73+ self . definition_source = Some ( source) ;
74+ self
75+ }
76+
7077 pub fn with_flow_types ( mut self , flow_types : Vec < FlowType > ) -> Self {
7178 self . flow_types = flow_types;
7279 self
@@ -90,24 +97,36 @@ impl FlowUpdateService {
9097 self
9198 }
9299
93- pub async fn send ( & self ) {
100+ pub async fn send ( & mut self ) {
94101 let _ = self . send_with_status ( ) . await ;
95102 }
96103
97- pub async fn send_with_status ( & self ) -> bool {
104+ pub async fn send_with_status ( & mut self ) -> bool {
98105 let data_types_success = self . update_data_types ( ) . await ;
99106 let runtime_functions_success = self . update_runtime_functions ( ) . await ;
100107 let functions_success = self . update_functions ( ) . await ;
101108 let flow_types_success = self . update_flow_types ( ) . await ;
102109 data_types_success && runtime_functions_success && functions_success && flow_types_success
103110 }
104111
105- async fn update_data_types ( & self ) -> bool {
112+ async fn update_data_types ( & mut self ) -> bool {
106113 if self . data_types . is_empty ( ) {
107114 log:: info!( "No DataTypes present." ) ;
108115 return true ;
109116 }
110117
118+ if let Some ( source) = & self . definition_source {
119+ self . data_types = self
120+ . data_types
121+ . clone ( )
122+ . into_iter ( )
123+ . map ( |mut x| {
124+ x. definition_source = source. to_string ( ) ;
125+ x
126+ } )
127+ . collect ( )
128+ } ;
129+
111130 log:: info!( "Updating {} DataTypes." , self . data_types. len( ) ) ;
112131 let mut client = DataTypeServiceClient :: new ( self . channel . clone ( ) ) ;
113132 let request = Request :: from_parts (
@@ -135,12 +154,24 @@ impl FlowUpdateService {
135154 }
136155 }
137156
138- async fn update_functions ( & self ) -> bool {
157+ async fn update_functions ( & mut self ) -> bool {
139158 if self . functions . is_empty ( ) {
140159 log:: info!( "No FunctionDefinitions present." ) ;
141160 return true ;
142161 }
143162
163+ if let Some ( source) = & self . definition_source {
164+ self . functions = self
165+ . functions
166+ . clone ( )
167+ . into_iter ( )
168+ . map ( |mut x| {
169+ x. definition_source = source. to_string ( ) ;
170+ x
171+ } )
172+ . collect ( )
173+ } ;
174+
144175 log:: info!( "Updating {} FunctionDefinitions." , self . functions. len( ) ) ;
145176 let mut client = FunctionDefinitionServiceClient :: new ( self . channel . clone ( ) ) ;
146177 let request = Request :: from_parts (
@@ -167,12 +198,24 @@ impl FlowUpdateService {
167198 }
168199 }
169200
170- async fn update_runtime_functions ( & self ) -> bool {
201+ async fn update_runtime_functions ( & mut self ) -> bool {
171202 if self . runtime_functions . is_empty ( ) {
172203 log:: info!( "No RuntimeFunctionDefinitions present." ) ;
173204 return true ;
174205 }
175206
207+ if let Some ( source) = & self . definition_source {
208+ self . runtime_functions = self
209+ . runtime_functions
210+ . clone ( )
211+ . into_iter ( )
212+ . map ( |mut x| {
213+ x. definition_source = source. to_string ( ) ;
214+ x
215+ } )
216+ . collect ( )
217+ } ;
218+
176219 log:: info!(
177220 "Updating {} RuntimeFunctionDefinitions." ,
178221 self . runtime_functions. len( )
@@ -202,12 +245,22 @@ impl FlowUpdateService {
202245 }
203246 }
204247
205- async fn update_flow_types ( & self ) -> bool {
248+ async fn update_flow_types ( & mut self ) -> bool {
206249 if self . flow_types . is_empty ( ) {
207250 log:: info!( "No FlowTypes present." ) ;
208251 return true ;
209252 }
210253
254+ self . flow_types = self
255+ . flow_types
256+ . clone ( )
257+ . into_iter ( )
258+ . map ( |mut x| {
259+ x. definition_source = self . definition_source . clone ( ) ;
260+ x
261+ } )
262+ . collect ( ) ;
263+
211264 log:: info!( "Updating {} FlowTypes." , self . flow_types. len( ) ) ;
212265 let mut client = FlowTypeServiceClient :: new ( self . channel . clone ( ) ) ;
213266 let request = Request :: from_parts (
0 commit comments