Thread (lx-thread.hpp)
Contents
- 1 Multithreading
- 1.1 Threading Interfaces
- 1.2 Worklist
- 1.3 Synchronization
- 1.3.1 (5) SDK: ThreadMutex::Enter, etc.
- 1.3.2 (6) SDK: empty ThreadMutex User Class
- 1.3.3 (7) SDK: ThreadService::CreateMutex
- 1.3.4 (8) SDK: CLxUser_ThreadService::NewMutex method
- 1.3.5 (9) SDK: ThreadService::CreateCS
- 1.3.6 (10) SDK: CLxUser_ThreadService::NewCritSec method
- 1.3.7 (11) User Class: additional
- 1.4 Jobs and Groups
- 1.5 Thread Slots
- 1.6 Shared Work
- 1.7 Range Worker
- 1.8 External Thread Initialization
- 1.9 Waterfall
- 1.10 Atomic Operations
Multithreading
This module provides a very simple interface for creating and managing lightweight threads. Although they can be grouped for ease of access, all the threads in an application reside in the same process context and share the same set of thread resources. The express purpose of the threads is to permit dividing the processing task of the application into parallel chunks that can be spread across multiple processors, if available.
Threading Interfaces
ILxThreadService service provides a simple interface into basic threading constructs and, more importantly, a way to synchronize with the host process and other threads.
(1) SDK: LXu_THREADSERVICE, etc. defines
#define LXu_THREADSERVICE "0A9D5B42-1DA6-42A4-8FC4-01FCCE939AC4" #define LXa_THREADSERVICE "threadservice" #define LXu_THREADMUTEX "7624F6B7-83FD-424F-A68E-0EDD089167CB" #define LXa_THREADMUTEX "threadmutex" #define LXu_THREADJOB "DE892B0B-A791-4FA5-B85D-46E8CACB695B" #define LXa_THREADJOB "threadjob" #define LXu_THREADGROUP "54A9DD48-3AFC-435F-9F17-2EEB6FB46FBA" #define LXa_THREADGROUP "threadgroup" #define LXu_WORKLIST "74568CA9-92C9-4C73-9851-E9169934629A" #define LXa_WORKLIST "worklist" #define LXu_THREADSLOT "365E4616-0FB9-478E-993D-D35282F4C326" #define LXu_THREADSLOTCLIENT "D24835B6-518B-4E33-8A70-E53038C29BB7" #define LXu_SHAREDWORK "4D414F97-35A4-4B26-84FE-0E740C79722C" #define LXu_THREADRANGEWORKER "612153FE-572F-4CD6-8033-B905762C3106" #define LXa_THREADRANGEWORKER "threadrangeworker"
Worklist
A worklist is a opaque container for units of work. Typically elements are processed one at a time, and can be split into new spawned lists.
(2) SDK: WorkList::IsEmpty, etc.
LXxMETHOD ( LxResult, IsEmpty) ( LXtObjectID self); LXxMETHOD ( void *, Next) ( LXtObjectID self); LXxMETHOD ( LxResult, Split) ( LXtObjectID self, unsigned mode, void **ppvObj); LXxMETHOD ( void, Clear) ( LXtObjectID self);
(3) SDK: LXiWLSPLIT_NONE, etc. defines
#define LXiWLSPLIT_NONE 0 #define LXiWLSPLIT_ONE 1 #define LXiWLSPLIT_HALF 2
(4) SDK: CLxUser_WorkList::Split method
bool Split ( CLxLoc_WorkList &wlist, unsigned mode = LXiWLSPLIT_NONE) { LXtObjectID obj; if (LXx_FAIL (CLxLoc_WorkList::Split (mode, &obj))) return false; return wlist.take (obj); }
Synchronization
Mutexes are used for synchronization of threads. Only a single thread may be "in" a mutex at a time. And other thread calling Enter will block until the owning thread has released the mutex with a Leave call. NOTE: Calling Enter from a thread that already has the mutex will result in deadlock.
Critical sections are closely related to mutexes with the one (significant) difference that a thread *can* re-enter a critical section it owns. These are usually not necessary and have a greater performance cost than mutexes. Critical sections share the THREADMUTEX interface.
(5) SDK: ThreadMutex::Enter, etc.
LXxMETHOD ( void, Enter) ( LXtObjectID self); LXxMETHOD ( void, Leave) ( LXtObjectID self);
(6) SDK: empty ThreadMutex User Class
The thread service allows you to create mutexes.
(7) SDK: ThreadService::CreateMutex
LXxMETHOD ( LxResult, CreateMutex) ( LXtObjectID self, void **ppvObj);
(8) SDK: CLxUser_ThreadService::NewMutex method
bool NewMutex ( CLxLoc_ThreadMutex &mux) { LXtObjectID obj; if (LXx_FAIL (CreateMutex (&obj))) return false; return mux.take (obj); }
... and critical sections.
(9) SDK: ThreadService::CreateCS
LXxMETHOD ( LxResult, CreateCS) ( LXtObjectID self, void **ppvObj);
(10) SDK: CLxUser_ThreadService::NewCritSec method
bool NewCritSec ( CLxLoc_ThreadMutex &cs) { LXtObjectID obj; if (LXx_FAIL (CreateCS (&obj))) return false; return cs.take (obj); }
We also provide a couple of simple classes for self-allocating locks and automatically scoped enter and leave.
(11) User Class: additional
class CLxMutexLock : public CLxUser_ThreadMutex { public: CLxMutexLock () { CLxUser_ThreadService ts; ts.NewMutex (*this); } }; class CLxCritSecLock : public CLxUser_ThreadMutex { public: CLxCritSecLock () { CLxUser_ThreadService ts; ts.NewCritSec (*this); } }; class CLxArmLockedMutex : public CLxArm { public: CLxUser_ThreadMutex &lock; CLxArmLockedMutex (CLxUser_ThreadMutex &mux) : lock (mux) { lock.Enter (); } ~CLxArmLockedMutex () { if (armed) lock.Leave (); } };
Jobs and Groups
Clients specify the work they wish to perform by creating a Job. The job is a very simple object that has a single method - Execute. Jobs can be added to other ThreadService objects which can in turn control their threaded execution.
(12) SDK: ThreadJob::Execute
LXxMETHOD ( void, Execute) ( LXtObjectID self);
A more useful higher level construct than the Job is the thread group. Thread Groups allow the client launch several threads at once. Each thread in the group gets its own ThreadFunc and data. Usually, all the threads in the group will have the same ThreadFunc but different pieces of data. They may run either synchronously or asynchronously.
(13) SDK: ThreadGroup::AddJob, etc.
LXxMETHOD ( void, AddJob) ( LXtObjectID self, LXtObjectID job); LXxMETHOD ( unsigned, NumJobs) ( LXtObjectID self); LXxMETHOD ( void, Clear) ( LXtObjectID self); LXxMETHOD ( void, Execute) ( LXtObjectID self); LXxMETHOD ( void, Wait) ( LXtObjectID self); LXxMETHOD ( LxResult, Running) ( LXtObjectID self); LXxMETHOD ( void, Kill) ( LXtObjectID self);
(14) SDK: empty ThreadGroup User Class
The thread service interface allows new groups to be created.
(15) SDK: ThreadService::CreateGroup
LXxMETHOD ( LxResult, CreateGroup) ( LXtObjectID self, void **ppvObj);
(16) SDK: CLxUser_ThreadService::NewGroup method
bool NewGroup ( CLxLoc_ThreadGroup &tg) { LXtObjectID obj; if (LXx_FAIL (CreateGroup (&obj))) return false; return tg.take (obj); }
There are also a couple of simple methods to determine the state of the threading systems.
(17) SDK: ThreadService::NumProcs, etc.
LXxMETHOD ( unsigned int, NumProcs) ( LXtObjectID self); LXxMETHOD ( unsigned int, IsMainThread) ( LXtObjectID self);
Thread Slots
Slots are a way to store data locally for each thread.
(18) SDK: ThreadSlot::Set, etc.
LXxMETHOD ( LxResult, Set) ( LXtObjectID self, void *value); LXxMETHOD ( LxResult, Get) ( LXtObjectID self, void **value); LXxMETHOD ( LxResult, Clear) ( LXtObjectID self);
(19) SDK: empty ThreadSlot User Class
Defining a slot can use a client class for allocating and freeing the data.
(20) SDK: ThreadSlotClient::Alloc, etc.
LXxMETHOD ( LxResult, Alloc) ( LXtObjectID self, void **value); LXxMETHOD ( LxResult, Free) ( LXtObjectID self, void *value);
Finally this method allows allocating a new thread slot.
(21) SDK: ThreadService::CreateSlot
LXxMETHOD ( LxResult, CreateSlot) ( LXtObjectID self, size_t size, LXtObjectID client, void **ppvObj);
(22) SDK: CLxUser_ThreadService::NewSlot method
bool NewSlot ( CLxLoc_ThreadSlot &ts, size_t size) { LXtObjectID obj; if (LXx_FAIL (CreateSlot (size, 0, &obj))) return false; return ts.take (obj); } bool NewSlot ( CLxLoc_ThreadSlot &ts, ILxUnknownID client) { LXtObjectID obj; if (LXx_FAIL (CreateSlot (0, client, &obj))) return false; return ts.take (obj); }
Shared work represents a batch of work that can be done in parallel.
Evaluate | Process one unit of work and return a result code. An error will terminate the whole process. If there is no more work left to do return LXe_FALSE. |
Spawn | Create a new shared work object of the same type. The new object should have no work. |
Share | Move work from this object to the other. This is typically only one unit of work, but if the work units are especially small, more than one at a time can be moved to reduce contention. When there is no work left return LXe_FALSE. |
LXxMETHOD ( LxResult, Evaluate) ( LXtObjectID self); LXxMETHOD ( LxResult, Spawn) ( LXtObjectID self, void **ppvObj); LXxMETHOD ( LxResult, Share) ( LXtObjectID self, LXtObjectID other, unsigned int split);
You start with a single one of these objects that contains all the work. You then call the ProcesShared() method in the service. This will spawn enough of the shared work objects to populate the available computing resources. Each one will then process all the work it has, getting more from the main shared work object when they are empty. When all the work is done the sub-objects are destroyed.
LXxMETHOD ( LxResult, ProcessShared) ( LXtObjectID self, LXtObjectID shared);
Range Worker
A range worker performs processing over a range of indices (such as scanlines in an image).
(25) SDK: ThreadRangeWorker::Execute
LXxMETHOD ( LxResult, Execute) ( LXtObjectID self, int index, void *sharedData);
Multi-processing over a range of indices (such as scanlines in an image).
(26) SDK: ThreadService::ProcessRange
LXxMETHOD ( LxResult, ProcessRange) ( LXtObjectID self, void *data, int startIndex, int endIndex, LXtObjectID rangeWorker);
External Thread Initialization
If a plugin creates its own threads, using some external library (like pthreads or OpenMP), it needs to initialize itself before it can call Nexus functions. This function allows that. If the thread has already initialized itself, this function will do nothing.
(27) SDK: ThreadService::InitThread
LXxMETHOD ( LxResult, InitThread) ( LXtObjectID self);
Once the thread has finished executing Nexus code, it needs to free the Nexus specific thread-data, so it must call CleanupThread, or there will be a memory leak.
(28) SDK: ThreadService::CleanupThread
LXxMETHOD ( LxResult, CleanupThread) ( LXtObjectID self);
Note that it's actually okay for a single thread to call Init, then Cleanup, then Init, etc. The only thing necessary is that there is a Cleanup for every Init, that all calls into Nexus come after an Init and before the subsequent Cleanup. Nested Init/Cleanup calls will actually work correctly, where only the last Cleanup will fire (the thread reference counts).
Waterfall
A waterfall is a model of threaded computation that combines parallel and sequential elements. The metaphor is of a waterfall consisting of a set of pools at different levels. Water fills the first set of pools in parallel, but the pools at the next level don't start to fill until the first level is completely full.
Likewise the waterfall object presents a parade of work units in stages. The work at each stage has to be complete before the next stage can begin. The object manages not only the work list but the processing, which it does by allowing multiple copies of itself to work on the same list.
(29) SDK: LXu_WATERFALL, etc. defines
#define LXu_WATERFALL "2B845B2A-06BE-4C90-8E50-58F7FBEEC25E" #define LXa_WATERFALL "waterfall" #define LXiWFALL_DONE 0 #define LXiWFALL_HASWORK 1 #define LXiWFALL_NEXT_WORK 2 #define LXiWFALL_NEXT_STAGE 3
Spawn() creates a new waterfall object taking the first work in the current stage. If there's no more work in this stage this should fail.
(30) SDK: Waterfall::Spawn
LXxMETHOD ( LxResult, Spawn) ( LXtObjectID self, void **ppvObj);
Query the state of the waterfall object. This returns one of the LXiWFALL state values.
DONE | indicates that the last work in the last stage has been completed. |
HASWORK | means that this waterfall object has work ready to be processed. |
NEXT_WORK, NEXT_STAGE | if the waterfall object has no work it can indicate that there is more work in this stage, or that the current stage is complete but there is more work in the next stage. |
(31) SDK: Waterfall::State
LXxMETHOD ( unsigned, State) ( LXtObjectID self);
If this object has work loaded it can be discharged with this method. This is the only method that will be called concurrently; all other methods will be called in one thread at a time and so don't need to protect themselves.
(32) SDK: Waterfall::ProcessWork
LXxMETHOD ( LxResult, ProcessWork) ( LXtObjectID self);
If there is work available in the current stage this is used to take one unit of work and load it into this object.
(33) SDK: Waterfall::GetWork
LXxMETHOD ( LxResult, GetWork) ( LXtObjectID self);
If there is no work in the current stage this is called to advance the waterfall to the next stage. All the work in the previous stage will have been completely processed.
(34) SDK: Waterfall::Advance
LXxMETHOD ( LxResult, Advance) ( LXtObjectID self);
This method takes a waterfall object and processes all the work it contains. New instances will be spawned to fill out the given number of threads, or one for each processor is the thread count is zero.
(35) SDK: ThreadService::ProcessWaterfall
LXxMETHOD ( LxResult, ProcessWaterfall) ( LXtObjectID self, LXtObjectID waterfall, unsigned threads);
Atomic Operations
This section defines functions that are assurably atomic, meaning that there are no possible race conditions if multiple threads call them on the same data at once.
All values needs to be aligned to their size (i.e. 32-bit ints needs to be 32-bit aligned, and 64-bit ints need to be 64-bit aligned). If they're not aligned, the functions do nothing. All functions return the result of the increment or decrement.
These functions atomically increment and decrement an address-aligned integer.
(36) SDK: ThreadService::AtomicIncrement, etc.
LXxMETHOD ( int, AtomicIncrement) ( LXtObjectID self, volatile int *addr); LXxMETHOD ( int, AtomicDecrement) ( LXtObjectID self, volatile int *addr);
These functions are the same but they allow you to add and subtract any number. They return the result of the addition or subtraction.
(37) SDK: ThreadService::AtomicIntegerAdd, etc.
LXxMETHOD ( int, AtomicIntegerAdd) ( LXtObjectID self, volatile int *addr, int val); LXxMETHOD ( int, AtomicIntegerSubtract) ( LXtObjectID self, volatile int *addr, int val);