Skip to content

Commit 7c49bc0

Browse files
authored
fix: honor server-directed FDv1 Fallback Directive in initializer phase (#423)
1 parent 84b442d commit 7c49bc0

11 files changed

Lines changed: 696 additions & 91 deletions

File tree

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ jobs:
6969
with:
7070
test_service_port: 9000
7171
token: ${{ secrets.GITHUB_TOKEN }}
72-
version: v3.0.0-alpha.4
72+
version: v3.0.0-alpha.6
7373
enable_persistence_tests: "true"
7474

7575
windows:

contract-tests/client_entity.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ def __init__(self, tag, config):
6262
sync_configs = datasystem_config.get('synchronizers')
6363
if sync_configs is not None:
6464
sync_builders = []
65-
fallback_builder = None
6665

6766
for sync_config in sync_configs:
6867
streaming = sync_config.get('streaming')
@@ -79,14 +78,18 @@ def __init__(self, tag, config):
7978
_set_optional_time(polling, "pollIntervalMs", builder.poll_interval)
8079
sync_builders.append(builder)
8180

82-
fallback_builder = fdv1_fallback_ds_builder()
83-
_set_optional_value(polling, "baseUri", fallback_builder.base_uri)
84-
_set_optional_time(polling, "pollIntervalMs", fallback_builder.poll_interval)
85-
8681
if sync_builders:
8782
datasystem.synchronizers(*sync_builders)
88-
if fallback_builder is not None:
89-
datasystem.fdv1_compatible_synchronizer(fallback_builder)
83+
84+
# The FDv1 Fallback Synchronizer is engaged only in response to a
85+
# server-directed FDv1 Fallback Directive; it is configured separately
86+
# from the FDv2 Primary/Fallback synchronizer chain.
87+
fdv1_fallback_config = datasystem_config.get('fdv1Fallback')
88+
if fdv1_fallback_config is not None:
89+
fallback_builder = fdv1_fallback_ds_builder()
90+
_set_optional_value(fdv1_fallback_config, "baseUri", fallback_builder.base_uri)
91+
_set_optional_time(fdv1_fallback_config, "pollIntervalMs", fallback_builder.poll_interval)
92+
datasystem.fdv1_compatible_synchronizer(fallback_builder)
9093

9194
if datasystem_config.get("payloadFilter") is not None:
9295
opts["payload_filter_key"] = datasystem_config["payloadFilter"]

contract-tests/service.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ def status():
8484
'persistent-data-store-consul',
8585
'flag-change-listeners',
8686
'flag-value-change-listeners',
87+
'fdv1-fallback',
8788
]
8889
}
8990
return json.dumps(body), 200, {'Content-type': 'application/json'}

ldclient/impl/datasourcev2/polling.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
139139
yield Update(
140140
state=DataSourceState.OFF,
141141
error=error_info,
142-
revert_to_fdv1=True,
142+
fallback_to_fdv1=True,
143143
environment_id=envid,
144144
)
145145
break
@@ -168,6 +168,17 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
168168
message=result.error,
169169
)
170170

171+
# Even a non-HTTP error (e.g. malformed JSON) can carry the fallback
172+
# header. If so, halt rather than retrying the FDv2 endpoint.
173+
if fallback:
174+
yield Update(
175+
state=DataSourceState.OFF,
176+
error=error_info,
177+
fallback_to_fdv1=True,
178+
environment_id=envid,
179+
)
180+
break
181+
171182
yield Update(
172183
state=DataSourceState.INTERRUPTED,
173184
error=error_info,
@@ -179,7 +190,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
179190
state=DataSourceState.VALID,
180191
change_set=change_set,
181192
environment_id=headers.get(_LD_ENVID_HEADER),
182-
revert_to_fdv1=headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
193+
fallback_to_fdv1=headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
183194
)
184195

185196
if self._interrupt_event.wait(self._poll_interval):
@@ -204,13 +215,18 @@ def _poll(self, ss: SelectorStore) -> BasisResult:
204215
if is_http_error_recoverable(status_code):
205216
log.warning(http_error_message_result)
206217

