Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4925446
Refactor hybrid queries to use `alpha_param` and remove `0.7` default…
tsmith023 Mar 13, 2026
1899ffd
Remove mistakenly commited local change to `regen.sh`
tsmith023 Mar 16, 2026
c057270
Update logic to use new proto message
tsmith023 Mar 16, 2026
8e5d54d
Change formatting
tsmith023 Mar 16, 2026
367ce79
Tidy version check code
tsmith023 Mar 16, 2026
81414a1
Parse correct default for BC if server < 1.36
tsmith023 Mar 16, 2026
a13da23
Update CI image
tsmith023 Mar 16, 2026
7abe7f5
Fix wrong version comparison
tsmith023 Mar 16, 2026
b5f5e55
Fix typo in ci
tsmith023 Mar 16, 2026
47841d4
Update CI image
tsmith023 Mar 16, 2026
e305b33
Remove client-side default from aggregate queries
tsmith023 Mar 16, 2026
c4d6e87
Merge pull request #1985 from weaviate/tsmith023/remove-default-hybri…
tsmith023 Mar 17, 2026
8520a1c
Update ver check and CI tags
tsmith023 Mar 20, 2026
ed37375
Remove test of lazy loading shards
tsmith023 Mar 20, 2026
454e0e4
Merge branch 'main' into dev/1.37
tsmith023 Mar 20, 2026
1433985
Refactor client test for new server lazy shard loading
tsmith023 Mar 20, 2026
d5d5117
Merge branch 'dev/1.37' of https://github.com/weaviate/weaviate-pytho…
tsmith023 Mar 20, 2026
dd6c835
Debug failing ci test
tsmith023 Mar 20, 2026
cf96ccb
Remove outdated lazy shard load test
tsmith023 Mar 20, 2026
4b669e1
Update CI images
tsmith023 Mar 20, 2026
890a414
Add per-test timeouts and stack dump on timeout
tsmith023 Mar 20, 2026
2893822
Reduce per-test timeout to 5 mins
tsmith023 Mar 20, 2026
6edd0e1
Fix inc backups test
tsmith023 Mar 20, 2026
e7cb697
Add server version check for incremental backups
tsmith023 Mar 20, 2026
12d064f
Remove comment
tsmith023 Mar 20, 2026
fe5c522
Hard kill the process on timeout detection
tsmith023 Mar 20, 2026
2d961d1
Timeout putting the sentinel to avoid deadlocking
tsmith023 Mar 20, 2026
fadcebb
Handle the timeout of sentinel pushing gracefully
tsmith023 Mar 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ env:
WEAVIATE_128: 1.28.16
WEAVIATE_129: 1.29.11
WEAVIATE_130: 1.30.22
WEAVIATE_131: 1.31.20
WEAVIATE_132: 1.32.23
WEAVIATE_133: 1.33.10
WEAVIATE_134: 1.34.5
WEAVIATE_135: 1.35.0
WEAVIATE_136: 1.36.0
WEAVIATE_137: 1.37.0-dev-29d5c87.amd64

WEAVIATE_131: 1.31.22
WEAVIATE_132: 1.32.27
WEAVIATE_133: 1.33.18
WEAVIATE_134: 1.34.19
WEAVIATE_135: 1.35.15
WEAVIATE_136: 1.36.6-21aaadc
WEAVIATE_137: 1.37.0-dev-8fb696b

jobs:
lint-and-format:
Expand Down Expand Up @@ -310,7 +309,7 @@ jobs:
$WEAVIATE_133,
$WEAVIATE_134,
$WEAVIATE_135,
$WEAVIATE_136
$WEAVIATE_136,
$WEAVIATE_137
]
steps:
Expand Down
68 changes: 67 additions & 1 deletion integration/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import os
import time
import sys
import threading
import traceback
import pytest
from typing import (
Any,
AsyncGenerator,
Expand All @@ -14,7 +18,6 @@
)
from typing import Callable, TypeVar

import pytest
import pytest_asyncio
from _pytest.fixtures import SubRequest

