Skip to content

Commit 58eb445

Browse files
committed
Add lambda job and fixes
1 parent 41ef639 commit 58eb445

File tree

12 files changed

+206
-136
lines changed

12 files changed

+206
-136
lines changed

JobScheduler.Benchmarks/Benchmark.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,6 @@ public class Benchmark
128128
{
129129
private const int jobCount = 2000;
130130
private const int loopCount = 1000;
131-
private static JobScheduler jobScheduler = new();
132131

133132
private static void CorrectnessTestJob()
134133
{
@@ -137,10 +136,10 @@ private static void CorrectnessTestJob()
137136
{
138137
TestCorrectnessJob.total = 0;
139138
TestCorrectnessJob.acceptsNewEntries = true;
140-
var job = new ParallelJobProducer<TestCorrectnessJob>(0, jobCount, new(), jobScheduler);
139+
var job = new ParallelJobProducer<TestCorrectnessJob>(0, jobCount, new());
141140
job.CheckAndSplit();
142-
jobScheduler.Flush(job.GetHandle());
143-
jobScheduler.Wait(job.GetHandle());
141+
ParallelForJobCommon.GlobalScheduler.Flush(job.GetHandle());
142+
ParallelForJobCommon.GlobalScheduler.Wait(job.GetHandle());
144143
// Thread.Sleep(1);
145144
// Console.WriteLine($"UnfinishedJobs {job.GetHandle().UnfinishedJobs} total {TestCorrectnessJob.total}");
146145
// TestCorrectnessJob.acceptsNewEntries = false;
@@ -182,7 +181,7 @@ private static void BenchC()
182181
var timer = new JobTimer();
183182
for (var sindex = 0; sindex < loopCount; sindex++)
184183
{
185-
var job = new ParallelJobProducer<HeavyCalculationJob>(0, jobCount, new(), jobScheduler);
184+
var job = new ParallelJobProducer<HeavyCalculationJob>(0, jobCount, new());
186185
jobScheduler.Wait(job.GetHandle());
187186
}
188187

@@ -208,21 +207,22 @@ private static long BenchVector(bool useVector)
208207
{
209208
var timer = new JobTimer();
210209
var data = new VectorCalculationJob { a = new float[jobCount], b = new float[jobCount], result = new float[jobCount], Repetitions = 500 };
211-
var parentJob = jobScheduler.Schedule();
210+
var parentJob = ParallelForJobCommon.GlobalScheduler.Schedule();
212211
for (var sindex = 0; sindex < loopCount; sindex++)
213212
{
214-
var job = new ParallelJobProducer<VectorCalculationJob>(0, jobCount, data, jobScheduler, 16, !useVector);
213+
var job = new ParallelJobProducer<VectorCalculationJob>(0, jobCount, data, 16, !useVector);
215214
job.CheckAndSplit();
216215
job.GetHandle().SetParent(parentJob);
217-
jobScheduler.Flush(job.GetHandle());
216+
ParallelForJobCommon.GlobalScheduler.Flush(job.GetHandle());
218217
}
219-
jobScheduler.Flush(parentJob);
220-
jobScheduler.Wait(parentJob);
218+
ParallelForJobCommon.GlobalScheduler.Flush(parentJob);
219+
ParallelForJobCommon.GlobalScheduler.Wait(parentJob);
221220
return timer.End(jobCount * loopCount, $"Use vector: {useVector}");
222221
}
223222

224223
private static void Main(string[] args)
225224
{
225+
ParallelForJobCommon.SetScheduler(new());
226226
// new JobHierarchyTest();
227227
// return;
228228

@@ -246,7 +246,7 @@ private static void Main(string[] args)
246246
var nonVectorized = BenchVector(false);
247247
Console.WriteLine($"Ratio {(double)nonVectorized / vectorized}");
248248
}
249-
jobScheduler.Dispose();
249+
ParallelForJobCommon.DisposeScheduler();
250250

251251
//using var jobScheduler = new JobScheduler();
252252

JobScheduler.Test/ParallelForTests.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,29 @@ public void TestParallelForEachMultiple()
7979
}
8080
Thread.Sleep(100);
8181
}
82+
83+
[Test]
84+
public void TestLambdaJob()
85+
{
86+
var autoresetEvent = new AutoResetEvent(false);
87+
LambdaJob.CreateFlushAndWait(() =>
88+
{
89+
autoresetEvent.Set();
90+
});
91+
Assert.That(autoresetEvent.WaitOne(1000), Is.True, "Lambda job did not complete in time.");
92+
}
93+
94+
[Test]
95+
public void NestedJobs()
96+
{
97+
var triggers = 0;
98+
LambdaJob.CreateFlushAndWait(() =>
99+
{
100+
ParallelForJob.CreateFlushAndWait(0, 100, i =>
101+
{
102+
Interlocked.Increment(ref triggers);
103+
});
104+
});
105+
Assert.That(triggers, Is.EqualTo(0));
106+
}
82107
}