218+
# Forward any response headers so callers (e.g. FDv2 datasystem)
219+
# can read the X-LD-FD-Fallback directive even on error.
207220
return _Fail(
208-
error=http_error_message_result, exception=result.exception
221+
error=http_error_message_result,
222+
exception=result.exception,
223+
headers=result.headers,
209224
)
210225

211226
return _Fail(
212227
error=result.error or "Failed to request payload",
213228
exception=result.exception,
229+
headers=result.headers,
214230
)
215231

216232
(change_set, headers) = result.value
@@ -223,6 +239,7 @@ def _poll(self, ss: SelectorStore) -> BasisResult:
223239
change_set=change_set,
224240
persist=change_set.selector.is_defined(),
225241
environment_id=env_id,
242+
fallback_to_fdv1=headers.get(_LD_FD_FALLBACK_HEADER) == 'true',
226243
)
227244

228245
return _Success(value=basis)

ldclient/impl/datasourcev2/streaming.py

Lines changed: 102 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from typing import Callable, Generator, Optional, Tuple
99
from urllib import parse
1010

11+
import urllib3
1112
from ld_eventsource import SSEClient
1213
from ld_eventsource.actions import Event, Fault, Start
1314
from ld_eventsource.config import (
@@ -57,7 +58,10 @@
5758

5859
STREAMING_ENDPOINT = "/sdk/stream"
5960

60-
SseClientBuilder = Callable[[str, HTTPConfig, float, Config, SelectorStore], SSEClient]
61+
SseClientBuilder = Callable[
62+
[str, HTTPConfig, float, Config, SelectorStore],
63+
Tuple[SSEClient, Optional[urllib3.PoolManager]],
64+
]
6165

6266

6367
def create_sse_client(
@@ -66,10 +70,17 @@ def create_sse_client(
6670
initial_reconnect_delay: float,
6771
config: Config,
6872
ss: SelectorStore
69-
) -> SSEClient:
73+
) -> Tuple[SSEClient, Optional[urllib3.PoolManager]]:
7074
""" "
7175
create_sse_client creates an SSEClient instance configured to connect
72-
to the LaunchDarkly streaming endpoint.
76+
to the LaunchDarkly streaming endpoint, along with the urllib3 PoolManager
77+
backing it. The pool is returned alongside the client so the caller can
78+
force-close any pooled connections on shutdown -- ``SSEClient.close()``
79+
only releases the connection back to the pool via ``urllib3.HTTPResponse
80+
.shutdown()`` (which performs a half-close on the local read side) plus
81+
``release_conn()``, neither of which actually closes the underlying TCP
82+
socket on Python 3.10. Closing the pool ensures the server observes the
83+
client's disconnect when the FDv1 Fallback Directive engages.
7384
"""
7485
uri = base_uri + STREAMING_ENDPOINT
7586
if config.payload_filter_key is not None:
@@ -87,11 +98,12 @@ def query_params() -> dict[str, str]:
8798
selector = ss.selector()
8899
return {"basis": selector.state} if selector.is_defined() else {}
89100

90-
return SSEClient(
101+
pool = stream_http_factory.create_pool_manager(1, uri)
102+
sse_client = SSEClient(
91103
connect=ConnectStrategy.http(
92104
url=uri,
93105
headers=base_headers,
94-
pool=stream_http_factory.create_pool_manager(1, uri),
106+
pool=pool,
95107
urllib3_request_options={"timeout": stream_http_factory.timeout},
96108
query_params=query_params
97109
),
@@ -106,6 +118,31 @@ def query_params() -> dict[str, str]:
106118
retry_delay_reset_threshold=BACKOFF_RESET_INTERVAL,
107119
logger=log,
108120
)
121+
return sse_client, pool
122+
123+
124+
def _close_pool_manager(pool: Optional[urllib3.PoolManager]) -> None:
125+
"""Close every pooled connection in ``pool`` so the underlying TCP sockets
126+
are torn down. ``HTTPConnectionPool.close()`` drains its queue and calls
127+
``conn.close()`` on each connection, which sends the FIN that the server
128+
is waiting on. ``PoolManager.clear()`` alone doesn't do this -- it just
129+
drops the dict of pools without closing the connections inside them."""
130+
if pool is None:
131+
return
132+
try:
133+
# ``RecentlyUsedContainer`` deliberately disallows iteration; ``keys()``
134+
# returns a thread-safe snapshot. We look each one up to close its
135+
# underlying ``HTTPConnectionPool``.
136+
for key in list(pool.pools.keys()):
137+
try:
138+
connection_pool = pool.pools.get(key)
139+
if connection_pool is not None:
140+
connection_pool.close()
141+
except Exception: # pylint: disable=broad-except
142+
log.debug("Error closing streaming connection pool", exc_info=True)
143+
pool.clear()
144+
except Exception: # pylint: disable=broad-except
145+
log.debug("Error closing streaming pool manager", exc_info=True)
109146

110147

111148
class StreamingDataSource(Synchronizer, DiagnosticSource):
@@ -126,9 +163,10 @@ def __init__(self,
126163
self.__http_options = http_options
127164
self.__initial_reconnect_delay = initial_reconnect_delay
128165

129-
self._sse_client_builder = create_sse_client
166+
self._sse_client_builder: SseClientBuilder = create_sse_client
130167
self._config = config
131168
self._sse: Optional[SSEClient] = None
169+
self._sse_pool: Optional[urllib3.PoolManager] = None
132170
self._running = False
133171
self._diagnostic_accumulator: Optional[DiagnosticAccumulator] = None
134172
self._connection_attempt_start_time: Optional[float] = None
@@ -149,7 +187,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
149187
Update objects until the connection is closed or an unrecoverable error
150188
occurs.
151189
"""
152-
self._sse = self._sse_client_builder(
190+
self._sse, self._sse_pool = self._sse_client_builder(
153191
self.__uri,
154192
self.__http_options,
155193
self.__initial_reconnect_delay,
@@ -166,6 +204,28 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
166204
self._connection_attempt_start_time = time()
167205

168206
envid = None
207+
# fallback_requested is set when a Start action carries
208+
# X-LD-FD-Fallback: true. We finish applying the current payload
209+
# before halting, so consumers can serve the server-provided data
210+
# while FDv1 takes over. The latch is one-way and terminal: once
211+
# set, any subsequent payload-completing event or error must carry
212+
# the signal forward and halt the stream, even if the failure path
213+
# itself doesn't see the directive header.
214+
fallback_requested = False
215+
216+
def _with_fallback_signal(update: Update) -> Update:
217+
"""Return ``update`` decorated with ``fallback_to_fdv1=True`` when
218+
the directive has been latched. Idempotent if already set."""
219+
if not fallback_requested or update.fallback_to_fdv1:
220+
return update
221+
return Update(
222+
state=update.state,
223+
change_set=update.change_set,
224+
error=update.error,
225+
fallback_to_fdv1=True,
226+
environment_id=update.environment_id,
227+
)
228+
169229
for action in self._sse.all:
170230
if isinstance(action, Fault):
171231
# If the SSE client detects the stream has closed, then it will
@@ -179,24 +239,19 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
179239

180240
(update, should_continue) = self._handle_error(action.error, envid)
181241
if update is not None:
182-
yield update
242+
yield _with_fallback_signal(update)
183243

184-
if not should_continue:
244+
# The FDv1 Fallback Directive is one-way and terminal: if it
245+
# was latched on a prior Start, we must not keep retrying the
246+
# FDv2 endpoint even when the failure itself looks recoverable.
247+
if fallback_requested or not should_continue:
185248
break
186249
continue
187250

188251
if isinstance(action, Start) and action.headers is not None:
189-
fallback = action.headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
190252
envid = action.headers.get(_LD_ENVID_HEADER, envid)
191-
192-
if fallback:
193-
self._record_stream_init(True)
194-
yield Update(
195-
state=DataSourceState.OFF,
196-
revert_to_fdv1=True,
197-
environment_id=envid,
198-
)
199-
break
253+
if action.headers.get(_LD_FD_FALLBACK_HEADER) == 'true':
254+
fallback_requested = True
200255

201256
if not isinstance(action, Event):
202257
continue
@@ -206,6 +261,12 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
206261
if update is not None:
207262
self._record_stream_init(False)
208263
self._connection_attempt_start_time = None
264+
if fallback_requested:
265+
# The completed update is the natural moment to honor
266+
# the latched directive: yield once with the signal,
267+
# then halt — the consumer will switch to FDv1.
268+
yield _with_fallback_signal(update)
269+
break
209270
yield update
210271
except json.decoder.JSONDecodeError as e:
211272
log.info(
@@ -215,25 +276,34 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
215276

216277
(update, should_continue) = self._handle_error(e, envid)
217278
if update is not None:
218-
yield update
219-
if not should_continue:
279+
yield _with_fallback_signal(update)
280+
if fallback_requested or not should_continue:
220281
break
221282
except Exception as e: # pylint: disable=broad-except
222283
log.info(
223284
"Error while handling stream event; will restart stream: %s", e
224285
)
225286
self._sse.interrupt()
226287

227-
yield Update(
288+
yield _with_fallback_signal(Update(
228289
state=DataSourceState.INTERRUPTED,
229290
error=DataSourceErrorInfo(
230291
DataSourceErrorKind.UNKNOWN, 0, time(), str(e)
231292
),
232-
revert_to_fdv1=False,
293+
fallback_to_fdv1=False,
233294
environment_id=envid,
234-
)
295+
))
296+
if fallback_requested:
297+
break
235298

236299
self._sse.close()
300+
# Force-close the underlying urllib3 pool. SSEClient.close() only does a
301+
# half-close on the local read side and releases the connection back to
302+
# the pool, which on Python 3.10 leaves the TCP socket open until the
303+
# response object is garbage-collected. The FDv1 Fallback Directive
304+
# requires the Primary Synchronizer to be terminated promptly, so we
305+
# tear down the pool here to send the FIN the server is waiting on.
306+
_close_pool_manager(self._sse_pool)
237307

238308
def stop(self):
239309
"""
@@ -243,6 +313,10 @@ def stop(self):
243313
self._running = False
244314
if self._sse:
245315
self._sse.close()
316+
# See _close_pool_manager docstring: this is what actually severs the
317+
# TCP connection. ``stop()`` may be called from a different thread than
318+
# the one running ``sync()``; close() is idempotent on the pool.
319+
_close_pool_manager(self._sse_pool)
246320

247321
def _record_stream_init(self, failed: bool):
248322
if self._diagnostic_accumulator and self._connection_attempt_start_time:
@@ -348,7 +422,7 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona
348422
error=DataSourceErrorInfo(
349423
DataSourceErrorKind.INVALID_DATA, 0, time(), str(error)
350424
),
351-
revert_to_fdv1=False,
425+
fallback_to_fdv1=False,
352426
environment_id=envid,
353427
)
354428
return (update, True)
@@ -372,7 +446,7 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona
372446
update = Update(
373447
state=DataSourceState.OFF,
374448
error=error_info,
375-
revert_to_fdv1=True,
449+
fallback_to_fdv1=True,
376450
environment_id=envid,
377451
)
378452
self.stop()
@@ -389,7 +463,7 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona
389463
else DataSourceState.OFF
390464
),
391465
error=error_info,
392-
revert_to_fdv1=False,
466+
fallback_to_fdv1=False,
393467
environment_id=envid,
394468
)
395469

@@ -411,7 +485,7 @@ def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optiona
411485
error=DataSourceErrorInfo(
412486
DataSourceErrorKind.UNKNOWN, 0, time(), str(error)
413487
),
414-
revert_to_fdv1=False,
488+
fallback_to_fdv1=False,
415489
environment_id=envid,
416490
)
417491
# no stacktrace here because, for a typical connection error, it'll

0 commit comments

Comments
 (0)