Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 99 additions & 1 deletion openviking/storage/vectordb/collection/local_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from openviking.storage.vectordb.index.local_index import PersistentIndex, VolatileIndex
from openviking.storage.vectordb.meta.collection_meta import CollectionMeta, create_collection_meta
from openviking.storage.vectordb.meta.index_meta import create_index_meta
from openviking.storage.vectordb.store.bytes_row import STRING_MAX_UINT16_LENGTH
from openviking.storage.vectordb.store.data import CandidateData, DeltaRecord
from openviking.storage.vectordb.store.store import OpType
from openviking.storage.vectordb.store.store_manager import StoreManager, create_store_manager
Expand Down Expand Up @@ -55,6 +56,99 @@

# Use imported constants, no longer defined here
AUTO_ID_KEY = SpecialFields.AUTO_ID.value
FIELDS_STORAGE_MAX_BYTES = STRING_MAX_UINT16_LENGTH
TRUNCATION_MARKER = "...[openviking:truncated]"
PREFERRED_TRUNCATABLE_FIELDS = (
"abstract",
"description",
"summary",
"overview",
"content",
"text",
"metadata",
)


def _utf8_len(value: str) -> int:
return len(value.encode("utf-8"))


def _truncate_utf8(value: str, max_bytes: int) -> str:
if max_bytes <= 0:
return ""
marker = TRUNCATION_MARKER.encode("utf-8")
if max_bytes <= len(marker):
return marker[:max_bytes].decode("utf-8", "ignore")
raw = value.encode("utf-8")
if len(raw) <= max_bytes:
return value
prefix = raw[: max_bytes - len(marker)].decode("utf-8", "ignore")
return prefix + TRUNCATION_MARKER


def _serialized_fields_len(data: Dict[str, Any]) -> int:
return _utf8_len(safe_json_dumps(data, ensure_ascii=False))


def _truncatable_string_fields(data: Dict[str, Any], primary_key: str) -> List[str]:
candidates = [
name
for name, value in data.items()
if name != primary_key and isinstance(value, str) and value
]
preferred = [name for name in PREFERRED_TRUNCATABLE_FIELDS if name in candidates]
remaining = [name for name in candidates if name not in preferred]
remaining.sort(key=lambda name: _utf8_len(data[name]), reverse=True)
return preferred + remaining


def _serialize_fields_for_store(
data: Dict[str, Any],
collection_name: str,
primary_key: str,
) -> str:
fields = safe_json_dumps(data, ensure_ascii=False)
initial_size = _utf8_len(fields)
if initial_size <= FIELDS_STORAGE_MAX_BYTES:
return fields

compacted = dict(data)
truncated_fields: List[str] = []
for field in _truncatable_string_fields(compacted, primary_key):
original = compacted[field]
high = _utf8_len(original)
low = 0
best: Optional[str] = None

while low <= high:
mid = (low + high) // 2
compacted[field] = _truncate_utf8(original, mid)
if _serialized_fields_len(compacted) <= FIELDS_STORAGE_MAX_BYTES:
best = compacted[field]
low = mid + 1
else:
high = mid - 1

compacted[field] = best if best is not None else ""
if compacted[field] != original:
truncated_fields.append(field)

fields = safe_json_dumps(compacted, ensure_ascii=False)
if _utf8_len(fields) <= FIELDS_STORAGE_MAX_BYTES:
logger.warning(
f"Truncated local vector store fields for collection '{collection_name}' "
f"primary_key='{primary_key}' fields={truncated_fields} "
f"bytes={initial_size}->{_utf8_len(fields)} "
f"limit={FIELDS_STORAGE_MAX_BYTES}"
)
return fields

final_size = _serialized_fields_len(compacted)
raise ValueError(
f"local vector store fields exceed {FIELDS_STORAGE_MAX_BYTES} bytes "
f"after truncation: collection={collection_name}, primary_key={primary_key}, "
f"bytes={initial_size}->{final_size}"
)


def get_or_create_local_collection(
Expand Down Expand Up @@ -589,7 +683,11 @@ def upsert_data(self, raw_data_list: List[Dict[str, Any]], ttl=0):
if sparse_dict and isinstance(sparse_dict, dict):
cands_list[i].sparse_raw_terms = list(sparse_dict.keys())
cands_list[i].sparse_values = list(sparse_dict.values())
cands_list[i].fields = safe_json_dumps(data, ensure_ascii=False)
cands_list[i].fields = _serialize_fields_for_store(
data,
collection_name=self.meta.collection_name,
primary_key=pk,
)
cands_list[i].expire_ns_ts = time.time_ns() + ttl * 1000000000 if ttl > 0 else 0

if not self.store_mgr:
Expand Down
37 changes: 23 additions & 14 deletions tests/vectordb/test_filter_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-License-Identifier: AGPL-3.0

import gc
import json
import random
import shutil
import time
Expand Down Expand Up @@ -138,22 +139,30 @@ def _create_collection(self):
}
return get_or_create_local_collection(meta_data=collection_meta, path=self.path)

def test_upsert_record_with_oversized_json_field_raises(self):
def test_upsert_record_with_oversized_json_field_is_truncated(self):
large_abstract = 'prefix "quoted" \\\\ path\n' + ("x" * 66000)
uri = "viking://user/memories/large.md"
with self.assertRaisesRegex(
(RuntimeError, ValueError), "fields.*exceeds 65535 bytes"
):
self.collection.upsert_data(
[
{
"id": 1,
"embedding": [1.0, 0, 0, 0],
"uri": uri,
"abstract": large_abstract,
}
]
)
result = self.collection.upsert_data(
[
{
"id": 1,
"embedding": [1.0, 0, 0, 0],
"uri": uri,
"abstract": large_abstract,
}
]
)

self.assertEqual(result.ids, [1])
inner = self.collection._Collection__collection
stored = inner.store_mgr.get_all_cands_data()[0]
self.assertLessEqual(len(stored.fields.encode("utf-8")), 65535)

fields = json.loads(stored.fields)
self.assertEqual(fields["id"], 1)
self.assertEqual(fields["uri"], uri)
self.assertLess(len(fields["abstract"]), len(large_abstract))
self.assertTrue(fields["abstract"].endswith("...[openviking:truncated]"))


class TestFilterOpsComplex(unittest.TestCase):
Expand Down
Loading