Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
mcculls committed Dec 28, 2024
1 parent 4709f62 commit fe39745
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import datadog.remoteconfig.DefaultConfigurationPoller;
import datadog.trace.api.Config;
import datadog.trace.util.AgentTaskScheduler;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import okhttp3.HttpUrl;
Expand All @@ -21,12 +23,23 @@
public class SharedCommunicationObjects {
private static final Logger log = LoggerFactory.getLogger(SharedCommunicationObjects.class);

private final List<Runnable> pausedComponents = new ArrayList<>();
private volatile boolean paused;

public OkHttpClient okHttpClient;
public HttpUrl agentUrl;
public Monitoring monitoring;
private DDAgentFeaturesDiscovery featuresDiscovery;
private ConfigurationPoller configurationPoller;

public SharedCommunicationObjects() {
this(false);
}

public SharedCommunicationObjects(boolean paused) {
this.paused = paused;
}

public void createRemaining(Config config) {
if (monitoring == null) {
monitoring = Monitoring.DISABLED;
Expand All @@ -46,6 +59,30 @@ public void createRemaining(Config config) {
}
}

public void whenReady(Runnable callback) {
synchronized (pausedComponents) {
if (paused) {
pausedComponents.add(callback);
} else {
callback.run();
}
}
}

public void resume() {
paused = false;
synchronized (pausedComponents) {
for (Runnable callback : pausedComponents) {
try {
callback.run();
} catch (Throwable e) {
log.warn("Problem resuming remote component {}", callback, e);
}
}
pausedComponents.clear();
}
}

private static HttpUrl parseAgentUrl(Config config) {
String agentUrl = config.getAgentUrl();
if (agentUrl.startsWith("unix:")) {
Expand Down Expand Up @@ -100,11 +137,14 @@ public DDAgentFeaturesDiscovery featuresDiscovery(Config config) {
agentUrl,
config.isTraceAgentV05Enabled(),
config.isTracerMetricsEnabled());
if (AGENT_THREAD_GROUP.equals(Thread.currentThread().getThreadGroup())) {
featuresDiscovery.discover(); // safe to run on same thread
} else {
// avoid performing blocking I/O operation on application thread
AgentTaskScheduler.INSTANCE.execute(featuresDiscovery::discover);

if (!paused) {
if (AGENT_THREAD_GROUP.equals(Thread.currentThread().getThreadGroup())) {
featuresDiscovery.discover(); // safe to run on same thread
} else {
// avoid performing blocking I/O operation on application thread
AgentTaskScheduler.INSTANCE.execute(featuresDiscovery::discover);
}
}
}
return featuresDiscovery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public void run() {
* logging facility. Likewise on IBM JDKs OkHttp may indirectly load 'IBMSASL' which in turn loads LogManager.
*/
InstallDatadogTracerCallback installDatadogTracerCallback =
new InstallDatadogTracerCallback(initTelemetry, inst);
new InstallDatadogTracerCallback(initTelemetry, inst, delayOkHttp);
if (delayOkHttp) {
log.debug("Custom logger detected. Delaying Datadog Tracer initialization.");
registerLogManagerCallback(installDatadogTracerCallback);
Expand Down Expand Up @@ -496,26 +496,31 @@ public void execute() {
}

protected static class InstallDatadogTracerCallback extends ClassLoadCallBack {
private final InitializationTelemetry initTelemetry;
private final Instrumentation instrumentation;
private final Object sco;
private final Class<?> scoClass;
private final boolean delayOkHttp;

public InstallDatadogTracerCallback(
InitializationTelemetry initTelemetry, Instrumentation instrumentation) {
this.initTelemetry = initTelemetry;
InitializationTelemetry initTelemetry,
Instrumentation instrumentation,
boolean delayOkHttp) {
this.delayOkHttp = delayOkHttp;
this.instrumentation = instrumentation;
try {
scoClass =
AGENT_CLASSLOADER.loadClass("datadog.communication.ddagent.SharedCommunicationObjects");
sco = scoClass.getConstructor().newInstance();
sco = scoClass.getConstructor(boolean.class).newInstance(delayOkHttp);
} catch (ClassNotFoundException
| NoSuchMethodException
| InstantiationException
| IllegalAccessException
| InvocationTargetException e) {
throw new UndeclaredThrowableException(e);
}

installDatadogTracer(initTelemetry, scoClass, sco);
maybeInstallLogsIntake(scoClass, sco);
}

@Override
Expand All @@ -525,11 +530,13 @@ public AgentThread agentThread() {

@Override
public void execute() {
installDatadogTracer(initTelemetry, scoClass, sco);
if (delayOkHttp) {
resumeRemoteComponents();
}

maybeStartAppSec(scoClass, sco);
maybeStartIast(instrumentation, scoClass, sco);
maybeStartCiVisibility(instrumentation, scoClass, sco);
maybeStartLogsIntake(scoClass, sco);
// start debugger before remote config to subscribe to it before starting to poll
maybeStartDebugger(instrumentation, scoClass, sco);
maybeStartRemoteConfig(scoClass, sco);
Expand All @@ -538,6 +545,16 @@ public void execute() {
startTelemetry(instrumentation, scoClass, sco);
}
}

private void resumeRemoteComponents() {
try {
Thread.sleep(1_000);
scoClass.getMethod("resume").invoke(sco);
} catch (InterruptedException ignore) {
} catch (Exception e) {
log.error("Error resuming remote components", e);
}
}
}

protected static class StartProfilingAgentCallback extends ClassLoadCallBack {
Expand Down Expand Up @@ -864,17 +881,18 @@ private static void maybeStartCiVisibility(Instrumentation inst, Class<?> scoCla
}
}

private static void maybeStartLogsIntake(Class<?> scoClass, Object sco) {
private static void maybeInstallLogsIntake(Class<?> scoClass, Object sco) {
if (agentlessLogSubmissionEnabled) {
StaticEventLogger.begin("Logs Intake");

try {
final Class<?> logsIntakeSystemClass =
AGENT_CLASSLOADER.loadClass("datadog.trace.logging.intake.LogsIntakeSystem");
final Method logsIntakeInstallerMethod = logsIntakeSystemClass.getMethod("start", scoClass);
final Method logsIntakeInstallerMethod =
logsIntakeSystemClass.getMethod("install", scoClass);
logsIntakeInstallerMethod.invoke(null, sco);
} catch (final Throwable e) {
log.warn("Not starting Logs Intake subsystem", e);
log.warn("Not installing Logs Intake subsystem", e);
}

StaticEventLogger.end("Logs Intake");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package datadog.trace.logging.intake;

import datadog.communication.BackendApi;
import datadog.communication.BackendApiFactory;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
Expand All @@ -12,18 +11,16 @@ public class LogsIntakeSystem {

private static final Logger LOGGER = LoggerFactory.getLogger(LogsIntakeSystem.class);

public static void start(SharedCommunicationObjects sco) {
public static void install(SharedCommunicationObjects sco) {
Config config = Config.get();
if (!config.isAgentlessLogSubmissionEnabled()) {
LOGGER.debug("Agentless logs intake is disabled");
return;
}

BackendApiFactory apiFactory = new BackendApiFactory(config, sco);
BackendApi backendApi = apiFactory.createBackendApi(BackendApiFactory.Intake.LOGS);
LogsDispatcher dispatcher = new LogsDispatcher(backendApi);
LogsWriterImpl writer = new LogsWriterImpl(config, dispatcher);
writer.start();
LogsWriterImpl writer = new LogsWriterImpl(config, apiFactory);
sco.whenReady(writer::start);

LogsIntake.registerWriter(writer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static datadog.trace.util.AgentThreadFactory.AGENT_THREAD_GROUP;

import datadog.communication.BackendApiFactory;
import datadog.trace.api.Config;
import datadog.trace.api.logging.intake.LogsWriter;
import datadog.trace.util.AgentThreadFactory;
Expand All @@ -23,12 +24,12 @@ public class LogsWriterImpl implements LogsWriter {
private static final int ENQUEUE_LOG_TIMEOUT_MILLIS = 1_000;

private final Map<String, Object> commonTags;
private final LogsDispatcher logsDispatcher;
private final BackendApiFactory apiFactory;
private final BlockingQueue<Map<String, Object>> messageQueue;
private final Thread messagePollingThread;

public LogsWriterImpl(Config config, LogsDispatcher logsDispatcher) {
this.logsDispatcher = logsDispatcher;
public LogsWriterImpl(Config config, BackendApiFactory apiFactory) {
this.apiFactory = apiFactory;

commonTags = new HashMap<>();
commonTags.put("ddsource", "java");
Expand Down Expand Up @@ -84,6 +85,9 @@ public void log(Map<String, Object> message) {
}

private void logPollingLoop() {
LogsDispatcher logsDispatcher =
new LogsDispatcher(apiFactory.createBackendApi(BackendApiFactory.Intake.LOGS));

while (!Thread.currentThread().isInterrupted()) {
try {
List<Map<String, Object>> batch = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class CustomLogManagerTest extends Specification {
, true) == 0
}

def "agent services starts up in premain if configured log manager on system classpath"() {
def "agent services startup is delayed even if configured log manager on system classpath"() {
expect:
IntegrationTestUtils.runOnSeparateJvm(LogManagerSetter.getName()
, [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class CustomMBeanServerBuilderTest extends Specification {
, true) == 0
}

def "JMXFetch starts up in premain if configured MBeanServerBuilder on system classpath"() {
def "JMXFetch startup is delayed even if configured MBeanServerBuilder on system classpath"() {
expect:
IntegrationTestUtils.runOnSeparateJvm(MBeanServerBuilderSetter.getName()
, [
Expand Down
29 changes: 8 additions & 21 deletions dd-java-agent/src/test/java/jvmbootstraptest/LogManagerSetter.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ public static void main(final String... args) throws Exception {
} else if (System.getProperty("java.util.logging.manager") != null) {
System.out.println("java.util.logging.manager != null");

assertTraceInstallationDelayed(
"tracer install must be delayed when log manager system property is present.");
customAssert(
isTracerInstalled(false),
true,
"tracer install is not delayed when log manager system property is present.");
customAssert(
isJmxfetchStarted(false),
false,
Expand All @@ -57,8 +59,6 @@ public static void main(final String... args) throws Exception {
.getClassLoader()
.loadClass(System.getProperty("java.util.logging.manager")),
"Javaagent should not prevent setting a custom log manager");
customAssert(
isTracerInstalled(true), true, "tracer should be installed after loading LogManager.");
customAssert(
isJmxfetchStarted(true), true, "jmxfetch should start after loading LogManager.");
if (isJFRSupported()) {
Expand All @@ -67,8 +67,10 @@ public static void main(final String... args) throws Exception {
}
} else if (System.getenv("JBOSS_HOME") != null) {
System.out.println("JBOSS_HOME != null");
assertTraceInstallationDelayed(
"tracer install must be delayed when JBOSS_HOME property is present.");
customAssert(
isTracerInstalled(false),
true,
"tracer install is not delayed when JBOSS_HOME property is present.");
customAssert(
isJmxfetchStarted(false),
false,
Expand All @@ -85,10 +87,6 @@ public static void main(final String... args) throws Exception {
.getClassLoader()
.loadClass(System.getProperty("java.util.logging.manager")),
"Javaagent should not prevent setting a custom log manager");
customAssert(
isTracerInstalled(true),
true,
"tracer should be installed after loading with JBOSS_HOME set.");
customAssert(
isJmxfetchStarted(true),
true,
Expand Down Expand Up @@ -128,17 +126,6 @@ private static void customAssert(
}
}

private static void assertTraceInstallationDelayed(final String message) {
if (okHttpMayIndirectlyLoadJUL()) {
customAssert(isTracerInstalled(false), false, message);
} else {
customAssert(
isTracerInstalled(false),
true,
"We can safely install tracer on java9+ since it doesn't indirectly trigger logger manager init");
}
}

private static void assertProfilingStartupDelayed(final String message) {
if (okHttpMayIndirectlyLoadJUL()) {
customAssert(isProfilingStarted(false), false, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ private CoreTracer(
}
pendingTraceBuffer.start();

this.writer.start();
sharedCommunicationObjects.whenReady(this.writer::start);

metricsAggregator = createMetricsAggregator(config, sharedCommunicationObjects);
// Schedule the metrics aggregator to begin reporting after a random delay of 1 to 10 seconds
Expand All @@ -705,7 +705,8 @@ private CoreTracer(
} else {
this.dataStreamsMonitoring = dataStreamsMonitoring;
}
this.dataStreamsMonitoring.start();

sharedCommunicationObjects.whenReady(this.dataStreamsMonitoring::start);

// Create default extractor from config if not provided and decorate it with DSM extractor
HttpCodec.Extractor builtExtractor =
Expand Down

0 comments on commit fe39745

Please sign in to comment.