Skip to content
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b945cf2
[JENKINS-38381] Controller.watch API.
jglick Feb 9, 2018
27f957f
Reminder to pick up https://github.com/jenkinsci/jenkins/pull/3272.
jglick Feb 13, 2018
cc6029c
Merge branch 'UTF-8-JENKINS-31096' into watch-plus-UTF-8-JENKINS-31096
jglick Feb 13, 2018
5d7ea7f
Integrating transcoding into watches.
jglick Feb 13, 2018
6141407
Merge branch 'UTF-8-JENKINS-31096' into watch-plus-UTF-8-JENKINS-31096
jglick Feb 13, 2018
deb425d
Completing test coverage.
jglick Feb 13, 2018
625da89
Merge branch 'UTF-8-JENKINS-31096' into watch-plus-UTF-8-JENKINS-31096
jglick Feb 19, 2018
814047d
Merge with #66.
jglick Mar 6, 2018
d3e2afb
Merge branch 'watch-JENKINS-38381' into watch-plus-UTF-8-JENKINS-31096
jglick Mar 6, 2018
768bf09
Merge branch 'UTF-8-JENKINS-31096' into watch-plus-UTF-8-JENKINS-31096
jglick Mar 6, 2018
9f6e2ec
Merge branch 'incrementals' into watch-JENKINS-38381
jglick Jun 8, 2018
5ac713d
Suggestions from @oleg-nenashev.
jglick Jun 8, 2018
6974ae2
Decided that unresponsiveness to kill is better handled by having Dur…
jglick Jun 8, 2018
d699ee0
Merge branch 'UTF-8-JENKINS-31096' into watch-plus-UTF-8-JENKINS-31096
jglick Jun 9, 2018
e219d47
Merge branch 'watch-JENKINS-38381' into watch-plus-UTF-8-JENKINS-31096
jglick Jun 9, 2018
c910214
Merge branch 'writeLog-JENKINS-37575' into watch-JENKINS-38381
jglick Jun 25, 2018
129d6b7
Merge branch 'writeLog-JENKINS-37575' into watch-JENKINS-38381
jglick Jun 25, 2018
9b56686
Merge branch 'UTF-8-JENKINS-31096' into watch-plus-UTF-8-JENKINS-31096
jglick Jun 25, 2018
e0fced2
Merge branch 'watch-JENKINS-38381' into watch-plus-UTF-8-JENKINS-31096
jglick Jun 25, 2018
8c7ea34
Merge branch 'writeLog-JENKINS-37575' into watch-JENKINS-38381
jglick Jul 9, 2018
20c9649
Merge branch 'UTF-8-JENKINS-31096' into watch-plus-UTF-8-JENKINS-31096
jglick Jul 9, 2018
a2a3d11
Merge branch 'watch-JENKINS-38381' into watch-plus-UTF-8-JENKINS-31096
jglick Jul 9, 2018
2e9d565
Merge branch 'parent' into watch-JENKINS-38381
jglick Aug 6, 2018
6267445
Merge branch 'watch-JENKINS-38381' into watch-plus-UTF-8-JENKINS-31096
jglick Aug 6, 2018
bd9232f
Merge branch 'UTF-8-JENKINS-31096' into watch-plus-UTF-8-JENKINS-31096
jglick Aug 6, 2018
95f9062
Fixed the lastLocation vs. transcoding bug also for watch mode.
jglick Aug 6, 2018
a330852
Merge branch 'master' into watch-plus-UTF-8-JENKINS-31096
jglick Aug 7, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,8 @@ private FilePath pidFile(FilePath ws) throws IOException, InterruptedException {
return controlDir(ws).child("pid");
}

