Skip to content

Commit 041ce5a

Browse files
Merge pull request #23 from CommonWorkflowScheduler/fixes
Improve Thread Saftey
2 parents ed361ab + d1900cf commit 041ce5a

File tree

7 files changed

+102
-59
lines changed

7 files changed

+102
-59
lines changed

src/main/java/cws/k8s/scheduler/model/DateParser.java

Lines changed: 0 additions & 32 deletions
This file was deleted.
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package cws.k8s.scheduler.scheduler;
2+
3+
import lombok.RequiredArgsConstructor;
4+
5+
import java.util.LinkedList;
6+
import java.util.Queue;
7+
import java.util.function.Consumer;
8+
9+
public class FinalizerThread<T> extends Thread {
10+
11+
private final Queue<T> items = new LinkedList<>();
12+
private final Consumer <? super T> finalizeItem;
13+
14+
public FinalizerThread(Consumer <? super T> finalizeItem) {
15+
super( "FinalizerThread" );
16+
this.finalizeItem = finalizeItem;
17+
}
18+
19+
@Override
20+
public void run() {
21+
while ( !Thread.currentThread().isInterrupted() ) {
22+
Queue<T> toProcess = new LinkedList<>();
23+
synchronized ( finalizeItem ) {
24+
while ( !items.isEmpty() ) {
25+
try {
26+
toProcess.add( items.poll() );
27+
} catch ( Exception e ) {
28+
System.err.println( "Error finalizing item: " + e.getMessage() );
29+
}
30+
}
31+
}
32+
for ( T item : toProcess ) {
33+
try {
34+
finalizeItem.accept( item );
35+
} catch ( Exception e ) {
36+
System.err.println( "Error finalizing item: " + e.getMessage() );
37+
}
38+
}
39+
synchronized ( finalizeItem ) {
40+
if ( items.isEmpty() ) {
41+
try {
42+
finalizeItem.wait();
43+
} catch ( InterruptedException e ) {
44+
Thread.currentThread().interrupt();
45+
System.err.println( "Finalizer thread interrupted: " + e.getMessage() );
46+
}
47+
}
48+
}
49+
}
50+
}
51+
52+
public void addItem( T item ) {
53+
synchronized ( finalizeItem ) {
54+
items.add( item );
55+
finalizeItem.notifyAll();
56+
}
57+
}
58+
59+
}

