24#define JOB_SIGNATURE (SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset)
25#define JOB_BEGIN_LOOP \
26for (IndexT i = 0; i < groupSize; i++)\
28 IndexT __INDEX = i + invocationOffset;\
29 if (__INDEX >= totalJobs)\
35#define JOB_ITEM_INDEX __INDEX
44template <
typename... ARGS>
47 virtual void invoke(ARGS... args) = 0;
50template<
typename LAMBDA,
typename... ARGS>
67 template<
typename LAMBDA>
72 new (this->callable) CallableType(l);
77 callable->invoke(totalJobs, groupSize, groupIndex, invocationOffset);
140 virtual void DoWork()
override;
190 , Threading::Event* signalEvent =
nullptr);
195 ,
const SizeT numInvocations
196 ,
const SizeT groupSize
203 ,
const SizeT numInvocations
213template <
typename T> T*
216 return (T*)
JobAlloc(count *
sizeof(T));
222template <
typename LAMBDA>
void
225 ,
const SizeT numInvocations
226 ,
const SizeT groupSize
229 , Threading::Event* signalEvent =
nullptr
232 if (numInvocations == 0)
234 if (doneCounter !=
nullptr)
239 if (signalEvent !=
nullptr)
241 signalEvent->Signal();
245 n_assert(doneCounter !=
nullptr ? *doneCounter > 0 :
true);
256 node->job.waitCounters =
nullptr;
257 if (waitCounters.Size() > 0)
260 memcpy(node->job.waitCounters, waitCounters.Begin(), waitCounters.Size() *
sizeof(
const Threading::AtomicCounter*));
263 node->job.l = std::move(func);
264 node->job.func =
nullptr;
265 node->job.remainingGroups = numJobs;
266 node->job.groupCompletionCounter = numJobs;
267 node->job.numInvocations = numInvocations;
268 node->job.groupSize = groupSize;
269 node->job.numWaitCounters = (
SizeT)waitCounters.Size();
270 node->job.doneCounter = doneCounter;
271 node->job.signalEvent = signalEvent;
272 node->sequence =
nullptr;
278 if (
ctx.head ==
nullptr)
282 node->next =
nullptr;
283 if (
ctx.tail !=
nullptr)
284 ctx.tail->next = node;
292 thread->SignalWorkAvailable();
299template <
typename LAMBDA>
void
302 ,
const SizeT numInvocations
305 , Threading::Event* signalEvent =
nullptr
308 JobDispatch(func, numInvocations, numInvocations, waitCounters, doneCounter, signalEvent);
314template <
typename CTX>
void
317 ,
const SizeT numInvocations
318 ,
const SizeT groupSize
322 , Threading::Event* signalEvent =
nullptr
325 static_assert(std::is_trivially_destructible<CTX>::value,
"Job context has to be trivially destructible");
327 n_assert(doneCounter !=
nullptr ? *doneCounter > 0 :
true);
338 node->job.waitCounters =
nullptr;
339 if (waitCounters.Size() > 0)
342 memcpy(node->job.waitCounters, waitCounters.Begin(), waitCounters.Size() *
sizeof(
const Threading::AtomicCounter*));
346 node->job.data = (
void*)(mem +
sizeof(
JobNode));
347 auto data =
reinterpret_cast<CTX*
>(node->job.data);
350 node->job.l.callable =
nullptr;
351 node->job.func = func;
352 node->job.remainingGroups = numJobs;
353 node->job.groupCompletionCounter = numJobs;
354 node->job.numInvocations = numInvocations;
355 node->job.groupSize = groupSize;
356 node->job.numWaitCounters = (
SizeT)waitCounters.Size();
357 node->job.doneCounter = doneCounter;
358 node->job.signalEvent = signalEvent;
359 node->sequence =
nullptr;
365 if (
ctx.head ==
nullptr)
369 node->next =
nullptr;
370 if (
ctx.tail !=
nullptr)
371 ctx.tail->next = node;
379 thread->SignalWorkAvailable();
386template <
typename CTX>
void
389 ,
const SizeT numInvocations
393 , Threading::Event* signalEvent =
nullptr
396 JobDispatch(func, numInvocations, numInvocations, context, waitCounters, doneCounter, signalEvent);
402template<
typename CTX>
void
405 static_assert(std::is_trivially_destructible<CTX>::value,
"Job context has to be trivially destructible");
422 node->job.numWaitCounters = 1;
428 node->job.numWaitCounters = 0;
429 node->job.waitCounters =
nullptr;
433 node->job.data = (
void*)(mem +
sizeof(
JobNode));
434 auto data =
reinterpret_cast<CTX*
>(node->job.data);
437 node->job.l.callable =
nullptr;
438 node->job.func = func;
439 node->job.remainingGroups = numJobs;
440 node->job.groupCompletionCounter = numJobs;
441 node->job.numInvocations = numInvocations;
442 node->job.groupSize = groupSize;
445 *node->job.doneCounter = 1;
447 node->job.signalEvent =
nullptr;
448 node->next =
nullptr;
465template<
typename CTX>
void
JobThread()
constructor
Definition jobs2.cc:19
virtual void DoWork() override
this method runs in the thread context
Definition jobs2.cc:58
Threading::Event wakeupEvent
Definition jobs2.h:143
void SignalWorkAvailable()
Signal new work available.
Definition jobs2.cc:40
__DeclareClass(JobThread)
virtual void EmitWakeupSignal() override
override this method if your thread loop needs a wakeup call before stopping
Definition jobs2.cc:49
virtual ~JobThread()
destructor
Definition jobs2.cc:28
bool enableProfiling
Definition jobs2.h:134
bool enableIo
Definition jobs2.h:133
Nebula's smart pointer class which manages the life time of RefCounted objects.
Definition ptr.h:38
Nebula's dynamic array class.
Definition array.h:60
Implements a fixed size one-dimensional array.
Definition fixedarray.h:20
A StringAtom.
Definition stringatom.h:22
#define n_assert(exp)
Definition debug.h:50
Threading::ThreadId sequenceThread
Definition jobs2.cc:288
Jobs2Context ctx
Definition jobs2.cc:13
Jobs2::JobNode * sequenceNode
Definition jobs2.cc:285
const Threading::AtomicCounter * prevDoneCounter
Definition jobs2.cc:287
Jobs2::JobNode * sequenceTail
Definition jobs2.cc:286
void JobBeginSequence(const Util::FixedArray< const Threading::AtomicCounter *, true > &waitCounters, Threading::AtomicCounter *doneCounter, Threading::Event *signalEvent)
Begin a sequence of jobs.
Definition jobs2.cc:294
void JobEndSequence(Threading::Event *signalEvent)
Flush queued jobs.
Definition jobs2.cc:334
void(*)(SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset, void *ctx) JobFunc
Definition jobs2.h:42
void JobSystemUninit()
Destroy job port.
Definition jobs2.cc:240
void * JobAlloc(SizeT bytes)
Allocate memory.
Definition jobs2.cc:264
void JobSystemInit(const JobSystemInitInfo &info)
Create a new job port.
Definition jobs2.cc:207
volatile long CompletionCounter
Definition jobs2.h:41
void JobNewFrame()
Progress to new buffer.
Definition jobs2.cc:253
void JobDispatch(LAMBDA &&func, const SizeT numInvocations, const SizeT groupSize, const Util::FixedArray< const Threading::AtomicCounter *, true > &waitCounters=nullptr, Threading::AtomicCounter *doneCounter=nullptr, Threading::Event *signalEvent=nullptr)
Definition jobs2.h:223
void JobAppendSequence(const JobFunc &func, const SizeT numInvocations, const SizeT groupSize, const CTX &context)
Append job to sequence with an automatic dependency on the previous job.
Definition jobs2.h:403
__forceinline float ceil(float val)
Floating point ceiling.
Definition scalar.h:542
int Exchange(int volatile *dest, int value)
interlocked exchange
Definition gccinterlocked.cc:94
The Jobs2 system provides a set of threads and a pool of jobs from which threads can pickup work.
Definition jobs2.h:16
pthread_t ThreadId
Definition linuxthreadid.h:15
volatile int AtomicCounter
Definition interlocked.h:19
LAMBDA l
Definition jobs2.h:53
void invoke(ARGS... args) override
Definition jobs2.h:57
Callable(LAMBDA l)
Definition jobs2.h:55
virtual void invoke(ARGS... args)=0
const Threading::AtomicCounter ** waitCounters
Definition jobs2.h:90
void * data
Definition jobs2.h:89
SizeT groupSize
Definition jobs2.h:88
int remainingGroups
Definition jobs2.h:85
SizeT numWaitCounters
Definition jobs2.h:91
Threading::AtomicCounter groupCompletionCounter
Definition jobs2.h:86
Threading::AtomicCounter * doneCounter
Definition jobs2.h:92
SizeT numInvocations
Definition jobs2.h:87
Threading::Event * signalEvent
Definition jobs2.h:93
JobFunc func
Definition jobs2.h:83
Lambda l
Definition jobs2.h:84
JobNode * next
Definition jobs2.h:98
JobContext job
Definition jobs2.h:99
JobNode * sequence
Definition jobs2.h:100
uint priority
Definition jobs2.h:151
bool enableIo
Definition jobs2.h:156
SizeT scratchMemorySize
Definition jobs2.h:153
Util::StringAtom name
Definition jobs2.h:148
uint affinity
Definition jobs2.h:150
JobSystemInitInfo()
Definition jobs2.h:159
SizeT numThreads
Definition jobs2.h:149
bool enableProfiling
Definition jobs2.h:157
SizeT numBuffers
Definition jobs2.h:154
IndexT activeBuffer
Definition jobs2.h:113
JobNode * head
Definition jobs2.h:106
Util::Array< JobNode * > queuedJobs
Definition jobs2.h:109
SizeT scratchMemorySize
Definition jobs2.h:115
IndexT iterator
Definition jobs2.h:112
SizeT numBuffers
Definition jobs2.h:111
Util::FixedArray< byte * > scratchMemory
Definition jobs2.h:114
JobNode * tail
Definition jobs2.h:107
Util::FixedArray< Ptr< JobThread > > threads
Definition jobs2.h:108
Threading::CriticalSection jobLock
Definition jobs2.h:105
Lambda(LAMBDA l)
Definition jobs2.h:68
CallableStub< SizeT, SizeT, IndexT, SizeT > * callable
Definition jobs2.h:65
void operator()(SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset)
Definition jobs2.h:75
int SizeT
Definition types.h:42
unsigned int uint
Definition types.h:33
int IndexT
Definition types.h:41