Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do inject trace context when opentelemetry is disabled #290

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,17 @@ public void onNext(@NotNull StreamsOuterClass.ReadResp readResp) {

try {
ResolvedEvent resolvedEvent = ResolvedEvent.fromWire(readResp.getEvent());
ClientTelemetry.traceSubscribe(
() -> listener.onEvent(this._subscription, resolvedEvent),
_subscription.getSubscriptionId(),
channel,
client.getSettings(),
options.getCredentials(),
resolvedEvent.getEvent());
if (ClientTelemetry.isEnabled()) {
listener.onEvent(this._subscription, resolvedEvent);
} else {
ClientTelemetry.traceSubscribe(
() -> listener.onEvent(this._subscription, resolvedEvent),
_subscription.getSubscriptionId(),
channel,
client.getSettings(),
options.getCredentials(),
resolvedEvent.getEvent());
}
} catch (Exception e) {
onError(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,17 @@ public void onNext(Persistent.ReadResp readResp) {

try {
ResolvedEvent resolvedEvent = ResolvedEvent.fromWire(readResp.getEvent());
ClientTelemetry.traceSubscribe(
() -> listener.onEvent(this._subscription, retryCount, resolvedEvent),
_subscription.getSubscriptionId(),
args.getChannel(),
client.getSettings(),
options.getCredentials(),
resolvedEvent.getEvent());
if (ClientTelemetry.isEnabled()) {
listener.onEvent(this._subscription, retryCount, resolvedEvent);
} else {
ClientTelemetry.traceSubscribe(
() -> listener.onEvent(this._subscription, retryCount, resolvedEvent),
_subscription.getSubscriptionId(),
args.getChannel(),
client.getSettings(),
options.getCredentials(),
resolvedEvent.getEvent());
}
} catch (Exception e) {
onError(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ public AppendToStream(GrpcClient client, String streamName, Iterator<EventData>
}

public CompletableFuture<WriteResult> execute() {
return this.client.run(channel -> ClientTelemetry.traceAppend(
this::append,
channel,
events,
this.streamName,
this.client.getSettings(),
this.options.getCredentials()));
return this.client.run(channel -> {
if (ClientTelemetry.isEnabled())
return append(channel, events);

return ClientTelemetry.traceAppend(
this::append,
channel,
events,
this.streamName,
this.client.getSettings(),
this.options.getCredentials());
});
}

private CompletableFuture<WriteResult> append(ManagedChannel channel, List<EventData> events) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.grpc.ManagedChannel;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.*;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.TextMapPropagator;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -18,6 +20,10 @@
import java.util.function.BiFunction;

class ClientTelemetry {
protected static boolean isEnabled() {
return OpenTelemetry.noop().getPropagators().getTextMapPropagator() == TextMapPropagator.noop();
}

private static final ClientTelemetryTags DEFAULT_ATTRIBUTES = new ClientTelemetryTags() {{
put(ClientTelemetryAttributes.Database.SYSTEM, ClientTelemetryConstants.INSTRUMENTATION_NAME);
}};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,45 +9,64 @@

public interface AppendTests extends ConnectionAware {
@Test
default void testAppendSingleEventNoStream() throws Throwable {
default void testAppendMultipleEvents() throws Throwable {
EventStoreDBClient client = getDatabase().defaultClient();

final String streamName = generateName();
final String eventType = "TestEvent";
final String eventId = "38fffbc2-339e-11ea-8c7b-784f43837872";
final String eventId1 = UUID.randomUUID().toString();
final String eventId2 = UUID.randomUUID().toString();
final byte[] eventMetaData = new byte[]{0xd, 0xe, 0xa, 0xd};
final JsonMapper jsonMapper = new JsonMapper();

EventData event = EventData.builderAsJson(eventType, jsonMapper.writeValueAsBytes(new Foo()))
// Create first event data with metadata bytes
EventData event1 = EventData.builderAsJson(eventType, jsonMapper.writeValueAsBytes(new Foo()))
.metadataAsBytes(eventMetaData)
.eventId(UUID.fromString(eventId))
.eventId(UUID.fromString(eventId1))
.build();

// Create second event data with JSON metadata
EventData event2 = EventData.builderAsJson(eventType, jsonMapper.writeValueAsBytes(new Foo()))
.metadataAsBytes(jsonMapper.writeValueAsBytes(new Foo()))
.eventId(UUID.fromString(eventId2))
.build();

AppendToStreamOptions appendOptions = AppendToStreamOptions.get()
.expectedRevision(ExpectedRevision.noStream());

WriteResult appendResult = client.appendToStream(streamName, appendOptions, event)
// Append both events to stream
WriteResult appendResult = client.appendToStream(streamName, appendOptions, event1, event2)
.get();

Assertions.assertEquals(ExpectedRevision.expectedRevision(0), appendResult.getNextExpectedRevision());
// Validate the append operation
Assertions.assertEquals(ExpectedRevision.expectedRevision(1), appendResult.getNextExpectedRevision());

ReadStreamOptions readStreamOptions = ReadStreamOptions.get()
.fromEnd()
.backwards()
.maxCount(1);
.maxCount(2);

// Ensure appended event is readable
// Ensure both appended events are readable
ReadResult result = client.readStream(streamName, readStreamOptions)
.get();

Assertions.assertEquals(1, result.getEvents().size());
RecordedEvent first = result.getEvents().get(0).getEvent();
Assertions.assertEquals(2, result.getEvents().size());
RecordedEvent first = result.getEvents().get(1).getEvent();
RecordedEvent second = result.getEvents().get(0).getEvent();
JsonMapper mapper = new JsonMapper();

// Verify first event details
Assertions.assertEquals(streamName, first.getStreamId());
Assertions.assertEquals(eventType, first.getEventType());
Assertions.assertEquals(eventId, first.getEventId().toString());
Assertions.assertEquals(eventId1, first.getEventId().toString());
Assertions.assertArrayEquals(eventMetaData, first.getUserMetadata());
Assertions.assertEquals(new Foo(), mapper.readValue(first.getEventData(), Foo.class));

// Verify second event details
Assertions.assertEquals(streamName, second.getStreamId());
Assertions.assertEquals(eventType, second.getEventType());
Assertions.assertEquals(eventId2, second.getEventId().toString());
Assertions.assertEquals(new Foo(), mapper.readValue(second.getEventData(), Foo.class));
}
}

Loading