src/main/java/cws/k8s/scheduler/scheduler/LocationAwareSchedulerV2.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ public class LocationAwareSchedulerV2 extends SchedulerWithDaemonSet {
6767
protected final int copySameTaskInParallel;
6868
protected final int maxHeldCopyTaskReady;
6969

70+
private final FinalizerThread<Tuple<CopyTask, Boolean>> finalizerThread;
71+
7072
/**
7173
* This value must be between 1 and 100.
7274
* 100 means that the data will be copied with full speed.
@@ -127,6 +129,8 @@ public LocationAwareSchedulerV2(
127129
this.copyInAdvance = new CopyInAdvanceNodeWithMostData( getCurrentlyCopying(), inputAlignment, this.copySameTaskInParallel );
128130
this.maxHeldCopyTaskReady = config.maxHeldCopyTaskReady == null ? 3 : config.maxHeldCopyTaskReady;
129131
this.prioPhaseThree = config.prioPhaseThree == null ? 70 : config.prioPhaseThree;
132+
finalizerThread = new FinalizerThread<>( x -> processCopyTaskFinished( x.getA(), x.getB() ) );
133+
finalizerThread.start();
130134
}
131135

132136
@Override
@@ -283,17 +287,22 @@ private void undoReserveCopyTask( CopyTask copyTask ) {
283287
}
284288

285289
public void copyTaskFinished( CopyTask copyTask, boolean success ) {
290+
finalizerThread.addItem( new Tuple<CopyTask, Boolean>(copyTask, success) );
291+
}
292+
293+
private void processCopyTaskFinished( CopyTask copyTask, boolean success ) {
286294
synchronized ( copyLock ) {
287295
freeLocations( copyTask.getAllLocationWrapper() );
288296
if( success ){
289-
copyTask.getInputFiles().parallelStream().forEach( TaskInputFileLocationWrapper::success );
290-
removeFromCopyingToNode( copyTask.getTask(), copyTask.getNodeLocation(), copyTask.getFilesForCurrentNode() );
291-
copyTask.getTask().preparedOnNode( copyTask.getNodeLocation() );
297+
copyTask.getInputFiles().parallelStream().forEach( TaskInputFileLocationWrapper::success );
298+
removeFromCopyingToNode( copyTask.getTask(), copyTask.getNodeLocation(), copyTask.getFilesForCurrentNode() );
299+
copyTask.getTask().preparedOnNode( copyTask.getNodeLocation() );
292300
} else {
293-
removeFromCopyingToNode( copyTask.getTask(), copyTask.getNodeLocation(), copyTask.getFilesForCurrentNode() );
294-
handleProblematicCopy( copyTask );
301+
removeFromCopyingToNode( copyTask.getTask(), copyTask.getNodeLocation(), copyTask.getFilesForCurrentNode() );
302+
handleProblematicCopy( copyTask );
295303
}
296304
}
305+
informResourceChange();
297306
}
298307

299308
private void handleProblematicCopy( CopyTask copyTask ){

src/main/java/cws/k8s/scheduler/scheduler/Scheduler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,10 @@ public abstract class Scheduler implements Informable {
6969

7070
PodHandler handler = new PodHandler(this );
7171

72-
schedulingThread = new TaskprocessingThread( unscheduledTasks, this::schedule );
72+
schedulingThread = new TaskprocessingThread( "Scheduling Thread", unscheduledTasks, this::schedule );
7373
schedulingThread.start();
7474

75-
finishThread = new TaskprocessingThread(unfinishedTasks, this::terminateTasks );
75+
finishThread = new TaskprocessingThread( "Finisher Thread", unfinishedTasks, this::terminateTasks );
7676
finishThread.start();
7777

7878
log.info("Start watching: {}", this.namespace );

src/main/java/cws/k8s/scheduler/scheduler/SchedulerWithDaemonSet.java

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -114,32 +114,35 @@ int terminateTasks(List<Task> finishedTasks) {
114114
log.info( "Pod finished with exitCode: {}", exitCode );
115115
//Init failure
116116
final Path workdir = Paths.get(finishedTask.getWorkingDir());
117-
if ( exitCode == 123 && Files.exists( workdir.resolve(".command.init.failure") ) ) {
118-
log.info( "Task {} ({}) had an init failure: won't parse the in- and out files", finishedTask.getConfig().getRunName(), finishedTask.getConfig().getName() );
119-
} else {
120-
final Set<OutputFile> newAndUpdatedFiles = taskResultParser.getNewAndUpdatedFiles(
121-
workdir,
122-
finishedTask.getNode().getNodeLocation(),
123-
!finishedTask.wasSuccessfullyExecuted(),
124-
finishedTask
125-
);
126-
for (OutputFile newAndUpdatedFile : newAndUpdatedFiles) {
127-
if( newAndUpdatedFile instanceof PathLocationWrapperPair pathLocationWrapperPair ) {
128-
hierarchyWrapper.addFile(
129-
newAndUpdatedFile.getPath(),
130-
pathLocationWrapperPair.getLocationWrapper()
131-
);
132-
} else if ( newAndUpdatedFile instanceof SymlinkOutput symlinkOutput ){
133-
hierarchyWrapper.addSymlink( newAndUpdatedFile.getPath(), symlinkOutput.getDst() );
117+
final Set<OutputFile> newAndUpdatedFiles = taskResultParser.getNewAndUpdatedFiles(
118+
workdir,
119+
finishedTask.getNode().getNodeLocation(),
120+
!finishedTask.wasSuccessfullyExecuted(),
121+
finishedTask
122+
);
123+
final Set<PathLocationWrapperPair> outputFiles = new HashSet<>();
124+
for (OutputFile newAndUpdatedFile : newAndUpdatedFiles) {
125+
if( newAndUpdatedFile instanceof PathLocationWrapperPair pathLocationWrapperPair ) {
126+
final LocationWrapper locationWrapper = hierarchyWrapper.addFile(
127+
newAndUpdatedFile.getPath(),
128+
pathLocationWrapperPair.getLocationWrapper()
129+
);
130+
if ( locationWrapper != null ) {
131+
outputFiles.add( pathLocationWrapperPair );
134132
}
133+
} else if ( newAndUpdatedFile instanceof SymlinkOutput symlinkOutput ){
134+
hierarchyWrapper.addSymlink( newAndUpdatedFile.getPath(), symlinkOutput.getDst() );
135135
}
136136
}
137137
}
138138
} catch ( Exception e ){
139139
log.info( "Problem while finishing task: {} ({})", finishedTask.getConfig().getRunName(), finishedTask.getConfig().getName(), e );
140140
}
141-
super.taskWasFinished( finishedTask );
142141
});
142+
// run in a separate thread to avoid blocking the finishedTasks list
143+
for ( Task finishedTask : finishedTasks ) {
144+
super.taskWasFinished( finishedTask );
145+
}
143146
return 0;
144147
}
145148

src/main/java/cws/k8s/scheduler/scheduler/TaskprocessingThread.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,19 @@
99
import java.util.function.Function;
1010

1111
@Slf4j
12-
@RequiredArgsConstructor
1312
public class TaskprocessingThread extends Thread {
1413

1514
private final List<Task> unprocessedTasks;
1615
private final Function<List<Task>, Integer> function;
1716

1817
private boolean otherResourceChange = false;
1918

19+
public TaskprocessingThread( String name, List<Task> unprocessedTasks, Function<List<Task>, Integer> function) {
20+
super(name );
21+
this.unprocessedTasks = unprocessedTasks;
22+
this.function = function;
23+
}
24+
2025
public void otherResourceChange() {
2126
otherResourceChange = true;
2227
}

src/main/java/cws/k8s/scheduler/scheduler/la2/copystrategy/LaListener.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ public void onClose( int exitCode, String reason ) {
5656
log.error( "Copy task was not finished, but closed. ExitCode: " + exitCode + " Reason: " + reason );
5757
scheduler.copyTaskFinished( copyTask, exitCode == 0 );
5858
}
59-
scheduler.informResourceChange();
6059
}
6160

6261
@Override

0 commit comments

Comments
 (0)