Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
0eef64e
Add client-side changes to handle new server-side batching in 1.33
tsmith023 Jul 31, 2025
7d13908
Update images in CI
tsmith023 Aug 5, 2025
ef790f0
Update 1.33 image in CI
tsmith023 Aug 5, 2025
3b4d99d
Alter test or lazy shard loading for new potential server behaviour
tsmith023 Aug 5, 2025
6dfd1d7
Change other lazy loading test too
tsmith023 Aug 5, 2025
689665b
Fix CI image for 1.33
tsmith023 Aug 5, 2025
c474309
Update protos, fix setting of batch client in wrapper to avoid races …
tsmith023 Aug 5, 2025
72353ef
Remove debug assert in test
tsmith023 Aug 5, 2025
63bbdd1
Update new batch to use different modes with server, update CI image
tsmith023 Aug 6, 2025
dd575ed
Refactor to changed server batching options
tsmith023 Aug 7, 2025
9f10a70
Throw error if using automatic batching with incompatible server
tsmith023 Aug 7, 2025
f4cd068
Add exponential backoff retry to stream reconnect method
tsmith023 Aug 7, 2025
8912f86
Remove timeout and retries from new grpc methods
tsmith023 Aug 7, 2025
450c010
Only delete key if present in dict
tsmith023 Aug 18, 2025
1ab42cd
Close before re-connecting, reset rec num objs on shutdown
tsmith023 Aug 19, 2025
d214507
Update to use latest protos and behaviour
tsmith023 Aug 20, 2025
e417701
Improve logging using .automatic()
tsmith023 Aug 26, 2025
3cfb194
Update CI image to latest server build
tsmith023 Aug 26, 2025
51359fe
Merge branch 'dev/1.33' of https://github.com/weaviate/weaviate-pytho…
tsmith023 Aug 26, 2025
e93f1ff
Fix testing issues with new versions
tsmith023 Aug 26, 2025
0ca3e00
Attempt fixes for tests again
tsmith023 Aug 26, 2025
d12d35f
Add ability to retry certain server-emitted full errors, e.g. tempora…
tsmith023 Aug 26, 2025
009f551
Attempt fixes of flakes
tsmith023 Aug 26, 2025
d41deda
Update to use latest server impl and CI image
tsmith023 Aug 27, 2025
d77ae10
Update to use latest dev server version
tsmith023 Aug 28, 2025
8f2f5f7
Rename from automatic to experimental, bump CI version to latest RC
tsmith023 Sep 4, 2025
d1a496b
Merge branch 'dev/1.33' of https://github.com/weaviate/weaviate-pytho…
tsmith023 Sep 4, 2025
1944626
Push ongoing changes
tsmith023 Sep 15, 2025
d2094fb
Merge branch 'main' of https://github.com/weaviate/weaviate-python-cl…
tsmith023 Sep 17, 2025
4842c66
Update to use latest server image
tsmith023 Sep 19, 2025
903e97b
Update to use latest server changes
tsmith023 Sep 24, 2025
40cebc8
Undo debug changes to conftest
tsmith023 Sep 24, 2025
f6e8c2e
Update to use latest server image
tsmith023 Oct 2, 2025
89700c4
Make internal send/recv queue size 1 and sleep while shutdown to avoi…
tsmith023 Oct 2, 2025
dbec613
Update to use latest server image
tsmith023 Oct 6, 2025
f81a69c
Fix shutting down message handling
tsmith023 Oct 6, 2025
6828be1
Skip backoff handling if client has closed the stream
tsmith023 Oct 6, 2025
36cce75
Remove unused code
tsmith023 Oct 7, 2025
b4cf0be
Don't print backoff adjustments when shutting down
tsmith023 Oct 7, 2025
c81e4c1
Improve shutting down log
tsmith023 Oct 7, 2025
9bfe3a1
Attempt to catch last req that can be lost during shutdown
tsmith023 Oct 7, 2025
0b01c64
Avoid circular import
tsmith023 Oct 7, 2025
ae34b47
Remove last_req wrapping logic from stream, reduce logging, update im…
tsmith023 Oct 7, 2025
43161c5
Close the client-side of the stream on shutdown, sleep for backoff du…
tsmith023 Oct 7, 2025
c02c73f
Update CI image
tsmith023 Oct 7, 2025
80b9d7d
Only log waiting for stream re-establishment once
tsmith023 Oct 7, 2025
0cf5ffe
Switch from arm to amd in CI
tsmith023 Oct 7, 2025
1dbcc19
Shutdown client-side stream regardless of size of __reqs queue
tsmith023 Oct 8, 2025
43553f2
Increase timeout when waiting for req to send, don't use queue size i…
tsmith023 Oct 8, 2025
2bf8b74
Use sentinel in req put/get to avoid inaccurate block timeouts
tsmith023 Oct 8, 2025
8546b3e
Update CI image
tsmith023 Oct 9, 2025
d098e45
Correctly populate batch.results
tsmith023 Oct 10, 2025
0182f3f
Update CI images
tsmith023 Oct 10, 2025
c2f99a7
Assert indexing status in one of the allowed values rather than a spe…
tsmith023 Oct 10, 2025
8198926
Undo debug changes in tests
tsmith023 Oct 10, 2025
4934b7c
Merge branch 'main' of https://github.com/weaviate/weaviate-python-cl…
tsmith023 Oct 10, 2025
ed38170
Update to match new server impl
tsmith023 Oct 13, 2025
8f5f1b1
Update to use latest server image
tsmith023 Oct 14, 2025
63717f5
Only start threads once to avoid runtime error when handling shutdown
tsmith023 Oct 15, 2025
f1d441b
Update CI images
tsmith023 Oct 15, 2025
92572d4
Hard-code SSB concurrency to 1 for now
tsmith023 Oct 16, 2025
cb8bbab
Fix collection.batch.automatic
dirkkul Oct 16, 2025
a9d8fc2
Correct logic in `_BgThreads.is_alive`
tsmith023 Oct 16, 2025
60cead8
Adjust default batch size to align with server default and avoid over…
tsmith023 Oct 16, 2025
3c0842e
Update CI images and version checks in tests
tsmith023 Oct 16, 2025
f60ba97
Update to use latest server behaviour around backoffs and uuid/err re…
tsmith023 Oct 17, 2025
b027030
Lock once when reading batch results from stream
tsmith023 Oct 20, 2025
76b758d
Interpret context canceled as ungraceful shutdown to be restarted by …
tsmith023 Oct 21, 2025
abb34c6
Use backoff message to adjust batch size
tsmith023 Oct 21, 2025
f400c3f
Start batching with smallest allowed server value
tsmith023 Oct 21, 2025
88ee682
Add extra log in batch send
tsmith023 Oct 21, 2025
0acafb0
Reintroduce timeout when getting from queue
tsmith023 Oct 21, 2025
56cdd2b
Add log to empty queue
tsmith023 Oct 21, 2025
0a2e402
Add log to batch recv restart
tsmith023 Oct 21, 2025
9510b59
Remove timeout when getting from internal queue
tsmith023 Oct 21, 2025
9523e4d
Only update batch size if value has changed
tsmith023 Oct 21, 2025
13ebff6
Track then log total number of objects pushed by client
tsmith023 Oct 21, 2025
fa2ef0d
WIP: receive shutdown as message and not rpc error
tsmith023 Oct 21, 2025
6acf173
Move result writing inside message.results case
tsmith023 Oct 21, 2025
04766fd
Add missing proto changes
tsmith023 Oct 21, 2025
4612966
Update CI image
tsmith023 Oct 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ concurrency:
env:
WEAVIATE_127: 1.27.27
WEAVIATE_128: 1.28.16
WEAVIATE_129: 1.29.9
WEAVIATE_130: 1.30.12
WEAVIATE_131: 1.31.5
WEAVIATE_129: 1.29.11
WEAVIATE_130: 1.30.18
WEAVIATE_131: 1.31.16
WEAVIATE_132: 1.32.5
WEAVIATE_133: 1.33.0-rc.1
WEAVIATE_133: 1.33.0
WEAVIATE_134: 1.34.0-rc.0-ba35f50.amd64

