Skip to content

Commit ae46e8b

Browse files
committed
Update to support source jobs
1 parent 58eb445 commit ae46e8b

File tree

10 files changed

+158
-38
lines changed

10 files changed

+158
-38
lines changed

JobScheduler.Benchmarks/Benchmark.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ private static long BenchVector(bool useVector)
210210
var parentJob = ParallelForJobCommon.GlobalScheduler.Schedule();
211211
for (var sindex = 0; sindex < loopCount; sindex++)
212212
{
213-
var job = new ParallelJobProducer<VectorCalculationJob>(0, jobCount, data, 16, !useVector);
213+
var job = new ParallelJobProducer<VectorCalculationJob>(0, jobCount, data, loopSize: 16, onlySingle: !useVector);
214214
job.CheckAndSplit();
215215
job.GetHandle().SetParent(parentJob);
216216
ParallelForJobCommon.GlobalScheduler.Flush(job.GetHandle());

JobScheduler.Test/JobSchedulerTests.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,4 +216,14 @@ public void ManyMoreDependencyThanNodeLimitSize()
216216
jobScheduler.Wait(targetJob);
217217
});
218218
}
219+
220+
[Test]
221+
public void TryToEditAJobThatIsReadyToExecute()
222+
{
223+
using var jobScheduler = new JobScheduler();
224+
var newJob = jobScheduler.Schedule();
225+
newJob.UnfinishedJobs--;
226+
Throws<InvalidOperationException>(() => newJob.SetParent(new()));
227+
Throws<InvalidOperationException>(() => newJob.SetDependsOn(new()));
228+
}
219229
}

JobScheduler.Test/ParallelForTests.cs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,35 @@ public void NestedJobs()
102102
Interlocked.Increment(ref triggers);
103103
});
104104
});
105+
Assert.That(triggers, Is.EqualTo(100));
106+
}
107+
108+
[Test]
109+
public void UsingSourceJobButNotStarted()
110+
{
111+
var triggers = 0;
112+
var sourceJob = ParallelForJobCommon.GlobalScheduler.Schedule();
113+
var handle = ParallelForJob.Create(0, 100, i =>
114+
{
115+
Interlocked.Increment(ref triggers);
116+
}, source: sourceJob);
117+
ParallelForJobCommon.GlobalScheduler.Flush(handle);
118+
Thread.Sleep(10);
105119
Assert.That(triggers, Is.EqualTo(0));
106120
}
121+
122+
[Test]
123+
public void UsingSourceJobAndStarted()
124+
{
125+
var triggers = 0;
126+
var sourceJob = ParallelForJobCommon.GlobalScheduler.Schedule();
127+
var handle = ParallelForJob.Create(0, 100, i =>
128+
{
129+
Interlocked.Increment(ref triggers);
130+
}, source: sourceJob);
131+
ParallelForJobCommon.GlobalScheduler.Flush(handle);
132+
ParallelForJobCommon.GlobalScheduler.Flush(sourceJob);
133+
Thread.Sleep(10);
134+
Assert.That(triggers, Is.EqualTo(100));
135+
}
107136
}

JobScheduler/JobHandle.cs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ public struct JobHandle
155155
internal ushort Generation;
156156

157157
private readonly bool _isInitialized;
158+
158159
/// <summary>
159160
/// Indicates whether this <see cref="JobHandle"/> is null.
160161
/// Default or Just JobHandle() is null, meaning it has not been initialized yet.
@@ -215,17 +216,39 @@ public ref int UnfinishedJobs
215216
get => ref Pool.UnfinishedJobs[Index];
216217
}
217218

