-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsetup_vector_db.py
More file actions
145 lines (105 loc) · 3.85 KB
/
setup_vector_db.py
File metadata and controls
145 lines (105 loc) · 3.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""
Script to process health.pdf and upload it to Pinecone vector database.
Run this script ONCE to set up the vector database.
"""
import os
import time
import PyPDF2
from dotenv import load_dotenv
from pinecone import Pinecone, ServerlessSpec
from sentence_transformers import SentenceTransformer
load_dotenv()
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
INDEX_NAME = os.getenv("PINECONE_INDEX_NAME", "doctor-assist")
AWS_REGION = os.getenv("PINECONE_ENVIRONMENT", "us-east-1")
if not PINECONE_API_KEY:
raise ValueError("❌ PINECONE_API_KEY not found in .env file")
pc = Pinecone(api_key=PINECONE_API_KEY)
print("🔄 Loading embedding model...")
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
EMBEDDING_DIMENSION = 384 # all-MiniLM-L6-v2 output size
def extract_text_from_pdf(pdf_path: str) -> str:
"""Extract text from a PDF file."""
text = ""
with open(pdf_path, "rb") as file:
reader = PyPDF2.PdfReader(file)
total_pages = len(reader.pages)
print(f"📄 Processing {total_pages} pages...")
for i, page in enumerate(reader.pages, start=1):
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
if i % 10 == 0 or i == total_pages:
print(f" ✓ Processed {i}/{total_pages} pages")
return text
def chunk_text(text: str, chunk_size: int = 500, overlap: int = 100) -> list[str]:
"""Split text into overlapping chunks."""
words = text.split()
chunks = []
step = chunk_size - overlap
for i in range(0, len(words), step):
chunk = " ".join(words[i : i + chunk_size])
if chunk.strip():
chunks.append(chunk)
return chunks
def create_index_if_not_exists():
"""Create Pinecone index if it doesn't already exist."""
try:
existing_indexes = pc.list_indexes().names()
except Exception as e:
raise RuntimeError(f"❌ Failed to list Pinecone indexes: {e}")
if INDEX_NAME in existing_indexes:
print(f"✅ Index '{INDEX_NAME}' already exists")
return
print(f"🚀 Creating Pinecone index '{INDEX_NAME}' (AWS | {AWS_REGION})")
pc.create_index(
name=INDEX_NAME,
dimension=EMBEDDING_DIMENSION,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region=AWS_REGION
)
)
print("⏳ Waiting for index to be ready...")
time.sleep(10)
def upload_to_pinecone(chunks: list[str]):
"""Upload text chunks and embeddings to Pinecone."""
index = pc.Index(INDEX_NAME)
print(f"⬆️ Uploading {len(chunks)} chunks to Pinecone...")
batch_size = 100
for start in range(0, len(chunks), batch_size):
batch = chunks[start : start + batch_size]
embeddings = embedding_model.encode(
batch,
show_progress_bar=True
)
vectors = [
{
"id": f"chunk-{start + i}",
"values": embedding.tolist(),
"metadata": {"text": chunk},
}
for i, (chunk, embedding) in enumerate(zip(batch, embeddings))
]
index.upsert(vectors=vectors)
print(
f" ✓ Uploaded batch "
f"{start // batch_size + 1}/"
f"{(len(chunks) + batch_size - 1) // batch_size}"
)
print("✅ All vectors uploaded successfully")
def main():
pdf_path = "health.pdf"
if not os.path.exists(pdf_path):
raise FileNotFoundError(f"❌ '{pdf_path}' not found")
create_index_if_not_exists()
print("🔍 Extracting text from PDF...")
text = extract_text_from_pdf(pdf_path)
print("✂️ Chunking text...")
chunks = chunk_text(text)
print(f"📦 Created {len(chunks)} chunks")
upload_to_pinecone(chunks)
print("\n🎉 Vector database setup complete!")
if __name__ == "__main__":
main()