Nebula
Loading...
Searching...
No Matches
jobs2.h
Go to the documentation of this file.
1#pragma once
2#include "threading/thread.h"
3#include "threading/event.h"
4#include "util/stringatom.h"
6
7//------------------------------------------------------------------------------
14//------------------------------------------------------------------------------
15namespace Threading
16{
17class Event;
18
19}
20
21namespace Jobs2
22{
23
24#define JOB_SIGNATURE (SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset)
25#define JOB_BEGIN_LOOP \
26for (IndexT i = 0; i < groupSize; i++)\
27{\
28 IndexT __INDEX = i + invocationOffset;\
29 if (__INDEX >= totalJobs)\
30 return;\
31
32#define JOB_END_LOOP \
33}
34
35#define JOB_ITEM_INDEX __INDEX
36
37
38class JobThread;
39
40void* JobAlloc(SizeT bytes);
41typedef volatile long CompletionCounter;
42using JobFunc = void(*)(SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset, void* ctx);
43
44template <typename... ARGS>
46{
47 virtual void invoke(ARGS... args) = 0;
48};
49
50template<typename LAMBDA, typename... ARGS>
51struct Callable : CallableStub<ARGS...>
52{
53 LAMBDA l;
54
55 Callable(LAMBDA l) : l(std::move(l)) {};
56
57 void invoke(ARGS... args) override
58 {
59 l(args...);
60 }
61};
62
63struct Lambda
64{
66
67 template<typename LAMBDA>
68 Lambda(LAMBDA l)
69 {
71 this->callable = (CallableType*)Jobs2::JobAlloc(sizeof(CallableType));
72 new (this->callable) CallableType(l);
73 };
74
75 void operator()(SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset)
76 {
77 callable->invoke(totalJobs, groupSize, groupIndex, invocationOffset);
78 }
79};
80
95
96struct JobNode
97{
100 JobNode* sequence; // set to nullptr for ordinary nodes
101};
102
117
118extern Jobs2Context ctx;
119
120class JobThread : public Threading::Thread
121{
123public:
124
126 JobThread();
128 virtual ~JobThread();
129
131 void SignalWorkAvailable();
132
135protected:
136
138 virtual void EmitWakeupSignal() override;
140 virtual void DoWork() override;
141
142private:
143 Threading::Event wakeupEvent;
144};
145
169
171void JobSystemInit(const JobSystemInitInfo& info);
173void JobSystemUninit();
174
176template <typename T> T* JobAlloc(SizeT count);
178void* JobAlloc(SizeT bytes);
180void JobNewFrame();
181
182extern JobNode* sequenceNode;
183extern JobNode* sequenceTail;
186
189 , Threading::AtomicCounter* doneCounter = nullptr
190 , Threading::Event* signalEvent = nullptr);
191
193template <typename CTX> void JobAppendSequence(
194 const JobFunc& func
195 , const SizeT numInvocations
196 , const SizeT groupSize
197 , const CTX& context
198);
199
201template <typename CTX> void JobAppendSequence(
202 const JobFunc& func
203 , const SizeT numInvocations
204 , const CTX& context
205);
206
208template <typename LAMBDA> void JobAppendSequence(
209 LAMBDA&& func
210 , const SizeT numInvocations
211 , const SizeT groupSize
212);
213
214template <typename LAMBDA> void JobAppendSequence(
215 LAMBDA&& func
216 , const SizeT numInvocations
217);
218
220void JobEndSequence(Threading::Event* signalEvent = nullptr);
221
222//------------------------------------------------------------------------------
225template <typename T> T*
227{
228 return (T*)JobAlloc(count * sizeof(T));
229}
230
231//------------------------------------------------------------------------------
234template <typename LAMBDA> void
236 LAMBDA&& func
237 , const SizeT numInvocations
238 , const SizeT groupSize
239 , const Util::FixedArray<const Threading::AtomicCounter*, true>& waitCounters = nullptr
240 , Threading::AtomicCounter* doneCounter = nullptr
241 , Threading::Event* signalEvent = nullptr
242)
243{
244 if (numInvocations == 0)
245 {
246 if (doneCounter != nullptr)
247 {
248 Threading::Interlocked::Exchange(doneCounter, 0);
249 }
250 // If we have a signal event and no invocations, just signal the event and return
251 if (signalEvent != nullptr)
252 {
253 signalEvent->Signal();
254 }
255 return;
256 }
257 n_assert(doneCounter != nullptr ? *doneCounter > 0 : true);
258
259 // Calculate the number of actual jobs based on invocations and group size
260 SizeT numJobs = Math::ceil(numInvocations / float(groupSize));
261
262 // Calculate allocation size which is node + counters + data context
263 SizeT dynamicAllocSize = sizeof(JobNode) + waitCounters.Size() * sizeof(const Threading::AtomicCounter*);
264 auto mem = JobAlloc<char>(dynamicAllocSize);
265 auto node = (JobNode*)mem;
266
267 // Copy over wait counters
268 node->job.waitCounters = nullptr;
269 if (waitCounters.Size() > 0)
270 {
271 node->job.waitCounters = (const Threading::AtomicCounter**)(mem + sizeof(JobNode));
272 memcpy(node->job.waitCounters, waitCounters.Begin(), waitCounters.Size() * sizeof(const Threading::AtomicCounter*));
273 }
274
275 node->job.l = std::move(func);
276 node->job.func = nullptr;
277 node->job.remainingGroups = numJobs;
278 node->job.groupCompletionCounter = numJobs;
279 node->job.numInvocations = numInvocations;
280 node->job.groupSize = groupSize;
281 node->job.numWaitCounters = (SizeT)waitCounters.Size();
282 node->job.doneCounter = doneCounter;
283 node->job.signalEvent = signalEvent;
284 node->sequence = nullptr;
285
286 // Add to end of linked list
287 ctx.jobLock.Enter();
288
289 // First, set head node if nullptr
290 if (ctx.head == nullptr)
291 ctx.head = node;
292
293 // Then add node to end of list
294 node->next = nullptr;
295 if (ctx.tail != nullptr)
296 ctx.tail->next = node;
297 ctx.tail = node;
298
299 ctx.jobLock.Leave();
300
301 // Trigger threads to wake up and compete for jobs
302 for (Ptr<JobThread>& thread : ctx.threads)
303 {
304 thread->SignalWorkAvailable();
305 }
306}
307
308//------------------------------------------------------------------------------
311template <typename LAMBDA> void
313 LAMBDA&& func
314 , const SizeT numInvocations
315 , const Util::FixedArray<const Threading::AtomicCounter*, true>& waitCounters = nullptr
316 , Threading::AtomicCounter* doneCounter = nullptr
317 , Threading::Event* signalEvent = nullptr
318)
319{
320 JobDispatch(func, numInvocations, numInvocations, waitCounters, doneCounter, signalEvent);
321}
322
323//------------------------------------------------------------------------------
326template <typename CTX> void
328 const JobFunc& func
329 , const SizeT numInvocations
330 , const SizeT groupSize
331 , const CTX& context
332 , const Util::FixedArray<const Threading::AtomicCounter*, true>& waitCounters = nullptr
333 , Threading::AtomicCounter* doneCounter = nullptr
334 , Threading::Event* signalEvent = nullptr
335)
336{
337 static_assert(std::is_trivially_destructible<CTX>::value, "Job context has to be trivially destructible");
338 n_assert(numInvocations > 0);
339 n_assert(doneCounter != nullptr ? *doneCounter > 0 : true);
340
341 // Calculate the number of actual jobs based on invocations and group size
342 SizeT numJobs = Math::ceil(numInvocations / float(groupSize));
343
344 // Calculate allocation size which is node + counters + data context
345 auto dynamicAllocSize = sizeof(JobNode) + sizeof(CTX) + waitCounters.Size() * sizeof(const Threading::AtomicCounter*);
346 auto mem = JobAlloc<char>(dynamicAllocSize);
347 auto node = (JobNode*)mem;
348
349 // Copy over wait counters
350 node->job.waitCounters = nullptr;
351 if (waitCounters.Size() > 0)
352 {
353 node->job.waitCounters = (const Threading::AtomicCounter**)(mem + sizeof(JobNode) + sizeof(CTX));
354 memcpy(node->job.waitCounters, waitCounters.Begin(), waitCounters.Size() * sizeof(const Threading::AtomicCounter*));
355 }
356
357 // Move context
358 node->job.data = (void*)(mem + sizeof(JobNode));
359 auto data = reinterpret_cast<CTX*>(node->job.data);
360 *data = context;
361
362 node->job.l.callable = nullptr;
363 node->job.func = func;
364 node->job.remainingGroups = numJobs;
365 node->job.groupCompletionCounter = numJobs;
366 node->job.numInvocations = numInvocations;
367 node->job.groupSize = groupSize;
368 node->job.numWaitCounters = (SizeT)waitCounters.Size();
369 node->job.doneCounter = doneCounter;
370 node->job.signalEvent = signalEvent;
371 node->sequence = nullptr;
372
373 // Add to end of linked list
374 ctx.jobLock.Enter();
375
376 // First, set head node if nullptr
377 if (ctx.head == nullptr)
378 ctx.head = node;
379
380 // Then add node to end of list
381 node->next = nullptr;
382 if (ctx.tail != nullptr)
383 ctx.tail->next = node;
384 ctx.tail = node;
385
386 ctx.jobLock.Leave();
387
388 // Trigger threads to wake up and compete for jobs
389 for (Ptr<JobThread>& thread : ctx.threads)
390 {
391 thread->SignalWorkAvailable();
392 }
393}
394
395//------------------------------------------------------------------------------
398template <typename CTX> void
400 const JobFunc& func
401 , const SizeT numInvocations
402 , const CTX& context
403 , const Util::FixedArray<const Threading::AtomicCounter*, true>& waitCounters = nullptr
404 , Threading::AtomicCounter* doneCounter = nullptr
405 , Threading::Event* signalEvent = nullptr
406)
407{
408 JobDispatch(func, numInvocations, numInvocations, context, waitCounters, doneCounter, signalEvent);
409}
410
411//------------------------------------------------------------------------------
414template<typename CTX> void
415JobAppendSequence(const JobFunc& func, const SizeT numInvocations, const SizeT groupSize, const CTX& context)
416{
417 static_assert(std::is_trivially_destructible<CTX>::value, "Job context has to be trivially destructible");
418 n_assert(numInvocations > 0);
419 n_assert(sequenceThread == Threading::Thread::GetMyThreadId());
420 n_assert(sequenceNode != nullptr);
421
422 // Calculate the number of actual jobs based on invocations and group size
423 SizeT numJobs = Math::ceil(numInvocations / float(groupSize));
424
425 // Calculate allocation size which is node + counters + data context
426 SizeT dynamicAllocSize = sizeof(JobNode) + sizeof(CTX) + sizeof(Threading::AtomicCounter);
427 if (prevDoneCounter != nullptr)
428 dynamicAllocSize += sizeof(Threading::AtomicCounter*);
429 auto mem = JobAlloc<char>(dynamicAllocSize);
430 auto node = (JobNode*)mem;
431
432 if (prevDoneCounter != nullptr)
433 {
434 node->job.numWaitCounters = 1;
435 node->job.waitCounters = (const Threading::AtomicCounter**)(mem + sizeof(JobNode) + sizeof(CTX) + sizeof(Threading::AtomicCounter));
436 memcpy(node->job.waitCounters, &prevDoneCounter, sizeof(Threading::AtomicCounter*));
437 }
438 else
439 {
440 node->job.numWaitCounters = 0;
441 node->job.waitCounters = nullptr;
442 }
443
444 // Move context
445 node->job.data = (void*)(mem + sizeof(JobNode));
446 auto data = reinterpret_cast<CTX*>(node->job.data);
447 *data = context;
448
449 node->job.l.callable = nullptr;
450 node->job.func = func;
451 node->job.remainingGroups = numJobs;
452 node->job.groupCompletionCounter = numJobs;
453 node->job.numInvocations = numInvocations;
454 node->job.groupSize = groupSize;
455
456 node->job.doneCounter = (Threading::AtomicCounter*)(mem + sizeof(JobNode) + sizeof(CTX));
457 *node->job.doneCounter = 1;
458 prevDoneCounter = node->job.doneCounter;
459 node->job.signalEvent = nullptr;
460 node->next = nullptr;
461
462 // The remainingGroups counter for the sequence node is the length of the sequence chain
463 if (sequenceTail == nullptr)
464 sequenceNode->sequence = node;
465 else
466 sequenceTail->next = node;
467
468 sequenceTail = node;
469
470 // Queue job
471 //ctx.queuedJobs.Append(node);
472}
473
474//------------------------------------------------------------------------------
477template<typename CTX> void
478JobAppendSequence(const JobFunc& func, const SizeT numInvocations, const CTX& context)
479{
480 JobAppendSequence(func, numInvocations, numInvocations, context);
481}
482
483//------------------------------------------------------------------------------
486template<typename LAMBDA> void
488 LAMBDA&& func,
489 const SizeT numInvocations,
490 const SizeT groupSize
491)
492{
493 n_assert(numInvocations > 0);
494 n_assert(sequenceThread == Threading::Thread::GetMyThreadId());
495 n_assert(sequenceNode != nullptr);
496
497 // Calculate the number of actual jobs based on invocations and group size
498 SizeT numJobs = Math::ceil(numInvocations / float(groupSize));
499
500 /*
501 // Calculate allocation size which is node + counters + data context
502 SizeT dynamicAllocSize = sizeof(JobNode) + waitCounters.Size() * sizeof(const Threading::AtomicCounter*);
503 auto mem = JobAlloc<char>(dynamicAllocSize);
504 auto node = (JobNode*)mem;
505
506 // Copy over wait counters
507 node->job.waitCounters = nullptr;
508 if (waitCounters.Size() > 0)
509 {
510 node->job.waitCounters = (const Threading::AtomicCounter**)(mem + sizeof(JobNode));
511 memcpy(node->job.waitCounters, waitCounters.Begin(), waitCounters.Size() * sizeof(const Threading::AtomicCounter*));
512 }
513 */
514
515 // Calculate allocation size which is node + counters + data context
516 SizeT dynamicAllocSize = sizeof(JobNode) + sizeof(Threading::AtomicCounter);
517 if (prevDoneCounter != nullptr)
518 dynamicAllocSize += sizeof(Threading::AtomicCounter*);
519 auto mem = JobAlloc<char>(dynamicAllocSize);
520 auto node = (JobNode*)mem;
521
522 if (prevDoneCounter != nullptr)
523 {
524 node->job.numWaitCounters = 1;
525 node->job.waitCounters = (const Threading::AtomicCounter**)(mem + sizeof(JobNode) + sizeof(Threading::AtomicCounter));
526 memcpy(node->job.waitCounters, &prevDoneCounter, sizeof(Threading::AtomicCounter*));
527 }
528 else
529 {
530 node->job.numWaitCounters = 0;
531 node->job.waitCounters = nullptr;
532 }
533
534 // Move context
535 node->job.l = std::move(func);
536 node->job.func = nullptr;
537 node->job.remainingGroups = numJobs;
538 node->job.groupCompletionCounter = numJobs;
539 node->job.numInvocations = numInvocations;
540 node->job.groupSize = groupSize;
541
542 node->job.doneCounter = (Threading::AtomicCounter*)(mem + sizeof(JobNode));
543 *node->job.doneCounter = 1;
544 prevDoneCounter = node->job.doneCounter;
545 node->job.signalEvent = nullptr;
546 node->next = nullptr;
547
548 // The remainingGroups counter for the sequence node is the length of the sequence chain
549 if (sequenceTail == nullptr)
550 sequenceNode->sequence = node;
551 else
552 sequenceTail->next = node;
553
554 sequenceTail = node;
555
556 // Queue job
557 //ctx.queuedJobs.Append(node);
558}
559
560//------------------------------------------------------------------------------
563template<typename LAMBDA> void
564JobAppendSequence(LAMBDA&& func, const SizeT numInvocations)
565{
566 JobAppendSequence(func, numInvocations, numInvocations);
567}
568
569} // namespace Jobs2
JobThread()
constructor
Definition jobs2.cc:19
virtual void DoWork() override
this method runs in the thread context
Definition jobs2.cc:58
Threading::Event wakeupEvent
Definition jobs2.h:143
void SignalWorkAvailable()
Signal new work available.
Definition jobs2.cc:40
__DeclareClass(JobThread)
virtual void EmitWakeupSignal() override
override this method if your thread loop needs a wakeup call before stopping
Definition jobs2.cc:49
virtual ~JobThread()
destructor
Definition jobs2.cc:28
bool enableProfiling
Definition jobs2.h:134
bool enableIo
Definition jobs2.h:133
Nebula's smart pointer class which manages the life time of RefCounted objects.
Definition ptr.h:38
Nebula's dynamic array class.
Definition array.h:61
Implements a fixed size one-dimensional array.
Definition fixedarray.h:20
A StringAtom.
Definition stringatom.h:22
#define n_assert(exp)
Definition debug.h:50
Definition jobs2.cc:11
Threading::ThreadId sequenceThread
Definition jobs2.cc:288
Jobs2Context ctx
Definition jobs2.cc:13
Jobs2::JobNode * sequenceNode
Definition jobs2.cc:285
const Threading::AtomicCounter * prevDoneCounter
Definition jobs2.cc:287
Jobs2::JobNode * sequenceTail
Definition jobs2.cc:286
void JobBeginSequence(const Util::FixedArray< const Threading::AtomicCounter *, true > &waitCounters, Threading::AtomicCounter *doneCounter, Threading::Event *signalEvent)
Begin a sequence of jobs.
Definition jobs2.cc:294
void JobEndSequence(Threading::Event *signalEvent)
Flush queued jobs.
Definition jobs2.cc:335
void(*)(SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset, void *ctx) JobFunc
Definition jobs2.h:42
void JobSystemUninit()
Destroy job port.
Definition jobs2.cc:240
void * JobAlloc(SizeT bytes)
Allocate memory.
Definition jobs2.cc:264
void JobSystemInit(const JobSystemInitInfo &info)
Create a new job port.
Definition jobs2.cc:207
volatile long CompletionCounter
Definition jobs2.h:41
void JobNewFrame()
Progress to new buffer.
Definition jobs2.cc:253
void JobDispatch(LAMBDA &&func, const SizeT numInvocations, const SizeT groupSize, const Util::FixedArray< const Threading::AtomicCounter *, true > &waitCounters=nullptr, Threading::AtomicCounter *doneCounter=nullptr, Threading::Event *signalEvent=nullptr)
Definition jobs2.h:235
void JobAppendSequence(const JobFunc &func, const SizeT numInvocations, const SizeT groupSize, const CTX &context)
Append job to sequence with an automatic dependency on the previous job.
Definition jobs2.h:415
__forceinline float ceil(float val)
Floating point ceiling.
Definition scalar.h:542
int Exchange(int volatile *dest, int value)
interlocked exchange
Definition gccinterlocked.cc:94
The Jobs2 system provides a set of threads and a pool of jobs from which threads can pickup work.
Definition jobs2.h:16
pthread_t ThreadId
Definition linuxthreadid.h:15
volatile int AtomicCounter
Definition interlocked.h:19
Definition half.h:491
Definition jobs2.h:52
LAMBDA l
Definition jobs2.h:53
void invoke(ARGS... args) override
Definition jobs2.h:57
Callable(LAMBDA l)
Definition jobs2.h:55
Definition jobs2.h:46
virtual void invoke(ARGS... args)=0
Definition jobs2.h:82
const Threading::AtomicCounter ** waitCounters
Definition jobs2.h:90
void * data
Definition jobs2.h:89
SizeT groupSize
Definition jobs2.h:88
int remainingGroups
Definition jobs2.h:85
SizeT numWaitCounters
Definition jobs2.h:91
Threading::AtomicCounter groupCompletionCounter
Definition jobs2.h:86
Threading::AtomicCounter * doneCounter
Definition jobs2.h:92
SizeT numInvocations
Definition jobs2.h:87
Threading::Event * signalEvent
Definition jobs2.h:93
JobFunc func
Definition jobs2.h:83
Lambda l
Definition jobs2.h:84
Definition jobs2.h:97
JobNode * next
Definition jobs2.h:98
JobContext job
Definition jobs2.h:99
JobNode * sequence
Definition jobs2.h:100
uint priority
Definition jobs2.h:151
bool enableIo
Definition jobs2.h:156
SizeT scratchMemorySize
Definition jobs2.h:153
Util::StringAtom name
Definition jobs2.h:148
uint affinity
Definition jobs2.h:150
JobSystemInitInfo()
Definition jobs2.h:159
SizeT numThreads
Definition jobs2.h:149
bool enableProfiling
Definition jobs2.h:157
SizeT numBuffers
Definition jobs2.h:154
Definition jobs2.h:104
IndexT activeBuffer
Definition jobs2.h:113
JobNode * head
Definition jobs2.h:106
Util::Array< JobNode * > queuedJobs
Definition jobs2.h:109
SizeT scratchMemorySize
Definition jobs2.h:115
IndexT iterator
Definition jobs2.h:112
SizeT numBuffers
Definition jobs2.h:111
Util::FixedArray< byte * > scratchMemory
Definition jobs2.h:114
JobNode * tail
Definition jobs2.h:107
Util::FixedArray< Ptr< JobThread > > threads
Definition jobs2.h:108
Threading::CriticalSection jobLock
Definition jobs2.h:105
Definition jobs2.h:64
Lambda(LAMBDA l)
Definition jobs2.h:68
CallableStub< SizeT, SizeT, IndexT, SizeT > * callable
Definition jobs2.h:65
void operator()(SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset)
Definition jobs2.h:75
int SizeT
Definition types.h:42
unsigned int uint
Definition types.h:33
int IndexT
Definition types.h:41