diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/BourneShellScript.java b/src/main/java/org/jenkinsci/plugins/durabletask/BourneShellScript.java index d390e41f..acd12206 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/BourneShellScript.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/BourneShellScript.java @@ -207,9 +207,8 @@ private FilePath pidFile(FilePath ws) throws IOException, InterruptedException { return controlDir(ws).child("pid"); } - // TODO run as one big MasterToSlaveCallable to avoid extra network roundtrips - @Override public Integer exitStatus(FilePath workspace, Launcher launcher, TaskListener listener) throws IOException, InterruptedException { - Integer status = super.exitStatus(workspace, launcher, listener); + @Override protected Integer exitStatus(FilePath workspace, TaskListener listener) throws IOException, InterruptedException { + Integer status = super.exitStatus(workspace, listener); if (status != null) { LOGGER.log(Level.FINE, "found exit code {0} in {1}", new Object[] {status, controlDir}); return status; diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/Controller.java b/src/main/java/org/jenkinsci/plugins/durabletask/Controller.java index 9fbc01ca..88c658cf 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/Controller.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/Controller.java @@ -28,6 +28,7 @@ import hudson.Launcher; import hudson.Util; import hudson.model.TaskListener; +import hudson.remoting.ChannelClosedException; import hudson.util.LogTaskListener; import java.io.IOException; import java.io.OutputStream; @@ -43,6 +44,21 @@ */ public abstract class Controller implements Serializable { + /** + * Begins watching the process asynchronously, so that the master may receive notification when output is available or the process has exited. + * This should be called as soon as the process is launched, and thereafter whenever reconnecting to the agent. + * You should not call {@link #writeLog} or {@link #cleanup} in this case; you do not need to call {@link #exitStatus(FilePath, Launcher)} frequently, + * though it is advisable to still call it occasionally to verify that the process is still running. + * @param workspace the workspace in use + * @param handler a remotable callback + * @param listener a remotable destination for messages + * @throws IOException if initiating the watch fails, for example with a {@link ChannelClosedException} + * @throws UnsupportedOperationException when this mode is not available, so you must fall back to polling {@link #writeLog} and {@link #exitStatus(FilePath, Launcher)} + */ + public void watch(@Nonnull FilePath workspace, @Nonnull Handler handler, @Nonnull TaskListener listener) throws IOException, InterruptedException, UnsupportedOperationException { + throw new UnsupportedOperationException("Asynchronous mode is not implemented in " + getClass().getName()); + } + /** * Obtains any new task log output. * Could use a serializable field to keep track of how much output has been previously written. @@ -57,7 +73,7 @@ public abstract class Controller implements Serializable { /** * Checks whether the task has finished. * @param workspace the workspace in use - * @param launcher a way to start processes + * @param launcher a way to start processes (currently unused) * @param logger a way to report special messages * @return an exit code (zero is successful), or null if the task appears to still be running */ @@ -88,7 +104,7 @@ public abstract class Controller implements Serializable { * Intended for use after {@link #exitStatus(FilePath, Launcher)} has returned a non-null status. * The result is undefined if {@link DurableTask#captureOutput} was not called before launch; generally an {@link IOException} will result. * @param workspace the workspace in use - * @param launcher a way to start processes + * @param launcher a way to start processes (currently unused) * @return the output of the process as raw bytes (may be empty but not null) * @see DurableTask#charset * @see DurableTask#defaultCharset diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/DurableTask.java b/src/main/java/org/jenkinsci/plugins/durabletask/DurableTask.java index 5247447d..73cbbfa8 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/DurableTask.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/DurableTask.java @@ -59,8 +59,9 @@ public abstract class DurableTask extends AbstractDescribableImpl i public abstract Controller launch(EnvVars env, FilePath workspace, Launcher launcher, TaskListener listener) throws IOException, InterruptedException; /** - * Requests that standard output of the task be captured rather than streamed to {@link Controller#writeLog}. - * If so, you may call {@link Controller#getOutput}. + * Requests that standard output of the task be captured rather than streamed. + * If you use {@link Controller#watch}, standard output will not be sent to {@link Handler#output}; it will be included in {@link Handler#exited} instead. + * Otherwise (using polling mode), standard output will not be sent to {@link Controller#writeLog}; call {@link Controller#getOutput} to collect. * Standard error should still be streamed to the log. * Should be called prior to {@link #launch} to take effect. * @throws UnsupportedOperationException if this implementation does not support that mode diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java index 303d1c97..32143a98 100644 --- a/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java +++ b/src/main/java/org/jenkinsci/plugins/durabletask/FileMonitoringTask.java @@ -31,25 +31,36 @@ import hudson.Util; import hudson.model.TaskListener; import hudson.remoting.Channel; +import hudson.remoting.DaemonThreadFactory; +import hudson.remoting.NamingThreadFactory; import hudson.remoting.RemoteOutputStream; import hudson.remoting.VirtualChannel; import hudson.slaves.WorkspaceList; import hudson.util.StreamTaskListener; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.RandomAccessFile; import java.io.StringWriter; import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CodingErrorAction; import java.nio.charset.StandardCharsets; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.Map; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -58,6 +69,8 @@ import jenkins.MasterToSlaveFileCallable; import jenkins.security.MasterToSlaveCallable; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.input.CountingInputStream; +import org.apache.commons.io.input.ReaderInputStream; import org.apache.commons.io.output.CountingOutputStream; import org.apache.commons.io.output.WriterOutputStream; @@ -125,6 +138,10 @@ protected static Map escape(EnvVars envVars) { return m; } + /** + * Tails a log file and watches for an exit status file. + * Must be remotable so that {@link #watch} can transfer the implementation. + */ protected static class FileMonitoringController extends Controller { /** Absolute path of {@link #controlDir(FilePath)}. */ @@ -137,6 +154,7 @@ protected static class FileMonitoringController extends Controller { /** * Byte offset in the file that has been reported thus far. + * Only used if {@link #writeLog(FilePath, OutputStream)} is used; not used for {@link #watch}. */ private long lastLocation; @@ -251,11 +269,25 @@ public Integer invoke(File f, VirtualChannel channel) throws IOException, Interr static final StatusCheck STATUS_CHECK_INSTANCE = new StatusCheck(); @Override public Integer exitStatus(FilePath workspace, Launcher launcher, TaskListener listener) throws IOException, InterruptedException { + return exitStatus(workspace, listener); + } + + /** + * Like {@link #exitStatus(FilePath, Launcher, TaskListener)} but not requesting a {@link Launcher}, which would not be available in {@link #watch} mode anyway. + */ + protected @CheckForNull Integer exitStatus(FilePath workspace, TaskListener listener) throws IOException, InterruptedException { FilePath status = getResultFile(workspace); return status.act(STATUS_CHECK_INSTANCE); } @Override public byte[] getOutput(FilePath workspace, Launcher launcher) throws IOException, InterruptedException { + return getOutput(workspace); + } + + /** + * Like {@link #getOutput(FilePath, Launcher)} but not requesting a {@link Launcher}, which would not be available in {@link #watch} mode anyway. + */ + protected byte[] getOutput(FilePath workspace) throws IOException, InterruptedException { return getOutputFile(workspace).act(new MasterToSlaveFileCallable() { @Override public byte[] invoke(File f, VirtualChannel channel) throws IOException, InterruptedException { byte[] buf = FileUtils.readFileToByteArray(f); @@ -367,7 +399,111 @@ public FilePath getOutputFile(FilePath workspace) throws IOException, Interrupte } } + @Override public void watch(FilePath workspace, Handler handler, TaskListener listener) throws IOException, InterruptedException, ClassCastException { + workspace.actAsync(new StartWatching(this, handler, listener)); + LOGGER.log(Level.FINE, "started asynchronous watch in {0}", controlDir); + } + + /** + * File in which a last-read position is stored if {@link #watch} is used. + */ + public FilePath getLastLocationFile(FilePath workspace) throws IOException, InterruptedException { + return controlDir(workspace).child("last-location.txt"); + } + + private static final long serialVersionUID = 1L; + } + + private static ScheduledExecutorService watchService; + private synchronized static ScheduledExecutorService watchService() { + if (watchService == null) { + // TODO 2.105+ use ClassLoaderSanityThreadFactory + watchService = new /*ErrorLogging*/ScheduledThreadPoolExecutor(5, new NamingThreadFactory(new DaemonThreadFactory(), "FileMonitoringTask watcher")); + } + return watchService; + } + + private static class StartWatching extends MasterToSlaveFileCallable { + private static final long serialVersionUID = 1L; + + private final FileMonitoringController controller; + private final Handler handler; + private final TaskListener listener; + + StartWatching(FileMonitoringController controller, Handler handler, TaskListener listener) { + this.controller = controller; + this.handler = handler; + this.listener = listener; + } + + @Override public Void invoke(File workspace, VirtualChannel channel) throws IOException, InterruptedException { + watchService().submit(new Watcher(controller, new FilePath(workspace), handler, listener)); + return null; + } + + } + + private static class Watcher implements Runnable { + + private final FileMonitoringController controller; + private final FilePath workspace; + private final Handler handler; + private final TaskListener listener; + private final @CheckForNull Charset cs; + + Watcher(FileMonitoringController controller, FilePath workspace, Handler handler, TaskListener listener) { + this.controller = controller; + this.workspace = workspace; + this.handler = handler; + this.listener = listener; + cs = FileMonitoringController.transcodingCharset(controller.charset); + } + + @Override public void run() { + try { + Integer exitStatus = controller.exitStatus(workspace, listener); // check before collecting output, in case the process is just now finishing + long lastLocation = 0; + FilePath lastLocationFile = controller.getLastLocationFile(workspace); + if (lastLocationFile.exists()) { + lastLocation = Long.parseLong(lastLocationFile.readToString()); + } + FilePath logFile = controller.getLogFile(workspace); + long len = logFile.length(); + if (len > lastLocation) { + assert !logFile.isRemote(); + try (FileChannel ch = FileChannel.open(Paths.get(logFile.getRemote()), StandardOpenOption.READ)) { + InputStream locallyEncodedStream = Channels.newInputStream(ch.position(lastLocation)); + CountingInputStream cis = new CountingInputStream(locallyEncodedStream); + InputStream utf8EncodedStream = cs == null ? cis : new ReaderInputStream(new InputStreamReader(cis, cs), StandardCharsets.UTF_8); + handler.output(utf8EncodedStream); + lastLocationFile.write(Long.toString(lastLocation + cis.getByteCount()), null); + } + } + if (exitStatus != null) { + byte[] output; + if (controller.getOutputFile(workspace).exists()) { + output = controller.getOutput(workspace); + } else { + output = null; + } + handler.exited(exitStatus, output); + controller.cleanup(workspace); + } else { + if (!controller.controlDir(workspace).isDirectory()) { + LOGGER.log(Level.WARNING, "giving up on watching nonexistent {0}", controller.controlDir); + return; + } + // Could use an adaptive timeout as in DurableTaskStep.Execution in polling mode, + // though less relevant here since there is no network overhead to the check. + watchService().schedule(this, 100, TimeUnit.MILLISECONDS); + } + } catch (Exception x) { + // note that LOGGER here is going to the agent log, not master log + LOGGER.log(Level.WARNING, "giving up on watching " + controller.controlDir, x); + } + } + } } diff --git a/src/main/java/org/jenkinsci/plugins/durabletask/Handler.java b/src/main/java/org/jenkinsci/plugins/durabletask/Handler.java new file mode 100644 index 00000000..0abf6f89 --- /dev/null +++ b/src/main/java/org/jenkinsci/plugins/durabletask/Handler.java @@ -0,0 +1,67 @@ +/* + * The MIT License + * + * Copyright 2016 CloudBees, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package org.jenkinsci.plugins.durabletask; + +import hudson.FilePath; +import hudson.Launcher; +import hudson.remoting.VirtualChannel; +import java.io.InputStream; +import java.io.Serializable; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * A remote handler which may be sent to an agent and handle process output and results. + * If it needs to communicate with the master, you may use {@link VirtualChannel#export}. + * @see Controller#watch + */ +public abstract class Handler implements Serializable { + + /** + * Notification that new process output is available. + *

