24#define JOB_SIGNATURE (SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset)
25#define JOB_BEGIN_LOOP \
26for (IndexT i = 0; i < groupSize; i++)\
28 IndexT __INDEX = i + invocationOffset;\
29 if (__INDEX >= totalJobs)\
35#define JOB_ITEM_INDEX __INDEX
44template <
typename... ARGS>
47 virtual void invoke(ARGS... args) = 0;
50template<
typename LAMBDA,
typename... ARGS>
67 template<
typename LAMBDA>
72 new (this->callable) CallableType(l);
77 callable->invoke(totalJobs, groupSize, groupIndex, invocationOffset);
140 virtual void DoWork()
override;
190 , Threading::Event* signalEvent =
nullptr);
195 ,
const SizeT numInvocations
196 ,
const SizeT groupSize
203 ,
const SizeT numInvocations
210 ,
const SizeT numInvocations
211 ,
const SizeT groupSize
216 ,
const SizeT numInvocations
225template <
typename T> T*
228 return (T*)
JobAlloc(count *
sizeof(T));
234template <
typename LAMBDA>
void
237 ,
const SizeT numInvocations
238 ,
const SizeT groupSize
241 , Threading::Event* signalEvent =
nullptr
244 if (numInvocations == 0)
246 if (doneCounter !=
nullptr)
251 if (signalEvent !=
nullptr)
253 signalEvent->Signal();
257 n_assert(doneCounter !=
nullptr ? *doneCounter > 0 :
true);
268 node->job.waitCounters =
nullptr;
269 if (waitCounters.Size() > 0)
272 memcpy(node->job.waitCounters, waitCounters.Begin(), waitCounters.Size() *
sizeof(
const Threading::AtomicCounter*));
275 node->job.l = std::move(func);
276 node->job.func =
nullptr;
277 node->job.remainingGroups = numJobs;
278 node->job.groupCompletionCounter = numJobs;
279 node->job.numInvocations = numInvocations;
280 node->job.groupSize = groupSize;
281 node->job.numWaitCounters = (
SizeT)waitCounters.Size();
282 node->job.doneCounter = doneCounter;
283 node->job.signalEvent = signalEvent;
284 node->sequence =
nullptr;
290 if (
ctx.head ==
nullptr)
294 node->next =
nullptr;
295 if (
ctx.tail !=
nullptr)
296 ctx.tail->next = node;
304 thread->SignalWorkAvailable();
311template <
typename LAMBDA>
void
314 ,
const SizeT numInvocations
317 , Threading::Event* signalEvent =
nullptr
320 JobDispatch(func, numInvocations, numInvocations, waitCounters, doneCounter, signalEvent);
326template <
typename CTX>
void
329 ,
const SizeT numInvocations
330 ,
const SizeT groupSize
334 , Threading::Event* signalEvent =
nullptr
337 static_assert(std::is_trivially_destructible<CTX>::value,
"Job context has to be trivially destructible");
339 n_assert(doneCounter !=
nullptr ? *doneCounter > 0 :
true);
350 node->job.waitCounters =
nullptr;
351 if (waitCounters.Size() > 0)
354 memcpy(node->job.waitCounters, waitCounters.Begin(), waitCounters.Size() *
sizeof(
const Threading::AtomicCounter*));
358 node->job.data = (
void*)(mem +
sizeof(
JobNode));
359 auto data =
reinterpret_cast<CTX*
>(node->job.data);
362 node->job.l.callable =
nullptr;
363 node->job.func = func;
364 node->job.remainingGroups = numJobs;
365 node->job.groupCompletionCounter = numJobs;
366 node->job.numInvocations = numInvocations;
367 node->job.groupSize = groupSize;
368 node->job.numWaitCounters = (
SizeT)waitCounters.Size();
369 node->job.doneCounter = doneCounter;
370 node->job.signalEvent = signalEvent;
371 node->sequence =
nullptr;
377 if (
ctx.head ==
nullptr)
381 node->next =
nullptr;
382 if (
ctx.tail !=
nullptr)
383 ctx.tail->next = node;
391 thread->SignalWorkAvailable();
398template <
typename CTX>
void
401 ,
const SizeT numInvocations
405 , Threading::Event* signalEvent =
nullptr
408 JobDispatch(func, numInvocations, numInvocations, context, waitCounters, doneCounter, signalEvent);
414template<
typename CTX>
void
417 static_assert(std::is_trivially_destructible<CTX>::value,
"Job context has to be trivially destructible");
434 node->job.numWaitCounters = 1;
440 node->job.numWaitCounters = 0;
441 node->job.waitCounters =
nullptr;
445 node->job.data = (
void*)(mem +
sizeof(
JobNode));
446 auto data =
reinterpret_cast<CTX*
>(node->job.data);
449 node->job.l.callable =
nullptr;
450 node->job.func = func;
451 node->job.remainingGroups = numJobs;
452 node->job.groupCompletionCounter = numJobs;
453 node->job.numInvocations = numInvocations;
454 node->job.groupSize = groupSize;
457 *node->job.doneCounter = 1;
459 node->job.signalEvent =
nullptr;
460 node->next =
nullptr;
477template<
typename CTX>
void
486template<
typename LAMBDA>
void
489 const SizeT numInvocations,
490 const SizeT groupSize
524 node->job.numWaitCounters = 1;
530 node->job.numWaitCounters = 0;
531 node->job.waitCounters =
nullptr;
535 node->job.l = std::move(func);
536 node->job.func =
nullptr;
537 node->job.remainingGroups = numJobs;
538 node->job.groupCompletionCounter = numJobs;
539 node->job.numInvocations = numInvocations;
540 node->job.groupSize = groupSize;
543 *node->job.doneCounter = 1;
545 node->job.signalEvent =
nullptr;
546 node->next =
nullptr;
563template<
typename LAMBDA>
void
JobThread()
constructor
Definition jobs2.cc:19
virtual void DoWork() override
this method runs in the thread context
Definition jobs2.cc:58
Threading::Event wakeupEvent
Definition jobs2.h:143
void SignalWorkAvailable()
Signal new work available.
Definition jobs2.cc:40
__DeclareClass(JobThread)
virtual void EmitWakeupSignal() override
override this method if your thread loop needs a wakeup call before stopping
Definition jobs2.cc:49
virtual ~JobThread()
destructor
Definition jobs2.cc:28
bool enableProfiling
Definition jobs2.h:134
bool enableIo
Definition jobs2.h:133
Nebula's smart pointer class which manages the life time of RefCounted objects.
Definition ptr.h:38
Nebula's dynamic array class.
Definition array.h:61
Implements a fixed size one-dimensional array.
Definition fixedarray.h:20
A StringAtom.
Definition stringatom.h:22
#define n_assert(exp)
Definition debug.h:50
Threading::ThreadId sequenceThread
Definition jobs2.cc:288
Jobs2Context ctx
Definition jobs2.cc:13
Jobs2::JobNode * sequenceNode
Definition jobs2.cc:285
const Threading::AtomicCounter * prevDoneCounter
Definition jobs2.cc:287
Jobs2::JobNode * sequenceTail
Definition jobs2.cc:286
void JobBeginSequence(const Util::FixedArray< const Threading::AtomicCounter *, true > &waitCounters, Threading::AtomicCounter *doneCounter, Threading::Event *signalEvent)
Begin a sequence of jobs.
Definition jobs2.cc:294
void JobEndSequence(Threading::Event *signalEvent)
Flush queued jobs.
Definition jobs2.cc:335
void(*)(SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset, void *ctx) JobFunc
Definition jobs2.h:42
void JobSystemUninit()
Destroy job port.
Definition jobs2.cc:240
void * JobAlloc(SizeT bytes)
Allocate memory.
Definition jobs2.cc:264
void JobSystemInit(const JobSystemInitInfo &info)
Create a new job port.
Definition jobs2.cc:207
volatile long CompletionCounter
Definition jobs2.h:41
void JobNewFrame()
Progress to new buffer.
Definition jobs2.cc:253
void JobDispatch(LAMBDA &&func, const SizeT numInvocations, const SizeT groupSize, const Util::FixedArray< const Threading::AtomicCounter *, true > &waitCounters=nullptr, Threading::AtomicCounter *doneCounter=nullptr, Threading::Event *signalEvent=nullptr)
Definition jobs2.h:235
void JobAppendSequence(const JobFunc &func, const SizeT numInvocations, const SizeT groupSize, const CTX &context)
Append job to sequence with an automatic dependency on the previous job.
Definition jobs2.h:415
__forceinline float ceil(float val)
Floating point ceiling.
Definition scalar.h:542
int Exchange(int volatile *dest, int value)
interlocked exchange
Definition gccinterlocked.cc:94
The Jobs2 system provides a set of threads and a pool of jobs from which threads can pickup work.
Definition jobs2.h:16
pthread_t ThreadId
Definition linuxthreadid.h:15
volatile int AtomicCounter
Definition interlocked.h:19
LAMBDA l
Definition jobs2.h:53
void invoke(ARGS... args) override
Definition jobs2.h:57
Callable(LAMBDA l)
Definition jobs2.h:55
virtual void invoke(ARGS... args)=0
const Threading::AtomicCounter ** waitCounters
Definition jobs2.h:90
void * data
Definition jobs2.h:89
SizeT groupSize
Definition jobs2.h:88
int remainingGroups
Definition jobs2.h:85
SizeT numWaitCounters
Definition jobs2.h:91
Threading::AtomicCounter groupCompletionCounter
Definition jobs2.h:86
Threading::AtomicCounter * doneCounter
Definition jobs2.h:92
SizeT numInvocations
Definition jobs2.h:87
Threading::Event * signalEvent
Definition jobs2.h:93
JobFunc func
Definition jobs2.h:83
Lambda l
Definition jobs2.h:84
JobNode * next
Definition jobs2.h:98
JobContext job
Definition jobs2.h:99
JobNode * sequence
Definition jobs2.h:100
uint priority
Definition jobs2.h:151
bool enableIo
Definition jobs2.h:156
SizeT scratchMemorySize
Definition jobs2.h:153
Util::StringAtom name
Definition jobs2.h:148
uint affinity
Definition jobs2.h:150
JobSystemInitInfo()
Definition jobs2.h:159
SizeT numThreads
Definition jobs2.h:149
bool enableProfiling
Definition jobs2.h:157
SizeT numBuffers
Definition jobs2.h:154
IndexT activeBuffer
Definition jobs2.h:113
JobNode * head
Definition jobs2.h:106
Util::Array< JobNode * > queuedJobs
Definition jobs2.h:109
SizeT scratchMemorySize
Definition jobs2.h:115
IndexT iterator
Definition jobs2.h:112
SizeT numBuffers
Definition jobs2.h:111
Util::FixedArray< byte * > scratchMemory
Definition jobs2.h:114
JobNode * tail
Definition jobs2.h:107
Util::FixedArray< Ptr< JobThread > > threads
Definition jobs2.h:108
Threading::CriticalSection jobLock
Definition jobs2.h:105
Lambda(LAMBDA l)
Definition jobs2.h:68
CallableStub< SizeT, SizeT, IndexT, SizeT > * callable
Definition jobs2.h:65
void operator()(SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset)
Definition jobs2.h:75
int SizeT
Definition types.h:42
unsigned int uint
Definition types.h:33
int IndexT
Definition types.h:41