JobScheduler/JobHandle.cs

Lines changed: 62 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ internal JobHandlePool()
6060
DependenciesBackingArray[i] = new ushort[DependencyCount];
6161
for (var j = 0; j < DependencyCount; j++)
6262
{
63-
DependenciesBackingArray[i][j] = JobHandle.NullHandle;
63+
DependenciesBackingArray[i][j] = JobHandle.NullHandleId;
6464
}
6565
}
6666

@@ -94,12 +94,11 @@ internal JobHandle RentJobHandle(IJob? iJob = null)
9494
Thread.Yield();
9595
}
9696

97-
return new()
97+
return new(index)
9898
{
9999
Generation = ++Generation[index],
100-
Index = index,
101100
Job = iJob,
102-
Parent = JobHandle.NullHandle,
101+
Parent = JobHandle.NullHandleId,
103102
UnfinishedJobs = 2, // 2 unfinished, because 1 is the job itself, and 1 is whether the job can be scheduled at all yet
104103
Dependents = null,
105104
};
@@ -116,7 +115,7 @@ public void ReturnHandle(JobHandle handle)
116115
{
117116
for (var i = 0; i < handle.Dependents!.Length; i++)
118117
{
119-
handle.Dependents[i] = JobHandle.NullHandle;
118+
handle.Dependents[i] = JobHandle.NullHandleId;
120119
}
121120
}
122121

@@ -139,20 +138,48 @@ public struct JobHandle
139138
/// <summary>
140139
/// Invalid handle value, used to indicate that a <see cref="JobHandle"/> is not valid.
141140
/// </summary>
142-
public const ushort NullHandle = ushort.MaxValue;
141+
public const ushort NullHandleId = ushort.MaxValue;
143142

144143
/// <summary>
145144
/// The index of this <see cref="JobHandle"/>, pointing towards its data inside the <see cref="Pool"/>.
146145
/// </summary>
147146
public ushort Index;
148147

148+
/// <summary>
149+
/// The generation of the handle.
150+
/// It is only used to check whether the handle is still valid or has been recycled.
151+
/// Which is only important when waiting for the handle.
152+
/// Internally we can safely store handles without generation, as if there is a reference to it, it means it is still valid and has not been recycled yet.
153+
/// Hence, in that case the generation is irrelevant as it is always the same as the current generation of the pool.
154+
/// </summary>
155+
internal ushort Generation;
156+
157+
private readonly bool _isInitialized;
158+
/// <summary>
159+
/// Indicates whether this <see cref="JobHandle"/> is null.
160+
/// Default or Just JobHandle() is null, meaning it has not been initialized yet.
161+
/// </summary>
162+
public readonly bool IsNull
163+
{
164+
get => !_isInitialized;
165+
}
166+
149167
/// <summary>
150168
/// Creates a new <see cref="JobHandle"/>.
151169
/// </summary>
152170
/// <param name="id">Its id.</param>
153171
public JobHandle(ushort id)
154172
{
155173
Index = id;
174+
_isInitialized = true;
175+
}
176+
177+
/// <summary>
178+
/// Gets a null handle.
179+
/// </summary>
180+
public static JobHandle Null
181+
{
182+
get => default;
156183
}
157184

158185
/// <summary>
@@ -190,10 +217,15 @@ public ref int UnfinishedJobs
190217

191218
/// <summary>
192219
/// Sets a <see cref="JobHandle"/> that is to be the parent.
220+
/// If a null handle is passed it will be ignored.
193221
/// </summary>
194222
/// <param name="parent">The parent job handle</param>
195223
public void SetParent(JobHandle parent)
196224
{
225+
if (parent.IsNull)
226+
{
227+
return;
228+
}
197229
Interlocked.Increment(ref parent.UnfinishedJobs);
198230
Parent = parent.Index;
199231
}
@@ -245,13 +277,32 @@ public void SetDependsOn(JobHandle source)
245277
}
246278