219+
/// <summary>
220+
/// Sets the job handle to be ready to execute.
221+
/// The job still needs to be added to the worker queue to be executed.
222+
/// It just means that before this point the job cannot be executed.
223+
/// </summary>
224+
// The readonly is not really true, but it silences the warning.
225+
public readonly int SetReadyToExecute()
226+
{
227+
var result = Interlocked.Decrement(ref UnfinishedJobs);
228+
if (result == 0)
229+
{
230+
throw new("JobHandle has no unfinished jobs, cannot set it ready to execute.");
231+
}
232+
233+
return result;
234+
}
235+
218236
/// <summary>
219237
/// Sets a <see cref="JobHandle"/> that is to be the parent.
220238
/// If a null handle is passed it will be ignored.
221239
/// </summary>
222240
/// <param name="parent">The parent job handle</param>
223241
public void SetParent(JobHandle parent)
224242
{
243+
if (IsJobEligibleForExecution())
244+
{
245+
throw new InvalidOperationException("Tried to set a parent on a job that is already eligible for execution. This is not allowed as it can lead to deadlocks or unexpected behavior.");
246+
}
225247
if (parent.IsNull)
226248
{
227-
return;
249+
throw new ArgumentNullException(nameof(parent), "Cannot set a null parent on a job handle.");
228250
}
251+
229252
Interlocked.Increment(ref parent.UnfinishedJobs);
230253
Parent = parent.Index;
231254
}
@@ -273,6 +296,10 @@ public bool IsHandleAlreadyRecycled()
273296
/// <param name="source">The handle that triggers the current handle</param>
274297
public void SetDependsOn(JobHandle source)
275298
{
299+
if (IsJobEligibleForExecution())
300+
{
301+
throw new InvalidOperationException("Tried to set a dependency on a job that is already eligible for execution. This is not allowed as it can lead to deadlocks or unexpected behavior.");
302+
}
276303
source.AddDependent(this);
277304
}
278305

@@ -315,6 +342,10 @@ public void FlushAndWait()
315342
/// <param name="target">What to be added to be executed after</param>
316343
private void AddDependent(JobHandle target)
317344
{
345+
if (IsJobEligibleForExecution())
346+
{
347+
throw new("Tried to add a dependent to a job that is already eligible for execution. This is not allowed as it can lead to deadlocks or unexpected behavior.");
348+
}
318349
Dependents ??= Pool.DependenciesBackingArray[Index];
319350
if (!IsAnySpaceAvailable())
320351
{
@@ -384,4 +415,9 @@ private int GetFreeDependentsIndex()
384415

385416
return -1;
386417
}
418+
419+
public bool IsJobEligibleForExecution()
420+
{
421+
return UnfinishedJobs == 1;
422+
}
387423
}

JobScheduler/JobScheduler.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,6 @@ public float CalculateThreadUsage()
111111
return total;
112112
}
113113

114-
private void FlushDirect(JobHandle job)
115-
{
116-
while (!Worker.Enqueue(job))
117-
{
118-
}
119-
}
120-
121114
/// <summary>
122115
/// Tries to transfer <see cref="JobHandle"/> to the <see cref="Workers"/> so that it can be executed.
123116
/// If it has unfinished children it will not be flushed, and instead will be left for the children to flush.
@@ -126,18 +119,25 @@ private void FlushDirect(JobHandle job)
126119
public void Flush(JobHandle job)
127120
{
128121
// This is to prevent the job from being flushed multiple times.
129-
var unfinishedJobs = Interlocked.Decrement(ref job.UnfinishedJobs);
130-
if (unfinishedJobs == 0)
122+
var unfinishedJobs = job.SetReadyToExecute();
123+
124+
if (unfinishedJobs != 1)
131125
{
132-
throw new("JobHandle has no unfinished jobs, cannot flush it.");
126+
return;
133127
}
134128

135-
if (unfinishedJobs != 1)
129+
// It is pointless to flush a job that has no work to do.
130+
// This is a job that triggers other jobs,
131+
// so might as well do it now, to lower latency.
132+
if (job.Job == null)
136133
{
134+
Finish(job);
137135
return;
138136
}
139137

140-
FlushDirect(job);
138+
while (!Worker.Enqueue(job))
139+
{
140+
}
141141
}
142142

143143
/// <summary>

JobScheduler/Utils/IParallelJobProducer.cs

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public struct ParallelJobProducer<T> : IJob where T : struct, IParallelJobProduc
4141
private readonly int _to;
4242
private readonly T _producer;
4343
private readonly JobHandle _selfHandle;
44+
private readonly JobHandle _sourceHandle;
4445
private readonly bool _onlySingle;
4546
private readonly int _loopSize;
4647