jobs:
lint-and-format:
Expand Down Expand Up @@ -152,11 +153,11 @@ jobs:
fail-fast: false
matrix:
versions: [
{ py: "3.9", weaviate: $WEAVIATE_131, grpc: "1.59.0"},
{ py: "3.10", weaviate: $WEAVIATE_131, grpc: "1.66.0"},
{ py: "3.11", weaviate: $WEAVIATE_131, grpc: "1.70.0"},
{ py: "3.12", weaviate: $WEAVIATE_131, grpc: "1.72.1"},
{ py: "3.13", weaviate: $WEAVIATE_131, grpc: "1.74.0"}
{ py: "3.9", weaviate: $WEAVIATE_132, grpc: "1.59.0"},
{ py: "3.10", weaviate: $WEAVIATE_132, grpc: "1.66.0"},
{ py: "3.11", weaviate: $WEAVIATE_132, grpc: "1.70.0"},
{ py: "3.12", weaviate: $WEAVIATE_132, grpc: "1.72.1"},
{ py: "3.13", weaviate: $WEAVIATE_132, grpc: "1.74.0"}
]
optional_dependencies: [false]
steps:
Expand Down Expand Up @@ -207,11 +208,11 @@ jobs:
fail-fast: false
matrix:
versions: [
{ py: "3.9", weaviate: $WEAVIATE_131},
{ py: "3.10", weaviate: $WEAVIATE_131},
{ py: "3.11", weaviate: $WEAVIATE_131},
{ py: "3.12", weaviate: $WEAVIATE_131},
{ py: "3.13", weaviate: $WEAVIATE_131}
{ py: "3.9", weaviate: $WEAVIATE_132},
{ py: "3.10", weaviate: $WEAVIATE_132},
{ py: "3.11", weaviate: $WEAVIATE_132},
{ py: "3.12", weaviate: $WEAVIATE_132},
{ py: "3.13", weaviate: $WEAVIATE_132}
]
optional_dependencies: [false]
steps:
Expand Down Expand Up @@ -302,7 +303,8 @@ jobs:
$WEAVIATE_130,
$WEAVIATE_131,
$WEAVIATE_132,
$WEAVIATE_133
$WEAVIATE_133,
$WEAVIATE_134
]
steps:
- name: Checkout
Expand Down
70 changes: 55 additions & 15 deletions integration/test_batch_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import weaviate
import weaviate.classes as wvc
from integration.conftest import _sanitize_collection_name
from weaviate import BatchClient, ClientBatchingContextManager
from weaviate import ClientBatchingContextManager
from weaviate.collections.classes.batch import Shard
from weaviate.collections.classes.config import (
Configure,
Expand Down Expand Up @@ -175,11 +175,11 @@ def test_add_reference(
client_factory: ClientFactory,
from_object_uuid: UUID,
to_object_uuid: UUID,
to_object_collection: Optional[bool],
to_object_collection: bool,
) -> None:
"""Test the `add_reference` method."""
client, name = client_factory()
with client.batch.fixed_size() as batch:
with client.batch.dynamic() as batch:
batch.add_object(
properties={},
collection=name,
Expand All @@ -194,8 +194,16 @@ def test_add_reference(
from_uuid=from_object_uuid,
from_collection=name,
from_property="test",
to=to_object_uuid,
to=ReferenceToMulti(target_collection=name, uuids=to_object_uuid)
if to_object_collection
else to_object_uuid,
)
assert len(client.batch.failed_objects) == 0, [
obj.message for obj in client.batch.failed_objects
]
assert len(client.batch.failed_references) == 0, [
ref.message for ref in client.batch.failed_references
]
objs = (
client.collections.use(name)
.query.fetch_objects(return_references=QueryReference(link_on="test"))
Expand Down Expand Up @@ -357,14 +365,16 @@ def test_add_ref_batch_with_tenant(client_factory: ClientFactory) -> None:
@pytest.mark.parametrize(
"batching_method",
[
lambda client: client.batch.dynamic(),
lambda client: client.batch.fixed_size(),
lambda client: client.batch.rate_limit(9999),
# lambda client: client.batch.dynamic(),
# lambda client: client.batch.fixed_size(),
# lambda client: client.batch.rate_limit(9999),
lambda client: client.batch.experimental(concurrency=1),
],
ids=[
"test_add_ten_thousand_data_objects_dynamic",
"test_add_ten_thousand_data_objects_fixed_size",
"test_add_ten_thousand_data_objects_rate_limit",
# "test_add_ten_thousand_data_objects_dynamic",
# "test_add_ten_thousand_data_objects_fixed_size",
# "test_add_ten_thousand_data_objects_rate_limit",
"test_add_ten_thousand_data_objects_experimental",
],
)
def test_add_ten_thousand_data_objects(
Expand All @@ -374,16 +384,31 @@ def test_add_ten_thousand_data_objects(
) -> None:
"""Test adding ten thousand data objects."""
client, name = client_factory()

nr_objects = 10000
if (
request.node.callspec.id == "test_add_ten_thousand_data_objects_experimental"
and client._connection._weaviate_version.is_lower_than(1, 34, 0)
):
pytest.skip("Server-side batching not supported in Weaviate < 1.34.0")
nr_objects = 100000
import time

start = time.time()
with batching_method(client) as batch:
for i in range(nr_objects):
batch.add_object(
collection=name,
properties={"name": "test" + str(i)},
)
objs = client.collections.use(name).query.fetch_objects(limit=nr_objects).objects
assert len(objs) == nr_objects
end = time.time()
print(f"Time taken to add {nr_objects} objects: {end - start} seconds")
assert len(client.batch.results.objs.errors) == 0
assert len(client.batch.results.objs.all_responses) == nr_objects
assert len(client.batch.results.objs.uuids) == nr_objects
assert len(client.collections.use(name)) == nr_objects
assert client.batch.results.objs.has_errors is False
assert len(client.batch.failed_objects) == 0, [
obj.message for obj in client.batch.failed_objects
]
client.collections.delete(name)


Expand Down Expand Up @@ -551,19 +576,28 @@ def test_add_1000_tenant_objects_with_async_indexing_and_wait_for_only_one(
lambda client: client.batch.dynamic(),
lambda client: client.batch.fixed_size(),
lambda client: client.batch.rate_limit(1000),
lambda client: client.batch.experimental(),
],
ids=[
"test_add_one_hundred_objects_and_references_between_all_dynamic",
"test_add_one_hundred_objects_and_references_between_all_fixed_size",
"test_add_one_hundred_objects_and_references_between_all_rate_limit",
"test_add_one_hundred_objects_and_references_between_all_experimental",
],
)
def test_add_one_object_and_a_self_reference(
client_factory: ClientFactory,
batching_method: Callable[[weaviate.WeaviateClient], ClientBatchingContextManager],
request: SubRequest,
) -> None:
"""Test adding one object and a self reference."""
client, name = client_factory()
if (
request.node.callspec.id
== "test_add_one_hundred_objects_and_references_between_all_experimental"
and client._connection._weaviate_version.is_lower_than(1, 34, 0)
):
pytest.skip("Server-side batching not supported in Weaviate < 1.34.0")
with batching_method(client) as batch:
uuid = batch.add_object(collection=name, properties={})
batch.add_reference(
Expand All @@ -586,7 +620,7 @@ def test_multi_threaded_batching(
nr_objects = 1000
nr_threads = 10

def batch_insert(batch: BatchClient) -> None:
def batch_insert(batch) -> None:
for i in range(nr_objects):
batch.add_object(
collection=name,
Expand Down Expand Up @@ -683,6 +717,12 @@ def test_batching_error_logs(
client_factory: ClientFactory, caplog: pytest.LogCaptureFixture
) -> None:
client, name = client_factory()
if client._connection._weaviate_version.is_at_least(
1, 32, 1
): # TODO: change to 1.33.0 when released
pytest.skip(
"Batching error logs do not get emitted by the new server-side batching functionality."
)
with client.batch.fixed_size() as batch:
for obj in [{"name": i} for i in range(100)]:
batch.add_object(properties=obj, collection=name)
Expand Down
23 changes: 12 additions & 11 deletions integration/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,12 @@ def test_client_cluster_with_lazy_shard_loading(
assert len(nodes[0].shards) == 1
assert nodes[0].shards[0].collection == collection.name
assert nodes[0].shards[0].object_count == 0
assert nodes[0].shards[0].vector_indexing_status == "READY"
assert nodes[0].shards[0].vector_indexing_status in [
"READONLY",
"INDEXING",
"READY",
"LAZY_LOADING",
]
assert nodes[0].shards[0].vector_queue_length == 0
assert nodes[0].shards[0].compressed is False
assert nodes[0].shards[0].loaded is True
Expand All @@ -358,12 +363,6 @@ def test_client_cluster_without_lazy_shard_loading(
) -> None:
client = client_factory(8090, 50061)

# Lazy-loading behaviour was changed in 1.32.4:
# https://github.com/weaviate/weaviate/pull/8829
#
# We also accept LOADING/READY because it may vary
# based on the machine running the tests.

try:
collection = client.collections.create(
name=request.node.name, vectorizer_config=Configure.Vectorizer.none()
Expand All @@ -374,10 +373,12 @@ def test_client_cluster_without_lazy_shard_loading(
assert len(nodes[0].shards) == 1
assert nodes[0].shards[0].collection == collection.name
assert nodes[0].shards[0].object_count == 0
if collection._connection._weaviate_version.is_lower_than(1, 32, 0):
assert nodes[0].shards[0].vector_indexing_status == "READY"
else:
assert nodes[0].shards[0].vector_indexing_status == "LAZY_LOADING"
assert nodes[0].shards[0].vector_indexing_status in [
"READONLY",
"INDEXING",
"READY",
"LAZY_LOADING",
]
assert nodes[0].shards[0].vector_queue_length == 0
assert nodes[0].shards[0].compressed is False
if collection._connection._weaviate_version.is_lower_than(1, 25, 0):
Expand Down
19 changes: 19 additions & 0 deletions integration/test_rbac.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from integration.conftest import ClientFactory, _sanitize_collection_name
from weaviate.auth import Auth
from weaviate.classes.rbac import Actions, Permissions, RoleScope
from weaviate.connect.helpers import connect_to_local
from weaviate.rbac.models import (
AliasPermissionOutput,
BackupsPermissionOutput,
Expand Down Expand Up @@ -734,3 +735,21 @@ def test_permission_joining(client_factory: ClientFactory) -> None:

finally:
client.roles.delete(role_name)


def test_server_side_batching_with_auth() -> None:
collection_name = "TestSSBAuth"
with connect_to_local(
port=RBAC_PORTS[0], grpc_port=RBAC_PORTS[1], auth_credentials=RBAC_AUTH_CREDS
) as client:
if client._connection._weaviate_version.is_lower_than(1, 34, 0):
pytest.skip("Server-side batching not supported in Weaviate < 1.34.0")
collection = client.collections.create(collection_name)
with client.batch.experimental() as batch:
batch.add_object(collection_name)
batch.add_object(collection_name)
batch.add_object(collection_name)
try:
assert len(collection) == 3
finally:
client.collections.delete(collection_name)
76 changes: 76 additions & 0 deletions profiling/test_shutdown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import json
import random
import time
import weaviate
import weaviate.classes.config as wvcc


def setup(client: weaviate.WeaviateClient, collection: str) -> weaviate.collections.Collection:
if client.collections.exists(collection):
client.collections.delete(collection)
return client.collections.create(
name=collection,
properties=[
wvcc.Property(
name="title",
data_type=wvcc.DataType.TEXT,
),
wvcc.Property(
name="content",
data_type=wvcc.DataType.TEXT,
),
],
replication_config=wvcc.Configure.replication(factor=3, async_enabled=True),
vector_config=wvcc.Configure.Vectors.self_provided(),
)


def import_(client: weaviate.WeaviateClient, collection: str, how_many: int = 1_000_000) -> None:
uuids: dict[str, int] = {}
with client.batch.experimental(concurrency=1) as batch:
for i in range(how_many):
uuid = batch.add_object(
collection=collection,
properties={
"title": f"Title {i}",
"content": f"Content {i}",
},
vector=random_vector(),
)
uuids[str(uuid)] = i
if batch.number_errors > 0:
print(f"There are some errors {batch.number_errors}")

for err in client.batch.failed_objects:
print(err.message)
assert len(client.batch.failed_objects) == 0, "Expected there to be no errors when importing"
client.batch.wait_for_vector_indexing()
with open("uuids.json", "w") as f:
json.dump(uuids, f)


def verify(client: weaviate.WeaviateClient, collection: str, expected: int = 1_000_000) -> None:
actual = 0
count = 0
c = client.collections.use(collection)
while actual != expected:
actual = len(c)
print(f"Found {actual} objects, waiting for async repl to reach {expected}...")
time.sleep(1)
count += 1
if count == 120:
break
assert actual == expected, f"Expected {expected} objects, found {actual}"


def random_vector() -> list[float]:
return [random.uniform(0, 1) for _ in range(128)]


def test_main() -> None:
collection = "BatchImportShutdownJourney"
how_many = 500000
with weaviate.connect_to_local() as client:
collection = setup(client, collection)
import_(client, collection.name, how_many)
verify(client, collection.name, how_many)
2 changes: 2 additions & 0 deletions weaviate/classes/data.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from weaviate.collections.classes.data import DataObject, DataReference
from weaviate.collections.classes.internal import ReferenceToMulti
from weaviate.collections.classes.types import GeoCoordinate, PhoneNumber

__all__ = [
"DataObject",
"DataReference",
"GeoCoordinate",
"PhoneNumber",
"ReferenceToMulti",
]
6 changes: 5 additions & 1 deletion weaviate/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ def __init__(
self.alias = _Alias(
self._connection,
)
self.batch = _BatchClientWrapper(self._connection, config=collections)
self.batch = _BatchClientWrapper(
self._connection,
config=collections,
consistency_level=None,
)
self.backup = _Backup(self._connection)
self.cluster = _Cluster(self._connection)
self.collections = collections
Expand Down
3 changes: 2 additions & 1 deletion weaviate/collections/batch/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
__all__ = ["_BatchClient", "_BatchCollection", "_BatchGRPC", "_BatchREST"]

from weaviate.collections.batch.grpc_batch import _BatchGRPC

from .client import _BatchClient
from .collection import _BatchCollection
from .grpc_batch_objects import _BatchGRPC
from .rest import _BatchREST
Loading
Loading