diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java b/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java index 85316ff8..34a61c0f 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java @@ -24,40 +24,40 @@ package org.jenkinsci.plugins.workflow.log; -import edu.umd.cs.findbugs.annotations.NonNull; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import hudson.CloseProofOutputStream; import hudson.model.BuildListener; +import hudson.remoting.Channel; +import hudson.remoting.ChannelClosedException; import hudson.remoting.RemoteOutputStream; import hudson.util.StreamTaskListener; import java.io.Closeable; +import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; +import java.util.logging.Logger; import org.jenkinsci.remoting.SerializableOnlyOverRemoting; /** * Unlike {@link StreamTaskListener} this does not set {@code autoflush} on the reconstructed {@link PrintStream}. * It also wraps on the remote side in {@link DelayBufferedOutputStream}. */ -final class BufferedBuildListener implements BuildListener, Closeable, SerializableOnlyOverRemoting { +final class BufferedBuildListener extends OutputStreamTaskListener.Default implements BuildListener, Closeable, SerializableOnlyOverRemoting { + + private static final Logger LOGGER = Logger.getLogger(BufferedBuildListener.class.getName()); private final OutputStream out; - @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "using Replacement anyway, fields here are irrelevant") - private final PrintStream ps; - BufferedBuildListener(OutputStream out) throws IOException { + BufferedBuildListener(OutputStream out) { this.out = out; - ps = new PrintStream(out, false, "UTF-8"); } - @NonNull - @Override public PrintStream getLogger() { - return ps; + @Override public OutputStream getOutputStream() { + return out; } - + @Override public void close() throws IOException { - ps.close(); + getLogger().close(); } private Object writeReplace() { @@ -75,8 +75,63 @@ private static final class Replacement implements SerializableOnlyOverRemoting { this.ros = new RemoteOutputStream(new CloseProofOutputStream(cbl.out)); } - private Object readResolve() throws IOException { - return new BufferedBuildListener(new GCFlushedOutputStream(new DelayBufferedOutputStream(ros, tuning))); + private Object readResolve() { + var cos = new CloseableOutputStream(new GCFlushedOutputStream(new DelayBufferedOutputStream(ros, tuning))); + Channel.currentOrFail().addListener(new Channel.Listener() { + @Override public void onClosed(Channel channel, IOException cause) { + LOGGER.fine(() -> "closing " + channel.getName()); + cos.close(channel, cause); + } + }); + return new BufferedBuildListener(cos); + } + + } + + /** + * Output stream which throws {@link ChannelClosedException} when appropriate. + * Otherwise callers could continue trying to write to {@link DelayBufferedOutputStream} + * long after {@link Channel#isClosingOrClosed} without errors. + * In the case of {@code org.jenkinsci.plugins.durabletask.Handler.output}, + * this is actively harmful since it would mean that writes apparently succeed + * and {@code last-location.txt} would move forward even though output was lost. + */ + private static final class CloseableOutputStream extends FilterOutputStream { + + /** non-null if closed */ + private Channel channel; + /** optional close cause */ + private IOException cause; + + CloseableOutputStream(OutputStream delegate) { + super(delegate); + } + + void close(Channel channel, IOException cause) { + this.channel = channel; + this.cause = cause; + // Do not call close(): ProxyOutputStream.doClose would just throw ChannelClosedException: …: channel is already closed + } + + private void checkClosed() throws IOException { + if (channel != null) { + throw new ChannelClosedException(channel, cause); + } + LOGGER.finer("not closed yet"); + } + + @Override public void write(int b) throws IOException { + checkClosed(); + out.write(b); + } + + @Override public void write(byte[] b, int off, int len) throws IOException { + checkClosed(); + out.write(b, off, len); + } + + @Override public String toString() { + return "CloseableOutputStream[" + out + "]"; } } diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/GCFlushedOutputStream.java b/src/main/java/org/jenkinsci/plugins/workflow/log/GCFlushedOutputStream.java index 7e47b7ab..0123213c 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/log/GCFlushedOutputStream.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/GCFlushedOutputStream.java @@ -35,6 +35,10 @@ import java.util.logging.Logger; import edu.umd.cs.findbugs.annotations.NonNull; +import hudson.remoting.ChannelClosedException; +import java.io.EOFException; +import java.nio.channels.ClosedChannelException; +import java.util.stream.Stream; import jenkins.util.Timer; /** @@ -58,6 +62,21 @@ final class GCFlushedOutputStream extends FilterOutputStream { return "GCFlushedOutputStream[" + out + "]"; } + // TODO https://github.com/jenkinsci/remoting/pull/657 + private static boolean isClosedChannelException(Throwable t) { + if (t instanceof ClosedChannelException) { + return true; + } else if (t instanceof ChannelClosedException) { + return true; + } else if (t instanceof EOFException) { + return true; + } else if (t == null) { + return false; + } else { + return isClosedChannelException(t.getCause()) || Stream.of(t.getSuppressed()).anyMatch(GCFlushedOutputStream::isClosedChannelException); + } + } + /** * Flushes streams prior to garbage collection. * ({@link BufferedOutputStream} does not do this automatically.) @@ -78,7 +97,7 @@ private static final class FlushRef extends PhantomReference "Unexpected PrintStream subclass " + ps.getClass().getName() + " which might override write(…); error handling is degraded unless OutputStreamTaskListener is used: " + listener.getClass().getName()); + return ps; + } + if (Runtime.version().compareToIgnoreOptional(Runtime.Version.parse("17")) >= 0) { + Logger.getLogger(OutputStreamTaskListener.class.getName()).warning(() -> "On Java 17+ error handling is degraded unless OutputStreamTaskListener is used: " + listener.getClass().getName()); + return ps; + } + Field printStreamDelegate; + try { + printStreamDelegate = FilterOutputStream.class.getDeclaredField("out"); + } catch (NoSuchFieldException x) { + Logger.getLogger(OutputStreamTaskListener.class.getName()).log(Level.WARNING, "PrintStream.out defined in Java Platform and protected, so should not happen.", x); + return ps; + } + try { + printStreamDelegate.setAccessible(true); + } catch (InaccessibleObjectException x) { + Logger.getLogger(OutputStreamTaskListener.class.getName()).warning(() -> "Using --illegal-access=deny? Error handling is degraded unless OutputStreamTaskListener is used: " + listener.getClass().getName()); + return ps; + } + OutputStream os; + try { + os = (OutputStream) printStreamDelegate.get(ps); + } catch (IllegalAccessException x) { + Logger.getLogger(OutputStreamTaskListener.class.getName()).log(Level.WARNING, "Unexpected failure to access PrintStream.out", x); + return ps; + } + if (os == null) { + // like PrintStream.ensureOpen + return ClosedOutputStream.CLOSED_OUTPUT_STREAM; + } + return os; + } + + /** + * Convenience implementation handling {@link #getLogger}. + */ + abstract class Default implements OutputStreamTaskListener { + + private transient PrintStream ps; + + @Override public synchronized PrintStream getLogger() { + if (ps == null) { + ps = new PrintStream(getOutputStream(), false, StandardCharsets.UTF_8); + } + return ps; + } + + } + +} diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/TaskListenerDecorator.java b/src/main/java/org/jenkinsci/plugins/workflow/log/TaskListenerDecorator.java index bc9c664d..2d2d2c16 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/log/TaskListenerDecorator.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/TaskListenerDecorator.java @@ -37,7 +37,6 @@ import java.io.OutputStream; import java.io.PrintStream; import java.io.Serializable; -import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -237,7 +236,7 @@ private static OutputStream decorateAll(OutputStream base, List decorators; - private transient PrintStream logger; + private transient OutputStream out; DecoratedTaskListener(@NonNull TaskListener delegate, @NonNull List decorators) { this.delegate = delegate; @@ -263,15 +262,11 @@ private static final class DecoratedTaskListener implements BuildListener { } @NonNull - @Override public PrintStream getLogger() { - if (logger == null) { - try { - logger = new PrintStream(decorateAll(delegate.getLogger(), decorators), false, "UTF-8"); - } catch (UnsupportedEncodingException x) { - throw new AssertionError(x); - } + @Override public OutputStream getOutputStream() { + if (out == null) { + out = decorateAll(OutputStreamTaskListener.getOutputStream(delegate), decorators); } - return logger; + return out; } @Override public String toString() { @@ -280,7 +275,7 @@ private static final class DecoratedTaskListener implements BuildListener { } - private static final class CloseableTaskListener implements BuildListener, AutoCloseable { + private static final class CloseableTaskListener implements BuildListener, AutoCloseable, OutputStreamTaskListener { static BuildListener of(BuildListener mainDelegate, TaskListener closeDelegate) { if (closeDelegate instanceof AutoCloseable) { @@ -301,6 +296,12 @@ private CloseableTaskListener(@NonNull TaskListener mainDelegate, @NonNull TaskL assert closeDelegate instanceof AutoCloseable; } + @NonNull + @Override + public OutputStream getOutputStream() { + return OutputStreamTaskListener.getOutputStream(mainDelegate); + } + @NonNull @Override public PrintStream getLogger() { return mainDelegate.getLogger();