24#define JOB_SIGNATURE (SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset)
25#define JOB_BEGIN_LOOP \
26for (IndexT i = 0; i < groupSize; i++)\
28 IndexT __INDEX = i + invocationOffset;\
29 if (__INDEX >= totalJobs)\
35#define JOB_ITEM_INDEX __INDEX
44template <
typename... ARGS>
47 virtual void invoke(ARGS... args) = 0;
50template<
typename LAMBDA,
typename... ARGS>
67 template<
typename LAMBDA>
72 new (this->
callable) CallableType(l);
77 callable->
invoke(totalJobs, groupSize, groupIndex, invocationOffset);
140 virtual void DoWork()
override;
195 ,
const SizeT numInvocations
196 ,
const SizeT groupSize
203 ,
const SizeT numInvocations
213template <
typename T> T*
216 return (T*)
JobAlloc(count *
sizeof(T));
222template <
typename LAMBDA>
void
225 ,
const SizeT numInvocations
226 ,
const SizeT groupSize
233 n_assert(doneCounter !=
nullptr ? *doneCounter > 0 :
true);
244 node->job.waitCounters =
nullptr;
245 if (waitCounters.Size() > 0)
248 memcpy(node->job.waitCounters, waitCounters.Begin(), waitCounters.Size() *
sizeof(
const Threading::AtomicCounter*));
251 node->job.l = std::move(func);
252 node->job.func =
nullptr;
253 node->job.remainingGroups = numJobs;
254 node->job.groupCompletionCounter = numJobs;
255 node->job.numInvocations = numInvocations;
256 node->job.groupSize = groupSize;
257 node->job.numWaitCounters = (
SizeT)waitCounters.Size();
258 node->job.doneCounter = doneCounter;
259 node->job.signalEvent = signalEvent;
260 node->sequence =
nullptr;
270 node->
next =
nullptr;
280 thread->SignalWorkAvailable();
287template <
typename LAMBDA>
void
290 ,
const SizeT numInvocations
296 JobDispatch(func, numInvocations, numInvocations, waitCounters, doneCounter, signalEvent);
302template <
typename CTX>
void
305 ,
const SizeT numInvocations
306 ,
const SizeT groupSize
313 static_assert(std::is_trivially_destructible<CTX>::value,
"Job context has to be trivially destructible");
315 n_assert(doneCounter !=
nullptr ? *doneCounter > 0 :
true);
326 node->job.waitCounters =
nullptr;
327 if (waitCounters.Size() > 0)
330 memcpy(node->job.waitCounters, waitCounters.Begin(), waitCounters.Size() *
sizeof(
const Threading::AtomicCounter*));
334 node->job.data = (
void*)(mem +
sizeof(
JobNode));
335 auto data =
reinterpret_cast<CTX*
>(node->job.data);
338 node->job.l.callable =
nullptr;
339 node->job.func = func;
340 node->job.remainingGroups = numJobs;
341 node->job.groupCompletionCounter = numJobs;
342 node->job.numInvocations = numInvocations;
343 node->job.groupSize = groupSize;
344 node->job.numWaitCounters = (
SizeT)waitCounters.Size();
345 node->job.doneCounter = doneCounter;
346 node->job.signalEvent = signalEvent;
347 node->sequence =
nullptr;
357 node->
next =
nullptr;
367 thread->SignalWorkAvailable();
374template <
typename CTX>
void
377 ,
const SizeT numInvocations
384 JobDispatch(func, numInvocations, numInvocations, context, waitCounters, doneCounter, signalEvent);
390template<
typename CTX>
void
393 static_assert(std::is_trivially_destructible<CTX>::value,
"Job context has to be trivially destructible");
410 node->job.numWaitCounters = 1;
416 node->job.numWaitCounters = 0;
417 node->job.waitCounters =
nullptr;
421 node->job.data = (
void*)(mem +
sizeof(
JobNode));
422 auto data =
reinterpret_cast<CTX*
>(node->job.data);
425 node->job.l.callable =
nullptr;
426 node->job.func = func;
427 node->job.remainingGroups = numJobs;
428 node->job.groupCompletionCounter = numJobs;
429 node->job.numInvocations = numInvocations;
430 node->job.groupSize = groupSize;
433 *node->job.doneCounter = 1;
435 node->job.signalEvent =
nullptr;
436 node->next =
nullptr;
453template<
typename CTX>
void
JobThread()
constructor
Definition jobs2.cc:19
virtual void DoWork() override
this method runs in the thread context
Definition jobs2.cc:58
Threading::Event wakeupEvent
Definition jobs2.h:143
void SignalWorkAvailable()
Signal new work available.
Definition jobs2.cc:40
__DeclareClass(JobThread)
virtual void EmitWakeupSignal() override
override this method if your thread loop needs a wakeup call before stopping
Definition jobs2.cc:49
virtual ~JobThread()
destructor
Definition jobs2.cc:28
bool enableProfiling
Definition jobs2.h:134
bool enableIo
Definition jobs2.h:133
Nebula's smart pointer class which manages the life time of RefCounted objects.
Definition ptr.h:38
Critical section objects are used to protect a portion of code from parallel execution.
Nebula's dynamic array class.
Definition array.h:60
Implements a fixed size one-dimensional array.
Definition fixedarray.h:20
A StringAtom.
Definition stringatom.h:22
#define n_assert(exp)
Definition debug.h:50
Threading::ThreadId sequenceThread
Definition jobs2.cc:288
Jobs2Context ctx
Definition jobs2.cc:13
Jobs2::JobNode * sequenceNode
Definition jobs2.cc:285
const Threading::AtomicCounter * prevDoneCounter
Definition jobs2.cc:287
Jobs2::JobNode * sequenceTail
Definition jobs2.cc:286
void JobBeginSequence(const Util::FixedArray< const Threading::AtomicCounter *, true > &waitCounters, Threading::AtomicCounter *doneCounter, Threading::Event *signalEvent)
Begin a sequence of jobs.
Definition jobs2.cc:294
void JobEndSequence(Threading::Event *signalEvent)
Flush queued jobs.
Definition jobs2.cc:334
void(*)(SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset, void *ctx) JobFunc
Definition jobs2.h:42
void JobSystemUninit()
Destroy job port.
Definition jobs2.cc:240
void * JobAlloc(SizeT bytes)
Allocate memory.
Definition jobs2.cc:264
void JobSystemInit(const JobSystemInitInfo &info)
Create a new job port.
Definition jobs2.cc:207
volatile long CompletionCounter
Definition jobs2.h:41
void JobNewFrame()
Progress to new buffer.
Definition jobs2.cc:253
void JobDispatch(LAMBDA &&func, const SizeT numInvocations, const SizeT groupSize, const Util::FixedArray< const Threading::AtomicCounter *, true > &waitCounters=nullptr, Threading::AtomicCounter *doneCounter=nullptr, Threading::Event *signalEvent=nullptr)
Definition jobs2.h:223
void JobAppendSequence(const JobFunc &func, const SizeT numInvocations, const SizeT groupSize, const CTX &context)
Append job to sequence with an automatic dependency on the previous job.
Definition jobs2.h:391
__forceinline float ceil(float val)
Floating point ceiling.
Definition scalar.h:523
The Jobs2 system provides a set of threads and a pool of jobs from which threads can pickup work.
Definition jobs2.h:16
pthread_t ThreadId
Definition linuxthreadid.h:15
volatile int AtomicCounter
Definition interlocked.h:19
LAMBDA l
Definition jobs2.h:53
void invoke(ARGS... args) override
Definition jobs2.h:57
Callable(LAMBDA l)
Definition jobs2.h:55
virtual void invoke(ARGS... args)=0
const Threading::AtomicCounter ** waitCounters
Definition jobs2.h:90
void * data
Definition jobs2.h:89
SizeT groupSize
Definition jobs2.h:88
int remainingGroups
Definition jobs2.h:85
SizeT numWaitCounters
Definition jobs2.h:91
Threading::AtomicCounter groupCompletionCounter
Definition jobs2.h:86
Threading::AtomicCounter * doneCounter
Definition jobs2.h:92
SizeT numInvocations
Definition jobs2.h:87
Threading::Event * signalEvent
Definition jobs2.h:93
JobFunc func
Definition jobs2.h:83
Lambda l
Definition jobs2.h:84
JobNode * next
Definition jobs2.h:98
JobContext job
Definition jobs2.h:99
JobNode * sequence
Definition jobs2.h:100
uint priority
Definition jobs2.h:151
bool enableIo
Definition jobs2.h:156
SizeT scratchMemorySize
Definition jobs2.h:153
Util::StringAtom name
Definition jobs2.h:148
uint affinity
Definition jobs2.h:150
JobSystemInitInfo()
Definition jobs2.h:159
SizeT numThreads
Definition jobs2.h:149
bool enableProfiling
Definition jobs2.h:157
SizeT numBuffers
Definition jobs2.h:154
IndexT activeBuffer
Definition jobs2.h:113
JobNode * head
Definition jobs2.h:106
Util::Array< JobNode * > queuedJobs
Definition jobs2.h:109
SizeT scratchMemorySize
Definition jobs2.h:115
IndexT iterator
Definition jobs2.h:112
SizeT numBuffers
Definition jobs2.h:111
Util::FixedArray< byte * > scratchMemory
Definition jobs2.h:114
JobNode * tail
Definition jobs2.h:107
Util::FixedArray< Ptr< JobThread > > threads
Definition jobs2.h:108
Threading::CriticalSection jobLock
Definition jobs2.h:105
Lambda(LAMBDA l)
Definition jobs2.h:68
CallableStub< SizeT, SizeT, IndexT, SizeT > * callable
Definition jobs2.h:65
void operator()(SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset)
Definition jobs2.h:75
int SizeT
Definition types.h:49
unsigned int uint
Definition types.h:31
int IndexT
Definition types.h:48