Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 54 additions & 5 deletions airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
import ipaddress
import json
import logging
import os
import os.path
import socket
import sys
import tempfile
import threading
import time
from collections import defaultdict
from functools import wraps
from typing import Any, DefaultDict, Iterable, List, Mapping, Optional
Expand Down Expand Up @@ -374,13 +377,59 @@ def _emit_queued_messages(self, source: Source) -> Iterable[AirbyteMessage]:
def launch(source: Source, args: List[str]) -> None:
source_entrypoint = AirbyteEntrypoint(source)
parsed_args = source_entrypoint.parse_args(args)

# Heartbeat state — shared with the background heartbeat thread.
_HEARTBEAT_INTERVAL_S = 30.0
messages_written = 0
bytes_written = 0
print_blocked = False
print_blocked_since = 0.0
heartbeat_stop = threading.Event()

def _heartbeat() -> None:
"""Emit periodic status to stderr to diagnose stdout pipe blocking.

Writes directly to fd 2 (stderr) which the Kubernetes container
runtime collects independently of the orchestrator reading stdout.
"""
start = time.monotonic()
stderr_fd = 2
while not heartbeat_stop.wait(timeout=_HEARTBEAT_INTERVAL_S):
now = time.monotonic()
elapsed = now - start
blocked_str = "YES" if print_blocked else "NO"
blocked_dur = (
f" blocked_since={now - print_blocked_since:.0f}s" if print_blocked else ""
)
line = (
f"STDOUT_HEARTBEAT: t={elapsed:.0f}s "
f"msgs={messages_written} bytes={bytes_written} "
f"print_blocked={blocked_str}{blocked_dur}\n"
)
try:
os.write(stderr_fd, line.encode())
except OSError:
pass # Best-effort diagnostic — if stderr (fd 2) is broken, silently give up.

heartbeat_thread = threading.Thread(target=_heartbeat, name="stdout-heartbeat", daemon=True)
heartbeat_thread.start()

# temporarily removes the PrintBuffer because we're seeing weird print behavior for concurrent syncs
# Refer to: https://github.com/airbytehq/oncall/issues/6235
with PRINT_BUFFER:
for message in source_entrypoint.run(parsed_args):
# simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and
# the other for the break line. Adding `\n` to the message ensure that both are printed at the same time
print(f"{message}\n", end="")
try:
with PRINT_BUFFER:
for message in source_entrypoint.run(parsed_args):
# simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and
# the other for the break line. Adding `\n` to the message ensure that both are printed at the same time
data = f"{message}\n"
print_blocked = True
print_blocked_since = time.monotonic()
print(data, end="")
print_blocked = False
messages_written += 1
bytes_written += len(data)
finally:
heartbeat_stop.set()


def _init_internal_request_filter() -> None:
Expand Down
Loading