Should only be called when at least one byte is available. + * Whatever bytes are actually read will not be offered on the next call, if there is one; there is no need to close the stream. + *

There is no guarantee that output is offered in the form of complete lines of text, + * though in the typical case of line-oriented output it is likely that it will end in a newline. + *

Buffering is the responsibility of the caller, and {@link InputStream#markSupported} may be false. + * @param stream a way to read process output which has not already been handled + * @throws Exception if anything goes wrong, this watch is deactivated + */ + public abstract void output(@Nonnull InputStream stream) throws Exception; + + /** + * Notification that the process has exited or vanished. + * {@link #output} should have been called with any final uncollected output. + *

Any metadata associated with the process may be deleted after this call completes, rendering subsequent {@link Controller} calls unsatisfiable. + *

Note that unlike {@link Controller#exitStatus(FilePath, Launcher)}, no specialized {@link Launcher} is available on the agent, + * so if there are specialized techniques for determining process liveness they will not be considered here; + * you still need to occasionally poll for an exit status from the master. + * @param code the exit code, if known (0 conventionally represents success); may be negative for anomalous conditions such as a missing process + * @param output standard output captured, if {@link DurableTask#captureOutput} was called; else null + * @throws Exception if anything goes wrong, this watch is deactivated + */ + public abstract void exited(int code, @Nullable byte[] output) throws Exception; + +} diff --git a/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java b/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java index ca3ab524..717650a9 100644 --- a/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java +++ b/src/test/java/org/jenkinsci/plugins/durabletask/BourneShellScriptTest.java @@ -27,19 +27,26 @@ import hudson.EnvVars; import hudson.FilePath; import hudson.Launcher; +import hudson.model.Slave; import hudson.plugins.sshslaves.SSHLauncher; +import hudson.remoting.VirtualChannel; import hudson.slaves.DumbSlave; import hudson.slaves.OfflineCause; import hudson.util.StreamTaskListener; import hudson.util.VersionNumber; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Locale; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import jenkins.security.MasterToSlaveCallable; +import org.apache.commons.io.IOUtils; import org.apache.commons.io.output.TeeOutputStream; import static org.hamcrest.Matchers.*; import org.jenkinsci.test.acceptance.docker.Docker; @@ -172,6 +179,51 @@ public void smokeTest() throws Exception { c.cleanup(ws); } + @Issue("JENKINS-38381") + @Test public void watch() throws Exception { + Slave s = j.createOnlineSlave(); + ws = s.getWorkspaceRoot(); + launcher = s.createLauncher(listener); + DurableTask task = new BourneShellScript("set +x; for x in 1 2 3 4 5; do echo $x; sleep 1; done"); + Controller c = task.launch(new EnvVars(), ws, launcher, listener); + BlockingQueue status = new LinkedBlockingQueue<>(); + BlockingQueue output = new LinkedBlockingQueue<>(); + BlockingQueue lines = new LinkedBlockingQueue<>(); + c.watch(ws, new MockHandler(s.getChannel(), status, output, lines), listener); + assertEquals("+ set +x", lines.take()); + assertEquals(0, status.take().intValue()); + assertEquals("", output.take()); + assertEquals("[1, 2, 3, 4, 5]", lines.toString()); + task = new BourneShellScript("echo result"); + task.captureOutput(); + c = task.launch(new EnvVars(), ws, launcher, listener); + status = new LinkedBlockingQueue<>(); + output = new LinkedBlockingQueue<>(); + lines = new LinkedBlockingQueue<>(); + c.watch(ws, new MockHandler(s.getChannel(), status, output, lines), listener); + assertEquals(0, status.take().intValue()); + assertEquals("result\n", output.take()); + assertEquals("[+ echo result]", lines.toString()); + } + private static class MockHandler extends Handler { + private final BlockingQueue status; + private final BlockingQueue output; + private final BlockingQueue lines; + @SuppressWarnings("unchecked") + MockHandler(VirtualChannel channel, BlockingQueue status, BlockingQueue output, BlockingQueue lines) { + this.status = channel.export(BlockingQueue.class, status); + this.output = channel.export(BlockingQueue.class, output); + this.lines = channel.export(BlockingQueue.class, lines); + } + @Override public void output(InputStream stream) throws Exception { + lines.addAll(IOUtils.readLines(stream, StandardCharsets.UTF_8)); + } + @Override public void exited(int code, byte[] data) throws Exception { + status.add(code); + output.add(data != null ? new String(data, StandardCharsets.UTF_8) : ""); + } + } + @Issue("JENKINS-40734") @Test public void envWithShellChar() throws Exception { Controller c = new BourneShellScript("echo \"value=$MYNEWVAR\"").launch(new EnvVars("MYNEWVAR", "foo$$bar"), ws, launcher, listener); @@ -240,7 +292,7 @@ private void runOnDocker(DumbSlave s) throws Exception { runOnDocker(new DumbSlave("docker", "/home/jenkins/agent", new SimpleCommandLauncher("docker run -i --rm --name agent --init jenkinsci/slave:3.7-1 java -jar /usr/share/jenkins/slave.jar"))); } - @Issue("JENKINS-31096") + @Issue({"JENKINS-31096", "JENKINS-38381"}) @Test public void encoding() throws Exception { JavaContainer container = dockerUbuntu.get(); DumbSlave s = new DumbSlave("docker", "/home/test", new SSHLauncher(container.ipBound(22), container.port(22), "test", "test", "", "-Dfile.encoding=ISO-8859-1")); @@ -252,7 +304,7 @@ private void runOnDocker(DumbSlave s) throws Exception { dockerWS.child("eastern").write("Čau!\n", "ISO-8859-2"); dockerWS.child("mixed").write("¡Čau → there!\n", "UTF-8"); Launcher dockerLauncher = s.createLauncher(listener); - assertEncoding("control: no transcoding", "latin", null, "¡Ole!", "ISO-8859-1", dockerWS, dockerLauncher); + assertEncoding("control: no transcoding", "latin", null, "¡Ole!", "ISO-8859-1", false, dockerWS, dockerLauncher); assertEncoding("test: specify particular charset (UTF-8)", "mixed", "UTF-8", "¡Čau → there!", "UTF-8", dockerWS, dockerLauncher); assertEncoding("test: specify particular charset (unrelated)", "eastern", "ISO-8859-2", "Čau!", "UTF-8", dockerWS, dockerLauncher); assertEncoding("test: specify agent default charset", "latin", "", "¡Ole!", "UTF-8", dockerWS, dockerLauncher); @@ -268,9 +320,13 @@ private void assertEncoding(String description, String file, String charset, Str assertEncoding(description, file, charset, expected, expectedEncoding, false, dockerWS, dockerLauncher); assertEncoding(description, file, charset, expected, expectedEncoding, true, dockerWS, dockerLauncher); } - private void assertEncoding(String description, String file, String charset, String expected, String expectedEncoding, boolean output, FilePath dockerWS, Launcher dockerLauncher) throws Exception { - System.err.println(description + " (output=" + output + ")"); // TODO maybe this should just be moved into a new class and @RunWith(Parameterized.class) for clarity + private void assertEncoding(String description, String file, String charset, String expected, String expectedEncoding, boolean watch, FilePath dockerWS, Launcher dockerLauncher) throws Exception { + assertEncoding(description, file, charset, expected, expectedEncoding, false, watch, dockerWS, dockerLauncher); + assertEncoding(description, file, charset, expected, expectedEncoding, true, watch, dockerWS, dockerLauncher); + } + private void assertEncoding(String description, String file, String charset, String expected, String expectedEncoding, boolean output, boolean watch, FilePath dockerWS, Launcher dockerLauncher) throws Exception { BourneShellScript dt = new BourneShellScript("set +x; cat " + file + "; sleep 1; tr '[a-z]' '[A-Z]' < " + file); + System.err.println(description + " (output=" + output + ", watch=" + watch + ")"); // TODO maybe this should just be moved into a new class and @RunWith(Parameterized.class) for clarity if (charset != null) { if (charset.isEmpty()) { dt.defaultCharset(); @@ -282,22 +338,38 @@ private void assertEncoding(String description, String file, String charset, Str dt.captureOutput(); } Controller c = dt.launch(new EnvVars(), dockerWS, dockerLauncher, listener); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - OutputStream tee = new TeeOutputStream(baos, System.err); - while (c.exitStatus(dockerWS, dockerLauncher, listener) == null) { - c.writeLog(dockerWS, tee); - Thread.sleep(100); - } - c.writeLog(dockerWS, tee); - assertEquals(description, 0, c.exitStatus(dockerWS, dockerLauncher, listener).intValue()); - String fullExpected = expected + "\n" + expected.toUpperCase(Locale.ENGLISH) + "\n"; - if (output) { - assertEquals(description, fullExpected, new String(c.getOutput(dockerWS, launcher), expectedEncoding)); + String expectedUC = expected.toUpperCase(Locale.ENGLISH); + String fullExpected = expected + "\n" + expectedUC + "\n"; + if (watch) { + BlockingQueue status = new LinkedBlockingQueue<>(); + BlockingQueue stdout = new LinkedBlockingQueue<>(); + BlockingQueue lines = new LinkedBlockingQueue<>(); + c.watch(dockerWS, new MockHandler(dockerWS.getChannel(), status, stdout, lines), listener); + assertEquals(description, "+ set +x", lines.take()); + assertEquals(description, 0, status.take().intValue()); + if (output) { + assertEquals(description, fullExpected, stdout.take()); + assertEquals(description, "[]", lines.toString()); + } else { + assertEquals(description, "", stdout.take()); + assertEquals(description, "[" + expected + ", " + expectedUC + "]", lines.toString()); + } } else { - assertThat(description, baos.toString(expectedEncoding), containsString(fullExpected)); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputStream tee = new TeeOutputStream(baos, System.err); + while (c.exitStatus(dockerWS, dockerLauncher, listener) == null) { + c.writeLog(dockerWS, tee); + Thread.sleep(100); + } + c.writeLog(dockerWS, tee); + assertEquals(description, 0, c.exitStatus(dockerWS, dockerLauncher, listener).intValue()); + if (output) { + assertEquals(description, fullExpected, new String(c.getOutput(dockerWS, launcher), expectedEncoding)); + } else { + assertThat(description, baos.toString(expectedEncoding), containsString(fullExpected)); + } } c.cleanup(dockerWS); - } }