// TODO run as one big MasterToSlaveCallable<Integer> to avoid extra network roundtrips
@Override public Integer exitStatus(FilePath workspace, Launcher launcher, TaskListener listener) throws IOException, InterruptedException {
Integer status = super.exitStatus(workspace, launcher, listener);
@Override protected Integer exitStatus(FilePath workspace, TaskListener listener) throws IOException, InterruptedException {
Integer status = super.exitStatus(workspace, listener);
if (status != null) {
LOGGER.log(Level.FINE, "found exit code {0} in {1}", new Object[] {status, controlDir});
return status;
Expand Down
18 changes: 16 additions & 2 deletions src/main/java/org/jenkinsci/plugins/durabletask/Controller.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@
*/
public abstract class Controller implements Serializable {

/**
* Begins watching the process asynchronously, so that the master may receive notification when output is available or the process has exited.
* This should be called as soon as the process is launched, and thereafter whenever reconnecting to the agent.
* You should not call {@link #writeLog} or {@link #cleanup} in this case; you do not need to call {@link #exitStatus(FilePath, Launcher)} frequently,
* though it is advisable to still call it occasionally to verify that the process is still running.
* @param workspace the workspace in use
* @param handler a remotable callback
* @param listener a remotable destination for messages
* @throws UnsupportedOperationException when this mode is not available, so you must fall back to polling {@link #writeLog} and {@link #exitStatus(FilePath, Launcher)}
*/
public void watch(@Nonnull FilePath workspace, @Nonnull Handler handler, @Nonnull TaskListener listener) throws IOException, InterruptedException, UnsupportedOperationException {
throw new UnsupportedOperationException("Asynchronous mode is not implemented in " + getClass().getName());
}

/**
* Obtains any new task log output.
* Could use a serializable field to keep track of how much output has been previously written.
Expand All @@ -57,7 +71,7 @@ public abstract class Controller implements Serializable {
/**
* Checks whether the task has finished.
* @param workspace the workspace in use
* @param launcher a way to start processes
* @param launcher a way to start processes (currently unused)
* @param logger a way to report special messages
* @return an exit code (zero is successful), or null if the task appears to still be running
*/
Expand Down Expand Up @@ -88,7 +102,7 @@ public abstract class Controller implements Serializable {
* Intended for use after {@link #exitStatus(FilePath, Launcher)} has returned a non-null status.
* The result is undefined if {@link DurableTask#captureOutput} was not called before launch; generally an {@link IOException} will result.
* @param workspace the workspace in use
* @param launcher a way to start processes
* @param launcher a way to start processes (currently unused)
* @return the output of the process as raw bytes (may be empty but not null)
* @see DurableTask#charset
* @see DurableTask#defaultCharset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ public abstract class DurableTask extends AbstractDescribableImpl<DurableTask> i
public abstract Controller launch(EnvVars env, FilePath workspace, Launcher launcher, TaskListener listener) throws IOException, InterruptedException;

/**
* Requests that standard output of the task be captured rather than streamed to {@link Controller#writeLog}.
* If so, you may call {@link Controller#getOutput}.
* Requests that standard output of the task be captured rather than streamed.
* If you use {@link Controller#watch}, standard output will not be sent to {@link Handler#output}; it will be included in {@link Handler#exited} instead.
* Otherwise (using polling mode), standard output will not be sent to {@link Controller#writeLog}; call {@link Controller#getOutput} to collect.
* Standard error should still be streamed to the log.
* Should be called prior to {@link #launch} to take effect.
* @throws UnsupportedOperationException if this implementation does not support that mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,42 @@
import hudson.Util;
import hudson.model.TaskListener;
import hudson.remoting.Channel;
import hudson.remoting.DaemonThreadFactory;
import hudson.remoting.NamingThreadFactory;
import hudson.remoting.RemoteOutputStream;
import hudson.remoting.VirtualChannel;
import hudson.slaves.WorkspaceList;
import hudson.util.StreamTaskListener;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import jenkins.MasterToSlaveFileCallable;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.CountingInputStream;
import org.apache.commons.io.input.ReaderInputStream;

/**
* A task which forks some external command and then waits for log and status files to be updated/created.
Expand Down Expand Up @@ -121,6 +132,10 @@ protected static Map<String, String> escape(EnvVars envVars) {
return m;
}

/**
* Tails a log file and watches for an exit status file.
* Must be remotable so that {@link #watch} can transfer the implementation.
*/
protected static class FileMonitoringController extends Controller {

/** Absolute path of {@link #controlDir(FilePath)}. */
Expand All @@ -133,6 +148,7 @@ protected static class FileMonitoringController extends Controller {

/**
* Byte offset in the file that has been reported thus far.
* Only used if {@link #writeLog(FilePath, OutputStream)} is used; not used for {@link #watch}.
*/
private long lastLocation;

Expand Down Expand Up @@ -177,7 +193,6 @@ private static class WriteLog extends MasterToSlaveFileCallable<Long> {
if (toRead > Integer.MAX_VALUE) { // >2Gb of output at once is unlikely
throw new IOException("large reads not yet implemented");
}
// TODO is this efficient for large amounts of output? Would it be better to stream data, or return a byte[] from the callable?
byte[] buf = new byte[(int) toRead];
raf.readFully(buf);
ByteBuffer transcoded = maybeTranscode(buf, charset);
Expand Down Expand Up @@ -225,31 +240,25 @@ public Integer invoke(File f, VirtualChannel channel) throws IOException, Interr
static final StatusCheck STATUS_CHECK_INSTANCE = new StatusCheck();

@Override public Integer exitStatus(FilePath workspace, Launcher launcher, TaskListener listener) throws IOException, InterruptedException {
FilePath status = getResultFile(workspace);
return status.act(STATUS_CHECK_INSTANCE);
return exitStatus(workspace, listener);
}

/**
* Transcode process output to UTF-8 if necessary.
* @param data output presumed to be in local encoding
* @param charset a particular encoding name, or the empty string for the system default encoding, or null to skip transcoding
* @return a buffer of UTF-8 encoded data ({@link CodingErrorAction#REPLACE} is used),
* or null if not performing transcoding because it was not requested or the data was already thought to be in UTF-8
* Like {@link #exitStatus(FilePath, Launcher, TaskListener)} but not requesting a {@link Launcher}, which would not be available in {@link #watch} mode anyway.
*/
private static @CheckForNull ByteBuffer maybeTranscode(@Nonnull byte[] data, @CheckForNull String charset) {
if (charset == null) { // no transcoding requested, do raw copy and YMMV
return null;
} else {
Charset cs = charset.equals(SYSTEM_DEFAULT_CHARSET) ? Charset.defaultCharset() : Charset.forName(charset);
if (cs.equals(StandardCharsets.UTF_8)) { // transcoding unnecessary as output was already UTF-8
return null;
} else { // decode output in specified charset and reëncode in UTF-8
return StandardCharsets.UTF_8.encode(cs.decode(ByteBuffer.wrap(data)));
}
}
protected Integer exitStatus(FilePath workspace, TaskListener listener) throws IOException, InterruptedException {
FilePath status = getResultFile(workspace);
return status.act(STATUS_CHECK_INSTANCE);
}

@Override public byte[] getOutput(FilePath workspace, Launcher launcher) throws IOException, InterruptedException {
return getOutput(workspace);
}

/**
* Like {@link #getOutput(FilePath, Launcher)} but not requesting a {@link Launcher}, which would not be available in {@link #watch} mode anyway.
*/
protected byte[] getOutput(FilePath workspace) throws IOException, InterruptedException {
return getOutputFile(workspace).act(new MasterToSlaveFileCallable<byte[]>() {
@Override public byte[] invoke(File f, VirtualChannel channel) throws IOException, InterruptedException {
byte[] buf = FileUtils.readFileToByteArray(f);
Expand All @@ -265,8 +274,38 @@ public Integer invoke(File f, VirtualChannel channel) throws IOException, Interr
});
}

/**
* Transcode process output to UTF-8 if necessary.
* @param data output presumed to be in local encoding
* @param charset a particular encoding name, or the empty string for the system default encoding, or null to skip transcoding
* @return a buffer of UTF-8 encoded data ({@link CodingErrorAction#REPLACE} is used),
* or null if not performing transcoding because it was not requested or the data was already thought to be in UTF-8
*/
private static @CheckForNull ByteBuffer maybeTranscode(@Nonnull byte[] data, @CheckForNull String charset) {
Charset cs = transcodingCharset(charset);
if (cs == null) {
return null;
} else {
return StandardCharsets.UTF_8.encode(cs.decode(ByteBuffer.wrap(data)));
}
}

private static @CheckForNull Charset transcodingCharset(@CheckForNull String charset) {
if (charset == null) {
return null;
} else {
Charset cs = charset.equals(SYSTEM_DEFAULT_CHARSET) ? Charset.defaultCharset() : Charset.forName(charset);
if (cs.equals(StandardCharsets.UTF_8)) { // transcoding unnecessary as output was already UTF-8
return null;
} else { // decode output in specified charset and reëncode in UTF-8
return cs;
}
}
}

@Override public final void stop(FilePath workspace, Launcher launcher) throws IOException, InterruptedException {
launcher.kill(Collections.singletonMap(COOKIE, cookieFor(workspace)));
// TODO after 10s, if the control dir still exists, write a flag file and have the Watcher shut down (interrupting any ongoing handler.output call if possible)
}

@Override public void cleanup(FilePath workspace) throws IOException, InterruptedException {
Expand Down Expand Up @@ -332,7 +371,107 @@ public FilePath getOutputFile(FilePath workspace) throws IOException, Interrupte
}
}

@Override public void watch(FilePath workspace, Handler handler, TaskListener listener) throws IOException, InterruptedException, ClassCastException {
workspace.actAsync(new StartWatching(this, handler, listener));
LOGGER.log(Level.FINE, "started asynchronous watch in {0}", controlDir);
}

/**
* File in which a last-read position is stored if {@link #watch} is used.
*/
public FilePath getLastLocationFile(FilePath workspace) throws IOException, InterruptedException {
return controlDir(workspace).child("last-location.txt");
}

private static final long serialVersionUID = 1L;
}

private static ScheduledExecutorService watchService;
private synchronized static ScheduledExecutorService watchService() {
if (watchService == null) {
// TODO 2.105+ use ClassLoaderSanityThreadFactory
watchService = new /*ErrorLogging*/ScheduledThreadPoolExecutor(5, new NamingThreadFactory(new DaemonThreadFactory(), "FileMonitoringTask watcher"));
}
return watchService;
}

private static class StartWatching extends MasterToSlaveFileCallable<Void> {

private static final long serialVersionUID = 1L;

private final FileMonitoringController controller;
private final Handler handler;
private final TaskListener listener;

StartWatching(FileMonitoringController controller, Handler handler, TaskListener listener) {
this.controller = controller;
this.handler = handler;
this.listener = listener;
}

@Override public Void invoke(File workspace, VirtualChannel channel) throws IOException, InterruptedException {
watchService().submit(new Watcher(controller, new FilePath(workspace), handler, listener));
return null;
}

}

private static class Watcher implements Runnable {

private final FileMonitoringController controller;
private final FilePath workspace;
private final Handler handler;
private final TaskListener listener;
private final @CheckForNull Charset cs;

Watcher(FileMonitoringController controller, FilePath workspace, Handler handler, TaskListener listener) {
this.controller = controller;
this.workspace = workspace;
this.handler = handler;
this.listener = listener;
cs = FileMonitoringController.transcodingCharset(controller.charset);
}

@Override public void run() {
try {
Integer exitStatus = controller.exitStatus(workspace, listener); // check before collecting output, in case the process is just now finishing
long lastLocation = 0;
FilePath lastLocationFile = controller.getLastLocationFile(workspace);
if (lastLocationFile.exists()) {
lastLocation = Long.parseLong(lastLocationFile.readToString());
}
FilePath logFile = controller.getLogFile(workspace);
long len = logFile.length();
if (len > lastLocation) {
assert !logFile.isRemote();
try (FileChannel ch = FileChannel.open(Paths.get(logFile.getRemote()), StandardOpenOption.READ)) {
InputStream locallyEncodedStream = Channels.newInputStream(ch.position(lastLocation));
InputStream utf8EncodedStream = cs == null ? locallyEncodedStream : new ReaderInputStream(new InputStreamReader(locallyEncodedStream, cs), StandardCharsets.UTF_8);
CountingInputStream cis = new CountingInputStream(utf8EncodedStream);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably does the same miscount when transcoding as #61, to be confirmed.

handler.output(cis);
lastLocationFile.write(Long.toString(lastLocation + cis.getByteCount()), null);
}
}
if (exitStatus != null) {
byte[] output;
if (controller.getOutputFile(workspace).exists()) {
output = controller.getOutput(workspace);
} else {
output = null;
}
handler.exited(exitStatus, output);
controller.cleanup(workspace);
} else {
// Could use an adaptive timeout as in DurableTaskStep.Execution in polling mode,
// though less relevant here since there is no network overhead to the check.
watchService().schedule(this, 100, TimeUnit.MILLISECONDS);
}
} catch (Exception x) {
// note that LOGGER here is going to the agent log, not master log
LOGGER.log(Level.WARNING, "giving up on watching " + controller.controlDir, x);
}
}

}

}
Loading