11import json
22import logging
33import sys
4+ from typing import Any , Callable
45from urllib .parse import urlparse
56
67import requests
@@ -51,11 +52,10 @@ def __init__(self, tag, config):
5152 for init_config in init_configs :
5253 polling = init_config .get ('polling' )
5354 if polling is not None :
54- if polling .get ("baseUri" ) is not None :
55- opts ["base_uri" ] = polling ["baseUri" ]
56- _set_optional_time_prop (polling , "pollIntervalMs" , opts , "poll_interval" )
57- polling = polling_ds_builder ()
58- initializers .append (polling )
55+ polling_builder = polling_ds_builder ()
56+ _set_optional_value (polling , "baseUri" , polling_builder .base_uri )
57+ _set_optional_time (polling , "pollIntervalMs" , polling_builder .poll_interval )
58+ initializers .append (polling_builder )
5959
6060 datasystem .initializers (initializers )
6161 sync_config = datasystem_config .get ('synchronizers' )
@@ -71,33 +71,34 @@ def __init__(self, tag, config):
7171 streaming = primary .get ('streaming' )
7272 if streaming is not None :
7373 primary_builder = streaming_ds_builder ()
74- if streaming .get ("baseUri" ) is not None :
75- opts ["stream_uri" ] = streaming ["baseUri" ]
76- _set_optional_time_prop (streaming , "initialRetryDelayMs" , opts , "initial_reconnect_delay" )
77- primary_builder = streaming_ds_builder ()
74+ _set_optional_value (streaming , "baseUri" , primary_builder .base_uri )
75+ _set_optional_time (streaming , "initialRetryDelayMs" , primary_builder .initial_reconnect_delay )
7876 elif primary .get ('polling' ) is not None :
7977 polling = primary .get ('polling' )
80- if polling .get ("baseUri" ) is not None :
81- opts ["base_uri" ] = polling ["baseUri" ]
82- _set_optional_time_prop (polling , "pollIntervalMs" , opts , "poll_interval" )
78+
8379 primary_builder = polling_ds_builder ()
80+ _set_optional_value (polling , "baseUri" , primary_builder .base_uri )
81+ _set_optional_time (polling , "pollIntervalMs" , primary_builder .poll_interval )
82+
8483 fallback_builder = fdv1_fallback_ds_builder ()
84+ _set_optional_value (polling , "baseUri" , fallback_builder .base_uri )
85+ _set_optional_time (polling , "pollIntervalMs" , fallback_builder .poll_interval )
8586
8687 if secondary is not None :
8788 streaming = secondary .get ('streaming' )
8889 if streaming is not None :
8990 secondary_builder = streaming_ds_builder ()
90- if streaming .get ("baseUri" ) is not None :
91- opts ["stream_uri" ] = streaming ["baseUri" ]
92- _set_optional_time_prop (streaming , "initialRetryDelayMs" , opts , "initial_reconnect_delay" )
93- secondary_builder = streaming_ds_builder ()
91+ _set_optional_value (streaming , "baseUri" , secondary_builder .base_uri )
92+ _set_optional_time (streaming , "initialRetryDelayMs" , secondary_builder .initial_reconnect_delay )
9493 elif secondary .get ('polling' ) is not None :
9594 polling = secondary .get ('polling' )
96- if polling .get ("baseUri" ) is not None :
97- opts ["base_uri" ] = polling ["baseUri" ]
98- _set_optional_time_prop (polling , "pollIntervalMs" , opts , "poll_interval" )
95+
9996 secondary_builder = polling_ds_builder ()
97+ _set_optional_value (polling , "baseUri" , secondary_builder .base_uri )
98+ _set_optional_time (polling , "pollIntervalMs" , secondary_builder .poll_interval )
10099 fallback_builder = fdv1_fallback_ds_builder ()
100+ _set_optional_value (polling , "baseUri" , fallback_builder .base_uri )
101+ _set_optional_time (polling , "pollIntervalMs" , fallback_builder .poll_interval )
101102
102103 if primary_builder is not None :
103104 datasystem .synchronizers (primary_builder , secondary_builder )
@@ -307,7 +308,16 @@ def close(self):
307308def _set_optional_time_prop (params_in : dict , name_in : str , params_out : dict , name_out : str ):
308309 if params_in .get (name_in ) is not None :
309310 params_out [name_out ] = params_in [name_in ] / 1000.0
310- return None
311+
312+
313+ def _set_optional_time (params_in : dict , name_in : str , func : Callable [[float ], Any ]):
314+ if params_in .get (name_in ) is not None :
315+ func (params_in [name_in ] / 1000.0 )
316+
317+
318+ def _set_optional_value (params_in : dict , name_in : str , func : Callable [[Any ], Any ]):
319+ if params_in .get (name_in ) is not None :
320+ func (params_in [name_in ])
311321
312322
313323def _create_persistent_store (persistent_store_config : dict ):
0 commit comments