Skip to content
8 changes: 8 additions & 0 deletions airbyte_cdk/sources/file_based/config/unstructured_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,11 @@ class Config(OneOfOptionConfig):
discriminator="mode",
type="object",
)

output_format: str = Field(
default="markdown_text",
always_show=True,
title="Output Format",
enum=["markdown_text", "markdown_json"],
description="The output format for parsed document content. `markdown_text` renders the document as flat Markdown text. `markdown_json` outputs a JSON array of structured elements with type, text, and metadata fields, preserving document structure for easier downstream processing.",
)
78 changes: 72 additions & 6 deletions airbyte_cdk/sources/file_based/file_types/unstructured_parser.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import json
import logging
import os
import traceback
Expand Down Expand Up @@ -147,10 +148,17 @@ async def infer_schema(
self._get_file_type_error_message(filetype),
)

if format.output_format == "markdown_json":
content_description = "Content of the file as a JSON array of structured elements with type, text, and metadata fields. Might be null if the file could not be parsed"
else:
content_description = (
"Content of the file as markdown. Might be null if the file could not be parsed"
)

return {
"content": {
"type": "string",
"description": "Content of the file as markdown. Might be null if the file could not be parsed",
"description": content_description,
},
"document_key": {
"type": "string",
Expand Down Expand Up @@ -225,23 +233,33 @@ def _read_file(
if filetype in {FileType.MD, FileType.TXT}:
file_content: bytes = file_handle.read()
decoded_content: str = optional_decode(file_content)
if format.output_format == "markdown_json":
return json.dumps(
[{"type": "NarrativeText", "text": decoded_content, "metadata": {}}]
)
return decoded_content
if format.processing.mode == "local":
return self._read_file_locally(
elements = self._read_file_locally_elements(
file_handle,
filetype,
format.strategy,
remote_file,
)
if format.output_format == "markdown_json":
return json.dumps(elements)
return self._render_markdown(elements)
elif format.processing.mode == "api":
try:
result: str = self._read_file_remotely_with_retries(
elements = self._read_file_remotely_elements_with_retries(
file_handle,
format.processing,
filetype,
format.strategy,
remote_file,
)
if format.output_format == "markdown_json":
return json.dumps(elements)
return self._render_markdown(elements)
except Exception as e:
# If a parser error happens during remotely processing the file, this means the file is corrupted. This case is handled by the parse_records method, so just rethrow.
#
Expand All @@ -253,8 +271,6 @@ def _read_file(
e, failure_type=FailureType.config_error
)

return result

def _params_to_dict(
self, params: Optional[List[APIParameterConfigModel]], strategy: str
) -> Dict[str, Union[str, List[str]]]:
Expand Down Expand Up @@ -323,6 +339,24 @@ def _read_file_remotely_with_retries(
"""
return self._read_file_remotely(file_handle, format, filetype, strategy, remote_file)

@backoff.on_exception(
backoff.expo, requests.exceptions.RequestException, max_tries=5, giveup=user_error
)
def _read_file_remotely_elements_with_retries(
self,
file_handle: IOBase,
format: APIProcessingConfigModel,
filetype: FileType,
strategy: str,
remote_file: RemoteFile,
) -> List[Dict[str, Any]]:
"""
Read a file remotely and return the raw JSON elements, retrying up to 5 times if the error is not caused by user error.
"""
return self._read_file_remotely_elements(
file_handle, format, filetype, strategy, remote_file
)
Comment on lines +342 to +358
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Could we rewind file_handle before each remote attempt, wdyt?

This retry path reuses the same stream across attempts, but requests.post(..., files=...) consumes it. After the first transient failure, later retries can resend EOF/partial data and make recovery impossible.

Suggested fix
 def _read_file_remotely_elements(
     self,
     file_handle: IOBase,
     format: APIProcessingConfigModel,
     filetype: FileType,
     strategy: str,
     remote_file: RemoteFile,
 ) -> List[Dict[str, Any]]:
+    file_handle.seek(0)
     headers = {"accept": "application/json", "unstructured-api-key": format.api_key}

     data = self._params_to_dict(format.parameters, strategy)

     file_data = {"files": ("filename", file_handle, FILETYPE_TO_MIMETYPE[filetype])}

Also applies to: 386-410

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@airbyte_cdk/sources/file_based/file_types/unstructured_parser.py` around
lines 339 - 355, The retry wrapper _read_file_remotely_elements_with_retries
currently reuses the same file_handle across attempts causing requests.post to
resend an EOF/partial stream; before each retry attempt rewind the stream (call
file_handle.seek(0)) or, if the handle is not seekable, buffer the contents
(e.g., into a BytesIO) and use that buffer for each attempt so the uploaded file
is complete on retries; apply the same change to the other retry wrapper that
delegates to the remote read (the similar retry function for the
chunked/alternative remote-read path) so both retry paths reset or rebuffer the
file_handle before each request attempt.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pre-existing pattern — the existing _read_file_remotely method (line 361) has the same behavior and doesn't add seek(0) before the request either. The new _read_file_remotely_elements method follows the same pattern for consistency. Addressing the retry seek behavior for both methods would be a separate improvement outside the scope of this PR.


def _read_file_remotely(
self,
file_handle: IOBase,
Expand Down Expand Up @@ -352,9 +386,41 @@ def _read_file_remotely(

return self._render_markdown(json_response)

def _read_file_remotely_elements(
self,
file_handle: IOBase,
format: APIProcessingConfigModel,
filetype: FileType,
strategy: str,
remote_file: RemoteFile,
) -> List[Dict[str, Any]]:
headers = {"accept": "application/json", "unstructured-api-key": format.api_key}

data = self._params_to_dict(format.parameters, strategy)

file_data = {"files": ("filename", file_handle, FILETYPE_TO_MIMETYPE[filetype])}

response = requests.post(
f"{format.api_url}/general/v0/general", headers=headers, data=data, files=file_data
)

if response.status_code == 422:
raise self._create_parse_error(remote_file, response.json())
else:
response.raise_for_status()

json_response: List[Dict[str, Any]] = response.json()
return json_response
Comment on lines +412 to +413
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cd /tmp && find . -name "unstructured_parser.py" 2>/dev/null | head -5

Repository: airbytehq/airbyte-python-cdk

Length of output: 54


🏁 Script executed:

# Let's search for the file in the repository
find . -name "unstructured_parser.py" 2>/dev/null

Repository: airbytehq/airbyte-python-cdk

Length of output: 138


🏁 Script executed:

# Get context around lines 413-414
cat -n airbyte_cdk/sources/file_based/file_types/unstructured_parser.py | sed -n '400,430p'

Repository: airbytehq/airbyte-python-cdk

Length of output: 1487


🏁 Script executed:

# Let's also check if there's validation elsewhere and understand the full method
cat -n airbyte_cdk/sources/file_based/file_types/unstructured_parser.py | sed -n '380,440p'

Repository: airbytehq/airbyte-python-cdk

Length of output: 2964


🏁 Script executed:

# Search for markdown_json function and its usage
rg "markdown_json" --type py -B 5 -A 10

Repository: airbytehq/airbyte-python-cdk

Length of output: 21328


🏁 Script executed:

# Check what happens to the returned value from _read_file_remotely_elements
cat -n airbyte_cdk/sources/file_based/file_types/unstructured_parser.py | sed -n '290,330p'

Repository: airbytehq/airbyte-python-cdk

Length of output: 2039


🏁 Script executed:

# Check the _render_markdown method to see how it handles the data
rg "_render_markdown" -B 2 -A 10 airbyte_cdk/sources/file_based/file_types/unstructured_parser.py

Repository: airbytehq/airbyte-python-cdk

Length of output: 2755


🏁 Script executed:

# Let's see the complete parse_records flow
cat -n airbyte_cdk/sources/file_based/file_types/unstructured_parser.py | sed -n '290,350p'

Repository: airbytehq/airbyte-python-cdk

Length of output: 2956


🏁 Script executed:

# Check if there are other places where response.json() is called without validation
rg "response\.json\(\)" airbyte_cdk/sources/file_based/file_types/unstructured_parser.py -B 3 -A 3

Repository: airbytehq/airbyte-python-cdk

Length of output: 899


🏁 Script executed:

# Let's also check what happens if json.dumps is called on a dict vs a list
python3 << 'EOF'
import json

# Test what happens with dict
dict_response = {"detail": "error"}
list_response = [{"type": "Title", "text": "heading"}]

print("json.dumps on dict:")
print(json.dumps(dict_response))
print("\njson.dumps on list:")
print(json.dumps(list_response))

# What happens when we iterate?
print("\n\nIterating over dict:")
for item in dict_response:
    print(f"  {item}")

print("\nIterating over list:")
for item in list_response:
    print(f"  {item}")
EOF

Repository: airbytehq/airbyte-python-cdk

Length of output: 264


Validate that the API response is actually a list before returning it.

Right now if the API returns a 200 with {"detail": "..."} instead of an array, the code silently emits a JSON object even though the schema promises a JSON array of elements. Since json.dumps() works on both dicts and lists, the error goes undetected downstream. Could we add a check that response.json() returns a list, wdyt?

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@airbyte_cdk/sources/file_based/file_types/unstructured_parser.py` around
lines 413 - 414, The code currently returns response.json() (assigned to
json_response) without verifying its type; update the logic after calling
response.json() in unstructured_parser.py to assert the result is a list (use
isinstance(json_response, list)), and if it's not a list raise a descriptive
exception (e.g., ValueError or RuntimeError) that includes the unexpected
response content (or response.text) so callers see the mismatch instead of
silently receiving a dict; ensure the function no longer returns non-list values
and references the json_response variable and the response.json() call in the
error path.


def _read_file_locally(
self, file_handle: IOBase, filetype: FileType, strategy: str, remote_file: RemoteFile
) -> str:
elements = self._read_file_locally_elements(file_handle, filetype, strategy, remote_file)
return self._render_markdown(elements)

def _read_file_locally_elements(
self, file_handle: IOBase, filetype: FileType, strategy: str, remote_file: RemoteFile
) -> List[Dict[str, Any]]:
_import_unstructured()
if (
(not unstructured_partition_pdf)
Expand Down Expand Up @@ -385,7 +451,7 @@ def _read_file_locally(
except Exception as e:
raise self._create_parse_error(remote_file, str(e))

return self._render_markdown([element.to_dict() for element in elements])
return [element.to_dict() for element in elements]

def _create_parse_error(
self,
Expand Down
163 changes: 163 additions & 0 deletions unit_tests/sources/file_based/file_types/test_unstructured_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import asyncio
import json
from datetime import datetime
from unittest import mock
from unittest.mock import MagicMock, call, mock_open, patch
Expand Down Expand Up @@ -674,3 +675,165 @@ def test_parse_records_remotely(
requests_mock.post.assert_has_calls(expected_requests)
else:
requests_mock.post.assert_not_called()


@patch("airbyte_cdk.sources.file_based.file_types.unstructured_parser.detect_filetype")
def test_infer_schema_markdown_json(mock_detect_filetype):
"""Test that infer_schema returns correct description for markdown_json output format."""
main_loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

mock_detect_filetype.return_value = FileType.PDF
config = MagicMock()
config.format = UnstructuredFormat(
skip_unprocessable_files=False, output_format="markdown_json"
)
schema = loop.run_until_complete(
UnstructuredParser().infer_schema(config, MagicMock(), MagicMock(), MagicMock())
)
assert schema == {
"content": {
"type": "string",
"description": "Content of the file as a JSON array of structured elements with type, text, and metadata fields. Might be null if the file could not be parsed",
},
"document_key": {
"type": "string",
"description": "Unique identifier of the document, e.g. the file path",
},
"_ab_source_file_parse_error": {
"type": "string",
"description": "Error message if the file could not be parsed even though the file is supported",
},
}
loop.close()
asyncio.set_event_loop(main_loop)


@patch("unstructured.partition.pdf.partition_pdf")
@patch("unstructured.partition.pptx.partition_pptx")
@patch("unstructured.partition.docx.partition_docx")
@patch("airbyte_cdk.sources.file_based.file_types.unstructured_parser.detect_filetype")
def test_parse_records_markdown_json_local(
mock_detect_filetype,
mock_partition_docx,
mock_partition_pptx,
mock_partition_pdf,
):
"""Test that parse_records returns JSON elements when output_format is markdown_json for local processing."""
elements = [
Title("heading"),
Text("This is the text"),
ListItem("This is a list item"),
Formula("This is a formula"),
]
mock_partition_pdf.return_value = elements

stream_reader = MagicMock()
mock_open(stream_reader.open_file, read_data=b"fake pdf content")
fake_file = RemoteFile(uri=FILE_URI, last_modified=datetime.now())
logger = MagicMock()
config = MagicMock()
config.format = UnstructuredFormat(
skip_unprocessable_files=False,
output_format="markdown_json",
)
mock_detect_filetype.return_value = FileType.PDF

records = list(
UnstructuredParser().parse_records(config, fake_file, stream_reader, logger, MagicMock())
)
assert len(records) == 1
assert records[0]["document_key"] == FILE_URI
assert records[0]["_ab_source_file_parse_error"] is None
# Verify content is valid JSON
content = json.loads(records[0]["content"])
assert isinstance(content, list)
assert len(content) == 4
assert content[0]["type"] == "Title"
assert content[0]["text"] == "heading"
assert content[1]["type"] == "UncategorizedText"
assert content[1]["text"] == "This is the text"
assert content[2]["type"] == "ListItem"
assert content[2]["text"] == "This is a list item"
assert content[3]["type"] == "Formula"
assert content[3]["text"] == "This is a formula"


@patch("airbyte_cdk.sources.file_based.file_types.unstructured_parser.requests")
@patch("airbyte_cdk.sources.file_based.file_types.unstructured_parser.detect_filetype")
@patch("time.sleep", side_effect=lambda _: None)
def test_parse_records_markdown_json_remote(
time_mock,
mock_detect_filetype,
requests_mock,
):
"""Test that parse_records returns JSON elements when output_format is markdown_json for API processing."""
json_response = [
{"type": "Title", "text": "heading", "metadata": {"page_number": 1}},
{"type": "NarrativeText", "text": "Some text", "metadata": {"page_number": 1}},
]

stream_reader = MagicMock()
mock_open(stream_reader.open_file, read_data=b"fake pdf content")
fake_file = RemoteFile(uri=FILE_URI, last_modified=datetime.now())
logger = MagicMock()
config = MagicMock()
config.format = UnstructuredFormat(
skip_unprocessable_files=False,
output_format="markdown_json",
processing=APIProcessingConfigModel(mode="api", api_key="test"),
)
mock_detect_filetype.return_value = FileType.PDF
mock_response = MagicMock()
mock_response.json.return_value = json_response
mock_response.status_code = 200
requests_mock.post.return_value = mock_response
requests_mock.exceptions.RequestException = requests.exceptions.RequestException

records = list(
UnstructuredParser().parse_records(config, fake_file, stream_reader, logger, MagicMock())
)
assert len(records) == 1
assert records[0]["document_key"] == FILE_URI
assert records[0]["_ab_source_file_parse_error"] is None
# Verify content is valid JSON matching the API response
content = json.loads(records[0]["content"])
assert content == json_response


@patch("unstructured.partition.pdf.partition_pdf")
@patch("unstructured.partition.pptx.partition_pptx")
@patch("unstructured.partition.docx.partition_docx")
@patch("airbyte_cdk.sources.file_based.file_types.unstructured_parser.detect_filetype")
def test_parse_records_markdown_json_md_file(
mock_detect_filetype,
mock_partition_docx,
mock_partition_pptx,
mock_partition_pdf,
):
"""Test that MD/TXT files return a JSON element array when output_format is markdown_json."""
stream_reader = MagicMock()
mock_open(stream_reader.open_file, read_data=b"# Hello World\n\nSome text content")
fake_file = RemoteFile(uri="path/to/file.md", last_modified=datetime.now())
logger = MagicMock()
config = MagicMock()
config.format = UnstructuredFormat(
skip_unprocessable_files=False,
output_format="markdown_json",
)
mock_detect_filetype.return_value = FileType.MD

records = list(
UnstructuredParser().parse_records(config, fake_file, stream_reader, logger, MagicMock())
)
assert len(records) == 1
assert records[0]["document_key"] == "path/to/file.md"
assert records[0]["_ab_source_file_parse_error"] is None
# Verify content is valid JSON with a single NarrativeText element
content = json.loads(records[0]["content"])
assert isinstance(content, list)
assert len(content) == 1
assert content[0]["type"] == "NarrativeText"
assert content[0]["text"] == "# Hello World\n\nSome text content"
assert content[0]["metadata"] == {}
11 changes: 11 additions & 0 deletions unit_tests/sources/file_based/scenarios/csv_scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,17 @@
},
],
},
"output_format": {
"title": "Output Format",
"description": "The output format for parsed document content. `markdown_text` renders the document as flat Markdown text. `markdown_json` outputs a JSON array of structured elements with type, text, and metadata fields, preserving document structure for easier downstream processing.",
"default": "markdown_text",
"always_show": True,
"enum": [
"markdown_text",
"markdown_json",
],
"type": "string",
},
},
"description": "Extract text from document formats (.pdf, .docx, .md, .pptx) and emit as one record per file.",
"required": ["filetype"],
Expand Down
Loading