@@ -50,28 +51,38 @@ public struct ParallelJobProducer<T> : IJob where T : struct, IParallelJobProduc
5051
/// <param name="from">Where the loop starts</param>
5152
/// <param name="to">Maximum to loop to</param>
5253
/// <param name="producer">The job to call</param>
53-
/// <param name="scheduler">The scheduler where the jobs should be put</param>
54+
/// <param name="source"></param>
5455
/// <param name="loopSize">Size of the loop, useful for when you want to use vectorization</param>
5556
/// <param name="onlySingle">Makes sure you can ignore the end parameter in the IParallelJobProducer, and use start as index</param>
56-
public ParallelJobProducer(int from, int to, T producer, int loopSize = 16, bool onlySingle = false)
57+
/// <param name="scheduler">The scheduler where the jobs should be put</param>
58+
public ParallelJobProducer(int from, int to, T producer, JobHandle source = default, int loopSize = 16, bool onlySingle = false)
5759
{
60+
_sourceHandle = source;
5861
_from = from;
5962
_to = to;
6063
_producer = producer;
6164
_onlySingle = onlySingle;
6265
_loopSize = loopSize;
6366
_selfHandle = ParallelForJobCommon.GlobalScheduler!.Schedule(this);
67+
if (!source.IsNull)
68+
{
69+
_selfHandle.SetDependsOn(source);
70+
}
6471
}
6572

6673
//Only used to spawn sub-jobs
67-
private ParallelJobProducer(T producer, JobHandle parent, int start, int end, int loopSize, bool onlySingle)
74+
private ParallelJobProducer(T producer, JobHandle parent, int start, int end, int loopSize, bool onlySingle, JobHandle source)
6875
{
6976
_producer = producer;
7077
_from = start;
7178
_to = end;
7279
_loopSize = loopSize;
7380
_onlySingle = onlySingle;
7481
_selfHandle = ParallelForJobCommon.GlobalScheduler.Schedule(this, parent);
82+
if (!source.IsNull)
83+
{
84+
_selfHandle.SetDependsOn(source);
85+
}
7586
}
7687

7788
/// <summary>
@@ -108,7 +119,9 @@ public void Execute()
108119
/// <returns>The amount.</returns>
109120
private int CalculateChildrenToSplitInto()
110121
{
111-
const int ChildrenToSplitInto = 128; //This should be equal to the number of threads(or that times 2/3?) but for now it's just a constant
122+
// If you change this also change the bulk queue segment size.
123+
// This should be equal to the number of threads(or that times 2/3?) but for now it's just a constant
124+
const int ChildrenToSplitInto = 128;
112125
var range = _to - _from;
113126
return range < ChildrenToSplitInto ? range : ChildrenToSplitInto;
114127
}
@@ -134,11 +147,20 @@ public bool CheckAndSplit()
134147
/// <exception cref="Exception">An exception occuring when an invalid range was passed.</exception>
135148
private void Split()
136149
{
150+
if (_selfHandle.IsJobEligibleForExecution())
151+
{
152+
throw new InvalidOperationException("Tried to split a job that is already eligible for execution. You should split before flushing the job.");
153+
}
154+
137155
var childrenToSplitInto = CalculateChildrenToSplitInto();
138-
BulkQueue<JobHandle>.Segment segment;
139-
while (!Worker.GetEmptySegment(out segment))
156+
BulkQueue<JobHandle>.Segment segment = default;
157+
if (_sourceHandle.IsNull)
140158
{
159+
while (!Worker.GetEmptySegment(out segment))
160+
{
161+
}
141162
}
163+
142164
_selfHandle.Job = null;
143165
for (var i = 0; i < childrenToSplitInto; i++)
144166
{
@@ -149,15 +171,23 @@ private void Split()
149171
throw new($"Invalid range from {start} to {end}");
150172
}
151173

152-
var pjob = new ParallelJobProducer<T>(_producer, _selfHandle, start, end, _loopSize, _onlySingle);
153-
if (!segment.Enqueue(pjob._selfHandle))
174+
var pjob = new ParallelJobProducer<T>(_producer, _selfHandle, start, end, _loopSize, _onlySingle, _sourceHandle);
175+
if (_sourceHandle.IsNull)
154176
{
155-
throw new InvalidOperationException("Failed to enqueue job in segment. Probably segment size was changed without changing the code using it.");
177+
if (!segment.Enqueue(pjob._selfHandle))
178+
{
179+
throw new InvalidOperationException("Failed to enqueue job in segment. Probably segment size was changed without changing the code using it.");
180+
}
156181
}
182+
157183
//Simulate flushing but without using the scheduler
158-
pjob._selfHandle.UnfinishedJobs--;
184+
pjob._selfHandle.SetReadyToExecute();
185+
}
186+
187+
if (_sourceHandle.IsNull)
188+
{
189+
Worker.EnqueueFullSegment(segment);
159190
}
160-
Worker.EnqueueFullSegment(segment);
161191
}
162192

163193
/// <summary>