247279
/// <summary>
248-
/// A shortcut to <see cref="JobScheduler.Flush(JobHandle)"/>.
280+
/// A shortcut to <see cref="JobScheduler.Wait(JobHandle)"/>.
249281
/// Calls the global scheduler wait the job.
250282
/// In case you want to use other scheduler then use <see cref="JobScheduler.Wait(JobHandle)"/> instead.
251283
/// </summary>
252-
public void Wait()
284+
public JobHandle Wait()
253285
{
254286
ParallelForJobCommon.GlobalScheduler!.Wait(this);
287+
return this;
288+
}
289+
290+
/// <summary>
291+
/// Flushes the job handle.
292+
/// </summary>
293+
public JobHandle Flush()
294+
{
295+
ParallelForJobCommon.GlobalScheduler!.Flush(this);
296+
return this;
297+
}
298+
299+
/// <summary>
300+
/// Combines <see cref="Wait"/> and <see cref="Flush"/>.
301+
/// </summary>
302+
public void FlushAndWait()
303+
{
304+
Flush();
305+
Wait();
255306
}
256307

257308
/// <summary>
@@ -296,7 +347,7 @@ private bool NextNodeExist()
296347
throw new InvalidOperationException("Dependents are not initialized. Please set them before using this method.");
297348
}
298349

299-
return Dependents[^1] != NullHandle;
350+
return Dependents[^1] != NullHandleId;
300351
}
301352

302353
/// <summary>
@@ -310,7 +361,7 @@ private bool IsAnySpaceAvailable()
310361
throw new InvalidOperationException("Dependents are not initialized. Please set them before using this method.");
311362
}
312363

313-
return Dependents[^2] == NullHandle;
364+
return Dependents[^2] == NullHandleId;
314365
}
315366

316367
/// <summary>
@@ -325,21 +376,12 @@ private int GetFreeDependentsIndex()
325376

326377
for (var i = 0; i < Dependents.Length; i++)
327378
{
328-
if (Dependents[i] == NullHandle)
379+
if (Dependents[i] == NullHandleId)
329380
{
330381
return i;
331382
}
332383
}
333384

334385
return -1;
335386
}
336-
337-
/// <summary>
338-
/// The generation of the handle.
339-
/// It is only used to check whether the handle is still valid or has been recycled.
340-
/// Which is only important when waiting for the handle.
341-
/// Internally we can safely store handles without generation, as if there is a reference to it, it means it is still valid and has not been recycled yet.
342-
/// Hence, in that case the generation is irrelevant as it is always the same as the current generation of the pool.
343-
/// </summary>
344-
internal ushort Generation;
345387
}

JobScheduler/JobScheduler.cs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,17 @@ public JobHandle Schedule(IJob? iJob, JobHandle parent)
7171
return job;
7272
}
7373

74+
/// <summary>
75+
/// Schedule a new <see cref="JobHandle"/> with a parent <see cref="JobHandle"/>.
76+
/// Simple a helper function to avoid writing (null, parent) every time.
77+
/// </summary>
78+
public JobHandle Schedule(JobHandle parent)
79+
{
80+
var job = JobHandle.Pool.RentJobHandle();
81+
job.SetParent(parent);
82+
return job;
83+
}
84+
7485
/// <summary>
7586
/// Creates a dependency between two <see cref="JobHandle"/>s.
7687
/// The dependent <see cref="JobHandle"/>s is executed after the target <see cref="JobHandle"/> has finished.
@@ -143,11 +154,11 @@ public void Wait(JobHandle job)
143154
{
144155
for (var i = 0; i < Workers.Count; i++)
145156
{
157+
nextJob = Workers[i].Queue.TrySteal(out stolenJob);
146158
if (nextJob)
147159
{
148160
break;
149161
}
150-
nextJob = Workers[i].Queue.TrySteal(out stolenJob);
151162
}
152163
}
153164

@@ -195,7 +206,7 @@ internal void Finish(JobHandle handle)
195206
throw new($"Finish called on a job with {handle.UnfinishedJobs} != 1 unfinished jobs, id:{handle.Index} job: {handle.Job == null}");
196207
}
197208

