Skip to content
58 changes: 58 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,64 @@ client.collections.update("Songs_Alias", "PopSongs");
client.collections.delete("Songs_Alias");
```

### Managing collection backups

> [!CAUTION]
> Weaviate does not support concurrent backups. Await one backup's completion before starting another one.

```java
// Start a backup:
Backup backup = client.backup.create(
"backup_1", "filesystem",
bak -> bak
.includeCollections("Songs", "Artists")
.compressionLevel(CompressionLevel.BEST_COMPRESSION)
.cpuPercentage(30)
);

// By default, the client does not monitor the backup status.
// The above method returns as soon as the server acknowledges
// the request and starts to process it.
//
// Now you can poll backup status to know when it is succeedes (or fails).

Backup status = client.backup.getCreateStatus(backup.id(), backup.backend());
if (status.status() == BackupStatus.SUCCESSFUL) {
System.out.println("Yay!");
System.exit(0);
}

// Backups may take a write to complete. To block the current thread until
// the execution completes, call Backup::waitForCompletion(WeaviateClient).
//
// Notice that, while we use `backup` object we can also call it on the `status`,
// as both will have sufficient information to identify the backup operation.

try {
Backup completed = backup.waitForCompletion(client);
assert completed.errors() == null : "completed with errors";
} catch (TimeoutException e) {
System.out.exit(1);
}

// List exists backups:
List<Backup> allBackups = client.backup.list();

// Restore from the first backup:
var first = allBackups.getFirst();
client.backup.restore(first.id(), first.backend());

// Similarly, wait until the restore is complete using Backup::waitForCompletion.
// It is possible to set a custom timeout and polling interval using a familiar Tucked Builder pattern:

var restoring = client.backup.getRestoreStatus(first.id(), first.backend());
var restored = restoring.waitForCompletion(client, wait -> wait
.timeout(Duration.ofMinutes(30))
.interval(Duration.ofMinutes(5)));

