Nebula
Loading...
Searching...
No Matches
jobs2.h
Go to the documentation of this file.
1#pragma once
2#include "threading/thread.h"
3#include "threading/event.h"
4#include "util/stringatom.h"
6
7//------------------------------------------------------------------------------
14//------------------------------------------------------------------------------
15namespace Threading
16{
17class Event;
18
19}
20
21namespace Jobs2
22{
23
24#define JOB_SIGNATURE (SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset)
25#define JOB_BEGIN_LOOP \
26for (IndexT i = 0; i < groupSize; i++)\
27{\
28 IndexT __INDEX = i + invocationOffset;\
29 if (__INDEX >= totalJobs)\
30 return;\
31
32#define JOB_END_LOOP \
33}
34
35#define JOB_ITEM_INDEX __INDEX
36
37
38class JobThread;
39
40void* JobAlloc(SizeT bytes);
41typedef volatile long CompletionCounter;
42using JobFunc = void(*)(SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset, void* ctx);
43
44template <typename... ARGS>
46{
47 virtual void invoke(ARGS... args) = 0;
48};
49
50template<typename LAMBDA, typename... ARGS>
51struct Callable : CallableStub<ARGS...>
52{
53 LAMBDA l;
54
55 Callable(LAMBDA l) : l(std::move(l)) {};
56
57 void invoke(ARGS... args) override
58 {
59 l(args...);
60 }
61};
62
63struct Lambda
64{
66
67 template<typename LAMBDA>
68 Lambda(LAMBDA l)
69 {
71 this->callable = (CallableType*)Jobs2::JobAlloc(sizeof(CallableType));
72 new (this->callable) CallableType(l);
73 };
74
75 void operator()(SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset)
76 {
77 callable->invoke(totalJobs, groupSize, groupIndex, invocationOffset);
78 }
79};
80
95
96struct JobNode
97{
100 JobNode* sequence; // set to nullptr for ordinary nodes
101};
102
117
118extern Jobs2Context ctx;
119
121{
123public:
124
126 JobThread();
128 virtual ~JobThread();
129
131 void SignalWorkAvailable();
132
135protected:
136
138 virtual void EmitWakeupSignal() override;
140 virtual void DoWork() override;
141
142private:
144};
145
169
171void JobSystemInit(const JobSystemInitInfo& info);
173void JobSystemUninit();
174
176template <typename T> T* JobAlloc(SizeT count);
178void* JobAlloc(SizeT bytes);
180void JobNewFrame();
181
182extern JobNode* sequenceNode;
183extern JobNode* sequenceTail;
186
189 , Threading::AtomicCounter* doneCounter = nullptr
190 , Threading::Event* signalEvent = nullptr);
191
193template <typename CTX> void JobAppendSequence(
194 const JobFunc& func
195 , const SizeT numInvocations
196 , const SizeT groupSize
197 , const CTX& context
198);
199
201template <typename CTX> void JobAppendSequence(
202 const JobFunc& func
203 , const SizeT numInvocations
204 , const CTX& context
205);
206
208void JobEndSequence(Threading::Event* signalEvent = nullptr);
209
210//------------------------------------------------------------------------------
213template <typename T> T*
215{
216 return (T*)JobAlloc(count * sizeof(T));
217}
218
219//------------------------------------------------------------------------------
222template <typename LAMBDA> void
224 LAMBDA&& func
225 , const SizeT numInvocations
226 , const SizeT groupSize
227 , const Util::FixedArray<const Threading::AtomicCounter*>& waitCounters = nullptr
228 , Threading::AtomicCounter* doneCounter = nullptr
229 , Threading::Event* signalEvent = nullptr
230)
231{
232 n_assert(numInvocations > 0);
233 n_assert(doneCounter != nullptr ? *doneCounter > 0 : true);
234
235 // Calculate the number of actual jobs based on invocations and group size
236 SizeT numJobs = Math::ceil(numInvocations / float(groupSize));
237
238 // Calculate allocation size which is node + counters + data context
239 SizeT dynamicAllocSize = sizeof(JobNode) + waitCounters.Size() * sizeof(const Threading::AtomicCounter*);
240 auto mem = JobAlloc<char>(dynamicAllocSize);
241 auto node = (JobNode*)mem;
242
243 // Copy over wait counters
244 node->job.waitCounters = nullptr;
245 if (waitCounters.Size() > 0)
246 {
247 node->job.waitCounters = (const Threading::AtomicCounter**)(mem + sizeof(JobNode));
248 memcpy(node->job.waitCounters, waitCounters.Begin(), waitCounters.Size() * sizeof(const Threading::AtomicCounter*));
249 }
250
251 node->job.l = std::move(func);
252 node->job.func = nullptr;
253 node->job.remainingGroups = numJobs;
254 node->job.groupCompletionCounter = numJobs;
255 node->job.numInvocations = numInvocations;
256 node->job.groupSize = groupSize;
257 node->job.numWaitCounters = (SizeT)waitCounters.Size();
258 node->job.doneCounter = doneCounter;
259 node->job.signalEvent = signalEvent;
260 node->sequence = nullptr;
261
262 // Add to end of linked list
263 ctx.jobLock.Enter();
264
265 // First, set head node if nullptr
266 if (ctx.head == nullptr)
267 ctx.head = node;
268
269 // Then add node to end of list
270 node->next = nullptr;
271 if (ctx.tail != nullptr)
272 ctx.tail->next = node;
273 ctx.tail = node;
274
275 ctx.jobLock.Leave();
276
277 // Trigger threads to wake up and compete for jobs
278 for (Ptr<JobThread>& thread : ctx.threads)
279 {
280 thread->SignalWorkAvailable();
281 }
282}
283
284//------------------------------------------------------------------------------
287template <typename LAMBDA> void
289 LAMBDA&& func
290 , const SizeT numInvocations
291 , const Util::FixedArray<const Threading::AtomicCounter*>& waitCounters = nullptr
292 , Threading::AtomicCounter* doneCounter = nullptr
293 , Threading::Event* signalEvent = nullptr
294)
295{
296 JobDispatch(func, numInvocations, numInvocations, waitCounters, doneCounter, signalEvent);
297}
298
299//------------------------------------------------------------------------------
302template <typename CTX> void
304 const JobFunc& func
305 , const SizeT numInvocations
306 , const SizeT groupSize
307 , const CTX& context
308 , const Util::FixedArray<const Threading::AtomicCounter*>& waitCounters = nullptr
309 , Threading::AtomicCounter* doneCounter = nullptr
310 , Threading::Event* signalEvent = nullptr
311)
312{
313 static_assert(std::is_trivially_destructible<CTX>::value, "Job context has to be trivially destructible");
314 n_assert(numInvocations > 0);
315 n_assert(doneCounter != nullptr ? *doneCounter > 0 : true);
316
317 // Calculate the number of actual jobs based on invocations and group size
318 SizeT numJobs = Math::ceil(numInvocations / float(groupSize));
319
320 // Calculate allocation size which is node + counters + data context
321 auto dynamicAllocSize = sizeof(JobNode) + sizeof(CTX) + waitCounters.Size() * sizeof(const Threading::AtomicCounter*);
322 auto mem = JobAlloc<char>(dynamicAllocSize);
323 auto node = (JobNode*)mem;
324
325 // Copy over wait counters
326 node->job.waitCounters = nullptr;
327 if (waitCounters.Size() > 0)
328 {
329 node->job.waitCounters = (const Threading::AtomicCounter**)(mem + sizeof(JobNode) + sizeof(CTX));
330 memcpy(node->job.waitCounters, waitCounters.Begin(), waitCounters.Size() * sizeof(const Threading::AtomicCounter*));
331 }
332
333 // Move context
334 node->job.data = (void*)(mem + sizeof(JobNode));
335 auto data = reinterpret_cast<CTX*>(node->job.data);
336 *data = context;
337
338 node->job.l.callable = nullptr;
339 node->job.func = func;
340 node->job.remainingGroups = numJobs;
341 node->job.groupCompletionCounter = numJobs;
342 node->job.numInvocations = numInvocations;
343 node->job.groupSize = groupSize;
344 node->job.numWaitCounters = (SizeT)waitCounters.Size();
345 node->job.doneCounter = doneCounter;
346 node->job.signalEvent = signalEvent;
347 node->sequence = nullptr;
348
349 // Add to end of linked list
350 ctx.jobLock.Enter();
351
352 // First, set head node if nullptr
353 if (ctx.head == nullptr)
354 ctx.head = node;
355
356 // Then add node to end of list
357 node->next = nullptr;
358 if (ctx.tail != nullptr)
359 ctx.tail->next = node;
360 ctx.tail = node;
361
362 ctx.jobLock.Leave();
363
364 // Trigger threads to wake up and compete for jobs
365 for (Ptr<JobThread>& thread : ctx.threads)
366 {
367 thread->SignalWorkAvailable();
368 }
369}
370
371//------------------------------------------------------------------------------
374template <typename CTX> void
376 const JobFunc& func
377 , const SizeT numInvocations
378 , const CTX& context
379 , const Util::FixedArray<const Threading::AtomicCounter*>& waitCounters = nullptr
380 , Threading::AtomicCounter* doneCounter = nullptr
381 , Threading::Event* signalEvent = nullptr
382)
383{
384 JobDispatch(func, numInvocations, numInvocations, context, waitCounters, doneCounter, signalEvent);
385}
386
387//------------------------------------------------------------------------------
390template<typename CTX> void
391JobAppendSequence(const JobFunc& func, const SizeT numInvocations, const SizeT groupSize, const CTX& context)
392{
393 static_assert(std::is_trivially_destructible<CTX>::value, "Job context has to be trivially destructible");
394 n_assert(numInvocations > 0);
395 n_assert(sequenceThread == Threading::Thread::GetMyThreadId());
396 n_assert(sequenceNode != nullptr);
397
398 // Calculate the number of actual jobs based on invocations and group size
399 SizeT numJobs = Math::ceil(numInvocations / float(groupSize));
400
401 // Calculate allocation size which is node + counters + data context
402 SizeT dynamicAllocSize = sizeof(JobNode) + sizeof(CTX) + sizeof(Threading::AtomicCounter);
403 if (prevDoneCounter != nullptr)
404 dynamicAllocSize += sizeof(Threading::AtomicCounter*);
405 auto mem = JobAlloc<char>(dynamicAllocSize);
406 auto node = (JobNode*)mem;
407
408 if (prevDoneCounter != nullptr)
409 {
410 node->job.numWaitCounters = 1;
411 node->job.waitCounters = (const Threading::AtomicCounter**)(mem + sizeof(JobNode) + sizeof(CTX) + sizeof(Threading::AtomicCounter));
412 memcpy(node->job.waitCounters, &prevDoneCounter, sizeof(Threading::AtomicCounter*));
413 }
414 else
415 {
416 node->job.numWaitCounters = 0;
417 node->job.waitCounters = nullptr;
418 }
419
420 // Move context
421 node->job.data = (void*)(mem + sizeof(JobNode));
422 auto data = reinterpret_cast<CTX*>(node->job.data);
423 *data = context;
424
425 node->job.l.callable = nullptr;
426 node->job.func = func;
427 node->job.remainingGroups = numJobs;
428 node->job.groupCompletionCounter = numJobs;
429 node->job.numInvocations = numInvocations;
430 node->job.groupSize = groupSize;
431
432 node->job.doneCounter = (Threading::AtomicCounter*)(mem + sizeof(JobNode) + sizeof(CTX));
433 *node->job.doneCounter = 1;
434 prevDoneCounter = node->job.doneCounter;
435 node->job.signalEvent = nullptr;
436 node->next = nullptr;
437
438 // The remainingGroups counter for the sequence node is the length of the sequence chain
439 if (sequenceTail == nullptr)
440 sequenceNode->sequence = node;
441 else
442 sequenceTail->next = node;
443
444 sequenceTail = node;
445
446 // Queue job
447 //ctx.queuedJobs.Append(node);
448}
449
450//------------------------------------------------------------------------------
453template<typename CTX> void
454JobAppendSequence(const JobFunc& func, const SizeT numInvocations, const CTX& context)
455{
456 JobAppendSequence(func, numInvocations, numInvocations, context);
457}
458
459} // namespace Jobs2
Definition jobs2.h:121
JobThread()
constructor
Definition jobs2.cc:19
virtual void DoWork() override
this method runs in the thread context
Definition jobs2.cc:58
Threading::Event wakeupEvent
Definition jobs2.h:143
void SignalWorkAvailable()
Signal new work available.
Definition jobs2.cc:40
__DeclareClass(JobThread)
virtual void EmitWakeupSignal() override
override this method if your thread loop needs a wakeup call before stopping
Definition jobs2.cc:49
virtual ~JobThread()
destructor
Definition jobs2.cc:28
bool enableProfiling
Definition jobs2.h:134
bool enableIo
Definition jobs2.h:133
Nebula's smart pointer class which manages the life time of RefCounted objects.
Definition ptr.h:38
Critical section objects are used to protect a portion of code from parallel execution.
Nebula's dynamic array class.
Definition array.h:60
Implements a fixed size one-dimensional array.
Definition fixedarray.h:20
A StringAtom.
Definition stringatom.h:22
#define n_assert(exp)
Definition debug.h:50
Definition jobs2.cc:11
Threading::ThreadId sequenceThread
Definition jobs2.cc:288
Jobs2Context ctx
Definition jobs2.cc:13
Jobs2::JobNode * sequenceNode
Definition jobs2.cc:285
const Threading::AtomicCounter * prevDoneCounter
Definition jobs2.cc:287
Jobs2::JobNode * sequenceTail
Definition jobs2.cc:286
void JobEndSequence(Threading::Event *signalEvent)
Flush queued jobs.
Definition jobs2.cc:334
void(*)(SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset, void *ctx) JobFunc
Definition jobs2.h:42
void JobDispatch(LAMBDA &&func, const SizeT numInvocations, const SizeT groupSize, const Util::FixedArray< const Threading::AtomicCounter * > &waitCounters=nullptr, Threading::AtomicCounter *doneCounter=nullptr, Threading::Event *signalEvent=nullptr)
Definition jobs2.h:223
void JobSystemUninit()
Destroy job port.
Definition jobs2.cc:240
void JobBeginSequence(const Util::FixedArray< const Threading::AtomicCounter * > &waitCounters, Threading::AtomicCounter *doneCounter, Threading::Event *signalEvent)
Begin a sequence of jobs.
Definition jobs2.cc:294
void * JobAlloc(SizeT bytes)
Allocate memory.
Definition jobs2.cc:264
void JobSystemInit(const JobSystemInitInfo &info)
Create a new job port.
Definition jobs2.cc:207
volatile long CompletionCounter
Definition jobs2.h:41
void JobNewFrame()
Progress to new buffer.
Definition jobs2.cc:253
void JobAppendSequence(const JobFunc &func, const SizeT numInvocations, const SizeT groupSize, const CTX &context)
Append job to sequence with an automatic dependency on the previous job.
Definition jobs2.h:391
__forceinline float ceil(float val)
Floating point ceiling.
Definition scalar.h:523
The Jobs2 system provides a set of threads and a pool of jobs from which threads can pickup work.
Definition jobs2.h:16
pthread_t ThreadId
Definition linuxthreadid.h:15
volatile int AtomicCounter
Definition interlocked.h:19
Definition half.h:491
Definition jobs2.h:52
LAMBDA l
Definition jobs2.h:53
void invoke(ARGS... args) override
Definition jobs2.h:57
Callable(LAMBDA l)
Definition jobs2.h:55
Definition jobs2.h:46
virtual void invoke(ARGS... args)=0
Definition jobs2.h:82
const Threading::AtomicCounter ** waitCounters
Definition jobs2.h:90
void * data
Definition jobs2.h:89
SizeT groupSize
Definition jobs2.h:88
int remainingGroups
Definition jobs2.h:85
SizeT numWaitCounters
Definition jobs2.h:91
Threading::AtomicCounter groupCompletionCounter
Definition jobs2.h:86
Threading::AtomicCounter * doneCounter
Definition jobs2.h:92
SizeT numInvocations
Definition jobs2.h:87
Threading::Event * signalEvent
Definition jobs2.h:93
JobFunc func
Definition jobs2.h:83
Lambda l
Definition jobs2.h:84
Definition jobs2.h:97
JobNode * next
Definition jobs2.h:98
JobContext job
Definition jobs2.h:99
JobNode * sequence
Definition jobs2.h:100
Definition jobs2.h:147
uint priority
Definition jobs2.h:151
bool enableIo
Definition jobs2.h:156
SizeT scratchMemorySize
Definition jobs2.h:153
Util::StringAtom name
Definition jobs2.h:148
uint affinity
Definition jobs2.h:150
JobSystemInitInfo()
Definition jobs2.h:159
SizeT numThreads
Definition jobs2.h:149
bool enableProfiling
Definition jobs2.h:157
SizeT numBuffers
Definition jobs2.h:154
Definition jobs2.h:104
IndexT activeBuffer
Definition jobs2.h:113
JobNode * head
Definition jobs2.h:106
Util::Array< JobNode * > queuedJobs
Definition jobs2.h:109
SizeT scratchMemorySize
Definition jobs2.h:115
IndexT iterator
Definition jobs2.h:112
SizeT numBuffers
Definition jobs2.h:111
Util::FixedArray< byte * > scratchMemory
Definition jobs2.h:114
JobNode * tail
Definition jobs2.h:107
Util::FixedArray< Ptr< JobThread > > threads
Definition jobs2.h:108
Threading::CriticalSection jobLock
Definition jobs2.h:105
Definition jobs2.h:64
Lambda(LAMBDA l)
Definition jobs2.h:68
CallableStub< SizeT, SizeT, IndexT, SizeT > * callable
Definition jobs2.h:65
void operator()(SizeT totalJobs, SizeT groupSize, IndexT groupIndex, SizeT invocationOffset)
Definition jobs2.h:75
int SizeT
Definition types.h:49
unsigned int uint
Definition types.h:31
int IndexT
Definition types.h:48