Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion config.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,7 @@ class Config:
DOCSTAR_ORG_ID = os.getenv('DOCSTAR_ORG_ID')
DOCSTAR_COLLECTION_ID = os.getenv('DOCSTAR_COLLECTION_ID')
AI_ML_APIKEY = os.getenv('AI_ML_APIKEY')
AI_MIDDLEWARE_PAUTH_KEY = os.getenv('AI_MIDDLEWARE_PAUTH_KEY')
AI_MIDDLEWARE_PAUTH_KEY = os.getenv('AI_MIDDLEWARE_PAUTH_KEY')

HIPPOCAMPUS_API_KEY = os.getenv('HIPPOCAMPUS_API_KEY')
HIPPOCAMPUS_API_URL = os.getenv('HIPPOCAMPUS_API_URL')
196 changes: 122 additions & 74 deletions src/controllers/rag_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from ..services.utils.rag_utils import extract_pdf_text, extract_csv_text, extract_docx_text
import traceback
from ..services.utils.rag_utils import get_csv_query_type
from ...utils.apiservice import fetch

rag_model = db["rag_datas"]
rag_parent_model = db["rag_parent_datas"]
Expand Down Expand Up @@ -251,97 +252,144 @@ async def delete_doc(request):
raise HTTPException(status_code=500, detail = error)


async def get_text_from_vectorsQuery(args, Flag = True, score = 0.1):
try:
doc_id = args.get('Document_id')
query = args.get('query')
org_id = args.get('org_id')
top_k = args.get('top_k', 3)
additional_query = {}
# async def get_text_from_vectorsQuery(args, Flag = True, score = 0.1):
# try:
# doc_id = args.get('Document_id')
# query = args.get('query')
# org_id = args.get('org_id')
# top_k = args.get('top_k', 3)
# additional_query = {}

if query is None:
raise HTTPException(status_code=400, detail="Query is required.")
# if query is None:
# raise HTTPException(status_code=400, detail="Query is required.")

doc_data = await rag_parent_model.find_one({
'_id' : ObjectId(doc_id)
})
# doc_data = await rag_parent_model.find_one({
# '_id' : ObjectId(doc_id)
# })

if not doc_data:
raise Exception("Invalid document id provided.")
# if not doc_data:
# raise Exception("Invalid document id provided.")

if doc_data['source']['fileFormat'] == 'csv':
to_search_for = await get_csv_query_type(doc_data, query)
additional_query['chunkType'] = to_search_for
# if doc_data['source']['fileFormat'] == 'csv':
# to_search_for = await get_csv_query_type(doc_data, query)
# additional_query['chunkType'] = to_search_for

embedding = OpenAIEmbeddings(api_key=Config.OPENAI_API_KEY, model="text-embedding-3-small").embed_documents([query])
# embedding = OpenAIEmbeddings(api_key=Config.OPENAI_API_KEY, model="text-embedding-3-small").embed_documents([query])

# Query Pinecone with timing
start_time = time.time()
query_response = index.query(
vector=embedding[0] if isinstance(embedding, list) and len(embedding) == 1 else list(map(float, embedding)),
namespace=org_id,
filter={ "docId": doc_id, **additional_query },
top_k = top_k # Adjust the number of results as needed
)
# # Query Pinecone with timing
# start_time = time.time()
# query_response = index.query(
# vector=embedding[0] if isinstance(embedding, list) and len(embedding) == 1 else list(map(float, embedding)),
# namespace=org_id,
# filter={ "docId": doc_id, **additional_query },
# top_k = top_k # Adjust the number of results as needed
# )

# Filter results based on score threshold - only include results with score >= threshold
# Skip filtering if Flag is True
if Flag:
filtered_matches = query_response['matches']
else:
filtered_matches = []
# # Filter results based on score threshold - only include results with score >= threshold
# # Skip filtering if Flag is True
# if Flag:
# filtered_matches = query_response['matches']
# else:
# filtered_matches = []

