Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix log injection smoke test on IBM8 #8126

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import datadog.remoteconfig.DefaultConfigurationPoller;
import datadog.trace.api.Config;
import datadog.trace.util.AgentTaskScheduler;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import okhttp3.HttpUrl;
Expand All @@ -21,12 +23,23 @@
public class SharedCommunicationObjects {
private static final Logger log = LoggerFactory.getLogger(SharedCommunicationObjects.class);

private final List<Runnable> pausedComponents = new ArrayList<>();
private volatile boolean paused;

public OkHttpClient okHttpClient;
public HttpUrl agentUrl;
public Monitoring monitoring;
private DDAgentFeaturesDiscovery featuresDiscovery;
private ConfigurationPoller configurationPoller;

public SharedCommunicationObjects() {
this(false);
}

public SharedCommunicationObjects(boolean paused) {
this.paused = paused;
}

public void createRemaining(Config config) {
if (monitoring == null) {
monitoring = Monitoring.DISABLED;
Expand All @@ -46,6 +59,30 @@ public void createRemaining(Config config) {
}
}

public void whenReady(Runnable callback) {
synchronized (pausedComponents) {
if (paused) {
pausedComponents.add(callback);
} else {
callback.run();
}
}
}

public void resume() {
paused = false;
synchronized (pausedComponents) {
for (Runnable callback : pausedComponents) {
try {
callback.run();
} catch (Throwable e) {
log.warn("Problem resuming remote component {}", callback, e);
}
}
pausedComponents.clear();
}
}

private static HttpUrl parseAgentUrl(Config config) {
String agentUrl = config.getAgentUrl();
if (agentUrl.startsWith("unix:")) {
Expand Down Expand Up @@ -100,11 +137,14 @@ public DDAgentFeaturesDiscovery featuresDiscovery(Config config) {
agentUrl,
config.isTraceAgentV05Enabled(),
config.isTracerMetricsEnabled());
if (AGENT_THREAD_GROUP.equals(Thread.currentThread().getThreadGroup())) {
featuresDiscovery.discover(); // safe to run on same thread
} else {
// avoid performing blocking I/O operation on application thread
AgentTaskScheduler.INSTANCE.execute(featuresDiscovery::discover);

if (!paused) {
if (AGENT_THREAD_GROUP.equals(Thread.currentThread().getThreadGroup())) {
featuresDiscovery.discover(); // safe to run on same thread
} else {
// avoid performing blocking I/O operation on application thread
AgentTaskScheduler.INSTANCE.execute(featuresDiscovery::discover);
}
}
}
return featuresDiscovery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import static datadog.trace.util.AgentThreadFactory.AgentThread.PROFILER_STARTUP;
import static datadog.trace.util.AgentThreadFactory.AgentThread.TRACE_STARTUP;
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
import static datadog.trace.util.Strings.getResourceName;
import static datadog.trace.util.Strings.propertyNameToSystemPropertyName;
import static datadog.trace.util.Strings.toEnvVar;

Expand Down Expand Up @@ -348,7 +347,7 @@ public void run() {
* logging facility. Likewise on IBM JDKs OkHttp may indirectly load 'IBMSASL' which in turn loads LogManager.
*/
InstallDatadogTracerCallback installDatadogTracerCallback =
new InstallDatadogTracerCallback(initTelemetry, inst);
new InstallDatadogTracerCallback(initTelemetry, inst, delayOkHttp);
if (delayOkHttp) {
log.debug("Custom logger detected. Delaying Datadog Tracer initialization.");
registerLogManagerCallback(installDatadogTracerCallback);
Expand Down Expand Up @@ -497,28 +496,21 @@ public void execute() {
}

protected static class InstallDatadogTracerCallback extends ClassLoadCallBack {
private final InitializationTelemetry initTelemetry;
private final Instrumentation instrumentation;
private final Object sco;
private final Class<?> scoClass;
private final boolean delayOkHttp;

public InstallDatadogTracerCallback(
InitializationTelemetry initTelemetry, Instrumentation instrumentation) {
this.initTelemetry = initTelemetry;
InitializationTelemetry initTelemetry,
Instrumentation instrumentation,
boolean delayOkHttp) {
this.delayOkHttp = delayOkHttp;
this.instrumentation = instrumentation;
}

@Override
public AgentThread agentThread() {
return TRACE_STARTUP;
}

@Override
public void execute() {
Object sco;
Class<?> scoClass;
try {
scoClass =
AGENT_CLASSLOADER.loadClass("datadog.communication.ddagent.SharedCommunicationObjects");
sco = scoClass.getConstructor().newInstance();
sco = scoClass.getConstructor(boolean.class).newInstance(delayOkHttp);
} catch (ClassNotFoundException
| NoSuchMethodException
| InstantiationException
Expand All @@ -528,10 +520,23 @@ public void execute() {
}

installDatadogTracer(initTelemetry, scoClass, sco);
maybeInstallLogsIntake(scoClass, sco);
}

@Override
public AgentThread agentThread() {
return TRACE_STARTUP;
}

@Override
public void execute() {
if (delayOkHttp) {
resumeRemoteComponents();
}

maybeStartAppSec(scoClass, sco);
maybeStartIast(instrumentation, scoClass, sco);
maybeStartCiVisibility(instrumentation, scoClass, sco);
maybeStartLogsIntake(scoClass, sco);
// start debugger before remote config to subscribe to it before starting to poll
maybeStartDebugger(instrumentation, scoClass, sco);
maybeStartRemoteConfig(scoClass, sco);
Expand All @@ -540,6 +545,16 @@ public void execute() {
startTelemetry(instrumentation, scoClass, sco);
}
}

private void resumeRemoteComponents() {
try {
Thread.sleep(1_000);
scoClass.getMethod("resume").invoke(sco);
} catch (InterruptedException ignore) {
} catch (Exception e) {
log.error("Error resuming remote components", e);
}
}
}

protected static class StartProfilingAgentCallback extends ClassLoadCallBack {
Expand Down Expand Up @@ -866,17 +881,18 @@ private static void maybeStartCiVisibility(Instrumentation inst, Class<?> scoCla
}
}

private static void maybeStartLogsIntake(Class<?> scoClass, Object sco) {
private static void maybeInstallLogsIntake(Class<?> scoClass, Object sco) {
if (agentlessLogSubmissionEnabled) {
StaticEventLogger.begin("Logs Intake");

try {
final Class<?> logsIntakeSystemClass =
AGENT_CLASSLOADER.loadClass("datadog.trace.logging.intake.LogsIntakeSystem");
final Method logsIntakeInstallerMethod = logsIntakeSystemClass.getMethod("start", scoClass);
final Method logsIntakeInstallerMethod =
logsIntakeSystemClass.getMethod("install", scoClass);
logsIntakeInstallerMethod.invoke(null, sco);
} catch (final Throwable e) {
log.warn("Not starting Logs Intake subsystem", e);
log.warn("Not installing Logs Intake subsystem", e);
}

StaticEventLogger.end("Logs Intake");
Expand Down Expand Up @@ -1267,14 +1283,8 @@ private static boolean isAppUsingCustomLogManager(final EnumSet<Library> librari

final String logManagerProp = System.getProperty("java.util.logging.manager");
if (logManagerProp != null) {
final boolean onSysClasspath =
ClassLoader.getSystemResource(getResourceName(logManagerProp)) != null;
log.debug("Prop - logging.manager: {}", logManagerProp);
log.debug("logging.manager on system classpath: {}", onSysClasspath);
// Some applications set java.util.logging.manager but never actually initialize the logger.
// Check to see if the configured manager is on the system classpath.
// If so, it should be safe to initialize jmxfetch which will setup the log manager.
return !onSysClasspath;
return true;
}

return false;
Expand Down Expand Up @@ -1305,14 +1315,8 @@ private static boolean isAppUsingCustomJMXBuilder(final EnumSet<Library> librari

final String jmxBuilderProp = System.getProperty("javax.management.builder.initial");
if (jmxBuilderProp != null) {
final boolean onSysClasspath =
ClassLoader.getSystemResource(getResourceName(jmxBuilderProp)) != null;
log.debug("Prop - javax.management.builder.initial: {}", jmxBuilderProp);
log.debug("javax.management.builder.initial on system classpath: {}", onSysClasspath);
// Some applications set javax.management.builder.initial but never actually initialize JMX.
// Check to see if the configured JMX builder is on the system classpath.
// If so, it should be safe to initialize jmxfetch which will setup JMX.
return !onSysClasspath;
return true;
}

return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package datadog.trace.logging.intake;

import datadog.communication.BackendApi;
import datadog.communication.BackendApiFactory;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
Expand All @@ -12,18 +11,16 @@ public class LogsIntakeSystem {

private static final Logger LOGGER = LoggerFactory.getLogger(LogsIntakeSystem.class);

public static void start(SharedCommunicationObjects sco) {
public static void install(SharedCommunicationObjects sco) {
Config config = Config.get();
if (!config.isAgentlessLogSubmissionEnabled()) {
LOGGER.debug("Agentless logs intake is disabled");
return;
}

BackendApiFactory apiFactory = new BackendApiFactory(config, sco);
BackendApi backendApi = apiFactory.createBackendApi(BackendApiFactory.Intake.LOGS);
LogsDispatcher dispatcher = new LogsDispatcher(backendApi);
LogsWriterImpl writer = new LogsWriterImpl(config, dispatcher);
writer.start();
LogsWriterImpl writer = new LogsWriterImpl(config, apiFactory);
sco.whenReady(writer::start);

LogsIntake.registerWriter(writer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static datadog.trace.util.AgentThreadFactory.AGENT_THREAD_GROUP;

import datadog.communication.BackendApiFactory;
import datadog.trace.api.Config;
import datadog.trace.api.logging.intake.LogsWriter;
import datadog.trace.util.AgentThreadFactory;
Expand All @@ -23,12 +24,12 @@ public class LogsWriterImpl implements LogsWriter {
private static final int ENQUEUE_LOG_TIMEOUT_MILLIS = 1_000;

private final Map<String, Object> commonTags;
private final LogsDispatcher logsDispatcher;
private final BackendApiFactory apiFactory;
private final BlockingQueue<Map<String, Object>> messageQueue;
private final Thread messagePollingThread;

public LogsWriterImpl(Config config, LogsDispatcher logsDispatcher) {
this.logsDispatcher = logsDispatcher;
public LogsWriterImpl(Config config, BackendApiFactory apiFactory) {
this.apiFactory = apiFactory;

commonTags = new HashMap<>();
commonTags.put("ddsource", "java");
Expand Down Expand Up @@ -84,6 +85,9 @@ public void log(Map<String, Object> message) {
}

private void logPollingLoop() {
LogsDispatcher logsDispatcher =
new LogsDispatcher(apiFactory.createBackendApi(BackendApiFactory.Intake.LOGS));

while (!Thread.currentThread().isInterrupted()) {
try {
List<Map<String, Object>> batch = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class CustomLogManagerTest extends Specification {
, true) == 0
}

def "agent services starts up in premain if configured log manager on system classpath"() {
def "agent services startup is delayed even if configured log manager on system classpath"() {
expect:
IntegrationTestUtils.runOnSeparateJvm(LogManagerSetter.getName()
, [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class CustomMBeanServerBuilderTest extends Specification {
, true) == 0
}

def "JMXFetch starts up in premain if configured MBeanServerBuilder on system classpath"() {
def "JMXFetch startup is delayed even if configured MBeanServerBuilder on system classpath"() {
expect:
IntegrationTestUtils.runOnSeparateJvm(MBeanServerBuilderSetter.getName()
, [
Expand Down
Loading
Loading