JobScheduler/Utils/LambdaJob.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,15 @@ public void Execute()
2020
/// <param name="action">The function to run</param>
2121
/// <param name="parent">The parent handle</param>
2222
/// <returns></returns>
23-
public static JobHandle Create(Action action, JobHandle parent)
23+
public static JobHandle Create(Action action, JobHandle source = default)
2424
{
2525
var job = new JobProducer(action);
26-
var handle = ParallelForJobCommon.GlobalScheduler.Schedule(job, parent);
26+
var handle = ParallelForJobCommon.GlobalScheduler!.Schedule(job);
27+
if (!source.IsNull)
28+
{
29+
handle.SetDependsOn(source);
30+
}
31+
2732
return handle;
2833
}
2934

@@ -32,7 +37,7 @@ public static JobHandle Create(Action action, JobHandle parent)
3237
/// </summary>
3338
public static void CreateFlushAndWait(Action action)
3439
{
35-
var handle = Create(action, JobHandle.Null);
40+
var handle = Create(action);
3641
handle.FlushAndWait();
3742
}
3843
}

JobScheduler/Utils/ParallelForEachJob.cs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,23 @@ public void RunSingle(int index)
2222
/// <summary>
2323
/// Creates a ParallelJobProducer that executes the provided action on every item of the given collection.
2424
/// </summary>
25-
/// <param name="source">The data to run the action on</param>
25+
/// <param name="data">The data to run the scheduler over</param>
2626
/// <param name="action">What to run</param>
27-
/// <param name="scheduler">The scheduler to use</param>
27+
/// <param name="source">The job that will trigger this one</param>
2828
/// <param name="parent"></param>
2929
/// <returns>A struct that wraps the handle</returns>
30-
public static JobHandle Create<T>(IEnumerable<T> source, Action<T> action, JobHandle parent = default)
30+
public static JobHandle Create<T>(IEnumerable<T> data, Action<T> action, JobHandle parent = default, JobHandle source = default)
3131
{
32-
var partitioner = Partitioner.Create(source);
32+
var partitioner = Partitioner.Create(data);
3333
// We use Environment.ProcessorCount * 4 to account for uneven distribution of work and to ensure that we have enough partitions to keep all cores busy.
3434
// The overhead of jobs is very low hence we can afford to have more partitions than cores.
3535
var partitions = partitioner.GetPartitions(Environment.ProcessorCount * 4);
36-
var producer = new ParallelJobProducer<PartitionedJobProducer<T>>(0, partitions.Count, new(partitions, action));
37-
producer.GetHandle().SetParent(parent);
36+
var producer = new ParallelJobProducer<PartitionedJobProducer<T>>(0, partitions.Count, new(partitions, action), source);
37+
if (!parent.IsNull)
38+
{
39+
producer.GetHandle().SetParent(parent);
40+
}
41+
3842
producer.CheckAndSplit();
3943
return producer.GetHandle();
4044
}

JobScheduler/Utils/ParallelForJob.cs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@ public void RunSingle(int index)
1919
/// <param name="start">Where to start executing</param>
2020
/// <param name="count">Executes to this</param>
2121
/// <param name="action">What to run</param>
22-
/// <param name="scheduler">The scheduler to use</param>
22+
/// <param name="target">The job to trigger after finishing</param>
23+
/// <param name="source">The job that will trigger this one</param>
2324
/// <returns>A struct that wraps the handle</returns>
24-
public static JobHandle Create(int start, int count, Action<int> action, JobHandle parent = default)
25+
public static JobHandle Create(int start, int count, Action<int> action, JobHandle target = default, JobHandle source = default)
2526
{
26-
var producer = new ParallelJobProducer<JobProducer>(start, count, new(action));
27-
producer.GetHandle().SetParent(parent);
27+
var producer = new ParallelJobProducer<JobProducer>(start, count, new(action), source);
28+
if (!target.IsNull)
29+
{
30+
producer.GetHandle().SetParent(target);
31+
}
2832
producer.CheckAndSplit();
2933
return producer.GetHandle();
3034
}

JobScheduler/Worker.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ internal class Worker
2121
private readonly WorkStealingDeque<JobHandle> _queue;
2222
private readonly JobScheduler _jobScheduler;
2323
private volatile CancellationTokenSource _cancellationToken;
24+
// 128 for now to match the IParallelJobProducer size.
25+
// if you change this then change also the split IParallelJobProducer size, to not be larger than this value.
2426
private static readonly BulkQueue<JobHandle> _bulkQueue = new(1000, 128);
2527
public bool IsCurrentlyWorking { get; private set; } = false;
2628

0 commit comments

Comments
 (0)