for result in query_response['matches']:
if result['score'] >= score:
filtered_matches.append(result)
# for result in query_response['matches']:
# if result['score'] >= score:
# filtered_matches.append(result)

query_response_ids = [result['id'] for result in filtered_matches]
# query_response_ids = [result['id'] for result in filtered_matches]

# Create a dictionary to map id to score and position
id_to_score = {result['id']: result['score'] for result in filtered_matches}
id_to_position = {result['id']: pos for pos, result in enumerate(filtered_matches)}
# # Create a dictionary to map id to score and position
# id_to_score = {result['id']: result['score'] for result in filtered_matches}
# id_to_position = {result['id']: pos for pos, result in enumerate(filtered_matches)}


# Query MongoDB using query_response_ids
mongo_query = {"_id": {"$in": [ObjectId(id) for id in query_response_ids] }}
cursor = rag_model.find(mongo_query)
# # Query MongoDB using query_response_ids
# mongo_query = {"_id": {"$in": [ObjectId(id) for id in query_response_ids] }}
# cursor = rag_model.find(mongo_query)

mongo_results = await cursor.to_list(length=None)
mongo_results.sort(key=lambda x: id_to_position.get(str(x.get('_id')), float('inf')))
# mongo_results = await cursor.to_list(length=None)
# mongo_results.sort(key=lambda x: id_to_position.get(str(x.get('_id')), float('inf')))

text = ""
results_with_scores = []
for result in mongo_results:
text += result.get('data', '')
result_id = str(result.get('_id'))
result_data = {
'id': result_id,
'data': result.get('data', '')
}
# Only add score if Flag is False
if not Flag:
result_data['score'] = id_to_score.get(result_id, 0.0)
results_with_scores.append(result_data)
# text = ""
# results_with_scores = []
# for result in mongo_results:
# text += result.get('data', '')
# result_id = str(result.get('_id'))
# result_data = {
# 'id': result_id,
# 'data': result.get('data', '')
# }
# # Only add score if Flag is False
# if not Flag:
# result_data['score'] = id_to_score.get(result_id, 0.0)
# results_with_scores.append(result_data)

# Build response metadata based on Flag
metadata = {"type": "RAG"}
if not Flag:
metadata["results_with_scores"] = results_with_scores
metadata["similarity_scores"] = [{'id': result['id'], 'score': result['score']} for result in filtered_matches]
else:
metadata["results"] = [{'id': item['id'], 'data': item['data']} for item in results_with_scores]
# # Build response metadata based on Flag
# metadata = {"type": "RAG"}
# if not Flag:
# metadata["results_with_scores"] = results_with_scores
# metadata["similarity_scores"] = [{'id': result['id'], 'score': result['score']} for result in filtered_matches]
# else:
# metadata["results"] = [{'id': item['id'], 'data': item['data']} for item in results_with_scores]

# return {
# 'response': text,
# 'metadata': metadata,
# 'status': 1
# }

# except Exception as error:
# return {
# 'response': str(error),
# 'metadata': {
# "type": "RAG"
# },
# 'status': 0 # 0 indicates error/failure
# }





async def get_text_from_vectorsQuery(args):
try:
query = args.get("query")
collection_id = args.get("collectionId")
resource_id = args.get("resourceId")
owner_id = args.get("ownerId")
is_review = args.get("isReview", True)

if not query:
raise ValueError("Query is required")

payload = {
"query": query,
"collectionId": collection_id,
"resourceId": resource_id,
"ownerId": owner_id,
"isReview": is_review
}

headers = {
"Content-Type": "application/json",
"x-api-key": Config.HIPPOCAMPUS_API_KEY
}

response_data, response_headers = await fetch(
url=f"{Config.HIPPOCAMPUS_BASE_URL}/search",
method="POST",
headers=headers,
json_body=payload
)

return {
'response': text,
'metadata': metadata,
'status': 1
"success": True,
"response": response_data,
"headers": response_headers
}

except Exception as error:
return {
'response': str(error),
'metadata': {
"type": "RAG"
},
'status': 0 # 0 indicates error/failure
}
"success": False,
"error": str(error)
}