Nebula
Loading...
Searching...
No Matches
jobs2.h
Go to the documentation of this file.
1#pragma once
2#include "threading/thread.h"
3#include "threading/event.h"
4#include "util/stringatom.h"
6
7//------------------------------------------------------------------------------
14//------------------------------------------------------------------------------
15namespace Threading
16{
17class Event;
18
19}
20
21namespace Jobs2
22{
23
24#define JOB_SIGNATURE (SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset)
25#define JOB_BEGIN_LOOP \
26for (IndexT i = 0; i < groupSize; i++)\
27{\
28 IndexT __INDEX = i + invocationOffset;\
29 if (__INDEX >= totalJobs)\
30 return;\
31
32#define JOB_END_LOOP \
33}
34
35#define JOB_ITEM_INDEX __INDEX
36
37
38class JobThread;
39
40void* JobAlloc(SizeT bytes);
41typedef volatile long CompletionCounter;
42using JobFunc = void(*)(SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset, void* ctx);
43
44template <typename... ARGS>
46{
47 virtual void invoke(ARGS... args) = 0;
48};
49
50template<typename LAMBDA, typename... ARGS>
51struct Callable : CallableStub<ARGS...>
52{
53 LAMBDA l;
54
55 Callable(LAMBDA l) : l(std::move(l)) {};
56
57 void invoke(ARGS... args) override
58 {
59 l(args...);
60 }
61};
62
63struct Lambda
64{
66
67 template<typename LAMBDA>
68 Lambda(LAMBDA l)
69 {
71 this->callable = (CallableType*)Jobs2::JobAlloc(sizeof(CallableType));
72 new (this->callable) CallableType(l);
73 };
74
75 void operator()(SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset)
76 {
77 callable->invoke(totalJobs, groupSize, groupIndex, invocationOffset);
78 }
79};
80
95
96struct JobNode
97{
100 JobNode* sequence; // set to nullptr for ordinary nodes
101};
102
117
118extern Jobs2Context ctx;
119
120class JobThread : public Threading::Thread
121{
123public:
124
126 JobThread();
128 virtual ~JobThread();
129
131 void SignalWorkAvailable();
132
135protected:
136
138 virtual void EmitWakeupSignal() override;
140 virtual void DoWork() override;
141
142private:
143 Threading::Event wakeupEvent;
144};
145
169
171void JobSystemInit(const JobSystemInitInfo& info);
173void JobSystemUninit();
174
176template <typename T> T* JobAlloc(SizeT count);
178void* JobAlloc(SizeT bytes);
180void JobNewFrame();
181
182extern JobNode* sequenceNode;
183extern JobNode* sequenceTail;
186
189 , Threading::AtomicCounter* doneCounter = nullptr
190 , Threading::Event* signalEvent = nullptr);
191
193template <typename CTX> void JobAppendSequence(
194 const JobFunc& func
195 , const SizeT numInvocations
196 , const SizeT groupSize
197 , const CTX& context
198);
199
201template <typename CTX> void JobAppendSequence(
202 const JobFunc& func
203 , const SizeT numInvocations
204 , const CTX& context
205);
206
208void JobEndSequence(Threading::Event* signalEvent = nullptr);
209
210//------------------------------------------------------------------------------
213template <typename T> T*
215{
216 return (T*)JobAlloc(count * sizeof(T));
217}
218
219//------------------------------------------------------------------------------
222template <typename LAMBDA> void
224 LAMBDA&& func
225 , const SizeT numInvocations
226 , const SizeT groupSize
227 , const Util::FixedArray<const Threading::AtomicCounter*, true>& waitCounters = nullptr
228 , Threading::AtomicCounter* doneCounter = nullptr
229 , Threading::Event* signalEvent = nullptr
230)
231{
232 if (numInvocations == 0)
233 {
234 if (doneCounter != nullptr)
235 {
236 Threading::Interlocked::Exchange(doneCounter, 0);
237 }
238 // If we have a signal event and no invocations, just signal the event and return
239 if (signalEvent != nullptr)
240 {
241 signalEvent->Signal();
242 }
243 return;
244 }
245 n_assert(doneCounter != nullptr ? *doneCounter > 0 : true);
246
247 // Calculate the number of actual jobs based on invocations and group size
248 SizeT numJobs = Math::ceil(numInvocations / float(groupSize));
249
250 // Calculate allocation size which is node + counters + data context
251 SizeT dynamicAllocSize = sizeof(JobNode) + waitCounters.Size() * sizeof(const Threading::AtomicCounter*);
252 auto mem = JobAlloc<char>(dynamicAllocSize);
253 auto node = (JobNode*)mem;
254
255 // Copy over wait counters
256 node->job.waitCounters = nullptr;
257 if (waitCounters.Size() > 0)
258 {
259 node->job.waitCounters = (const Threading::AtomicCounter**)(mem + sizeof(JobNode));
260 memcpy(node->job.waitCounters, waitCounters.Begin(), waitCounters.Size() * sizeof(const Threading::AtomicCounter*));
261 }
262
263 node->job.l = std::move(func);
264 node->job.func = nullptr;
265 node->job.remainingGroups = numJobs;
266 node->job.groupCompletionCounter = numJobs;
267 node->job.numInvocations = numInvocations;
268 node->job.groupSize = groupSize;
269 node->job.numWaitCounters = (SizeT)waitCounters.Size();
270 node->job.doneCounter = doneCounter;
271 node->job.signalEvent = signalEvent;
272 node->sequence = nullptr;
273
274 // Add to end of linked list
275 ctx.jobLock.Enter();
276
277 // First, set head node if nullptr
278 if (ctx.head == nullptr)
279 ctx.head = node;
280
281 // Then add node to end of list
282 node->next = nullptr;
283 if (ctx.tail != nullptr)
284 ctx.tail->next = node;
285 ctx.tail = node;
286
287 ctx.jobLock.Leave();
288
289 // Trigger threads to wake up and compete for jobs
290 for (Ptr<JobThread>& thread : ctx.threads)
291 {
292 thread->SignalWorkAvailable();
293 }
294}
295
296//------------------------------------------------------------------------------
299template <typename LAMBDA> void
301 LAMBDA&& func
302 , const SizeT numInvocations
303 , const Util::FixedArray<const Threading::AtomicCounter*, true>& waitCounters = nullptr
304 , Threading::AtomicCounter* doneCounter = nullptr
305 , Threading::Event* signalEvent = nullptr
306)
307{
308 JobDispatch(func, numInvocations, numInvocations, waitCounters, doneCounter, signalEvent);
309}
310
311//------------------------------------------------------------------------------
314template <typename CTX> void
316 const JobFunc& func
317 , const SizeT numInvocations
318 , const SizeT groupSize
319 , const CTX& context
320 , const Util::FixedArray<const Threading::AtomicCounter*, true>& waitCounters = nullptr
321 , Threading::AtomicCounter* doneCounter = nullptr
322 , Threading::Event* signalEvent = nullptr
323)
324{
325 static_assert(std::is_trivially_destructible<CTX>::value, "Job context has to be trivially destructible");
326 n_assert(numInvocations > 0);
327 n_assert(doneCounter != nullptr ? *doneCounter > 0 : true);
328
329 // Calculate the number of actual jobs based on invocations and group size
330 SizeT numJobs = Math::ceil(numInvocations / float(groupSize));
331
332 // Calculate allocation size which is node + counters + data context
333 auto dynamicAllocSize = sizeof(JobNode) + sizeof(CTX) + waitCounters.Size() * sizeof(const Threading::AtomicCounter*);
334 auto mem = JobAlloc<char>(dynamicAllocSize);
335 auto node = (JobNode*)mem;
336
337 // Copy over wait counters
338 node->job.waitCounters = nullptr;
339 if (waitCounters.Size() > 0)
340 {
341 node->job.waitCounters = (const Threading::AtomicCounter**)(mem + sizeof(JobNode) + sizeof(CTX));
342 memcpy(node->job.waitCounters, waitCounters.Begin(), waitCounters.Size() * sizeof(const Threading::AtomicCounter*));
343 }
344
345 // Move context
346 node->job.data = (void*)(mem + sizeof(JobNode));
347 auto data = reinterpret_cast<CTX*>(node->job.data);
348 *data = context;
349
350 node->job.l.callable = nullptr;
351 node->job.func = func;
352 node->job.remainingGroups = numJobs;
353 node->job.groupCompletionCounter = numJobs;
354 node->job.numInvocations = numInvocations;
355 node->job.groupSize = groupSize;
356 node->job.numWaitCounters = (SizeT)waitCounters.Size();
357 node->job.doneCounter = doneCounter;
358 node->job.signalEvent = signalEvent;
359 node->sequence = nullptr;
360
361 // Add to end of linked list
362 ctx.jobLock.Enter();
363
364 // First, set head node if nullptr
365 if (ctx.head == nullptr)
366 ctx.head = node;
367
368 // Then add node to end of list
369 node->next = nullptr;
370 if (ctx.tail != nullptr)
371 ctx.tail->next = node;
372 ctx.tail = node;
373
374 ctx.jobLock.Leave();
375
376 // Trigger threads to wake up and compete for jobs
377 for (Ptr<JobThread>& thread : ctx.threads)
378 {
379 thread->SignalWorkAvailable();
380 }
381}
382
383//------------------------------------------------------------------------------
386template <typename CTX> void
388 const JobFunc& func
389 , const SizeT numInvocations
390 , const CTX& context
391 , const Util::FixedArray<const Threading::AtomicCounter*, true>& waitCounters = nullptr
392 , Threading::AtomicCounter* doneCounter = nullptr
393 , Threading::Event* signalEvent = nullptr
394)
395{
396 JobDispatch(func, numInvocations, numInvocations, context, waitCounters, doneCounter, signalEvent);
397}
398
399//------------------------------------------------------------------------------
402template<typename CTX> void
403JobAppendSequence(const JobFunc& func, const SizeT numInvocations, const SizeT groupSize, const CTX& context)
404{
405 static_assert(std::is_trivially_destructible<CTX>::value, "Job context has to be trivially destructible");
406 n_assert(numInvocations > 0);
407 n_assert(sequenceThread == Threading::Thread::GetMyThreadId());
408 n_assert(sequenceNode != nullptr);
409
410 // Calculate the number of actual jobs based on invocations and group size
411 SizeT numJobs = Math::ceil(numInvocations / float(groupSize));
412
413 // Calculate allocation size which is node + counters + data context
414 SizeT dynamicAllocSize = sizeof(JobNode) + sizeof(CTX) + sizeof(Threading::AtomicCounter);
415 if (prevDoneCounter != nullptr)
416 dynamicAllocSize += sizeof(Threading::AtomicCounter*);
417 auto mem = JobAlloc<char>(dynamicAllocSize);
418 auto node = (JobNode*)mem;
419
420 if (prevDoneCounter != nullptr)
421 {
422 node->job.numWaitCounters = 1;
423 node->job.waitCounters = (const Threading::AtomicCounter**)(mem + sizeof(JobNode) + sizeof(CTX) + sizeof(Threading::AtomicCounter));
424 memcpy(node->job.waitCounters, &prevDoneCounter, sizeof(Threading::AtomicCounter*));
425 }
426 else
427 {
428 node->job.numWaitCounters = 0;
429 node->job.waitCounters = nullptr;
430 }
431
432 // Move context
433 node->job.data = (void*)(mem + sizeof(JobNode));
434 auto data = reinterpret_cast<CTX*>(node->job.data);
435 *data = context;
436
437 node->job.l.callable = nullptr;
438 node->job.func = func;
439 node->job.remainingGroups = numJobs;
440 node->job.groupCompletionCounter = numJobs;
441 node->job.numInvocations = numInvocations;
442 node->job.groupSize = groupSize;
443
444 node->job.doneCounter = (Threading::AtomicCounter*)(mem + sizeof(JobNode) + sizeof(CTX));
445 *node->job.doneCounter = 1;
446 prevDoneCounter = node->job.doneCounter;
447 node->job.signalEvent = nullptr;
448 node->next = nullptr;
449
450 // The remainingGroups counter for the sequence node is the length of the sequence chain
451 if (sequenceTail == nullptr)
452 sequenceNode->sequence = node;
453 else
454 sequenceTail->next = node;
455
456 sequenceTail = node;
457
458 // Queue job
459 //ctx.queuedJobs.Append(node);
460}
461
462//------------------------------------------------------------------------------
465template<typename CTX> void
466JobAppendSequence(const JobFunc& func, const SizeT numInvocations, const CTX& context)
467{
468 JobAppendSequence(func, numInvocations, numInvocations, context);
469}
470
471} // namespace Jobs2
JobThread()
constructor
Definition jobs2.cc:19
virtual void DoWork() override
this method runs in the thread context
Definition jobs2.cc:58
Threading::Event wakeupEvent
Definition jobs2.h:143
void SignalWorkAvailable()
Signal new work available.
Definition jobs2.cc:40
__DeclareClass(JobThread)
virtual void EmitWakeupSignal() override
override this method if your thread loop needs a wakeup call before stopping
Definition jobs2.cc:49
virtual ~JobThread()
destructor
Definition jobs2.cc:28
bool enableProfiling
Definition jobs2.h:134
bool enableIo
Definition jobs2.h:133
Nebula's smart pointer class which manages the life time of RefCounted objects.
Definition ptr.h:38
Nebula's dynamic array class.
Definition array.h:60
Implements a fixed size one-dimensional array.
Definition fixedarray.h:20
A StringAtom.
Definition stringatom.h:22
#define n_assert(exp)
Definition debug.h:50
Definition jobs2.cc:11
Threading::ThreadId sequenceThread
Definition jobs2.cc:288
Jobs2Context ctx
Definition jobs2.cc:13
Jobs2::JobNode * sequenceNode
Definition jobs2.cc:285
const Threading::AtomicCounter * prevDoneCounter
Definition jobs2.cc:287
Jobs2::JobNode * sequenceTail
Definition jobs2.cc:286
void JobBeginSequence(const Util::FixedArray< const Threading::AtomicCounter *, true > &waitCounters, Threading::AtomicCounter *doneCounter, Threading::Event *signalEvent)
Begin a sequence of jobs.
Definition jobs2.cc:294
void JobEndSequence(Threading::Event *signalEvent)
Flush queued jobs.
Definition jobs2.cc:334
void(*)(SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset, void *ctx) JobFunc
Definition jobs2.h:42
void JobSystemUninit()
Destroy job port.
Definition jobs2.cc:240
void * JobAlloc(SizeT bytes)
Allocate memory.
Definition jobs2.cc:264
void JobSystemInit(const JobSystemInitInfo &info)
Create a new job port.
Definition jobs2.cc:207
volatile long CompletionCounter
Definition jobs2.h:41
void JobNewFrame()
Progress to new buffer.
Definition jobs2.cc:253
void JobDispatch(LAMBDA &&func, const SizeT numInvocations, const SizeT groupSize, const Util::FixedArray< const Threading::AtomicCounter *, true > &waitCounters=nullptr, Threading::AtomicCounter *doneCounter=nullptr, Threading::Event *signalEvent=nullptr)
Definition jobs2.h:223
void JobAppendSequence(const JobFunc &func, const SizeT numInvocations, const SizeT groupSize, const CTX &context)
Append job to sequence with an automatic dependency on the previous job.
Definition jobs2.h:403
__forceinline float ceil(float val)
Floating point ceiling.
Definition scalar.h:542
int Exchange(int volatile *dest, int value)
interlocked exchange
Definition gccinterlocked.cc:94
The Jobs2 system provides a set of threads and a pool of jobs from which threads can pickup work.
Definition jobs2.h:16
pthread_t ThreadId
Definition linuxthreadid.h:15
volatile int AtomicCounter
Definition interlocked.h:19
Definition half.h:491
Definition jobs2.h:52
LAMBDA l
Definition jobs2.h:53
void invoke(ARGS... args) override
Definition jobs2.h:57
Callable(LAMBDA l)
Definition jobs2.h:55
Definition jobs2.h:46
virtual void invoke(ARGS... args)=0
Definition jobs2.h:82
const Threading::AtomicCounter ** waitCounters
Definition jobs2.h:90
void * data
Definition jobs2.h:89
SizeT groupSize
Definition jobs2.h:88
int remainingGroups
Definition jobs2.h:85
SizeT numWaitCounters
Definition jobs2.h:91
Threading::AtomicCounter groupCompletionCounter
Definition jobs2.h:86
Threading::AtomicCounter * doneCounter
Definition jobs2.h:92
SizeT numInvocations
Definition jobs2.h:87
Threading::Event * signalEvent
Definition jobs2.h:93
JobFunc func
Definition jobs2.h:83
Lambda l
Definition jobs2.h:84
Definition jobs2.h:97
JobNode * next
Definition jobs2.h:98
JobContext job
Definition jobs2.h:99
JobNode * sequence
Definition jobs2.h:100
uint priority
Definition jobs2.h:151
bool enableIo
Definition jobs2.h:156
SizeT scratchMemorySize
Definition jobs2.h:153
Util::StringAtom name
Definition jobs2.h:148
uint affinity
Definition jobs2.h:150
JobSystemInitInfo()
Definition jobs2.h:159
SizeT numThreads
Definition jobs2.h:149
bool enableProfiling
Definition jobs2.h:157
SizeT numBuffers
Definition jobs2.h:154
Definition jobs2.h:104
IndexT activeBuffer
Definition jobs2.h:113
JobNode * head
Definition jobs2.h:106
Util::Array< JobNode * > queuedJobs
Definition jobs2.h:109
SizeT scratchMemorySize
Definition jobs2.h:115
IndexT iterator
Definition jobs2.h:112
SizeT numBuffers
Definition jobs2.h:111
Util::FixedArray< byte * > scratchMemory
Definition jobs2.h:114
JobNode * tail
Definition jobs2.h:107
Util::FixedArray< Ptr< JobThread > > threads
Definition jobs2.h:108
Threading::CriticalSection jobLock
Definition jobs2.h:105
Definition jobs2.h:64
Lambda(LAMBDA l)
Definition jobs2.h:68
CallableStub< SizeT, SizeT, IndexT, SizeT > * callable
Definition jobs2.h:65
void operator()(SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset)
Definition jobs2.h:75
int SizeT
Definition types.h:42
unsigned int uint
Definition types.h:33
int IndexT
Definition types.h:41