-
Notifications
You must be signed in to change notification settings - Fork 40
feat: add markdown_json output format for unstructured document parsing #948
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
1afcc98
a8f4e04
dc9bb1d
e484f05
eecb6c2
25bb2d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| # | ||
| # Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
| # | ||
| import json | ||
| import logging | ||
| import os | ||
| import traceback | ||
|
|
@@ -147,10 +148,17 @@ async def infer_schema( | |
| self._get_file_type_error_message(filetype), | ||
| ) | ||
|
|
||
| if format.output_format == "markdown_json": | ||
| content_description = "Content of the file as a JSON array of structured elements with type, text, and metadata fields. Might be null if the file could not be parsed" | ||
| else: | ||
| content_description = ( | ||
| "Content of the file as markdown. Might be null if the file could not be parsed" | ||
| ) | ||
|
|
||
| return { | ||
| "content": { | ||
| "type": "string", | ||
| "description": "Content of the file as markdown. Might be null if the file could not be parsed", | ||
| "description": content_description, | ||
| }, | ||
| "document_key": { | ||
| "type": "string", | ||
|
|
@@ -225,23 +233,33 @@ def _read_file( | |
| if filetype in {FileType.MD, FileType.TXT}: | ||
| file_content: bytes = file_handle.read() | ||
| decoded_content: str = optional_decode(file_content) | ||
| if format.output_format == "markdown_json": | ||
| return json.dumps( | ||
| [{"type": "NarrativeText", "text": decoded_content, "metadata": {}}] | ||
| ) | ||
| return decoded_content | ||
| if format.processing.mode == "local": | ||
| return self._read_file_locally( | ||
| elements = self._read_file_locally_elements( | ||
| file_handle, | ||
| filetype, | ||
| format.strategy, | ||
| remote_file, | ||
| ) | ||
| if format.output_format == "markdown_json": | ||
| return json.dumps(elements) | ||
| return self._render_markdown(elements) | ||
| elif format.processing.mode == "api": | ||
| try: | ||
| result: str = self._read_file_remotely_with_retries( | ||
| elements = self._read_file_remotely_elements_with_retries( | ||
| file_handle, | ||
| format.processing, | ||
| filetype, | ||
| format.strategy, | ||
| remote_file, | ||
| ) | ||
| if format.output_format == "markdown_json": | ||
| return json.dumps(elements) | ||
| return self._render_markdown(elements) | ||
| except Exception as e: | ||
| # If a parser error happens during remotely processing the file, this means the file is corrupted. This case is handled by the parse_records method, so just rethrow. | ||
| # | ||
|
|
@@ -253,8 +271,6 @@ def _read_file( | |
| e, failure_type=FailureType.config_error | ||
| ) | ||
|
|
||
| return result | ||
|
|
||
| def _params_to_dict( | ||
| self, params: Optional[List[APIParameterConfigModel]], strategy: str | ||
| ) -> Dict[str, Union[str, List[str]]]: | ||
|
|
@@ -323,6 +339,24 @@ def _read_file_remotely_with_retries( | |
| """ | ||
| return self._read_file_remotely(file_handle, format, filetype, strategy, remote_file) | ||
|
|
||
| @backoff.on_exception( | ||
| backoff.expo, requests.exceptions.RequestException, max_tries=5, giveup=user_error | ||
| ) | ||
| def _read_file_remotely_elements_with_retries( | ||
| self, | ||
| file_handle: IOBase, | ||
| format: APIProcessingConfigModel, | ||
| filetype: FileType, | ||
| strategy: str, | ||
| remote_file: RemoteFile, | ||
| ) -> List[Dict[str, Any]]: | ||
| """ | ||
| Read a file remotely and return the raw JSON elements, retrying up to 5 times if the error is not caused by user error. | ||
| """ | ||
| return self._read_file_remotely_elements( | ||
| file_handle, format, filetype, strategy, remote_file | ||
| ) | ||
|
Comment on lines
+342
to
+358
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we rewind This retry path reuses the same stream across attempts, but Suggested fix def _read_file_remotely_elements(
self,
file_handle: IOBase,
format: APIProcessingConfigModel,
filetype: FileType,
strategy: str,
remote_file: RemoteFile,
) -> List[Dict[str, Any]]:
+ file_handle.seek(0)
headers = {"accept": "application/json", "unstructured-api-key": format.api_key}
data = self._params_to_dict(format.parameters, strategy)
file_data = {"files": ("filename", file_handle, FILETYPE_TO_MIMETYPE[filetype])}Also applies to: 386-410 🤖 Prompt for AI Agents
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a pre-existing pattern — the existing |
||
|
|
||
| def _read_file_remotely( | ||
| self, | ||
| file_handle: IOBase, | ||
|
|
@@ -352,9 +386,41 @@ def _read_file_remotely( | |
|
|
||
| return self._render_markdown(json_response) | ||
|
|
||
| def _read_file_remotely_elements( | ||
| self, | ||
| file_handle: IOBase, | ||
| format: APIProcessingConfigModel, | ||
| filetype: FileType, | ||
| strategy: str, | ||
| remote_file: RemoteFile, | ||
| ) -> List[Dict[str, Any]]: | ||
| headers = {"accept": "application/json", "unstructured-api-key": format.api_key} | ||
|
|
||
| data = self._params_to_dict(format.parameters, strategy) | ||
|
|
||
| file_data = {"files": ("filename", file_handle, FILETYPE_TO_MIMETYPE[filetype])} | ||
|
|
||
| response = requests.post( | ||
| f"{format.api_url}/general/v0/general", headers=headers, data=data, files=file_data | ||
| ) | ||
|
|
||
| if response.status_code == 422: | ||
| raise self._create_parse_error(remote_file, response.json()) | ||
| else: | ||
| response.raise_for_status() | ||
|
|
||
| json_response: List[Dict[str, Any]] = response.json() | ||
| return json_response | ||
|
Comment on lines
+412
to
+413
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: cd /tmp && find . -name "unstructured_parser.py" 2>/dev/null | head -5Repository: airbytehq/airbyte-python-cdk Length of output: 54 🏁 Script executed: # Let's search for the file in the repository
find . -name "unstructured_parser.py" 2>/dev/nullRepository: airbytehq/airbyte-python-cdk Length of output: 138 🏁 Script executed: # Get context around lines 413-414
cat -n airbyte_cdk/sources/file_based/file_types/unstructured_parser.py | sed -n '400,430p'Repository: airbytehq/airbyte-python-cdk Length of output: 1487 🏁 Script executed: # Let's also check if there's validation elsewhere and understand the full method
cat -n airbyte_cdk/sources/file_based/file_types/unstructured_parser.py | sed -n '380,440p'Repository: airbytehq/airbyte-python-cdk Length of output: 2964 🏁 Script executed: # Search for markdown_json function and its usage
rg "markdown_json" --type py -B 5 -A 10Repository: airbytehq/airbyte-python-cdk Length of output: 21328 🏁 Script executed: # Check what happens to the returned value from _read_file_remotely_elements
cat -n airbyte_cdk/sources/file_based/file_types/unstructured_parser.py | sed -n '290,330p'Repository: airbytehq/airbyte-python-cdk Length of output: 2039 🏁 Script executed: # Check the _render_markdown method to see how it handles the data
rg "_render_markdown" -B 2 -A 10 airbyte_cdk/sources/file_based/file_types/unstructured_parser.pyRepository: airbytehq/airbyte-python-cdk Length of output: 2755 🏁 Script executed: # Let's see the complete parse_records flow
cat -n airbyte_cdk/sources/file_based/file_types/unstructured_parser.py | sed -n '290,350p'Repository: airbytehq/airbyte-python-cdk Length of output: 2956 🏁 Script executed: # Check if there are other places where response.json() is called without validation
rg "response\.json\(\)" airbyte_cdk/sources/file_based/file_types/unstructured_parser.py -B 3 -A 3Repository: airbytehq/airbyte-python-cdk Length of output: 899 🏁 Script executed: # Let's also check what happens if json.dumps is called on a dict vs a list
python3 << 'EOF'
import json
# Test what happens with dict
dict_response = {"detail": "error"}
list_response = [{"type": "Title", "text": "heading"}]
print("json.dumps on dict:")
print(json.dumps(dict_response))
print("\njson.dumps on list:")
print(json.dumps(list_response))
# What happens when we iterate?
print("\n\nIterating over dict:")
for item in dict_response:
print(f" {item}")
print("\nIterating over list:")
for item in list_response:
print(f" {item}")
EOFRepository: airbytehq/airbyte-python-cdk Length of output: 264 Validate that the API response is actually a list before returning it. Right now if the API returns a 200 with 🤖 Prompt for AI Agents |
||
|
|
||
| def _read_file_locally( | ||
| self, file_handle: IOBase, filetype: FileType, strategy: str, remote_file: RemoteFile | ||
| ) -> str: | ||
| elements = self._read_file_locally_elements(file_handle, filetype, strategy, remote_file) | ||
| return self._render_markdown(elements) | ||
|
|
||
| def _read_file_locally_elements( | ||
| self, file_handle: IOBase, filetype: FileType, strategy: str, remote_file: RemoteFile | ||
| ) -> List[Dict[str, Any]]: | ||
| _import_unstructured() | ||
| if ( | ||
| (not unstructured_partition_pdf) | ||
|
|
@@ -385,7 +451,7 @@ def _read_file_locally( | |
| except Exception as e: | ||
| raise self._create_parse_error(remote_file, str(e)) | ||
|
|
||
| return self._render_markdown([element.to_dict() for element in elements]) | ||
| return [element.to_dict() for element in elements] | ||
|
|
||
| def _create_parse_error( | ||
| self, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.