Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1940,6 +1940,11 @@ definitions:
type:
type: string
enum: [ResponseToFileExtractor]
delimiter:
title: Delimiter
description: The delimiter used to separate values in the CSV data. Defaults to comma (',').
type: string
default: ","
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import csv
import logging
import os
import uuid
Expand Down Expand Up @@ -30,9 +31,12 @@ class ResponseToFileExtractor(RecordExtractor):
"""

parameters: InitVar[Mapping[str, Any]]
delimiter: str = ","

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.logger = logging.getLogger("airbyte")
if self.delimiter.startswith("\\"):
self.delimiter = self.delimiter.encode("utf-8").decode("unicode_escape")

def _get_response_encoding(self, headers: Dict[str, Any]) -> str:
"""
Expand Down Expand Up @@ -137,7 +141,14 @@ def _read_with_chunks(
try:
with open(path, "r", encoding=file_encoding) as data:
chunks = pd.read_csv(
data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object
data,
chunksize=chunk_size,
iterator=True,
dtype=object,
delimiter=self.delimiter,
quoting=csv.QUOTE_ALL,
doublequote=True,
lineterminator="\n",
)
for chunk in chunks:
chunk = chunk.replace({nan: None}).to_dict(orient="records")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -502,6 +500,11 @@ class DpathExtractor(BaseModel):

class ResponseToFileExtractor(BaseModel):
type: Literal["ResponseToFileExtractor"]
delimiter: Optional[str] = Field(
",",
description="The delimiter used to separate values in the CSV data. Defaults to comma (',').",
title="Delimiter",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2383,7 +2383,10 @@ def create_response_to_file_extractor(
model: ResponseToFileExtractorModel,
**kwargs: Any,
) -> ResponseToFileExtractor:
return ResponseToFileExtractor(parameters=model.parameters or {})
return ResponseToFileExtractor(
parameters=model.parameters or {},
delimiter=model.delimiter or ",",
)

@staticmethod
def create_exponential_backoff_strategy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

class ResponseToFileExtractorTest(TestCase):
def setUp(self) -> None:
self._extractor = ResponseToFileExtractor({})
self._extractor = ResponseToFileExtractor(parameters={})
self._http_mocker = requests_mock.Mocker()
self._http_mocker.__enter__()

Expand All @@ -39,6 +39,55 @@ def test_text_response_with_null_bytes(self) -> None:

assert extracted_records == [{"FIRST_NAME": "a first name", "LAST_NAME": "a last name"}]

def test_tab_delimited_response(self) -> None:
"""Test that a tab-delimited (TSV) response is correctly parsed when delimiter is set to tab."""
extractor = ResponseToFileExtractor(parameters={}, delimiter="\t")
tsv_data = "Date\tApp Name\tApp Apple Identifier\tEvent\n2026-02-28\tBullseye Blast\t1632779218\tImpression\n"
response = self._mock_streamed_response(BytesIO(tsv_data.encode("utf-8")))

extracted_records = list(extractor.extract_records(response))

assert len(extracted_records) == 1
assert extracted_records[0] == {
"Date": "2026-02-28",
"App Name": "Bullseye Blast",
"App Apple Identifier": "1632779218",
"Event": "Impression",
}

def test_escaped_tab_delimiter(self) -> None:
"""Test that escaped tab delimiter (\\t from YAML) is correctly decoded."""
extractor = ResponseToFileExtractor(parameters={}, delimiter="\\t")
tsv_data = "col1\tcol2\tcol3\nval1\tval2\tval3\n"
response = self._mock_streamed_response(BytesIO(tsv_data.encode("utf-8")))

extracted_records = list(extractor.extract_records(response))

assert len(extracted_records) == 1
assert extracted_records[0] == {"col1": "val1", "col2": "val2", "col3": "val3"}

def test_default_comma_delimiter(self) -> None:
"""Test that the default comma delimiter still works correctly."""
extractor = ResponseToFileExtractor(parameters={})
csv_data = "col1,col2,col3\nval1,val2,val3\n"
response = self._mock_streamed_response(BytesIO(csv_data.encode("utf-8")))

extracted_records = list(extractor.extract_records(response))

assert len(extracted_records) == 1
assert extracted_records[0] == {"col1": "val1", "col2": "val2", "col3": "val3"}

def test_tab_delimiter_with_comma_in_values(self) -> None:
"""Test that commas in field values are preserved when using tab delimiter."""
extractor = ResponseToFileExtractor(parameters={}, delimiter="\t")
tsv_data = "name\taddress\nJohn Doe\t123 Main St, Apt 4\n"
response = self._mock_streamed_response(BytesIO(tsv_data.encode("utf-8")))

extracted_records = list(extractor.extract_records(response))

assert len(extracted_records) == 1
assert extracted_records[0] == {"name": "John Doe", "address": "123 Main St, Apt 4"}

def _test_folder_path(self) -> Path:
return Path(__file__).parent.resolve()

Expand Down Expand Up @@ -76,7 +125,7 @@ def large_event_response_fixture():
@pytest.mark.limit_memory("20 MB")
def test_response_to_file_extractor_memory_usage(requests_mock, large_events_response):
lines_in_response, file_path = large_events_response
extractor = ResponseToFileExtractor({})
extractor = ResponseToFileExtractor(parameters={})

url = "https://for-all-mankind.nasa.com/api/v1/users/users1"
requests_mock.get(url, body=open(file_path, "rb"))
Expand Down