assert restored.errors() == null : "restored with errors";
```

## Useful resources

- [Documentation](https://weaviate.io/developers/weaviate/current/client-libraries/java.html).
Expand Down
6 changes: 6 additions & 0 deletions src/it/java/io/weaviate/containers/Weaviate.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ public Builder withOffloadS3(String accessKey, String secretKey) {
return this;
}

public Builder withFilesystemBackup(String fsPath) {
addModules("backup-filesystem");
environment.put("BACKUP_FILESYSTEM_PATH", fsPath);
return this;
}

public Builder enableTelemetry(boolean enable) {
environment.put("DISABLE_TELEMETRY", Boolean.toString(!enable));
return this;
Expand Down
2 changes: 1 addition & 1 deletion src/it/java/io/weaviate/integration/AliasITest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import io.weaviate.containers.Container;

public class AliasITest extends ConcurrentTest {
private static WeaviateClient client = Container.WEAVIATE.getClient();
private static final WeaviateClient client = Container.WEAVIATE.getClient();

@Test
public void test_aliasLifecycle() throws IOException {
Expand Down
251 changes: 251 additions & 0 deletions src/it/java/io/weaviate/integration/BackupITest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
package io.weaviate.integration;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;

import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.Test;

import io.weaviate.ConcurrentTest;
import io.weaviate.client6.v1.api.WeaviateClient;
import io.weaviate.client6.v1.api.backup.Backup;
import io.weaviate.client6.v1.api.backup.BackupStatus;
import io.weaviate.client6.v1.api.backup.CompressionLevel;
import io.weaviate.client6.v1.api.collections.Property;
import io.weaviate.containers.Weaviate;

public class BackupITest extends ConcurrentTest {
private static final WeaviateClient client = Weaviate.custom()
.withFilesystemBackup("/tmp/backups").build()
.getClient();

@Test
public void test_lifecycle() throws IOException, TimeoutException {
// Arrange
String nsA = ns("A"), nsB = ns("B"), nsC = ns("C"), nsBig = ns("Big");
String backup_1 = ns("backup_1").toLowerCase();
String backend = "filesystem";

// Start writing data in the background so it's ready
// by the time we get to backup #3.
var spam = spamData(nsBig);

client.collections.create(nsA);
client.collections.create(nsB);
client.collections.create(nsC);

// Insert some data to check restore later
var collectionA = client.collections.use(nsA);
collectionA.data.insert(Map.of());

// Act: start backup
var started = client.backup.create(backup_1, backend,
backup -> backup
.includeCollections(nsA, nsB)
.compressionLevel(CompressionLevel.BEST_SPEED));

// Assert
Assertions.assertThat(started)
.as("created backup operation")
.returns(backup_1, Backup::id)
.returns(backend, Backup::backend)
.returns(BackupStatus.STARTED, Backup::status)
.returns(null, Backup::error)
.extracting(Backup::includesCollections, InstanceOfAssertFactories.list(String.class))
.containsOnly(nsA, nsB);

// Act: await backup competion
var completed = started.waitForCompletion(client);

// Assert
Assertions.assertThat(completed)
.as("await backup completion")
.returns(backup_1, Backup::id)
.returns(backend, Backup::backend)
.returns(BackupStatus.SUCCESS, Backup::status)
.returns(null, Backup::error);

// Act: create another backup
String backup_2 = ns("backup_2").toLowerCase();
client.backup.create(backup_2, backend).waitForCompletion(client);

// Assert: check the second backup is created successfully
var status_2 = client.backup.getCreateStatus(backup_2, backend);
Assertions.assertThat(status_2).as("backup #2").get()
.returns(BackupStatus.SUCCESS, Backup::status);

// Act: create and cancel
// Try to throttle this backup by creating a lot of objects,
// limiting CPU resources and requiring high compression ratio.
// This is to avoid flaky tests and make sure we can cancel
// the backup before it completes successfully.
String backup_3 = ns("backup_3").toLowerCase();
spam.join();
var cancelMe = client.backup.create(backup_3, backend,
backup -> backup
.includeCollections(nsA, nsB, nsC, nsBig)
.cpuPercentage(1)
.compressionLevel(CompressionLevel.BEST_COMPRESSION));
cancelMe.cancel(client);
cancelMe.waitForStatus(client, BackupStatus.CANCELED, wait -> wait.interval(500));

// Assert: check the backup is cancelled
var status_3 = client.backup.getCreateStatus(backup_3, backend);
Assertions.assertThat(status_3).as("backup #3").get()
.returns(BackupStatus.CANCELED, Backup::status);

// Assert: all 3 backups are present
var all = client.backup.list(backend);
Assertions.assertThat(all).as("all backups")
.extracting(Backup::id)
.contains(backup_1, backup_2, backup_3);

// Act: delete data and restore backup #1
client.collections.delete(nsA);
client.backup.restore(backup_1, backend, restore -> restore.includeCollections(nsA));

// Assert: object inserted in the beginning of the test is present
var restore_1 = client.backup.getRestoreStatus(backup_1, backend)
.orElseThrow().waitForCompletion(client);
Assertions.assertThat(restore_1).as("restore backup #1")
.returns(BackupStatus.SUCCESS, Backup::status);
Assertions.assertThat(collectionA.size()).as("after restore backup #1").isEqualTo(1);
}

@Test
public void test_lifecycle_async() throws ExecutionException, InterruptedException, Exception {
// Arrange
String nsA = ns("A"), nsB = ns("B"), nsC = ns("C"), nsBig = ns("Big");
String backup_1 = ns("backup_1").toLowerCase();
String backend = "filesystem";

try (final var async = client.async()) {
// Start writing data in the background so it's ready
// by the time we get to backup #3.
var spam = spamData(nsBig);

CompletableFuture.allOf(
async.collections.create(nsA),
async.collections.create(nsB),
async.collections.create(nsC))
.join();

// Insert some data to check restore later
var collectionA = async.collections.use(nsA);
collectionA.data.insert(Map.of()).join();

// Act: start backup
var started = async.backup.create(backup_1, backend,
backup -> backup
.includeCollections(nsA, nsB)
.compressionLevel(CompressionLevel.BEST_SPEED))
.join();

// Assert
Assertions.assertThat(started)
.as("created backup operation")
.returns(backup_1, Backup::id)
.returns(backend, Backup::backend)
.returns(BackupStatus.STARTED, Backup::status)
.returns(null, Backup::error)
.extracting(Backup::includesCollections, InstanceOfAssertFactories.list(String.class))
.containsOnly(nsA, nsB);

// Act: await backup competion
var completed = started.waitForCompletion(async).join();

// Assert
Assertions.assertThat(completed)
.as("await backup completion")
.returns(backup_1, Backup::id)
.returns(backend, Backup::backend)
.returns(BackupStatus.SUCCESS, Backup::status)
.returns(null, Backup::error);

// Act: create another backup
String backup_2 = ns("backup_2").toLowerCase();
async.backup.create(backup_2, backend)
.thenCompose(bak -> bak.waitForCompletion(async))
.join();

// Assert: check the second backup is created successfully
var status_2 = async.backup.getCreateStatus(backup_2, backend).join();
Assertions.assertThat(status_2).as("backup #2").get()
.returns(BackupStatus.SUCCESS, Backup::status);

// Act: create and cancel
// Try to throttle this backup by creating a lot of objects,
// limiting CPU resources and requiring high compression ratio.
// This is to avoid flaky tests and make sure we can cancel
// the backup before it completes successfully.
String backup_3 = ns("backup_3").toLowerCase();
spam.join();
async.backup.create(backup_3, backend,
backup -> backup
.includeCollections(nsA, nsB, nsC, nsBig)
.cpuPercentage(1)
.compressionLevel(CompressionLevel.BEST_COMPRESSION))
.thenCompose(cancelMe -> cancelMe.cancel(async)
.thenCompose(__ -> cancelMe.waitForStatus(async, BackupStatus.CANCELED,
wait -> wait.interval(500))))
.join();

// Assert: check the backup is cancelled
var status_3 = async.backup.getCreateStatus(backup_3, backend).join();
Assertions.assertThat(status_3).as("backup #3").get()
.returns(BackupStatus.CANCELED, Backup::status);

// Assert: all 3 backups are present
var all = async.backup.list(backend).join();
Assertions.assertThat(all).as("all backups")
.extracting(Backup::id)
.contains(backup_1, backup_2, backup_3);

// Act: delete data and restore backup #1
async.collections.delete(nsA).join();
async.backup.restore(backup_1, backend, restore -> restore.includeCollections(nsA)).join();

// Assert: object inserted in the beginning of the test is present
var restore_1 = async.backup.getRestoreStatus(backup_1, backend)
.thenCompose(bak -> bak.orElseThrow().waitForCompletion(async))
.join();
Assertions.assertThat(restore_1).as("restore backup #1")
.returns(BackupStatus.SUCCESS, Backup::status);
Assertions.assertThat(collectionA.size().join()).as("after restore backup #1").isEqualTo(1);
}
}

@Test(expected = IllegalStateException.class)
public void test_waitForCompletion_unknown() throws IOException, TimeoutException {
var backup = new Backup("#1", "/tmp/bak/#1", "filesystem", List.of("Things"), BackupStatus.STARTED, null,
null);
backup.waitForCompletion(client);
}

/** Write 10_000 entries with a UUID[10] property. */
private CompletableFuture<Void> spamData(String collectionName) {
return CompletableFuture.supplyAsync(() -> {
try {
client.collections.create(collectionName,
c -> c.properties(Property.uuidArray("uuids")));

var spam = client.collections.use(collectionName);
for (int i = 0; i < 10_000; i++) {
var uuids = IntStream.range(0, 10).mapToObj(j -> UUID.randomUUID()).toArray();
spam.data.insert(Map.of("uuids", uuids));
}
} catch (IOException e) {
throw new CompletionException(e);
}
return null;
});
}
}
2 changes: 1 addition & 1 deletion src/it/java/io/weaviate/integration/DataITest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.weaviate.containers.Container;

public class DataITest extends ConcurrentTest {
private static WeaviateClient client = Container.WEAVIATE.getClient();
private static final WeaviateClient client = Container.WEAVIATE.getClient();
private static final String COLLECTION = unique("Artists");
private static final String VECTOR_INDEX = "bring_your_own";

Expand Down
2 changes: 1 addition & 1 deletion src/it/java/io/weaviate/integration/ORMITest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.weaviate.containers.Container;

public class ORMITest extends ConcurrentTest {
private static WeaviateClient client = Container.WEAVIATE.getClient();
private static final WeaviateClient client = Container.WEAVIATE.getClient();

@Collection("ORMITestThings")
static record Thing(
Expand Down
2 changes: 1 addition & 1 deletion src/it/java/io/weaviate/integration/PaginationITest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.weaviate.containers.Container;

public class PaginationITest extends ConcurrentTest {
private static WeaviateClient client = Container.WEAVIATE.getClient();
private static final WeaviateClient client = Container.WEAVIATE.getClient();

@Test
public void testIterateAll() throws IOException {
Expand Down
2 changes: 1 addition & 1 deletion src/it/java/io/weaviate/integration/TenantsITest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class TenantsITest extends ConcurrentTest {
.build(),
Container.MINIO);

private static WeaviateClient client = compose.getClient();
private static final WeaviateClient client = compose.getClient();

@Test
public void test_tenantLifecycle() throws Exception {
Expand Down
Loading