Skip to content

Commit d1900cf

Browse files
Improve Thread safety
Signed-off-by: Lehmann_Fabian <[email protected]>
1 parent d26928c commit d1900cf

File tree

6 files changed

+53
-29
lines changed

6 files changed

+53
-29
lines changed

src/main/java/cws/k8s/scheduler/scheduler/FinalizerThread.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,44 @@
66
import java.util.Queue;
77
import java.util.function.Consumer;
88

9-
@RequiredArgsConstructor
109
public class FinalizerThread<T> extends Thread {
1110

1211
private final Queue<T> items = new LinkedList<>();
1312
private final Consumer <? super T> finalizeItem;
1413

14+
public FinalizerThread(Consumer <? super T> finalizeItem) {
15+
super( "FinalizerThread" );
16+
this.finalizeItem = finalizeItem;
17+
}
18+
1519
@Override
1620
public void run() {
1721
while ( !Thread.currentThread().isInterrupted() ) {
22+
Queue<T> toProcess = new LinkedList<>();
1823
synchronized ( finalizeItem ) {
1924
while ( !items.isEmpty() ) {
2025
try {
21-
finalizeItem.accept( items.poll() );
26+
toProcess.add( items.poll() );
2227
} catch ( Exception e ) {
2328
System.err.println( "Error finalizing item: " + e.getMessage() );
2429
}
2530
}
31+
}
32+
for ( T item : toProcess ) {
2633
try {
27-
finalizeItem.wait();
28-
} catch ( InterruptedException e ) {
29-
Thread.currentThread().interrupt();
30-
System.err.println( "Finalizer thread interrupted: " + e.getMessage() );
34+
finalizeItem.accept( item );
35+
} catch ( Exception e ) {
36+
System.err.println( "Error finalizing item: " + e.getMessage() );
37+
}
38+
}
39+
synchronized ( finalizeItem ) {
40+
if ( items.isEmpty() ) {
41+
try {
42+
finalizeItem.wait();
43+
} catch ( InterruptedException e ) {
44+
Thread.currentThread().interrupt();
45+
System.err.println( "Finalizer thread interrupted: " + e.getMessage() );
46+
}
3147
}
3248
}
3349
}
@@ -36,7 +52,7 @@ public void run() {
3652
public void addItem( T item ) {
3753
synchronized ( finalizeItem ) {
3854
items.add( item );
39-
finalizeItem.notify();
55+
finalizeItem.notifyAll();
4056
}
4157
}
4258

src/main/java/cws/k8s/scheduler/scheduler/LocationAwareSchedulerV2.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ private void processCopyTaskFinished( CopyTask copyTask, boolean success ) {
302302
handleProblematicCopy( copyTask );
303303
}
304304
}
305+
informResourceChange();
305306
}
306307

307308
private void handleProblematicCopy( CopyTask copyTask ){

src/main/java/cws/k8s/scheduler/scheduler/Scheduler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,10 @@ public abstract class Scheduler implements Informable {
6969

7070
PodHandler handler = new PodHandler(this );
7171

72-
schedulingThread = new TaskprocessingThread( unscheduledTasks, this::schedule );
72+
schedulingThread = new TaskprocessingThread( "Scheduling Thread", unscheduledTasks, this::schedule );
7373
schedulingThread.start();
7474

75-
finishThread = new TaskprocessingThread(unfinishedTasks, this::terminateTasks );
75+
finishThread = new TaskprocessingThread( "Finisher Thread", unfinishedTasks, this::terminateTasks );
7676
finishThread.start();
7777

7878
log.info("Start watching: {}", this.namespace );

src/main/java/cws/k8s/scheduler/scheduler/SchedulerWithDaemonSet.java

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -114,32 +114,35 @@ int terminateTasks(List<Task> finishedTasks) {
114114
log.info( "Pod finished with exitCode: {}", exitCode );
115115
//Init failure
116116
final Path workdir = Paths.get(finishedTask.getWorkingDir());
117-
if ( exitCode == 123 && Files.exists( workdir.resolve(".command.init.failure") ) ) {
118-
log.info( "Task {} ({}) had an init failure: won't parse the in- and out files", finishedTask.getConfig().getRunName(), finishedTask.getConfig().getName() );
119-
} else {
120-
final Set<OutputFile> newAndUpdatedFiles = taskResultParser.getNewAndUpdatedFiles(
121-
workdir,
122-
finishedTask.getNode().getNodeLocation(),
123-
!finishedTask.wasSuccessfullyExecuted(),
124-
finishedTask
125-
);
126-
for (OutputFile newAndUpdatedFile : newAndUpdatedFiles) {
127-
if( newAndUpdatedFile instanceof PathLocationWrapperPair pathLocationWrapperPair ) {
128-
hierarchyWrapper.addFile(
129-
newAndUpdatedFile.getPath(),
130-
pathLocationWrapperPair.getLocationWrapper()
131-
);
132-
} else if ( newAndUpdatedFile instanceof SymlinkOutput symlinkOutput ){
133-
hierarchyWrapper.addSymlink( newAndUpdatedFile.getPath(), symlinkOutput.getDst() );
117+
final Set<OutputFile> newAndUpdatedFiles = taskResultParser.getNewAndUpdatedFiles(
118+
workdir,
119+
finishedTask.getNode().getNodeLocation(),
120+
!finishedTask.wasSuccessfullyExecuted(),
121+
finishedTask
122+
);
123+
final Set<PathLocationWrapperPair> outputFiles = new HashSet<>();
124+
for (OutputFile newAndUpdatedFile : newAndUpdatedFiles) {
125+
if( newAndUpdatedFile instanceof PathLocationWrapperPair pathLocationWrapperPair ) {
126+
final LocationWrapper locationWrapper = hierarchyWrapper.addFile(
127+
newAndUpdatedFile.getPath(),
128+
pathLocationWrapperPair.getLocationWrapper()
129+
);
130+
if ( locationWrapper != null ) {
131+
outputFiles.add( pathLocationWrapperPair );
134132
}
133+
} else if ( newAndUpdatedFile instanceof SymlinkOutput symlinkOutput ){
134+
hierarchyWrapper.addSymlink( newAndUpdatedFile.getPath(), symlinkOutput.getDst() );
135135
}
136136
}
137137
}
138138
} catch ( Exception e ){
139139
log.info( "Problem while finishing task: {} ({})", finishedTask.getConfig().getRunName(), finishedTask.getConfig().getName(), e );
140140
}
141-
super.taskWasFinished( finishedTask );
142141
});
142+
// run in a separate thread to avoid blocking the finishedTasks list
143+
for ( Task finishedTask : finishedTasks ) {
144+
super.taskWasFinished( finishedTask );
145+
}
143146
return 0;
144147
}
145148

src/main/java/cws/k8s/scheduler/scheduler/TaskprocessingThread.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,19 @@
99
import java.util.function.Function;
1010

1111
@Slf4j
12-
@RequiredArgsConstructor
1312
public class TaskprocessingThread extends Thread {
1413

1514
private final List<Task> unprocessedTasks;
1615
private final Function<List<Task>, Integer> function;
1716

1817
private boolean otherResourceChange = false;
1918

19+
public TaskprocessingThread( String name, List<Task> unprocessedTasks, Function<List<Task>, Integer> function) {
20+
super(name );
21+
this.unprocessedTasks = unprocessedTasks;
22+
this.function = function;
23+
}
24+
2025
public void otherResourceChange() {
2126
otherResourceChange = true;
2227
}

src/main/java/cws/k8s/scheduler/scheduler/la2/copystrategy/LaListener.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ public void onClose( int exitCode, String reason ) {
5656
log.error( "Copy task was not finished, but closed. ExitCode: " + exitCode + " Reason: " + reason );
5757
scheduler.copyTaskFinished( copyTask, exitCode == 0 );
5858
}
59-
scheduler.informResourceChange();
6059
}
6160

6261
@Override

0 commit comments

Comments
 (0)