diff --git a/README.md b/README.md index 247bbb8c..13301a70 100644 --- a/README.md +++ b/README.md @@ -721,6 +721,64 @@ client.collections.update("Songs_Alias", "PopSongs"); client.collections.delete("Songs_Alias"); ``` +### Managing collection backups + +> [!CAUTION] +> Weaviate does not support concurrent backups. Await one backup's completion before starting another one. + +```java +// Start a backup: +Backup backup = client.backup.create( + "backup_1", "filesystem", + bak -> bak + .includeCollections("Songs", "Artists") + .compressionLevel(CompressionLevel.BEST_COMPRESSION) + .cpuPercentage(30) +); + +// By default, the client does not monitor the backup status. +// The above method returns as soon as the server acknowledges +// the request and starts to process it. +// +// Now you can poll backup status to know when it is succeedes (or fails). + +Backup status = client.backup.getCreateStatus(backup.id(), backup.backend()); +if (status.status() == BackupStatus.SUCCESSFUL) { + System.out.println("Yay!"); + System.exit(0); +} + +// Backups may take a write to complete. To block the current thread until +// the execution completes, call Backup::waitForCompletion(WeaviateClient). +// +// Notice that, while we use `backup` object we can also call it on the `status`, +// as both will have sufficient information to identify the backup operation. + +try { + Backup completed = backup.waitForCompletion(client); + assert completed.errors() == null : "completed with errors"; +} catch (TimeoutException e) { + System.out.exit(1); +} + +// List exists backups: +List allBackups = client.backup.list(); + +// Restore from the first backup: +var first = allBackups.getFirst(); +client.backup.restore(first.id(), first.backend()); + +// Similarly, wait until the restore is complete using Backup::waitForCompletion. +// It is possible to set a custom timeout and polling interval using a familiar Tucked Builder pattern: + +var restoring = client.backup.getRestoreStatus(first.id(), first.backend()); +var restored = restoring.waitForCompletion(client, wait -> wait + .timeout(Duration.ofMinutes(30)) + .interval(Duration.ofMinutes(5))); + +assert restored.errors() == null : "restored with errors"; +``` + ### RBAC #### Roles diff --git a/src/it/java/io/weaviate/containers/Weaviate.java b/src/it/java/io/weaviate/containers/Weaviate.java index cc8a9604..a89714bd 100644 --- a/src/it/java/io/weaviate/containers/Weaviate.java +++ b/src/it/java/io/weaviate/containers/Weaviate.java @@ -133,6 +133,11 @@ public Builder withOffloadS3(String accessKey, String secretKey) { return this; } + public Builder withFilesystemBackup(String fsPath) { + addModules("backup-filesystem"); + environment.put("BACKUP_FILESYSTEM_PATH", fsPath); + return this; + } public Builder withAdminUsers(String... admins) { adminUsers.addAll(Arrays.asList(admins)); return this; diff --git a/src/it/java/io/weaviate/integration/AliasITest.java b/src/it/java/io/weaviate/integration/AliasITest.java index f5b1dd47..0fc40fc6 100644 --- a/src/it/java/io/weaviate/integration/AliasITest.java +++ b/src/it/java/io/weaviate/integration/AliasITest.java @@ -12,7 +12,7 @@ import io.weaviate.containers.Container; public class AliasITest extends ConcurrentTest { - private static WeaviateClient client = Container.WEAVIATE.getClient(); + private static final WeaviateClient client = Container.WEAVIATE.getClient(); @Test public void test_aliasLifecycle() throws IOException { diff --git a/src/it/java/io/weaviate/integration/BackupITest.java b/src/it/java/io/weaviate/integration/BackupITest.java new file mode 100644 index 00000000..67a7bcdf --- /dev/null +++ b/src/it/java/io/weaviate/integration/BackupITest.java @@ -0,0 +1,251 @@ +package io.weaviate.integration; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.stream.IntStream; + +import org.assertj.core.api.Assertions; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.Test; + +import io.weaviate.ConcurrentTest; +import io.weaviate.client6.v1.api.WeaviateClient; +import io.weaviate.client6.v1.api.backup.Backup; +import io.weaviate.client6.v1.api.backup.BackupStatus; +import io.weaviate.client6.v1.api.backup.CompressionLevel; +import io.weaviate.client6.v1.api.collections.Property; +import io.weaviate.containers.Weaviate; + +public class BackupITest extends ConcurrentTest { + private static final WeaviateClient client = Weaviate.custom() + .withFilesystemBackup("/tmp/backups").build() + .getClient(); + + @Test + public void test_lifecycle() throws IOException, TimeoutException { + // Arrange + String nsA = ns("A"), nsB = ns("B"), nsC = ns("C"), nsBig = ns("Big"); + String backup_1 = ns("backup_1").toLowerCase(); + String backend = "filesystem"; + + // Start writing data in the background so it's ready + // by the time we get to backup #3. + var spam = spamData(nsBig); + + client.collections.create(nsA); + client.collections.create(nsB); + client.collections.create(nsC); + + // Insert some data to check restore later + var collectionA = client.collections.use(nsA); + collectionA.data.insert(Map.of()); + + // Act: start backup + var started = client.backup.create(backup_1, backend, + backup -> backup + .includeCollections(nsA, nsB) + .compressionLevel(CompressionLevel.BEST_SPEED)); + + // Assert + Assertions.assertThat(started) + .as("created backup operation") + .returns(backup_1, Backup::id) + .returns(backend, Backup::backend) + .returns(BackupStatus.STARTED, Backup::status) + .returns(null, Backup::error) + .extracting(Backup::includesCollections, InstanceOfAssertFactories.list(String.class)) + .containsOnly(nsA, nsB); + + // Act: await backup competion + var completed = started.waitForCompletion(client); + + // Assert + Assertions.assertThat(completed) + .as("await backup completion") + .returns(backup_1, Backup::id) + .returns(backend, Backup::backend) + .returns(BackupStatus.SUCCESS, Backup::status) + .returns(null, Backup::error); + + // Act: create another backup + String backup_2 = ns("backup_2").toLowerCase(); + client.backup.create(backup_2, backend).waitForCompletion(client); + + // Assert: check the second backup is created successfully + var status_2 = client.backup.getCreateStatus(backup_2, backend); + Assertions.assertThat(status_2).as("backup #2").get() + .returns(BackupStatus.SUCCESS, Backup::status); + + // Act: create and cancel + // Try to throttle this backup by creating a lot of objects, + // limiting CPU resources and requiring high compression ratio. + // This is to avoid flaky tests and make sure we can cancel + // the backup before it completes successfully. + String backup_3 = ns("backup_3").toLowerCase(); + spam.join(); + var cancelMe = client.backup.create(backup_3, backend, + backup -> backup + .includeCollections(nsA, nsB, nsC, nsBig) + .cpuPercentage(1) + .compressionLevel(CompressionLevel.BEST_COMPRESSION)); + cancelMe.cancel(client); + cancelMe.waitForStatus(client, BackupStatus.CANCELED, wait -> wait.interval(500)); + + // Assert: check the backup is cancelled + var status_3 = client.backup.getCreateStatus(backup_3, backend); + Assertions.assertThat(status_3).as("backup #3").get() + .returns(BackupStatus.CANCELED, Backup::status); + + // Assert: all 3 backups are present + var all = client.backup.list(backend); + Assertions.assertThat(all).as("all backups") + .extracting(Backup::id) + .contains(backup_1, backup_2, backup_3); + + // Act: delete data and restore backup #1 + client.collections.delete(nsA); + client.backup.restore(backup_1, backend, restore -> restore.includeCollections(nsA)); + + // Assert: object inserted in the beginning of the test is present + var restore_1 = client.backup.getRestoreStatus(backup_1, backend) + .orElseThrow().waitForCompletion(client); + Assertions.assertThat(restore_1).as("restore backup #1") + .returns(BackupStatus.SUCCESS, Backup::status); + Assertions.assertThat(collectionA.size()).as("after restore backup #1").isEqualTo(1); + } + + @Test + public void test_lifecycle_async() throws ExecutionException, InterruptedException, Exception { + // Arrange + String nsA = ns("A"), nsB = ns("B"), nsC = ns("C"), nsBig = ns("Big"); + String backup_1 = ns("backup_1").toLowerCase(); + String backend = "filesystem"; + + try (final var async = client.async()) { + // Start writing data in the background so it's ready + // by the time we get to backup #3. + var spam = spamData(nsBig); + + CompletableFuture.allOf( + async.collections.create(nsA), + async.collections.create(nsB), + async.collections.create(nsC)) + .join(); + + // Insert some data to check restore later + var collectionA = async.collections.use(nsA); + collectionA.data.insert(Map.of()).join(); + + // Act: start backup + var started = async.backup.create(backup_1, backend, + backup -> backup + .includeCollections(nsA, nsB) + .compressionLevel(CompressionLevel.BEST_SPEED)) + .join(); + + // Assert + Assertions.assertThat(started) + .as("created backup operation") + .returns(backup_1, Backup::id) + .returns(backend, Backup::backend) + .returns(BackupStatus.STARTED, Backup::status) + .returns(null, Backup::error) + .extracting(Backup::includesCollections, InstanceOfAssertFactories.list(String.class)) + .containsOnly(nsA, nsB); + + // Act: await backup competion + var completed = started.waitForCompletion(async).join(); + + // Assert + Assertions.assertThat(completed) + .as("await backup completion") + .returns(backup_1, Backup::id) + .returns(backend, Backup::backend) + .returns(BackupStatus.SUCCESS, Backup::status) + .returns(null, Backup::error); + + // Act: create another backup + String backup_2 = ns("backup_2").toLowerCase(); + async.backup.create(backup_2, backend) + .thenCompose(bak -> bak.waitForCompletion(async)) + .join(); + + // Assert: check the second backup is created successfully + var status_2 = async.backup.getCreateStatus(backup_2, backend).join(); + Assertions.assertThat(status_2).as("backup #2").get() + .returns(BackupStatus.SUCCESS, Backup::status); + + // Act: create and cancel + // Try to throttle this backup by creating a lot of objects, + // limiting CPU resources and requiring high compression ratio. + // This is to avoid flaky tests and make sure we can cancel + // the backup before it completes successfully. + String backup_3 = ns("backup_3").toLowerCase(); + spam.join(); + async.backup.create(backup_3, backend, + backup -> backup + .includeCollections(nsA, nsB, nsC, nsBig) + .cpuPercentage(1) + .compressionLevel(CompressionLevel.BEST_COMPRESSION)) + .thenCompose(cancelMe -> cancelMe.cancel(async) + .thenCompose(__ -> cancelMe.waitForStatus(async, BackupStatus.CANCELED, + wait -> wait.interval(500)))) + .join(); + + // Assert: check the backup is cancelled + var status_3 = async.backup.getCreateStatus(backup_3, backend).join(); + Assertions.assertThat(status_3).as("backup #3").get() + .returns(BackupStatus.CANCELED, Backup::status); + + // Assert: all 3 backups are present + var all = async.backup.list(backend).join(); + Assertions.assertThat(all).as("all backups") + .extracting(Backup::id) + .contains(backup_1, backup_2, backup_3); + + // Act: delete data and restore backup #1 + async.collections.delete(nsA).join(); + async.backup.restore(backup_1, backend, restore -> restore.includeCollections(nsA)).join(); + + // Assert: object inserted in the beginning of the test is present + var restore_1 = async.backup.getRestoreStatus(backup_1, backend) + .thenCompose(bak -> bak.orElseThrow().waitForCompletion(async)) + .join(); + Assertions.assertThat(restore_1).as("restore backup #1") + .returns(BackupStatus.SUCCESS, Backup::status); + Assertions.assertThat(collectionA.size().join()).as("after restore backup #1").isEqualTo(1); + } + } + + @Test(expected = IllegalStateException.class) + public void test_waitForCompletion_unknown() throws IOException, TimeoutException { + var backup = new Backup("#1", "/tmp/bak/#1", "filesystem", List.of("Things"), BackupStatus.STARTED, null, + null); + backup.waitForCompletion(client); + } + + /** Write 10_000 entries with a UUID[10] property. */ + private CompletableFuture spamData(String collectionName) { + return CompletableFuture.supplyAsync(() -> { + try { + client.collections.create(collectionName, + c -> c.properties(Property.uuidArray("uuids"))); + + var spam = client.collections.use(collectionName); + for (int i = 0; i < 10_000; i++) { + var uuids = IntStream.range(0, 10).mapToObj(j -> UUID.randomUUID()).toArray(); + spam.data.insert(Map.of("uuids", uuids)); + } + } catch (IOException e) { + throw new CompletionException(e); + } + return null; + }); + } +} diff --git a/src/it/java/io/weaviate/integration/DataITest.java b/src/it/java/io/weaviate/integration/DataITest.java index c679f7a7..7eb176aa 100644 --- a/src/it/java/io/weaviate/integration/DataITest.java +++ b/src/it/java/io/weaviate/integration/DataITest.java @@ -29,7 +29,7 @@ import io.weaviate.containers.Container; public class DataITest extends ConcurrentTest { - private static WeaviateClient client = Container.WEAVIATE.getClient(); + private static final WeaviateClient client = Container.WEAVIATE.getClient(); private static final String COLLECTION = unique("Artists"); private static final String VECTOR_INDEX = "bring_your_own"; diff --git a/src/it/java/io/weaviate/integration/ORMITest.java b/src/it/java/io/weaviate/integration/ORMITest.java index 16b5b371..d708a25f 100644 --- a/src/it/java/io/weaviate/integration/ORMITest.java +++ b/src/it/java/io/weaviate/integration/ORMITest.java @@ -23,7 +23,7 @@ import io.weaviate.containers.Container; public class ORMITest extends ConcurrentTest { - private static WeaviateClient client = Container.WEAVIATE.getClient(); + private static final WeaviateClient client = Container.WEAVIATE.getClient(); @Collection("ORMITestThings") static record Thing( diff --git a/src/it/java/io/weaviate/integration/PaginationITest.java b/src/it/java/io/weaviate/integration/PaginationITest.java index 3cb3d878..b4b02d29 100644 --- a/src/it/java/io/weaviate/integration/PaginationITest.java +++ b/src/it/java/io/weaviate/integration/PaginationITest.java @@ -25,7 +25,7 @@ import io.weaviate.containers.Container; public class PaginationITest extends ConcurrentTest { - private static WeaviateClient client = Container.WEAVIATE.getClient(); + private static final WeaviateClient client = Container.WEAVIATE.getClient(); @Test public void testIterateAll() throws IOException { diff --git a/src/it/java/io/weaviate/integration/TenantsITest.java b/src/it/java/io/weaviate/integration/TenantsITest.java index 7c17cf54..c765ae08 100644 --- a/src/it/java/io/weaviate/integration/TenantsITest.java +++ b/src/it/java/io/weaviate/integration/TenantsITest.java @@ -18,7 +18,7 @@ public class TenantsITest extends ConcurrentTest { .build(), Container.MINIO); - private static WeaviateClient client = compose.getClient(); + private static final WeaviateClient client = compose.getClient(); @Test public void test_tenantLifecycle() throws Exception { diff --git a/src/main/java/io/weaviate/client6/v1/api/WeaviateClient.java b/src/main/java/io/weaviate/client6/v1/api/WeaviateClient.java index 910016ac..9bf44552 100644 --- a/src/main/java/io/weaviate/client6/v1/api/WeaviateClient.java +++ b/src/main/java/io/weaviate/client6/v1/api/WeaviateClient.java @@ -4,6 +4,7 @@ import java.util.function.Function; import io.weaviate.client6.v1.api.alias.WeaviateAliasClient; +import io.weaviate.client6.v1.api.backup.WeaviateBackupClient; import io.weaviate.client6.v1.api.collections.WeaviateCollectionsClient; import io.weaviate.client6.v1.api.rbac.groups.WeaviateGroupsClient; import io.weaviate.client6.v1.api.rbac.roles.WeaviateRolesClient; @@ -34,6 +35,9 @@ public class WeaviateClient implements AutoCloseable { /** Client for {@code /aliases} endpoints for managing collection aliases. */ public final WeaviateAliasClient alias; + /** Client for {@code /backups} endpoints for managing backups. */ + public final WeaviateBackupClient backup; + /** * Client for {@code /authz/roles} endpoints for managing RBAC roles. */ @@ -99,6 +103,7 @@ public WeaviateClient(Config config) { this.restTransport = _restTransport; this.grpcTransport = new DefaultGrpcTransport(grpcOpt); this.alias = new WeaviateAliasClient(restTransport); + this.backup = new WeaviateBackupClient(restTransport); this.collections = new WeaviateCollectionsClient(restTransport, grpcTransport); this.roles = new WeaviateRolesClient(restTransport); this.groups = new WeaviateGroupsClient(restTransport); diff --git a/src/main/java/io/weaviate/client6/v1/api/WeaviateClientAsync.java b/src/main/java/io/weaviate/client6/v1/api/WeaviateClientAsync.java index 0783c574..8a797f0f 100644 --- a/src/main/java/io/weaviate/client6/v1/api/WeaviateClientAsync.java +++ b/src/main/java/io/weaviate/client6/v1/api/WeaviateClientAsync.java @@ -5,6 +5,7 @@ import java.util.function.Function; import io.weaviate.client6.v1.api.alias.WeaviateAliasClientAsync; +import io.weaviate.client6.v1.api.backup.WeaviateBackupClientAsync; import io.weaviate.client6.v1.api.collections.WeaviateCollectionsClient; import io.weaviate.client6.v1.api.collections.WeaviateCollectionsClientAsync; import io.weaviate.client6.v1.api.rbac.groups.WeaviateGroupsClientAsync; @@ -33,6 +34,9 @@ public class WeaviateClientAsync implements AutoCloseable { /** Client for {@code /aliases} endpoints for managing collection aliases. */ public final WeaviateAliasClientAsync alias; + /** Client for {@code /backups} endpoints for managing backups. */ + public final WeaviateBackupClientAsync backup; + /** * Client for {@code /authz/roles} endpoints for managing RBAC roles. */ @@ -102,6 +106,7 @@ public WeaviateClientAsync(Config config) { this.restTransport = _restTransport; this.grpcTransport = new DefaultGrpcTransport(grpcOpt); this.alias = new WeaviateAliasClientAsync(restTransport); + this.backup = new WeaviateBackupClientAsync(restTransport); this.roles = new WeaviateRolesClientAsync(restTransport); this.groups = new WeaviateGroupsClientAsync(restTransport); this.users = new WeaviateUsersClientAsync(restTransport); diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java b/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java new file mode 100644 index 00000000..3f0a7a22 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java @@ -0,0 +1,209 @@ +package io.weaviate.client6.v1.api.backup; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; +import java.util.function.Supplier; + +import com.google.gson.annotations.SerializedName; + +import io.weaviate.client6.v1.api.WeaviateClient; +import io.weaviate.client6.v1.api.WeaviateClientAsync; +import io.weaviate.client6.v1.internal.ObjectBuilder; + +public record Backup( + /** Backup ID. */ + @SerializedName("id") String id, + /** Path to backup in the backend storage. */ + @SerializedName("path") String path, + /** Backup storage backend. */ + @SerializedName("backend") String backend, + /** Collections included in the backup. */ + @SerializedName("classes") List includesCollections, + /** Backup creation / restoration status. */ + @SerializedName("status") BackupStatus status, + /** Backup creation / restoration error. */ + @SerializedName("error") String error, + /** + * This value indicates if a backup is being created or restored from. + * For operations like LIST this value is null. + */ + // We set a bogus SerializedName to make sure we do not pick up this + // value from the JSON by accident, but always set it ourselves. + @SerializedName("__operation__") Operation operation) { + + /** Set operation associated with this backup. */ + public Backup withOperation(Operation operation) { + return new Backup(id, path, backend, includesCollections, status, error, operation); + } + + public enum Operation { + CREATE, RESTORE; + } + + /** + * Block until the backup has been created / restored successfully. + * + * @param client Weaviate client. Make sure {@link WeaviateClient#close} + * is NOT called before this method returns. + * @throws IllegalStateException if {@link #operation} is not set (null). + * @throws TimeoutException in case the wait times out without reaching + * BackupStatus.SUCCESS. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Backup waitForCompletion(WeaviateClient client) throws IOException, TimeoutException { + return waitForStatus(client, BackupStatus.SUCCESS); + } + + /** + * Block until the backup has been created / restored successfully. + * + * @param client Weaviate client. Make sure {@link WeaviateClient#close} + * is NOT called before this method returns. + * @param fn Lambda expression for optional parameters. + * @throws IllegalStateException if {@link #operation} is not set (null). + * @throws TimeoutException in case the wait times out without reaching + * BackupStatus.SUCCESS. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Backup waitForCompletion(WeaviateClient client, Function> fn) + throws IOException, TimeoutException { + return waitForStatus(client, BackupStatus.SUCCESS, fn); + } + + /** + * Block until the backup operation reaches a certain status. + * + * @param client Weaviate client. Make sure {@link WeaviateClient#close} + * is NOT called before this method returns. + * @param status Target status. + * @throws IllegalStateException if {@link #operation} is not set (null). + * @throws TimeoutException in case the wait times out without reaching + * the target status. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Backup waitForStatus(WeaviateClient client, BackupStatus status) throws IOException, TimeoutException { + return waitForStatus(client, status, ObjectBuilder.identity()); + } + + /** + * Block until the backup operation reaches a certain status. + * + * @param client Weaviate client. Make sure {@link WeaviateClient#close} + * is NOT called before this method returns. + * @param status Target status. + * @param fn Lambda expression for optional parameters. + * @throws IllegalStateException if {@link #operation} is not set (null). + * @throws TimeoutException in case the wait times out without reaching + * the target status. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Backup waitForStatus(WeaviateClient client, BackupStatus status, + Function> fn) throws IOException, TimeoutException { + if (operation == null) { + throw new IllegalStateException("backup.operation is null"); + } + + final var options = WaitOptions.of(fn); + final Callable> poll = operation == Operation.CREATE + ? () -> client.backup.getCreateStatus(id, backend) + : () -> client.backup.getRestoreStatus(id, backend); + return new Waiter(this, options).waitForStatus(status, poll); + } + + /** + * Cancel backup creation. + * + *