198-
if (handle.Parent != JobHandle.NullHandle)
209+
if (handle.Parent != JobHandle.NullHandleId)
199210
{
200211
OnChildFinish(new(handle.Parent));
201212
}
@@ -205,7 +216,7 @@ internal void Finish(JobHandle handle)
205216
var dependencies = handle.GetDependents();
206217
foreach (var id in dependencies)
207218
{
208-
if (id == JobHandle.NullHandle)
219+
if (id == JobHandle.NullHandleId)
209220
{
210221
continue;
211222
}

JobScheduler/Utils/BulkQueue.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ namespace Schedulers.Utils;
66
/// Thread-safe bulk queue implementation.
77
/// Provides around 10-20% performance in jobs compared to ordinary flush.
88
/// </summary>
9-
public class BulkQueue<T>
9+
internal class BulkQueue<T>
1010
{
1111
public BulkQueue(int segmentCount, int segmentSize)
1212
{

JobScheduler/Utils/IParallelJobProducer.cs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,11 @@ public void RunSingle(int index)
3535
/// </summary>
3636
/// <typeparam name="T">Type of the </typeparam>
3737
/// <remarks>The reason we pass T instead of just IParallelJobProducer is because the compiler otherwise cannot inline it</remarks>
38-
public class ParallelJobProducer<T> : IJob where T : struct, IParallelJobProducer
38+
public struct ParallelJobProducer<T> : IJob where T : struct, IParallelJobProducer
3939
{
4040
private int _from;
4141
private readonly int _to;
4242
private readonly T _producer;
43-
private readonly JobScheduler _scheduler;
4443
private readonly JobHandle _selfHandle;
4544
private readonly bool _onlySingle;
4645
private readonly int _loopSize;
@@ -54,33 +53,29 @@ public class ParallelJobProducer<T> : IJob where T : struct, IParallelJobProduce
5453
/// <param name="scheduler">The scheduler where the jobs should be put</param>
5554
/// <param name="loopSize">Size of the loop, useful for when you want to use vectorization</param>
5655
/// <param name="onlySingle">Makes sure you can ignore the end parameter in the IParallelJobProducer, and use start as index</param>
57-
public ParallelJobProducer(int from, int to, T producer, JobScheduler scheduler, int loopSize = 16, bool onlySingle = false)
56+
public ParallelJobProducer(int from, int to, T producer, int loopSize = 16, bool onlySingle = false)
5857
{
5958
_from = from;
6059
_to = to;
6160
_producer = producer;
62-
_scheduler = scheduler;
6361
_onlySingle = onlySingle;
6462
_loopSize = loopSize;
65-
_selfHandle = _scheduler.Schedule(this);
63+
_selfHandle = ParallelForJobCommon.GlobalScheduler!.Schedule(this);
6664
}
6765

6866
//Only used to spawn sub-jobs
69-
private ParallelJobProducer(T producer, JobScheduler scheduler, JobHandle parent, int start, int end, int loopSize, bool onlySingle)
67+
private ParallelJobProducer(T producer, JobHandle parent, int start, int end, int loopSize, bool onlySingle)
7068
{
7169
_producer = producer;
72-
_scheduler = scheduler;
7370
_from = start;
7471
_to = end;
7572
_loopSize = loopSize;
7673
_onlySingle = onlySingle;
77-
_selfHandle = _scheduler.Schedule(this, parent);
74+
_selfHandle = ParallelForJobCommon.GlobalScheduler.Schedule(this, parent);
7875
}
7976

8077
/// <summary>
8178
/// Executes the job.
82-
/// If external thread overwrites the <see cref="_shouldSplitWhenAvailable"/> to true, it will split the job into multiple children.
83-
/// The current job will stop executing and the children will be scheduled.
8479
/// </summary>
8580
public void Execute()
8681
{
@@ -154,8 +149,12 @@ private void Split()
154149
throw new($"Invalid range from {start} to {end}");
155150
}
156151

157-
var pjob = new ParallelJobProducer<T>(_producer, _scheduler, _selfHandle, start, end, _loopSize, _onlySingle);
158-
segment.Enqueue(pjob._selfHandle);
152+
var pjob = new ParallelJobProducer<T>(_producer, _selfHandle, start, end, _loopSize, _onlySingle);
153+
if (!segment.Enqueue(pjob._selfHandle))
154+
{
155+
throw new InvalidOperationException("Failed to enqueue job in segment. Probably segment size was changed without changing the code using it.");
156+
}
157+
//Simulate flushing but without using the scheduler
159158
pjob._selfHandle.UnfinishedJobs--;
160159
}
161160
Worker.EnqueueFullSegment(segment);

0 commit comments

Comments
 (0)