Skip to content

Commit 11388ac

Browse files
committed
Merge pull request #392 from JLLeitschuh/feat/pipelineInOneThread
Pipeline run in own thread
2 parents 46d005f + e642cdc commit 11388ac

File tree

57 files changed

+2121
-515
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+2121
-515
lines changed

build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ allprojects {
5757
testCompile group: 'net.jodah', name: 'concurrentunit', version: '0.4.2'
5858
testCompile group: 'org.hamcrest', name: 'hamcrest-all', version: '1.3'
5959
testCompile group: 'junit', name: 'junit', version: '4.12'
60+
testCompile group: 'com.google.truth', name: 'truth', version: '0.28'
61+
testCompile group: 'com.google.guava', name: 'guava-testlib', version: '19.0'
6062
}
6163

6264
version = getVersionName()
@@ -122,6 +124,7 @@ project(":core") {
122124
}
123125

124126
dependencies {
127+
compile group: 'com.google.code.findbugs', name: 'jsr305', version: '3.0.0'
125128
compile group: 'org.bytedeco', name: 'javacv', version: '1.1'
126129
compile group: 'org.bytedeco.javacpp-presets', name: 'opencv', version: '3.0.0-1.1'
127130
compile group: 'org.bytedeco.javacpp-presets', name: 'opencv', version: '3.0.0-1.1', classifier: os

core/src/main/java/edu/wpi/grip/core/Main.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ public class Main {
2626
@Inject
2727
private Project project;
2828
@Inject
29+
private PipelineRunner pipelineRunner;
30+
@Inject
2931
private EventBus eventBus;
3032
@Inject
3133
private Logger logger;
@@ -58,6 +60,7 @@ public void start(String[] args) throws IOException, InterruptedException {
5860
// Open a project from a .grip file specified on the command line
5961
project.open(new File(projectPath));
6062

63+
pipelineRunner.startAsync();
6164

6265
// This is done in order to indicate to the user using the deployment UI that this is running
6366
logger.log(Level.INFO, "SUCCESS! The project is running in headless mode!");

core/src/main/java/edu/wpi/grip/core/Operation.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,18 @@ default void perform(InputSocket<?>[] inputs, OutputSocket<?>[] outputs, Optiona
6464
default void perform(InputSocket<?>[] inputs, OutputSocket<?>[] outputs) {
6565
throw new UnsupportedOperationException("Perform was not overridden");
6666
}
67+
68+
/**
69+
* Allows the step to clean itself up when removed from the pipeline.
70+
* This should only be called by {@link Step#setRemoved()} to ensure correct synchronization.
71+
*
72+
* @param inputs An array obtained from {@link #createInputSockets(EventBus)}. The caller can set the value of
73+
* each socket to an actual parameter for the operation.
74+
* @param outputs An array obtained from {@link #createOutputSockets(EventBus)}. The outputs of the operation will
75+
* be stored in these sockets.
76+
* @param data Optional data to be passed to the operation
77+
*/
78+
default void cleanUp(InputSocket<?>[] inputs, OutputSocket<?>[] outputs, Optional<?> data) {
79+
/* no-op */
80+
}
6781
}

core/src/main/java/edu/wpi/grip/core/Pipeline.java

Lines changed: 133 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package edu.wpi.grip.core;
22

3+
import com.google.common.collect.ImmutableList;
34
import com.google.common.eventbus.EventBus;
45
import com.google.common.eventbus.Subscribe;
56
import com.google.inject.Singleton;
@@ -11,6 +12,11 @@
1112

1213
import javax.inject.Inject;
1314
import java.util.*;
15+
import java.util.concurrent.locks.Lock;
16+
import java.util.concurrent.locks.ReadWriteLock;
17+
import java.util.concurrent.locks.ReentrantReadWriteLock;
18+
import java.util.function.Consumer;
19+
import java.util.function.Function;
1420
import java.util.stream.Collectors;
1521

1622
import static com.google.common.base.Preconditions.checkArgument;
@@ -35,7 +41,14 @@ public class Pipeline {
3541
@XStreamOmitField
3642
private NTManager ntManager;
3743

44+
/*
45+
* We have separate locks for sources and steps because we don't want to
46+
* block access to both resources when only one is in use.
47+
*/
48+
49+
private transient final ReadWriteLock sourceLock = new ReentrantReadWriteLock();
3850
private final List<Source> sources = new ArrayList<>();
51+
private transient ReadWriteLock stepLock = new ReentrantReadWriteLock();
3952
private final List<Step> steps = new ArrayList<>();
4053
private final Set<Connection> connections = new HashSet<>();
4154
private ProjectSettings settings = new ProjectSettings();
@@ -44,27 +57,108 @@ public class Pipeline {
4457
* Remove everything in the pipeline
4558
*/
4659
public void clear() {
47-
this.steps.stream().collect(Collectors.toList()).forEach(this::removeStep);
48-
60+
getSteps().forEach(this::removeStep);
61+
// We collect the list first because the event modifies the list
4962
this.sources.stream()
5063
.map(SourceRemovedEvent::new)
5164
.collect(Collectors.toList())
5265
.forEach(this.eventBus::post);
5366
}
5467

68+
private final <R> R readSourcesSafely(Function<List<Source>, R> sourceListFunction) {
69+
return accessSafely(sourceLock.readLock(), Collections.unmodifiableList(sources), sourceListFunction);
70+
}
71+
72+
/**
73+
* Returns a snapshot of all of the sources in the pipeline.
74+
*
75+
* @return an Immutable copy of the sources at the current point in the pipeline.
76+
* @see <a href="https://youtu.be/ZeO_J2OcHYM?t=16m35s">Why we use ImmutableList return type</a>
77+
*/
78+
public final ImmutableList<Source> getSources() {
79+
return readSourcesSafely(ImmutableList::copyOf);
80+
}
81+
5582
/**
56-
* @return The unmodifiable list of sources for inputs to the algorithm
57-
* @see Source
83+
* @param stepListFunction The function to read the steps with.
84+
* @param <R> The return type of the function
85+
* @return The value returned by the function.
5886
*/
59-
public List<Source> getSources() {
60-
return Collections.unmodifiableList(this.sources);
87+
private final <R> R readStepsSafely(Function<List<Step>, R> stepListFunction) {
88+
return accessSafely(stepLock.readLock(), Collections.unmodifiableList(steps), stepListFunction);
6189
}
6290

6391
/**
64-
* @return The unmodifiable list of steps in the computer vision algorithm
92+
* Returns a snapshot of all of the steps in the pipeline.
93+
*
94+
* @return an Immutable copy of the steps at the current point in the pipeline.
95+
* @see <a href="https://youtu.be/ZeO_J2OcHYM?t=16m35s">Why we use ImmutableList return type</a>
6596
*/
66-
public List<Step> getSteps() {
67-
return Collections.unmodifiableList(this.steps);
97+
public final ImmutableList<Step> getSteps() {
98+
return readStepsSafely(ImmutableList::copyOf);
99+
}
100+
101+
/*
102+
* These methods should not be made public.
103+
* If you do so you are making a poor design decision and should move whatever you are trying to do into
104+
* this class.
105+
*/
106+
107+
/**
108+
* @param stepListWriterFunction A function that modifies the step list passed to the operation.
109+
* @param <R> The return type of the function
110+
* @return The value returned by the function.
111+
*/
112+
private <R> R writeStepsSafely(Function<List<Step>, R> stepListWriterFunction) {
113+
return accessSafely(stepLock.writeLock(), steps, stepListWriterFunction);
114+
}
115+
116+
/**
117+
* @param stepListWriterConsumer A consumer that can modify the list that is passed to it.
118+
*/
119+
private void writeStepsSafelyConsume(Consumer<List<Step>> stepListWriterConsumer) {
120+
writeStepsSafely(stepList -> {
121+
stepListWriterConsumer.accept(stepList);
122+
return null;
123+
});
124+
}
125+
126+
private <R> R writeSourcesSafely(Function<List<Source>, R> sourceListWriterFunction) {
127+
return accessSafely(sourceLock.writeLock(), sources, sourceListWriterFunction);
128+
}
129+
130+
private void writeSourcesSafelyConsume(Consumer<List<Source>> sourceListWriterFunction) {
131+
writeSourcesSafely(sources -> {
132+
sourceListWriterFunction.accept(sources);
133+
return null;
134+
});
135+
}
136+
137+
/*
138+
* End of methods that should not be made public
139+
*/
140+
141+
/**
142+
* Locks the resource with the specified lock and performs the function.
143+
* When the function is complete then the lock unlocked again.
144+
*
145+
* @param lock The lock for the given resource
146+
* @param list The list that will be accessed while the resource is locked
147+
* @param listFunction The function that either modifies or accesses the list
148+
* @param <T> The type of list
149+
* @param <R> The return value for the function
150+
* @return The value returned by the list function
151+
*/
152+
private static <T, R> R accessSafely(Lock lock, List<T> list, Function<List<T>, R> listFunction) {
153+
final R returnValue;
154+
lock.lock();
155+
try {
156+
returnValue = listFunction.apply(list);
157+
} finally {
158+
// Ensure that no matter what may get thrown while reading the steps we unlock
159+
lock.unlock();
160+
}
161+
return returnValue;
68162
}
69163

70164
/**
@@ -132,43 +226,53 @@ public boolean canConnect(Socket socket1, Socket socket2) {
132226
/**
133227
* @return true if the step1 is before step2 in the pipeline
134228
*/
135-
private synchronized boolean isBefore(Step step1, Step step2) {
136-
return this.steps.indexOf(step1) < this.steps.indexOf(step2);
229+
private boolean isBefore(Step step1, Step step2) {
230+
return readStepsSafely(steps -> steps.indexOf(step1) < steps.indexOf(step2));
137231
}
138232

139233
@Subscribe
140234
public void onSourceAdded(SourceAddedEvent event) {
141-
this.sources.add(event.getSource());
235+
writeSourcesSafelyConsume(sources -> {
236+
sources.add(event.getSource());
237+
});
142238
}
143239

144240
@Subscribe
145241
public void onSourceRemoved(SourceRemovedEvent event) {
146-
this.sources.remove(event.getSource());
242+
writeSourcesSafelyConsume(sources -> {
243+
sources.remove(event.getSource());
244+
});
147245

148246
// Sockets of deleted sources should not be previewed
149247
for (OutputSocket<?> socket : event.getSource().getOutputSockets()) {
150248
socket.setPreviewed(false);
151249
}
152250
}
153251

154-
public synchronized void addStep(int index, Step step) {
252+
public void addStep(int index, Step step) {
155253
checkNotNull(step, "The step can not be null");
156-
this.steps.add(index, step);
254+
checkArgument(!step.removed(), "The step must not have been disabled already");
255+
256+
writeStepsSafelyConsume(steps -> steps.add(index, step));
257+
157258
this.eventBus.register(step);
158259
this.eventBus.post(new StepAddedEvent(step, index));
159260
}
160261

161-
public synchronized void addStep(Step step) {
262+
public void addStep(Step step) {
162263
addStep(this.steps.size(), step);
163264
}
164265

165-
public synchronized void removeStep(Step step) {
266+
public void removeStep(Step step) {
166267
checkNotNull(step, "The step can not be null");
167-
this.steps.remove(step);
268+
269+
writeStepsSafelyConsume(steps -> steps.remove(step));
270+
168271
// Sockets of deleted steps should not be previewed
169272
for (OutputSocket<?> socket : step.getOutputSockets()) {
170273
socket.setPreviewed(false);
171274
}
275+
step.setRemoved();
172276
this.eventBus.unregister(step);
173277
this.eventBus.post(new StepRemovedEvent(step));
174278
}
@@ -177,13 +281,19 @@ public synchronized void moveStep(Step step, int delta) {
177281
checkNotNull(step, "The step can not be null");
178282
checkArgument(this.steps.contains(step), "The step must exist in the pipeline to be moved");
179283

180-
final int oldIndex = this.steps.indexOf(step);
181-
this.steps.remove(oldIndex);
284+
// We are modifying the steps array
285+
writeStepsSafelyConsume(steps -> {
286+
final int oldIndex = this.steps.indexOf(step);
287+
this.steps.remove(oldIndex);
182288

183-
// Compute the new index of the step, clamping to the beginning or end of pipeline if it goes past either end
184-
final int newIndex = Math.min(Math.max(oldIndex + delta, 0), this.steps.size());
185-
this.steps.add(newIndex, step);
289+
// Compute the new index of the step, clamping to the beginning or end of pipeline if it goes past either end
290+
final int newIndex = Math.min(Math.max(oldIndex + delta, 0), this.steps.size());
291+
this.steps.add(newIndex, step);
292+
});
293+
294+
// Do not lock while posting the event
186295
eventBus.post(new StepMovedEvent(step, delta));
296+
187297
}
188298

189299
@Subscribe

0 commit comments

Comments
 (0)