Expand Down Expand Up @@ -500,3 +503,66 @@ def retry_on_http_error(
raise
# This should never be reached, but satisfies the type checker
raise last_exception # type: ignore


TIMEOUT_SECONDS = 300


def dump_all_stacks():
frames = sys._current_frames()
lines = ["\n===== DEADLOCK DETECTED — THREAD DUMP =====\n"]
for thread in threading.enumerate():
frame = frames.get(thread.ident) # pyright: ignore
lines.append(f"\n--- Thread: {thread.name} (id={thread.ident}) ---")
if frame:
lines.append("".join(traceback.format_stack(frame)))
else:
lines.append(" (no frame)\n")
lines.append("===========================================\n")
return "\n".join(lines)


class DeadlockWatchdog:
def __init__(self, timeout):
self.timeout = timeout
self._timer = None

def start(self, label):
self._label = label
self._timer = threading.Timer(self.timeout, self._on_timeout)
self._timer.daemon = True
self._timer.start()

def stop(self):
if self._timer:
self._timer.cancel()
self._timer = None

def _on_timeout(self):
sys.stderr.write(f"\n[WATCHDOG] Hung at: '{self._label}' after {self.timeout}s\n")
sys.stderr.write(dump_all_stacks())
sys.stderr.flush()
os._exit(1) # Hard kill — works reliably in xdist workers


_watchdog = DeadlockWatchdog(TIMEOUT_SECONDS)


# Covers setup + call + teardown
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_protocol(item, nextitem):
_watchdog.start(item.nodeid)
try:
yield
finally:
_watchdog.stop()


# Separately watch session-scoped fixture setup
@pytest.hookimpl(hookwrapper=True)
def pytest_sessionstart(session):
_watchdog.start("session startup / session-scoped fixtures")
try:
yield
finally:
_watchdog.stop()
2 changes: 1 addition & 1 deletion integration/test_backup_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ def test_incremental_backup(client: weaviate.WeaviateClient, request: SubRequest
backend=BACKEND,
include_collections=["Article"],
wait_for_completion=True,
incremental_backup_base_id=base_backup_id,
incremental_base_backup_id=base_backup_id,
)

# remove existing class
Expand Down
33 changes: 1 addition & 32 deletions integration/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ def test_collection_name_capitalization(
client.collections.delete(name_big)


def test_client_cluster_with_lazy_shard_loading(
def test_client_cluster_without_lazy_shard_loading(
client: weaviate.WeaviateClient, request: SubRequest
) -> None:
try:
Expand All @@ -360,37 +360,6 @@ def test_client_cluster_with_lazy_shard_loading(
client.collections.delete(request.node.name)


def test_client_cluster_without_lazy_shard_loading(
client_factory: ClientFactory, request: SubRequest
) -> None:
client = client_factory(8090, 50061)

try:
collection = client.collections.create(
name=request.node.name, vectorizer_config=Configure.Vectorizer.none()
)

nodes = client.cluster.nodes(collection.name, output="verbose")
assert len(nodes) == 1
assert len(nodes[0].shards) == 1
assert nodes[0].shards[0].collection == collection.name
assert nodes[0].shards[0].object_count == 0
assert nodes[0].shards[0].vector_indexing_status in [
"READONLY",
"INDEXING",
"READY",
"LAZY_LOADING",
]
assert nodes[0].shards[0].vector_queue_length == 0
assert nodes[0].shards[0].compressed is False
if collection._connection._weaviate_version.is_lower_than(1, 25, 0):
assert nodes[0].shards[0].loaded is True
else:
assert nodes[0].shards[0].loaded is False
finally:
client.collections.delete(request.node.name)


def test_client_cluster_multitenant(client: weaviate.WeaviateClient, request: SubRequest) -> None:
try:
collection = client.collections.create(
Expand Down
4 changes: 3 additions & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@
addopts = -m 'not profiling' --benchmark-skip -l
markers =
profiling: marks tests that can be profiled
asyncio_default_fixture_loop_scope = function
asyncio_default_fixture_loop_scope = function
timeout = 300
timeout_method = thread
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pytest-cov==6.2.1
pytest-asyncio==1.3.0
pytest-benchmark==5.1.0
pytest-profiling==1.8.1
pytest-timeout==2.4.0
coverage==7.10.7
pytest-xdist==3.7.0
werkzeug==3.1.6
Expand Down
10 changes: 10 additions & 0 deletions weaviate/backup/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ def create(
wait_for_completion=wait_for_completion,
)

if (
incremental_base_backup_id is not None
and self._connection._weaviate_version.is_lower_than(1, 37, 0)
):
raise WeaviateUnsupportedFeatureError(
"Incremental backups",
str(self._connection._weaviate_version),
"1.37.0",
)

payload: dict = {
"id": backup_id,
"include": include_collections,
Expand Down
6 changes: 3 additions & 3 deletions weaviate/collections/aggregations/hybrid/async_.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class _HybridAsync(_HybridExecutor[ConnectionAsync]):
self,
query: Optional[str],
*,
alpha: NUMBER = 0.7,
alpha: Optional[NUMBER] = None,
vector: Optional[List[float]] = None,
query_properties: Optional[List[str]] = None,
object_limit: Optional[int] = None,
Expand All @@ -36,7 +36,7 @@ class _HybridAsync(_HybridExecutor[ConnectionAsync]):
self,
query: Optional[str],
*,
alpha: NUMBER = 0.7,
alpha: Optional[NUMBER] = None,
vector: Optional[List[float]] = None,
query_properties: Optional[List[str]] = None,
object_limit: Optional[int] = None,
Expand All @@ -53,7 +53,7 @@ class _HybridAsync(_HybridExecutor[ConnectionAsync]):
self,
query: Optional[str],
*,
alpha: NUMBER = 0.7,
alpha: Optional[NUMBER] = None,
vector: Optional[List[float]] = None,
query_properties: Optional[List[str]] = None,
object_limit: Optional[int] = None,
Expand Down
8 changes: 4 additions & 4 deletions weaviate/collections/aggregations/hybrid/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def hybrid(
self,
query: Optional[str],
*,
alpha: NUMBER = 0.7,
alpha: Optional[NUMBER] = None,
vector: Optional[List[float]] = None,
query_properties: Optional[List[str]] = None,
object_limit: Optional[int] = None,
Expand All @@ -40,7 +40,7 @@ def hybrid(
self,
query: Optional[str],
*,
alpha: NUMBER = 0.7,
alpha: Optional[NUMBER] = None,
vector: Optional[List[float]] = None,
query_properties: Optional[List[str]] = None,
object_limit: Optional[int] = None,
Expand All @@ -58,7 +58,7 @@ def hybrid(
self,
query: Optional[str],
*,
alpha: NUMBER = 0.7,
alpha: Optional[NUMBER] = None,
vector: Optional[List[float]] = None,
query_properties: Optional[List[str]] = None,
object_limit: Optional[int] = None,
Expand All @@ -75,7 +75,7 @@ def hybrid(
self,
query: Optional[str],
*,
alpha: NUMBER = 0.7,
alpha: Optional[NUMBER] = None,
vector: Optional[List[float]] = None,
query_properties: Optional[List[str]] = None,
object_limit: Optional[int] = None,
Expand Down
6 changes: 3 additions & 3 deletions weaviate/collections/aggregations/hybrid/sync.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class _Hybrid(_HybridExecutor[ConnectionSync]):
self,
query: Optional[str],
*,
alpha: NUMBER = 0.7,
alpha: Optional[NUMBER] = None,
vector: Optional[List[float]] = None,
query_properties: Optional[List[str]] = None,
object_limit: Optional[int] = None,
Expand All @@ -36,7 +36,7 @@ class _Hybrid(_HybridExecutor[ConnectionSync]):
self,
query: Optional[str],
*,
alpha: NUMBER = 0.7,
alpha: Optional[NUMBER] = None,
vector: Optional[List[float]] = None,
query_properties: Optional[List[str]] = None,
object_limit: Optional[int] = None,
Expand All @@ -53,7 +53,7 @@ class _Hybrid(_HybridExecutor[ConnectionSync]):
self,
query: Optional[str],
*,
alpha: NUMBER = 0.7,
alpha: Optional[NUMBER] = None,
vector: Optional[List[float]] = None,
query_properties: Optional[List[str]] = None,
object_limit: Optional[int] = None,
Expand Down
9 changes: 8 additions & 1 deletion weaviate/collections/batch/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,14 @@ async def __loop(self) -> None:
and not self.__is_shutting_down.is_set()
and not self.__is_oom.is_set()
):
await self.__reqs.put(None)
try:
await asyncio.wait_for(self.__reqs.put(None), timeout=60)
except asyncio.TimeoutError as e:
logger.warning(
"Batch queue is blocked for more than 60 seconds while trying to send shutdown signal. Exiting the loop"
)
self.__bg_exception = e
return
self.__sent_sentinel.set()
await asyncio.sleep(refresh_time)

Expand Down
9 changes: 8 additions & 1 deletion weaviate/collections/batch/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,14 @@ def __loop(self) -> None:
and not self.__is_shutting_down.is_set()
and not self.__is_oom.is_set()
):
self.__reqs.put(None)
try:
self.__reqs.put(None, timeout=60)
except Full as e:
logger.warning(
"Batch queue is blocked for more than 60 seconds while trying to send shutdown signal. Exiting the loop"
)
self.__bg_exception = e
return
self.__sent_sentinel.set()
time.sleep(refresh_time)

Expand Down
7 changes: 6 additions & 1 deletion weaviate/collections/grpc/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,11 +686,16 @@ def _parse_hybrid(
else:
vector_bytes = vector_bytes_tmp

use_alpha_param = self._weaviate_version.is_at_least(
1, 36, 6
) # TODO: change to 1.36.7 once it's released
return (
base_search_pb2.Hybrid(
properties=properties,
query=query,
alpha=float(alpha) if alpha is not None else None,
alpha=None if use_alpha_param else (alpha if alpha is not None else 0.7),
alpha_param=alpha if use_alpha_param else None,
use_alpha_param=use_alpha_param,
fusion_type=(
cast(
base_search_pb2.Hybrid.FusionType,
Expand Down
Loading
Loading