Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ wheels/
*.egg-info/
.installed.cfg
*.egg

data/*
# Virtual Environment
venv/
env/
Expand Down
139 changes: 139 additions & 0 deletions data_organizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import os
import re
import random
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Tuple, Optional

class TableDataOrganizer:
def __init__(self, data_root: str):
"""
Initialize the organizer with the root data directory.
Args:
data_root: Path to the 'data' directory.
"""
self.data_root = Path(data_root)
self.grouped_data: Dict[str, List[str]] = defaultdict(list)
self._organize_data()

def _organize_data(self):
"""Scans the data directory and groups images by table ID."""
# Regex to parse filenames: P_origin_{group}_{table}_{index}.png or P_origin_{group}_{table}.png
# We want to group by "group_table"

# Pattern covers: group, table, index (optional)
# e.g., P_origin_1_11_0.png -> group=1, table=11, index=0
# e.g., P_origin_1_2.png -> group=1, table=2, index=-1 (conceptually)
pattern = re.compile(r"P_origin_(\d+)_(\d+)(?:_(\d+))?\.png")

if not self.data_root.exists():
print(f"Warning: Directory {self.data_root} does not exist.")
return

for root, _, files in os.walk(self.data_root):
for file in files:
if not file.endswith(".png"):
continue

match = pattern.match(file)
if match:
group_id = match.group(1)
table_id = match.group(2)
index = match.group(3)

# If index is missing (e.g. single file per table), treat as 0 or handle logically
# For sorting purposes, we can treat None as -1 so it comes first, or just 0
idx_val = int(index) if index is not None else -1

# Create a unique key for grouping: "group_{g}_table_{t}"
key = f"P_origin_{group_id}_{table_id}"

abs_path = str(Path(root) / file)
self.grouped_data[key].append((idx_val, abs_path))

# Sort each group by index
for key in self.grouped_data:
# Sort by index (tuple first element)
self.grouped_data[key].sort(key=lambda x: x[0])
# Keep only paths
self.grouped_data[key] = [item[1] for item in self.grouped_data[key]]

def get_batches(self,
sampling: bool = False,
min_k: int = 2,
max_k: int = 3,
num_samples: int = 1) -> Dict[str, List[List[str]]]:
"""
Generates batches of images for each table.

Args:
sampling: If True, randomly samples images. If False, returns all images as one batch.
min_k: Minimum number of images to sample (inclusive, used if sampling=True).
max_k: Maximum number of images to sample (inclusive, used if sampling=True).
num_samples: Number of random batches to generate per table (used if sampling=True).

Returns:
A dictionary where keys are table identifiers and values are LISTS of image lists (batches).
e.g. {
"P_origin_1_11": [ ["path/to/img0", "path/to/img2"] ]
}
"""
results = {}

for key, images in self.grouped_data.items():
if not sampling:
# Return all images as a single batch
results[key] = [images]
else:
table_batches = []
n_images = len(images)

# If there are fewer images than min_k, we can't really "sample" between min_k and max_k
# strictly unless we allow duplicates or just take what we have.
# Logic: if n_images < min_k, just use all images once (effectively no sampling choice).
effective_min = min(n_images, min_k)
effective_max = min(n_images, max_k)

if n_images == 0:
results[key] = []
continue

for _ in range(num_samples):
# Randomly choose k size
# If effective_min == effective_max, then k is fixed
k = random.randint(effective_min, effective_max) if effective_min <= effective_max else n_images

# Sample k images
# Note: random.sample throws error if k > population
# We guarded with min(), so k <= n_images
if k > 0:
batch = sorted(random.sample(images, k))
table_batches.append(batch)
else:
# Should not happen typically unless file list is empty
table_batches.append([])

results[key] = table_batches

return results

if __name__ == "__main__":
# Test existing directory
organizer = TableDataOrganizer("data")

print("=== Default Mode (All Images) ===")
batches_default = organizer.get_batches(sampling=False)
# Print first 2 keys
for k in list(batches_default.keys())[:2]:
print(f"Table: {k}")
for batch in batches_default[k]:
print(f" Batch size: {len(batch)}")
# print(batch) # Uncomment to see paths

print("\n=== Sampling Mode (2-3 images) ===")
batches_sampled = organizer.get_batches(sampling=True, min_k=2, max_k=3, num_samples=2)
for k in list(batches_sampled.keys())[:2]:
print(f"Table: {k}")
for i, batch in enumerate(batches_sampled[k]):
print(f" Sample {i+1}: size {len(batch)}")
# print(batch) # Uncomment to see paths
74 changes: 74 additions & 0 deletions fix1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Fix 1: 데이터 구조화 및 랜덤 샘플링 (Data Organization & Random Sampling)

## 개요 (Overview)
테이블 식별자를 기준으로 이미지를 그룹화하고, QA 생성을 위해 이미지의 일부를 무작위로 추출(샘플링)하는 로직을 구현했습니다. 이를 통해 동일한 테이블에 속한 이미지들의 다양한 조합을 사용하여 다채로운 QA 쌍을 생성할 수 있습니다.

## 새로운 파일: `data_organizer.py`
이 스크립트는 `data` 디렉토리를 스캔하여 `P_origin_{group}_{table}_{index}.png` 명명 규칙에 따라 이미지를 그룹화합니다.

```python
import os
import re
import random
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Optional

class TableDataOrganizer:
def __init__(self, data_root: str):
self.data_root = Path(data_root)
self.grouped_data: Dict[str, List[str]] = defaultdict(list)
self._organize_data()

def _organize_data(self):
pattern = re.compile(r"P_origin_(\d+)_(\d+)(?:_(\d+))?\.png")
if not self.data_root.exists():
return

for root, _, files in os.walk(self.data_root):
for file in files:
if not file.endswith(".png"): continue
match = pattern.match(file)
if match:
group_id, table_id, index = match.group(1), match.group(2), match.group(3)
idx_val = int(index) if index is not None else -1
key = f"P_origin_{group_id}_{table_id}"
abs_path = str(Path(root) / file)
self.grouped_data[key].append((idx_val, abs_path))

for key in self.grouped_data:
self.grouped_data[key].sort(key=lambda x: x[0])
self.grouped_data[key] = [item[1] for item in self.grouped_data[key]]

def get_batches(self, sampling: bool = False, min_k: int = 2, max_k: int = 3, num_samples: int = 1) -> Dict[str, List[List[str]]]:
results = {}
for key, images in self.grouped_data.items():
if not sampling:
results[key] = [images]
else:
table_batches = []
n_images = len(images)
effective_min = min(n_images, min_k)
effective_max = min(n_images, max_k)

if n_images == 0:
results[key] = []
continue

for _ in range(num_samples):
k = random.randint(effective_min, effective_max) if effective_min <= effective_max else n_images
if k > 0:
batch = sorted(random.sample(images, k))
table_batches.append(batch)
results[key] = table_batches
return results
```

## `generate_synthetic_table/runner.py` 변경 사항
- CLI 인자 추가: `--sampling`, `--min-k`, `--max-k`, `--num-samples`.
- `TableDataOrganizer`를 통합하여 이미지 배치(batch)를 준비하도록 수정.
- 단일 파일 대신 준비된 배치를 순회하며 실행하도록 루프 수정.

## `generate_synthetic_table/flow.py` 변경 사항
- `TableState` 업데이트: `image_paths: List[str]` 필드 추가.
- `generate_qa_from_image_node` 업데이트: 다중 이미지를 입력받아 LLM에 전달하도록 로직 수정.
39 changes: 39 additions & 0 deletions fix2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Fix 2: 도메인 맞춤형 프롬프트 (YAML 기반)

## 개요 (Overview)
도메인별 최적화된 프롬프트 관리를 용이하게 하기 위해 기존 텍스트 파일 기반 시스템을 YAML 기반 시스템으로 교체했습니다. 이를 통해 입력 파일의 키나 CLI 인자를 기반으로 특정 도메인(예: 공공/정부 데이터)에 맞는 프롬프트를 자동으로 로드할 수 있습니다.

## 프롬프트 파일 (Prompt Files)
- **`generate_synthetic_table/prompts/default.yaml`**: 파이프라인의 12개 이상의 단계에서 사용되는 기본 프롬프트들을 모두 포함합니다.
- **`generate_synthetic_table/prompts/public.yaml`**: 도메인별 오버라이드 내용을 포함합니다.
- 예시: `generate_qa_from_image` 프롬프트가 공공 부문 용어에 맞춰 커스터마이징되어 있습니다.

## `generate_synthetic_table/flow.py` 변경 사항
- `_load_yaml_prompts` 함수 구현: YAML 파일 로드 및 캐싱 기능.
- `_load_prompt(name, domain)` 함수 업데이트: `{domain}.yaml`을 먼저 확인하고, 없으면 `default.yaml`을 사용하도록 로직 변경 (Fallback).
- 모든 노드(Node) 함수 업데이트:
1. `TableState`에서 `domain` 정보를 읽어옴.
2. 노드 실행 시 `_load_prompt(..., domain)`을 동적으로 호출하여 프롬프트 결정.

```python
# flow.py 내 동적 로딩 예시
def generate_qa_node(llm: ChatOpenAI) -> Callable[[TableState], TableState]:
def _node(state: TableState) -> TableState:
# state의 도메인 정보에 따라 프롬프트 로드
prompt_template = _load_prompt("generate_qa", state.get("domain"))
# ... 나머지 로직
return _node
```

## `generate_synthetic_table/runner.py` 변경 사항
- CLI 인자 추가: `--domain`.
- **자동 감지 로직 (Auto-Detection Logic)** 구현:
- 입력 파일/폴더 이름이 `P_`로 시작하는 경우, 자동으로 `domain="public"`으로 설정.
- 이 `domain` 값을 `run_synthetic_table_flow`로 전달하여 `TableState`에 반영.

## 사용 방법 (Usage)
`data/P_origin_1`과 같은 폴더를 처리할 때 시스템은 자동으로 'public' 도메인 프롬프트를 적용합니다.
수동으로 지정할 수도 있습니다:
```bash
uv run python main.py data/MyTable --domain public
```
Loading