Skip to content

Ingest async implementation #430

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,36 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [UNRELEASED]

### Changed

- [BREAKING] All synchronous queued and streaming ingestion APIs now delegate to their asynchronous counterparts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it have any effects for the user?

I think a big thing is the exception thanges. IDK if we mention it anywhere

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any public api that broke and no longer available?

internally and block for results.
- [BREAKING] Streaming client no longer check for blob size and if it exists.
### Added
- The SDK now provides Reactor Core-based asynchronous APIs for all queued and streaming ingestion endpoints,
enabling non-blocking operations.

## [6.0.1]

### Added
- The SDK now provides Reactor Core-based asynchronous APIs for all query, management, streaming query/ingestion (StreamingClient) endpoints,
enabling non-blocking operations. You can read more about Reactor Core and [Mono type here](https://projectreactor.io/docs/core/release/api/).
- `ConnectionStringBuilder` now supports keywords without regards to spaces or case. It now supports `toString()` that prints a canonical connection string, with censored secrets by default.

### Changed
- [BREAKING] All synchronous query/management, streaming query/ingestion (StreamingClient) APIs now delegate to their asynchronous counterparts
internally and block for results.
- [BREAKING] * Make ManagedStreamingQueuingPolicy internal, expose just a factor
* Dont allow users to pass raw data size, provide it only if we have it
- [BREAKING] Make ManagedStreamingQueuingPolicy internal, expose just a factor
- [BREAKING] Don't allow users to pass raw data size, provide it only if we have it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retroactively changing the 6.0.1 changelogs? maybe another PR?


- [BREAKING] Removing max keep alive from HttpClientPropertiesBuilder.
### Fixed
- Fixed edge cases in query timeouts.
- Long Queries would time out after 2 minutes. Remove keep alive timeout to fix.


## [6.0.0-ALPHA-01] - 2024-11-27
### Added
- A new policy heuristic for choosing between queuing and streaming in Managed streaming client. A policy can be configured
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.List;
import java.util.function.Predicate;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -12,7 +13,7 @@
import reactor.util.annotation.Nullable;
import reactor.util.retry.Retry;

public class ExponentialRetry<E1 extends Throwable, E2 extends Throwable> {
public class ExponentialRetry {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final int maxAttempts;
Expand All @@ -37,50 +38,25 @@ public ExponentialRetry(ExponentialRetry other) {
this.maxJitterSecs = other.maxJitterSecs;
}

// Caller should throw only permanent errors, returning null if a retry is needed
public <T> T execute(KustoCheckedFunction<Integer, T, E1, E2> function) throws E1, E2 {
for (int currentAttempt = 0; currentAttempt < maxAttempts; currentAttempt++) {
log.info("execute: Attempt {}", currentAttempt);

try {
T result = function.apply(currentAttempt);
if (result != null) {
return result;
}
} catch (Exception e) {
log.error("execute: Error is permanent, stopping", e);
throw e;
}

double currentSleepSecs = sleepBaseSecs * (float) Math.pow(2, currentAttempt);
double jitterSecs = (float) Math.random() * maxJitterSecs;
double sleepMs = (currentSleepSecs + jitterSecs) * 1000;

log.info("execute: Attempt {} failed, trying again after sleep of {} seconds", currentAttempt, sleepMs / 1000);

try {
Thread.sleep((long) sleepMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("execute: Interrupted while sleeping", e);
}
}

return null;
}

/**
* Creates a retry mechanism with exponential backoff and jitter.
*
* @param retriableErrorClasses A list of error classes that are considered retriable. If null,
* the method does not retry.
* @param filter A filter to use. Default is retrying retriable errors.
* @return A configured {@link Retry} instance
*/
public Retry retry(@Nullable List<Class<? extends Throwable>> retriableErrorClasses) {
public Retry retry(@Nullable List<Class<? extends Throwable>> retriableErrorClasses,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the only time we use the first param is in a test. Do we really need it?
Maybe overloads to make sure they can't be used at the same time?

@Nullable Predicate<? super Throwable> filter) {
if (retriableErrorClasses != null && filter != null) {
throw new IllegalArgumentException("Cannot specify both retriableErrorClasses and filter");
}

Predicate<? super Throwable> filterToUse = filter == null ? throwable -> shouldRetry(throwable, retriableErrorClasses) : filter;
return Retry.backoff(maxAttempts, Duration.ofSeconds((long) sleepBaseSecs))
.maxBackoff(Duration.ofSeconds(30))
.jitter(maxJitterSecs)
.filter(throwable -> shouldRetry(throwable, retriableErrorClasses))
.filter(filterToUse)
.doAfterRetry(retrySignal -> {
long currentAttempt = retrySignal.totalRetries() + 1;
log.info("Attempt {} failed.", currentAttempt);
Expand Down
10 changes: 5 additions & 5 deletions data/src/main/java/com/microsoft/azure/kusto/data/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,29 +156,29 @@ public static Mono<String> processGzipBody(Flux<ByteBuffer> gzipBody) {
// to occur in chunks, making it more memory-efficient for large payloads, as it prevents the entire
// compressed stream from being loaded into memory at once (which for example is required by GZIPInputStream for decompression).

EmbeddedChannel channel = new EmbeddedChannel(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
EmbeddedChannel decoder = new EmbeddedChannel(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));

return gzipBody
.reduce(new StringBuilder(), (stringBuilder, byteBuffer) -> {
channel.writeInbound(Unpooled.wrappedBuffer(byteBuffer)); // Write chunk to channel for decompression
decoder.writeInbound(Unpooled.wrappedBuffer(byteBuffer)); // Write chunk to channel for decompression

ByteBuf decompressedByteBuf = channel.readInbound();
ByteBuf decompressedByteBuf = decoder.readInbound();
if (decompressedByteBuf == null) {
return stringBuilder;
}

String string = decompressedByteBuf.toString(StandardCharsets.UTF_8);
decompressedByteBuf.release();

if (!channel.inboundMessages().isEmpty()) {
if (!decoder.inboundMessages().isEmpty()) { // TODO: remove this when we are sure that only one message exists in the channel
throw new IllegalStateException("Expected exactly one message in the channel.");
}

stringBuilder.append(string);
return stringBuilder;
})
.map(StringBuilder::toString)
.doFinally(ignore -> channel.finishAndReleaseAll());
.doFinally(ignore -> decoder.finishAndReleaseAll());
}

private static Mono<String> processNonGzipBody(Flux<ByteBuffer> gzipBody) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class CloudInfo implements TraceableAttributes, Serializable {
private final String kustoServiceResourceId;
private final String firstPartyAuthorityUrl;
private static final int ATTEMPT_COUNT = 3;
private static final Retry RETRY_CONFIG = new ExponentialRetry<>(ATTEMPT_COUNT).retry(null);
private static final Retry RETRY_CONFIG = new ExponentialRetry(ATTEMPT_COUNT).retry(null, null);

public CloudInfo(boolean loginMfaRequired, String loginEndpoint, String kustoClientAppId, String kustoClientRedirectUri, String kustoServiceResourceId,
String firstPartyAuthorityUrl) {
Expand Down
11 changes: 10 additions & 1 deletion ingest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@
<dependency>com.microsoft.azure:msal4j:jar</dependency>
<dependency>io.projectreactor:reactor-core:jar</dependency>
<dependency>com.fasterxml.jackson.core:jackson-core:jar</dependency>
<dependency>io.netty:netty-buffer:jar</dependency>
<dependency>io.netty:netty-codec:jar</dependency>
<dependency>io.netty:netty-transport:jar</dependency>
</ignoredUsedUndeclaredDependencies>
<ignoreNonCompile>true</ignoreNonCompile>
<ignoredNonTestScopedDependencies>
Expand Down Expand Up @@ -156,7 +159,6 @@
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>kusto-data</artifactId>
<version>${project.parent.version}</version>
<scope>compile</scope>
</dependency>
<!-- Azure bom libraries -->
<dependency>
Expand Down Expand Up @@ -276,5 +278,12 @@
<artifactId>vavr</artifactId>
<version>${io.vavr.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-test -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>${reactor-test.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -4,138 +4,130 @@
package com.microsoft.azure.kusto.ingest;

import com.azure.core.util.BinaryData;
import com.azure.data.tables.TableClient;
import com.azure.data.tables.TableAsyncClient;
import com.azure.data.tables.implementation.models.TableServiceErrorException;
import com.azure.data.tables.models.TableEntity;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.queue.QueueClient;
import com.azure.storage.queue.models.QueueStorageException;
import com.azure.storage.blob.BlobAsyncClient;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.queue.QueueAsyncClient;
import com.microsoft.azure.kusto.data.Ensure;

import com.microsoft.azure.kusto.ingest.utils.IngestionUtils;
import org.apache.commons.codec.binary.Base64;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.util.zip.GZIPOutputStream;

public class AzureStorageClient {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final int GZIP_BUFFER_SIZE = 16384;
private static final int STREAM_BUFFER_SIZE = 16384;

public AzureStorageClient() {
}

void postMessageToQueue(QueueClient queueClient, String content) throws QueueStorageException {
// Ensure
Ensure.argIsNotNull(queueClient, "queueClient");
Mono<Void> postMessageToQueue(QueueAsyncClient queueAsyncClient, String content) {
Ensure.argIsNotNull(queueAsyncClient, "queueAsyncClient");
Ensure.stringIsNotBlank(content, "content");

byte[] bytesEncoded = Base64.encodeBase64(content.getBytes());
queueClient.sendMessage(BinaryData.fromBytes(bytesEncoded));
return queueAsyncClient.sendMessage(BinaryData.fromBytes(bytesEncoded)).then();
}

public void azureTableInsertEntity(TableClient tableClient, TableEntity tableEntity) throws URISyntaxException, TableServiceErrorException {
Ensure.argIsNotNull(tableClient, "tableClient");
public Mono<Void> azureTableInsertEntity(TableAsyncClient tableAsyncClient, TableEntity tableEntity) throws TableServiceErrorException {
Ensure.argIsNotNull(tableAsyncClient, "tableAsyncClient");
Ensure.argIsNotNull(tableEntity, "tableEntity");

tableClient.createEntity(tableEntity);
return tableAsyncClient.createEntity(tableEntity);
}

void uploadLocalFileToBlob(File file, String blobName, BlobContainerClient container, boolean shouldCompress)
throws IOException, BlobStorageException {
log.debug("uploadLocalFileToBlob: filePath: {}, blobName: {}, storageUri: {}", file.getPath(), blobName, container.getBlobContainerUrl());
Mono<Void> uploadLocalFileToBlob(File file, String blobName, BlobContainerAsyncClient asyncContainer, boolean shouldCompress) throws IOException {
log.debug("uploadLocalFileToBlob: filePath: {}, blobName: {}, storageUri: {}", file.getPath(), blobName, asyncContainer.getBlobContainerUrl());

// Ensure
Ensure.fileExists(file, "sourceFile");
Ensure.stringIsNotBlank(blobName, "blobName");
Ensure.argIsNotNull(container, "container");
Ensure.argIsNotNull(asyncContainer, "asyncContainer");

BlobClient blobClient = container.getBlobClient(blobName);
BlobAsyncClient blobAsyncClient = asyncContainer.getBlobAsyncClient(blobName);
if (shouldCompress) {
compressAndUploadFileToBlob(file, blobClient);
return compressAndUploadFileToBlob(file, blobAsyncClient);
} else {
uploadFileToBlob(file, blobClient);
return uploadFileToBlob(file, blobAsyncClient);
}
}

void compressAndUploadFileToBlob(File sourceFile, BlobClient blob) throws IOException, BlobStorageException {
Mono<Void> compressAndUploadFileToBlob(File sourceFile, BlobAsyncClient blobAsyncClient) throws IOException {
Ensure.fileExists(sourceFile, "sourceFile");
Ensure.argIsNotNull(blob, "blob");

try (InputStream fin = Files.newInputStream(sourceFile.toPath());
GZIPOutputStream gzOut = new GZIPOutputStream(blob.getBlockBlobClient().getBlobOutputStream(true))) {
copyStream(fin, gzOut, GZIP_BUFFER_SIZE);
}
Ensure.argIsNotNull(blobAsyncClient, "blobAsyncClient");

return Mono.defer(() -> {
try {
InputStream inputStream = Files.newInputStream(sourceFile.toPath());
return IngestionUtils.toCompressedByteArray(inputStream, false)
.flatMap(bytes -> blobAsyncClient.getBlockBlobAsyncClient()
.upload(BinaryData.fromBytes(bytes), true));
} catch (IOException e) {
throw Exceptions.propagate(e);
}
}).then();
}

void uploadFileToBlob(File sourceFile, BlobClient blobClient) throws IOException, BlobStorageException {
// Ensure
Ensure.argIsNotNull(blobClient, "blob");
Mono<Void> uploadFileToBlob(File sourceFile, BlobAsyncClient blobAsyncClient) throws IOException {
Ensure.argIsNotNull(blobAsyncClient, "blob");
Ensure.fileExists(sourceFile, "sourceFile");

blobClient.uploadFromFile(sourceFile.getPath());
return blobAsyncClient.uploadFromFile(sourceFile.getPath());
}

int uploadStreamToBlob(InputStream inputStream, String blobName, BlobContainerClient container, boolean shouldCompress)
throws IOException, BlobStorageException {
log.debug("uploadStreamToBlob: blobName: {}, storageUri: {}", blobName, container);
Mono<Integer> uploadStreamToBlob(InputStream inputStream,
String blobName,
BlobContainerAsyncClient asyncContainer,
boolean shouldCompress) {
log.debug("uploadStreamToBlob: blobName: {}, storageUri: {}", blobName, asyncContainer.getBlobContainerUrl());

// Ensure
Ensure.argIsNotNull(inputStream, "inputStream");
Ensure.stringIsNotBlank(blobName, "blobName");
Ensure.argIsNotNull(container, "container");
Ensure.argIsNotNull(asyncContainer, "asyncContainer");

BlobClient blobClient = container.getBlobClient(blobName);
BlobAsyncClient blobAsyncClient = asyncContainer.getBlobAsyncClient(blobName);
if (shouldCompress) {
return compressAndUploadStream(inputStream, blobClient);
return compressAndUploadStream(inputStream, blobAsyncClient);
} else {
return uploadStream(inputStream, blobClient);
return uploadStream(inputStream, blobAsyncClient);
}
}

// Returns original stream size
int uploadStream(InputStream inputStream, BlobClient blob) throws IOException, BlobStorageException {
// Ensure
Mono<Integer> uploadStream(InputStream inputStream, BlobAsyncClient blobAsyncClient) {
Ensure.argIsNotNull(inputStream, "inputStream");
Ensure.argIsNotNull(blob, "blob");
Ensure.argIsNotNull(blobAsyncClient, "blobAsyncClient");

OutputStream blobOutputStream = blob.getBlockBlobClient().getBlobOutputStream(true);
int originalSize = copyStream(inputStream, blobOutputStream, STREAM_BUFFER_SIZE);
blobOutputStream.close();
return originalSize;
IngestionUtils.IntegerHolder size = new IngestionUtils.IntegerHolder();

return IngestionUtils.toByteArray(inputStream)
.flatMap(bytes -> {
size.add(bytes.length);
return blobAsyncClient.getBlockBlobAsyncClient().upload(BinaryData.fromBytes(bytes), true);
})
.map(x -> size.getValue());
}

// Returns original stream size
int compressAndUploadStream(InputStream inputStream, BlobClient blob) throws IOException, BlobStorageException {
// Ensure
Mono<Integer> compressAndUploadStream(InputStream inputStream, BlobAsyncClient blobAsyncClient) {
Ensure.argIsNotNull(inputStream, "inputStream");
Ensure.argIsNotNull(blob, "blob");

try (GZIPOutputStream gzout = new GZIPOutputStream(blob.getBlockBlobClient().getBlobOutputStream(true))) {
return copyStream(inputStream, gzout, GZIP_BUFFER_SIZE);
}
Ensure.argIsNotNull(blobAsyncClient, "blobAsyncClient");

IngestionUtils.IntegerHolder size = new IngestionUtils.IntegerHolder();
return IngestionUtils.toCompressedByteArray(inputStream, false)
.flatMap(bytes -> {
size.add(bytes.length);
return blobAsyncClient.getBlockBlobAsyncClient().upload(BinaryData.fromBytes(bytes), true);
})
.map(x -> size.getValue());
}

// Returns original stream size
private int copyStream(InputStream inputStream, OutputStream outputStream, int bufferSize) throws IOException {
byte[] buffer = new byte[bufferSize];
int length;
int size = 0;
while ((length = inputStream.read(buffer)) > 0) {
size += length;
outputStream.write(buffer, 0, length);
}

return size;
}
}
Loading
Loading