+ * This method cannot be called to cancel backup restore. + * + * @param client Weaviate client. Make sure {@link WeaviateClient#close} + * is NOT called before this method returns. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public void cancel(WeaviateClient client) throws IOException { + client.backup.cancel(id(), backend()); + } + + /** + * Poll until backup's been created / restored successfully. + * + * @param client Weaviate client. Make sure {@link WeaviateClientAsync#close} + * is NOT called before this method returns. + * @throws IllegalStateException if {@link #operation} is not set (null). + */ + public CompletableFuture waitForCompletion(WeaviateClientAsync client) { + return waitForStatus(client, BackupStatus.SUCCESS); + } + + /** + * Poll until backup's been created / restored successfully. + * + * @param client Weaviate client. Make sure {@link WeaviateClientAsync#close} + * is NOT called before this method returns. + * @param fn Lambda expression for optional parameters. + */ + public CompletableFuture waitForCompletion(WeaviateClientAsync client, + Function> fn) { + return waitForStatus(client, BackupStatus.SUCCESS, fn); + } + + /** + * Poll until backup reaches a certain status or the wait times out. + * + * @param client Weaviate client. Make sure {@link WeaviateClientAsync#close} + * is NOT called before this method returns. + * @param status Target status. + */ + public CompletableFuture waitForStatus(WeaviateClientAsync client, BackupStatus status) { + return waitForStatus(client, status, ObjectBuilder.identity()); + } + + /** + * Poll until backup reaches a certain status or the wait times out. + * + * @param client Weaviate client. Make sure {@link WeaviateClientAsync#close} + * is NOT called before this method returns. + * @param status Target status. + * @param fn Lambda expression for optional parameters. + */ + public CompletableFuture waitForStatus(WeaviateClientAsync client, BackupStatus status, + Function> fn) { + if (operation == null) { + throw new IllegalStateException("backup.operation is null"); + } + + final var options = WaitOptions.of(fn); + final Supplier>> poll = operation == Operation.CREATE + ? () -> client.backup.getCreateStatus(id, backend) + : () -> client.backup.getRestoreStatus(id, backend); + return new Waiter(this, options).waitForStatusAsync(status, poll); + } + + /** + * Cancel backup creation. + * + *

+ * This method cannot be called to cancel backup restore. + * + * @param client Weaviate client. Make sure {@link WeaviateClientAsync#close} + * is NOT called before this method returns. + */ + public CompletableFuture cancel(WeaviateClientAsync client) { + return client.backup.cancel(id(), backend()); + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/BackupStatus.java b/src/main/java/io/weaviate/client6/v1/api/backup/BackupStatus.java new file mode 100644 index 00000000..74d4ff30 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/BackupStatus.java @@ -0,0 +1,24 @@ +package io.weaviate.client6.v1.api.backup; + +import com.google.gson.annotations.SerializedName; + +public enum BackupStatus { + /** Backup creation / restoration has begun. */ + @SerializedName("STARTED") + STARTED, + /** Backup in progress, data is being transferred. */ + @SerializedName("TRANSFERRING") + TRANSFERRING, + /** Backup creation / restoration completed successfully. */ + @SerializedName("SUCCESS") + SUCCESS, + /** Backup creation / restoration failed. */ + @SerializedName("FAILED") + FAILED, + /** + * Backup creation canceled. + * This status is never returned for backup restorations. + */ + @SerializedName("CANCELED") + CANCELED; +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/CancelBackupRequest.java b/src/main/java/io/weaviate/client6/v1/api/backup/CancelBackupRequest.java new file mode 100644 index 00000000..9c311f9c --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/CancelBackupRequest.java @@ -0,0 +1,14 @@ +package io.weaviate.client6.v1.api.backup; + +import java.util.Collections; + +import io.weaviate.client6.v1.internal.rest.Endpoint; +import io.weaviate.client6.v1.internal.rest.SimpleEndpoint; + +public record CancelBackupRequest(String backupId, String backend) { + + public static Endpoint _ENDPOINT = SimpleEndpoint.sideEffect( + request -> "DELETE", + request -> "/backups/" + request.backend + "/" + request.backupId, + request -> Collections.emptyMap()); +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/CompressionLevel.java b/src/main/java/io/weaviate/client6/v1/api/backup/CompressionLevel.java new file mode 100644 index 00000000..1a6cd000 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/CompressionLevel.java @@ -0,0 +1,15 @@ +package io.weaviate.client6.v1.api.backup; + +import com.google.gson.annotations.SerializedName; + +public enum CompressionLevel { + /** Use default compression algorithm (gzip). */ + @SerializedName("DefaultCompression") + DEFAULT, + /** Use compression algorithm that prioritizes speed. */ + @SerializedName("BestSpeed") + BEST_SPEED, + /** Use compression algorithm that prioritizes compression quality. */ + @SerializedName("BestCompression") + BEST_COMPRESSION; +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupRequest.java b/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupRequest.java new file mode 100644 index 00000000..9db358eb --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupRequest.java @@ -0,0 +1,144 @@ +package io.weaviate.client6.v1.api.backup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +import com.google.gson.annotations.SerializedName; + +import io.weaviate.client6.v1.internal.ObjectBuilder; +import io.weaviate.client6.v1.internal.json.JSON; +import io.weaviate.client6.v1.internal.rest.Endpoint; +import io.weaviate.client6.v1.internal.rest.SimpleEndpoint; + +public record CreateBackupRequest(BackupCreate body, String backend) { + + public static Endpoint _ENDPOINT = new SimpleEndpoint<>( + request -> "POST", + request -> "/backups/" + request.backend, + request -> Collections.emptyMap(), + request -> JSON.serialize(request.body), + (statusCode, response) -> JSON.deserialize(response, Backup.class)); + + public static record BackupCreate( + @SerializedName("id") String id, + @SerializedName("include") List includeCollections, + @SerializedName("exclude") List excludeCollections, + @SerializedName("config") Config config) { + + private static record Config( + @SerializedName("CPUPercentage") Integer cpuPercentage, + @SerializedName("ChunkSize") Integer chunkSize, + @SerializedName("CompressionLevel") CompressionLevel compressionLevel, + @SerializedName("Bucket") String bucket, + @SerializedName("Path") String path) { + } + + public static BackupCreate of(String backupId) { + return of(backupId, ObjectBuilder.identity()); + } + + public static BackupCreate of(String backupId, Function> fn) { + return fn.apply(new Builder(backupId)).build(); + } + + public BackupCreate(Builder builder) { + this( + builder.backupId, + builder.includeCollections, + builder.excludeCollections, + new Config( + builder.cpuPercentage, + builder.chunkSize, + builder.compressionLevel, + builder.bucket, + builder.path)); + } + + public static class Builder implements ObjectBuilder { + private final String backupId; + + private Integer cpuPercentage; + private Integer chunkSize; + private CompressionLevel compressionLevel; + private String bucket; + private String path; + private final List includeCollections = new ArrayList<>(); + private final List excludeCollections = new ArrayList<>(); + + public Builder(String backupId) { + this.backupId = backupId; + } + + /** Collection that should be included in the backup. */ + public Builder includeCollections(String... includeCollections) { + return includeCollections(Arrays.asList(includeCollections)); + } + + /** Collection that should be included in the backup. */ + public Builder includeCollections(List includeCollections) { + this.includeCollections.addAll(includeCollections); + return this; + } + + /** Collection that should be excluded from the backup. */ + public Builder excludeCollections(String... excludeCollections) { + return excludeCollections(Arrays.asList(excludeCollections)); + } + + /** Collection that should be excluded from the backup. */ + public Builder excludeCollections(List excludeCollections) { + this.excludeCollections.addAll(excludeCollections); + return this; + } + + /** + * Set the desired CPU core utilization. + * + * @param cpuPercentage Percent value of the target CPU utilization (1% to 80%). + */ + public Builder cpuPercentage(int cpuPercentage) { + this.cpuPercentage = cpuPercentage; + return this; + } + + /** + * Set the desired chunk size. Defaults to 128MB. + * + * @param chunkSize Chunk size in MB (2MB to 512 MB). + */ + public Builder chunkSize(int chunkSize) { + this.chunkSize = chunkSize; + return this; + } + + /** Adjust the parameters of the selected compression algorithm. */ + public Builder compressionLevel(CompressionLevel compressionLevel) { + this.compressionLevel = compressionLevel; + return this; + } + + /** + * Set the bucket where backups are stored. + * Applicable for cloud storage backends. + */ + public Builder bucket(String bucket) { + this.bucket = bucket; + return this; + } + + /** Override default backup location. */ + public Builder path(String path) { + this.path = path; + return this; + } + + @Override + public BackupCreate build() { + return new BackupCreate(this); + } + } + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/GetCreateStatusRequest.java b/src/main/java/io/weaviate/client6/v1/api/backup/GetCreateStatusRequest.java new file mode 100644 index 00000000..52d23d19 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/GetCreateStatusRequest.java @@ -0,0 +1,15 @@ +package io.weaviate.client6.v1.api.backup; + +import java.util.Collections; +import java.util.Optional; + +import io.weaviate.client6.v1.internal.rest.Endpoint; +import io.weaviate.client6.v1.internal.rest.OptionalEndpoint; + +public record GetCreateStatusRequest(String backupId, String backend) { + public static final Endpoint> _ENDPOINT = OptionalEndpoint.noBodyOptional( + request -> "GET", + request -> "/backups/" + request.backend + "/" + request.backupId, + request -> Collections.emptyMap(), + Backup.class); +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/GetRestoreStatusRequest.java b/src/main/java/io/weaviate/client6/v1/api/backup/GetRestoreStatusRequest.java new file mode 100644 index 00000000..f4430b80 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/GetRestoreStatusRequest.java @@ -0,0 +1,15 @@ +package io.weaviate.client6.v1.api.backup; + +import java.util.Collections; +import java.util.Optional; + +import io.weaviate.client6.v1.internal.rest.Endpoint; +import io.weaviate.client6.v1.internal.rest.OptionalEndpoint; + +public record GetRestoreStatusRequest(String backupId, String backend) { + public static final Endpoint> _ENDPOINT = OptionalEndpoint.noBodyOptional( + request -> "GET", + request -> "/backups/" + request.backend + "/" + request.backupId + "/restore", + request -> Collections.emptyMap(), + Backup.class); +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/ListBackupsRequest.java b/src/main/java/io/weaviate/client6/v1/api/backup/ListBackupsRequest.java new file mode 100644 index 00000000..ea3e497a --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/ListBackupsRequest.java @@ -0,0 +1,21 @@ +package io.weaviate.client6.v1.api.backup; + +import java.util.Collections; +import java.util.List; + +import com.google.gson.reflect.TypeToken; + +import io.weaviate.client6.v1.internal.json.JSON; +import io.weaviate.client6.v1.internal.rest.Endpoint; +import io.weaviate.client6.v1.internal.rest.SimpleEndpoint; + +public record ListBackupsRequest(String backend) { + + @SuppressWarnings("unchecked") + public static Endpoint> _ENDPOINT = SimpleEndpoint.noBody( + request -> "GET", + request -> "/backups/" + request.backend, + request -> Collections.emptyMap(), + (statusCode, response) -> (List) JSON.deserialize( + response, TypeToken.getParameterized(List.class, Backup.class))); +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/RbacRestoreOption.java b/src/main/java/io/weaviate/client6/v1/api/backup/RbacRestoreOption.java new file mode 100644 index 00000000..b2ba14d6 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/RbacRestoreOption.java @@ -0,0 +1,13 @@ +package io.weaviate.client6.v1.api.backup; + +import com.google.gson.annotations.SerializedName; + +/** Controls which RBAC objects (users, roles) get restored. */ +public enum RbacRestoreOption { + /** Do not restore any objects. */ + @SerializedName("noRestore") + NONE, + /** Restore all objects. */ + @SerializedName("all") + ALL; +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupRequest.java b/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupRequest.java new file mode 100644 index 00000000..bc017d8e --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupRequest.java @@ -0,0 +1,145 @@ +package io.weaviate.client6.v1.api.backup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +import com.google.gson.annotations.SerializedName; + +import io.weaviate.client6.v1.internal.ObjectBuilder; +import io.weaviate.client6.v1.internal.json.JSON; +import io.weaviate.client6.v1.internal.rest.Endpoint; +import io.weaviate.client6.v1.internal.rest.SimpleEndpoint; + +public record RestoreBackupRequest(String backupId, String backend, BackupRestore body) { + + public static Endpoint _ENDPOINT = new SimpleEndpoint<>( + request -> "POST", + request -> "/backups/" + request.backend + "/" + request.backupId + "/restore", + request -> Collections.emptyMap(), + request -> JSON.serialize(request.body), + (statusCode, response) -> JSON.deserialize(response, Backup.class)); + + public record BackupRestore( + @SerializedName("include") List includeCollections, + @SerializedName("exclude") List excludeCollections, + @SerializedName("overwriteAlias") Boolean overwriteAlias, + @SerializedName("config") Config config) { + + public record Config( + @SerializedName("CPUPercentage") Integer cpuPercentage, + @SerializedName("Bucket") String bucket, + @SerializedName("Path") String path, + @SerializedName("usersOptions") RbacRestoreOption restoreUsers, + @SerializedName("rolesOptions") RbacRestoreOption restoreRoles) { + } + + public static BackupRestore of() { + return of(ObjectBuilder.identity()); + } + + public static BackupRestore of(Function> fn) { + return fn.apply(new Builder()).build(); + } + + public BackupRestore(Builder builder) { + this( + builder.includeCollections, + builder.excludeCollections, + builder.overwriteAlias, + new Config( + builder.cpuPercentage, + builder.bucket, + builder.path, + builder.restoreUsers, + builder.restoreRoles)); + + } + + public static class Builder implements ObjectBuilder { + private Integer cpuPercentage; + private String bucket; + private String path; + private Boolean overwriteAlias; + private RbacRestoreOption restoreUsers; + private RbacRestoreOption restoreRoles; + private final List includeCollections = new ArrayList<>(); + private final List excludeCollections = new ArrayList<>(); + + /** Collection that should be restored. */ + public Builder includeCollections(String... includeCollections) { + return includeCollections(Arrays.asList(includeCollections)); + } + + /** Collection that should be restored. */ + public Builder includeCollections(List includeCollections) { + this.includeCollections.addAll(includeCollections); + return this; + } + + /** Collection that should be not be restored. */ + public Builder excludeCollections(String... excludeCollections) { + return excludeCollections(Arrays.asList(excludeCollections)); + } + + /** Collection that should be not be restored. */ + public Builder excludeCollections(List excludeCollections) { + this.excludeCollections.addAll(excludeCollections); + return this; + } + + /** + * Set the desired CPU core utilization. + * + * @param cpuPercentage Percent value of the target CPU utilization (1% to 80%). + */ + public Builder cpuPercentage(int cpuPercentage) { + this.cpuPercentage = cpuPercentage; + return this; + } + + /** + * Set the bucket where backups are stored. + * Applicable for cloud storage backends. + */ + public Builder bucket(String bucket) { + this.bucket = bucket; + return this; + } + + /** Override default backup location. */ + public Builder path(String path) { + this.path = path; + return this; + } + + /** + * Allow restored collection aliases to overwrite existing ones + * in case of conflict. + */ + public Builder overwriteAlias(boolean overwriteAlias) { + this.overwriteAlias = overwriteAlias; + return this; + } + + /** Control which RBAC users should be restored. */ + public Builder restoreUsers(RbacRestoreOption restoreUsers) { + this.restoreUsers = restoreUsers; + return this; + } + + /** Control which RBAC roles should be restored. */ + public Builder restoreRoles(RbacRestoreOption restoreRoles) { + this.restoreRoles = restoreRoles; + return this; + } + + @Override + public BackupRestore build() { + return new BackupRestore(this); + } + } + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/WaitOptions.java b/src/main/java/io/weaviate/client6/v1/api/backup/WaitOptions.java new file mode 100644 index 00000000..1044bc6e --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/WaitOptions.java @@ -0,0 +1,64 @@ +package io.weaviate.client6.v1.api.backup; + +import java.time.Duration; +import java.util.function.Function; + +import io.weaviate.client6.v1.internal.ObjectBuilder; + +public record WaitOptions(long interval, long timeout) { + private static final long DEFAULT_INTERVAL_MILLIS = 1_000; + private static final long DEFAULT_TIMEOUT_MILLIS = 3600_000; + + public static WaitOptions of(Function> fn) { + return fn.apply(new Builder()).build(); + } + + public WaitOptions(Builder builder) { + this(builder.interval, builder.timeout); + } + + public static class Builder implements ObjectBuilder { + private long interval = DEFAULT_INTERVAL_MILLIS; + private long timeout = DEFAULT_TIMEOUT_MILLIS; + + /** Set polling interval. Defaults to 1s. */ + public Builder interval(Duration duration) { + return interval(duration.toMillis()); + } + + /** + * Set polling interval. Defaults to 1s. + * + * @param intervalMillis Polling interval in milliseconds. Minimum 1ms. + */ + public Builder interval(long intervalMillis) { + this.interval = Math.max(intervalMillis, 1); + return this; + } + + /** + * Set wait timeout. Defaults to 1s. + * + * @param duration Wait timeout duration. + */ + public Builder timeout(Duration duration) { + return timeout(duration.toMillis()); + } + + /** + * Set wait timeout. Set this to a negative value + * for the wait to expire immediately. + * + * @param timeoutMillis Wait timeout in milliseconds. + */ + public Builder timeout(long timeoutMillis) { + this.timeout = timeoutMillis; + return this; + } + + @Override + public WaitOptions build() { + return new WaitOptions(this); + } + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/Waiter.java b/src/main/java/io/weaviate/client6/v1/api/backup/Waiter.java new file mode 100644 index 00000000..82528b73 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/Waiter.java @@ -0,0 +1,104 @@ +package io.weaviate.client6.v1.api.backup; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +final class Waiter { + + private final Backup backup; + private final WaitOptions wait; + + Waiter(final Backup backup, WaitOptions wait) { + this.backup = backup; + this.wait = wait; + } + + Backup waitForStatus(final BackupStatus wantStatus, Callable> poll) + throws IOException, TimeoutException { + if (backup.error() != null) { + throw new RuntimeException(backup.error()); + } + + if (backup.status() == wantStatus) { + return backup; + } + + final Instant deadline = Instant.now().plusMillis(wait.timeout()); + Backup latest = backup; + while (!Thread.interrupted()) { + if (Instant.now().isAfter(deadline)) { + throw new TimeoutException("timed out after %s, latest status %s".formatted( + Duration.ofMillis(wait.timeout()).toSeconds(), latest.status())); + } + + try { + var current = poll.call().orElseThrow(); + latest = current; + } catch (Exception e) { + throw new RuntimeException(e); + } + + if (latest.status() == wantStatus) { + return latest; + } else if (isComplete(latest)) { + throw new IllegalStateException("completed with status=%s without reaching %s" + .formatted(latest.status(), wantStatus)); + } + + try { + Thread.sleep(wait.interval()); + } catch (InterruptedException e) { + // TODO: the interrupted state will be cleared on the next while() check + // and then we will simply return the latest state. An absence of an exception + // might be misleading here. What should we do? + Thread.currentThread().interrupt(); + } + } + return latest; + } + + CompletableFuture waitForStatusAsync( + final BackupStatus wantStatus, + Supplier>> poll) { + if (backup.status() == wantStatus) { + return CompletableFuture.completedFuture(backup); + } + final Instant deadline = Instant.now().plusMillis(wait.timeout()); + return poll.get().thenCompose(latest -> _waitForStatusAsync(wantStatus, latest.orElseThrow(), poll, deadline)); + } + + CompletableFuture _waitForStatusAsync( + final BackupStatus wantStatus, + final Backup current, + Supplier>> poll, + final Instant deadline) { + + if (current.status() == wantStatus) { + return CompletableFuture.completedFuture(current); + } + + if (Instant.now().isAfter(deadline)) { + var e = new TimeoutException("timed out after %s, latest status %s".formatted( + Duration.ofMillis(wait.timeout()).toSeconds(), current.status())); + throw new CompletionException(e); + } + + return poll.get().thenComposeAsync( + latest -> _waitForStatusAsync(wantStatus, latest.orElseThrow(), poll, deadline), + CompletableFuture.delayedExecutor(wait.interval(), TimeUnit.MILLISECONDS)); + } + + private boolean isComplete(final Backup backup) { + return backup.status() == BackupStatus.SUCCESS + || backup.status() == BackupStatus.FAILED + || backup.status() == BackupStatus.CANCELED; + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java b/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java new file mode 100644 index 00000000..e2235d4c --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java @@ -0,0 +1,180 @@ +package io.weaviate.client6.v1.api.backup; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; + +import io.weaviate.client6.v1.api.WeaviateApiException; +import io.weaviate.client6.v1.internal.ObjectBuilder; +import io.weaviate.client6.v1.internal.rest.RestTransport; + +public class WeaviateBackupClient { + private final RestTransport restTransport; + + public WeaviateBackupClient(RestTransport restTransport) { + this.restTransport = restTransport; + } + + /** + * Start a new backup process. + * + * @param backupId Backup ID. Must be unique for the backend. + * @param backend Backup storage backend. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Backup create(String backupId, String backend) throws IOException { + return create(new CreateBackupRequest(CreateBackupRequest.BackupCreate.of(backupId), backend)); + } + + /** + * Start a new backup process. + * + * @param backupId Backup ID. Must be unique for the backend. + * @param backend Backup storage backend. + * @param fn Lambda expression for optional parameters. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Backup create(String backupId, String backend, + Function> fn) + throws IOException { + return create(new CreateBackupRequest(CreateBackupRequest.BackupCreate.of(backupId, fn), backend)); + } + + /** + * Start a new backup process. + * + * @param request Create backup request. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Backup create(CreateBackupRequest request) throws IOException { + return this.restTransport.performRequest(request, CreateBackupRequest._ENDPOINT) + .withOperation(Backup.Operation.CREATE); + } + + /** + * Get backup create status. + * + * @param backupId Backup ID. + * @param backend Backup storage backend. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Optional getCreateStatus(String backupId, String backend) throws IOException { + return this.restTransport.performRequest( + new GetCreateStatusRequest(backupId, backend), GetCreateStatusRequest._ENDPOINT) + .map(b -> b.withOperation(Backup.Operation.CREATE)); + } + + /** + * Start backup restore process. + * + * @param backupId Backup ID. + * @param backend Backup storage backend. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Backup restore(String backupId, String backend) throws IOException { + return restore(new RestoreBackupRequest(backupId, backend, RestoreBackupRequest.BackupRestore.of())); + } + + /** + * Start backup restore process. + * + * @param backupId Backup ID. + * @param backend Backup storage backend. + * @param fn Lambda expression for optional parameters. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Backup restore(String backupId, String backend, + Function> fn) + throws IOException { + return restore(new RestoreBackupRequest(backupId, backend, RestoreBackupRequest.BackupRestore.of(fn))); + } + + /** + * Start backup restore process. + * + * @param request Restore backup request. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Backup restore(RestoreBackupRequest request) throws IOException { + return this.restTransport.performRequest(request, RestoreBackupRequest._ENDPOINT) + .withOperation(Backup.Operation.RESTORE); + } + + /** + * Get backup restore status. + * + * @param backupId Backup ID. + * @param backend Backup storage backend. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Optional getRestoreStatus(String backupId, String backend) throws IOException { + return this.restTransport + .performRequest(new GetRestoreStatusRequest(backupId, backend), GetRestoreStatusRequest._ENDPOINT) + .map(b -> b.withOperation(Backup.Operation.RESTORE)); + } + + /** + * List backups in the backend storage. + * + * @param backend Backup storage backend. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public List list(String backend) throws IOException { + return this.restTransport.performRequest(new ListBackupsRequest(backend), ListBackupsRequest._ENDPOINT); + } + + /** + * Cancel in-progress backup. + * + *

+ * This method cannot be called cancel backup restore. + * + * @param backupId Backup ID. + * @param backend Backup storage backend. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public void cancel(String backupId, String backend) throws IOException { + this.restTransport.performRequest(new CancelBackupRequest(backupId, backend), CancelBackupRequest._ENDPOINT); + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClientAsync.java b/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClientAsync.java new file mode 100644 index 00000000..02e37281 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClientAsync.java @@ -0,0 +1,128 @@ +package io.weaviate.client6.v1.api.backup; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import io.weaviate.client6.v1.internal.ObjectBuilder; +import io.weaviate.client6.v1.internal.rest.RestTransport; + +public class WeaviateBackupClientAsync { + private final RestTransport restTransport; + + public WeaviateBackupClientAsync(RestTransport restTransport) { + this.restTransport = restTransport; + } + + /** + * Start a new backup process. + * + * @param backupId Backup ID. Must be unique for the backend. + * @param backend Backup storage backend. + */ + public CompletableFuture create(String backupId, String backend) { + return create(new CreateBackupRequest(CreateBackupRequest.BackupCreate.of(backupId), backend)); + } + + /** + * Start a new backup process. + * + * @param backupId Backup ID. Must be unique for the backend. + * @param backend Backup storage backend. + * @param fn Lambda expression for optional parameters. + */ + public CompletableFuture create(String backupId, String backend, + Function> fn) { + return create(new CreateBackupRequest(CreateBackupRequest.BackupCreate.of(backupId, fn), backend)); + } + + /** + * Start a new backup process. + * + * @param request Create backup request. + */ + public CompletableFuture create(CreateBackupRequest request) { + return this.restTransport.performRequestAsync(request, CreateBackupRequest._ENDPOINT) + .thenApply(bak -> bak.withOperation(Backup.Operation.CREATE)); + } + + /** + * Get backup create status. + * + * @param backupId Backup ID. + * @param backend Backup storage backend. + */ + public CompletableFuture> getCreateStatus(String backupId, String backend) { + return this.restTransport.performRequestAsync( + new GetCreateStatusRequest(backupId, backend), GetCreateStatusRequest._ENDPOINT) + .thenApply(bak -> bak.map(_bak -> _bak.withOperation(Backup.Operation.CREATE))); + } + + /** + * Start backup restore process. + * + * @param backupId Backup ID. + * @param backend Backup storage backend. + */ + public CompletableFuture restore(String backupId, String backend) { + return restore(new RestoreBackupRequest(backupId, backend, RestoreBackupRequest.BackupRestore.of())); + } + + /** + * Start backup restore process. + * + * @param backupId Backup ID. + * @param backend Backup storage backend. + * @param fn Lambda expression for optional parameters. + */ + public CompletableFuture restore(String backupId, String backend, + Function> fn) { + return restore(new RestoreBackupRequest(backupId, backend, RestoreBackupRequest.BackupRestore.of(fn))); + } + + /** + * Start backup restore process. + * + * @param request Restore backup request. + */ + public CompletableFuture restore(RestoreBackupRequest request) { + return this.restTransport.performRequestAsync(request, RestoreBackupRequest._ENDPOINT) + .thenApply(bak -> bak.withOperation(Backup.Operation.RESTORE)); + } + + /** + * Get backup restore status. + * + * @param backupId Backup ID. + * @param backend Backup storage backend. + */ + public CompletableFuture> getRestoreStatus(String backupId, String backend) { + return this.restTransport + .performRequestAsync(new GetRestoreStatusRequest(backupId, backend), GetRestoreStatusRequest._ENDPOINT) + .thenApply(bak -> bak.map(_bak -> _bak.withOperation(Backup.Operation.RESTORE))); + } + + /** + * List backups in the backend storage. + * + * @param backend Backup storage backend. + */ + public CompletableFuture> list(String backend) { + return this.restTransport.performRequestAsync(new ListBackupsRequest(backend), ListBackupsRequest._ENDPOINT); + } + + /** + * Cancel in-progress backup. + * + *

+ * This method cannot be called cancel backup restore. + * + * @param backupId Backup ID. + * @param backend Backup storage backend. + */ + public CompletableFuture cancel(String backupId, String backend) { + return this.restTransport.performRequestAsync(new CancelBackupRequest(backupId, backend), + CancelBackupRequest._ENDPOINT); + } +} diff --git a/src/main/java/io/weaviate/client6/v1/internal/MapUtil.java b/src/main/java/io/weaviate/client6/v1/internal/MapUtil.java index 4186572b..05811cf4 100644 --- a/src/main/java/io/weaviate/client6/v1/internal/MapUtil.java +++ b/src/main/java/io/weaviate/client6/v1/internal/MapUtil.java @@ -28,9 +28,9 @@ private MapUtil() { * // Result: {1: 1, 2: 2, 3: null}; * } * - * @param stream Stream of elements {@link T}. - * @param keyFn Transforms element {@link T} to key {@link K}. - * @param keyFn Transforms element {@link T} to value {@link V}. + * @param stream Stream of elements {@link T}. + * @param keyFn Transforms element {@link T} to key {@link K}. + * @param valueFn Transforms element {@link T} to value {@link V}. * @return Map */ public static Map collect(Stream stream, Function keyFn, Function valueFn) { diff --git a/src/main/java/io/weaviate/client6/v1/internal/rest/OptionalEndpoint.java b/src/main/java/io/weaviate/client6/v1/internal/rest/OptionalEndpoint.java index c3863bf9..2c960236 100644 --- a/src/main/java/io/weaviate/client6/v1/internal/rest/OptionalEndpoint.java +++ b/src/main/java/io/weaviate/client6/v1/internal/rest/OptionalEndpoint.java @@ -15,6 +15,14 @@ public static OptionalEndpoint noBody return new OptionalEndpoint<>(method, requestUrl, queryParameters, nullBody(), deserializeResponse); } + public static OptionalEndpoint noBodyOptional( + Function method, + Function requestUrl, + Function> queryParameters, + Class cls) { + return new OptionalEndpoint<>(method, requestUrl, queryParameters, nullBody(), deserializeClass(cls)); + } + public OptionalEndpoint( Function method, Function requestUrl,