Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions examples/LLM_Workflows/information_extraction/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Information Extraction

This example shows how to model an LLM information extraction task as a Hamilton dataflow.
It takes customer feedback text, builds a JSON schema from Pydantic models, creates an extraction prompt, sends it to the OpenAI chat API, parses the JSON response, and validates the result.

The dataflow is intentionally small so each part of the extraction pipeline is visible as a Hamilton node:

- `output_schema`: the expected JSON shape
- `extraction_prompt`: prompt built from the schema and input text
- `extraction_response`: OpenAI API response
- `parsed_extraction`: JSON parsing
- `validated_extraction`: Pydantic validation
- `extracted_feedback`: serializable extracted records

## Run

Install the requirements:

```bash
pip install -r requirements.txt
```

Run with a mocked LLM response:

```bash
python run.py --mock-response
```

Run with OpenAI:

```bash
export OPENAI_API_KEY=...
python run.py
```

You can pass custom text directly:

```bash
python run.py --text "The app search is great, but checkout keeps failing."
```

Or read text from a file:

```bash
python run.py --text-file feedback.txt
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import json
import os
import re
from typing import Any, Literal

from openai import OpenAI
from pydantic import BaseModel, Field


class ProductFeedback(BaseModel):
product: str = Field(description="Product or feature mentioned by the customer.")
sentiment: Literal["positive", "negative", "neutral", "mixed"]
issues: list[str] = Field(default_factory=list)
requested_features: list[str] = Field(default_factory=list)
summary: str


class FeedbackExtraction(BaseModel):
feedback: list[ProductFeedback]


def sample_feedback_text() -> str:
"""Customer feedback to extract from."""
return (
"The mobile app search is fast, but checkout froze twice before I could pay. "
"I still like the recommendations tab because it found the right headphones. "
"Please add order status alerts and make saved cards easier to update."
)


def output_schema() -> dict[str, Any]:
"""JSON schema for the extraction."""
schema_method = getattr(FeedbackExtraction, "model_json_schema", FeedbackExtraction.schema)
return schema_method()


def extraction_prompt(input_text: str, output_schema: dict[str, Any]) -> str:
"""Prompt sent to the LLM."""
return f"""Extract structured product feedback from the text.

Return only JSON that matches this schema:
{json.dumps(output_schema, indent=2)}

Text:
{input_text}
"""


def llm_client() -> OpenAI:
"""OpenAI client."""
return OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))


def extraction_messages(extraction_prompt: str) -> list[dict[str, str]]:
"""Chat messages for extraction."""
return [
{
"role": "system",
"content": "You extract structured data and return valid JSON only.",
},
{"role": "user", "content": extraction_prompt},
]


def extraction_response(
llm_client: OpenAI,
extraction_messages: list[dict[str, str]],
openai_model: str = "gpt-4o-mini",
) -> str:
"""Raw LLM response."""
response = llm_client.chat.completions.create(
model=openai_model,
messages=extraction_messages,
response_format={"type": "json_object"},
temperature=0,
)
return response.choices[0].message.content or ""


def parsed_extraction(extraction_response: str) -> dict[str, Any]:
"""Parsed JSON response."""
cleaned_response = re.sub(r"^```(?:json)?|```$", "", extraction_response.strip()).strip()
return json.loads(cleaned_response)


def validated_extraction(parsed_extraction: dict[str, Any]) -> FeedbackExtraction:
"""Validated extraction."""
return FeedbackExtraction(**parsed_extraction)


def extracted_feedback(validated_extraction: FeedbackExtraction) -> list[dict[str, Any]]:
"""Serializable feedback records."""
model_dump = getattr(validated_extraction, "model_dump", validated_extraction.dict)
return model_dump()["feedback"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
apache-hamilton[visualization]
openai
pydantic
89 changes: 89 additions & 0 deletions examples/LLM_Workflows/information_extraction/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import argparse
import json
from pathlib import Path

import information_extraction
from hamilton import driver


MOCK_RESPONSE = {
"feedback": [
{
"product": "mobile app search",
"sentiment": "positive",
"issues": [],
"requested_features": [],
"summary": "The customer says search is fast.",
},
{
"product": "checkout",
"sentiment": "negative",
"issues": ["Checkout froze twice before payment."],
"requested_features": [],
"summary": "Checkout reliability blocked payment.",
},
{
"product": "recommendations tab",
"sentiment": "positive",
"issues": [],
"requested_features": [],
"summary": "Recommendations helped find the right headphones.",
},
{
"product": "orders and saved cards",
"sentiment": "neutral",
"issues": [],
"requested_features": [
"Order status alerts.",
"Easier saved card updates.",
],
"summary": "The customer asks for order alerts and card management improvements.",
},
]
}


def _input_text(args: argparse.Namespace) -> str:
if args.text_file:
return Path(args.text_file).read_text()
if args.text:
return args.text
return information_extraction.sample_feedback_text()


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run the information extraction example.")
parser.add_argument("--text", help="Customer feedback text to extract from.")
parser.add_argument("--text-file", help="Path to a text file to extract from.")
parser.add_argument("--model", default="gpt-4o-mini", help="OpenAI chat model.")
parser.add_argument("--mock-response", action="store_true", help="Run without an API call.")
args = parser.parse_args()

dr = driver.Builder().with_modules(information_extraction).build()
inputs = {
"input_text": _input_text(args),
"openai_model": args.model,
}
overrides = {}
if args.mock_response:
overrides["extraction_response"] = json.dumps(MOCK_RESPONSE)

result = dr.execute(["extracted_feedback"], inputs=inputs, overrides=overrides)
print(json.dumps(result["extracted_feedback"], indent=2))