ARGoS 3
A parallel, multi-engine simulator for swarm robotics
space_multi_thread_balance_length.cpp
Go to the documentation of this file.
1
8#include <argos3/core/simulator/simulator.h>
9#include <argos3/core/utility/profiler/profiler.h>
10
11namespace argos {
12
13 /****************************************/
14 /****************************************/
15
18 pthread_mutex_t* StartActPhaseMutex;
19 pthread_mutex_t* StartPhysicsPhaseMutex;
20 pthread_mutex_t* StartMediaPhaseMutex;
21 pthread_mutex_t* StartEntityIterPhaseMutex;
22 pthread_mutex_t* FetchTaskMutex;
23 };
24
25 static void CleanupThread(void* p_data) {
26 CSimulator& cSimulator = CSimulator::GetInstance();
27 if(cSimulator.IsProfiling()) {
29 }
30 SCleanupThreadData& sData =
31 *reinterpret_cast<SCleanupThreadData*>(p_data);
32 pthread_mutex_unlock(sData.FetchTaskMutex);
33 pthread_mutex_unlock(sData.StartSenseControlPhaseMutex);
34 pthread_mutex_unlock(sData.StartActPhaseMutex);
35 pthread_mutex_unlock(sData.StartPhysicsPhaseMutex);
36 pthread_mutex_unlock(sData.StartMediaPhaseMutex);
37 pthread_mutex_unlock(sData.StartEntityIterPhaseMutex);
38 }
39
40 void* LaunchThreadBalanceLength(void* p_data) {
41 /* Set up thread-safe buffers for this new thread */
42 LOG.AddThreadSafeBuffer();
43 LOGERR.AddThreadSafeBuffer();
44 /* Make this thread cancellable */
45 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nullptr);
46 pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, nullptr);
47 /* Get a handle to the thread launch data */
48 auto* psData = reinterpret_cast<CSpaceMultiThreadBalanceLength::SThreadLaunchData*>(p_data);
49 /* Create cancellation data */
50 SCleanupThreadData sCancelData;
51 sCancelData.StartSenseControlPhaseMutex = &(psData->Space->m_tStartSenseControlPhaseMutex);
52 sCancelData.StartActPhaseMutex = &(psData->Space->m_tStartActPhaseMutex);
53 sCancelData.StartPhysicsPhaseMutex = &(psData->Space->m_tStartPhysicsPhaseMutex);
54 sCancelData.StartMediaPhaseMutex = &(psData->Space->m_tStartMediaPhaseMutex);
55 sCancelData.StartEntityIterPhaseMutex = &(psData->Space->m_tStartEntityIterPhaseMutex);
56 sCancelData.FetchTaskMutex = &(psData->Space->m_tFetchTaskMutex);
57 pthread_cleanup_push(CleanupThread, &sCancelData);
58 psData->Space->SlaveThread();
59 /* Dispose of cancellation data */
60 pthread_cleanup_pop(1);
61 return nullptr;
62 }
63
64 /****************************************/
65 /****************************************/
66
68 /* Initialize the space */
69 CSpace::Init(t_tree);
70 /* Initialize thread related structures */
71 int nErrors;
72 /* Init mutexes */
73 if((nErrors = pthread_mutex_init(&m_tStartSenseControlPhaseMutex, nullptr)) ||
74 (nErrors = pthread_mutex_init(&m_tStartActPhaseMutex, nullptr)) ||
75 (nErrors = pthread_mutex_init(&m_tStartPhysicsPhaseMutex, nullptr)) ||
76 (nErrors = pthread_mutex_init(&m_tStartMediaPhaseMutex, nullptr)) ||
77 (nErrors = pthread_mutex_init(&m_tStartEntityIterPhaseMutex, nullptr)) ||
78 (nErrors = pthread_mutex_init(&m_tFetchTaskMutex, nullptr))) {
79 THROW_ARGOSEXCEPTION("Error creating thread mutexes " << ::strerror(nErrors));
80 }
81 /* Init conditionals */
82 if((nErrors = pthread_cond_init(&m_tStartSenseControlPhaseCond, nullptr)) ||
83 (nErrors = pthread_cond_init(&m_tStartActPhaseCond, nullptr)) ||
84 (nErrors = pthread_cond_init(&m_tStartPhysicsPhaseCond, nullptr)) ||
85 (nErrors = pthread_cond_init(&m_tStartMediaPhaseCond, nullptr)) ||
86 (nErrors = pthread_cond_init(&m_tStartEntityIterPhaseCond, nullptr)) ||
87 (nErrors = pthread_cond_init(&m_tFetchTaskCond, nullptr))) {
88 THROW_ARGOSEXCEPTION("Error creating thread conditionals " << ::strerror(nErrors));
89 }
90 /* Reset the idle thread count */
91 m_unSenseControlPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
92 m_unActPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
93 m_unPhysicsPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
94 m_unMediaPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
95 m_unEntityIterPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
96 /* Start threads */
97 StartThreads();
98 }
99
100 /****************************************/
101 /****************************************/
102
104 /* Destroy the threads to update the controllable entities */
105 int nErrors;
106 if(m_ptThreads != nullptr) {
107 for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
108 if((nErrors = pthread_cancel(m_ptThreads[i]))) {
109 THROW_ARGOSEXCEPTION("Error canceling threads " << ::strerror(nErrors));
110 }
111 }
112 auto** ppJoinResult = new void*[CSimulator::GetInstance().GetNumThreads()];
113 for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
114 if((nErrors = pthread_join(m_ptThreads[i], ppJoinResult + i))) {
115 THROW_ARGOSEXCEPTION("Error joining threads " << ::strerror(nErrors));
116 }
117 if(ppJoinResult[i] != PTHREAD_CANCELED) {
118 LOGERR << "[WARNING] Thread #" << i<< " not canceled" << std::endl;
119 }
120 }
121 delete[] ppJoinResult;
122 }
123 delete[] m_ptThreads;
124 /* Destroy the thread launch info */
125 if(m_psThreadData != nullptr) {
126 for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
127 delete m_psThreadData[i];
128 }
129 }
130 delete[] m_psThreadData;
131 pthread_mutex_destroy(&m_tStartSenseControlPhaseMutex);
132 pthread_mutex_destroy(&m_tStartActPhaseMutex);
133 pthread_mutex_destroy(&m_tStartPhysicsPhaseMutex);
134 pthread_mutex_destroy(&m_tStartMediaPhaseMutex);
135 pthread_mutex_destroy(&m_tStartEntityIterPhaseMutex);
136 pthread_mutex_destroy(&m_tFetchTaskMutex);
137
138 pthread_cond_destroy(&m_tStartSenseControlPhaseCond);
139 pthread_cond_destroy(&m_tStartActPhaseCond);
140 pthread_cond_destroy(&m_tStartPhysicsPhaseCond);
141 pthread_cond_destroy(&m_tStartMediaPhaseCond);
142 pthread_cond_destroy(&m_tStartEntityIterPhaseCond);
143 pthread_cond_destroy(&m_tFetchTaskCond);
144
145 /* Destroy the base space */
147 }
148
149 /****************************************/
150 /****************************************/
151
153 /* Reset the idle thread count */
154 m_unSenseControlPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
155 m_unActPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
156 m_unPhysicsPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
157 m_unMediaPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
158 m_unEntityIterPhaseIdleCounter = CSimulator::GetInstance().GetNumThreads();
159 /* Update the space */
161 }
162
163 /****************************************/
164 /****************************************/
165
166#define MAIN_START_PHASE(PHASE) \
167 pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex); \
168 m_un ## PHASE ## PhaseIdleCounter = 0; \
169 m_unTaskIndex = 0; \
170 pthread_cond_broadcast(&m_tStart ## PHASE ## PhaseCond); \
171 pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex);
172
173#define MAIN_WAIT_FOR_END_OF(PHASE) \
174 pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex); \
175 while(m_un ## PHASE ## PhaseIdleCounter < CSimulator::GetInstance().GetNumThreads()) { \
176 pthread_cond_wait(&m_tStart ## PHASE ## PhaseCond, &m_tStart ## PHASE ## PhaseMutex); \
177 } \
178 pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex);
179
185
186 /****************************************/
187 /****************************************/
188
190 /* Physics phase */
191 MAIN_START_PHASE(Physics);
192 MAIN_WAIT_FOR_END_OF(Physics);
193 /* Perform entity transfer from engine to engine, if needed */
194 for(size_t i = 0; i < m_ptPhysicsEngines->size(); ++i) {
195 if((*m_ptPhysicsEngines)[i]->IsEntityTransferNeeded()) {
196 (*m_ptPhysicsEngines)[i]->TransferEntities();
197 }
198 }
199 }
200
201 /****************************************/
202 /****************************************/
203
205 /* Media phase */
206 MAIN_START_PHASE(Media);
208 }
209
210 /****************************************/
211 /****************************************/
212
214 const TControllableEntityIterCBType &c_cb) {
216 /* Iterate over all robots in the swarm */
217 MAIN_START_PHASE(EntityIter);
218 MAIN_WAIT_FOR_END_OF(EntityIter);
219 } /* IterateOverControllableEntities() */
220
221
222 /****************************************/
223 /****************************************/
224
226 /* Sense/control phase */
227 MAIN_START_PHASE(SenseControl);
228 MAIN_WAIT_FOR_END_OF(SenseControl);
229 }
230
231 /****************************************/
232 /****************************************/
233
234 void CSpaceMultiThreadBalanceLength::StartThreads() {
235 int nErrors;
236 /* Create the threads to update the controllable entities */
237 m_ptThreads = new pthread_t[CSimulator::GetInstance().GetNumThreads()];
238 m_psThreadData = new SThreadLaunchData*[CSimulator::GetInstance().GetNumThreads()];
239 for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
240 /* Create the struct with the info to launch the thread */
241 m_psThreadData[i] = new SThreadLaunchData(i, this);
242 /* Create the thread */
243 if((nErrors = pthread_create(m_ptThreads + i,
244 nullptr,
246 reinterpret_cast<void*>(m_psThreadData[i])))) {
247 THROW_ARGOSEXCEPTION("Error creating thread: " << ::strerror(nErrors));
248 }
249 }
250 }
251
252 /****************************************/
253 /****************************************/
254
255#define THREAD_WAIT_FOR_START_OF(PHASE) \
256 pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex); \
257 while(m_un ## PHASE ## PhaseIdleCounter == CSimulator::GetInstance().GetNumThreads()) { \
258 pthread_cond_wait(&m_tStart ## PHASE ## PhaseCond, &m_tStart ## PHASE ## PhaseMutex); \
259 } \
260 pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex); \
261 pthread_testcancel();
262
263#define THREAD_PERFORM_TASK(PHASE, TASKVEC, CONDITION, SNIPPET) \
264 while(1) { \
265 pthread_mutex_lock(&m_tFetchTaskMutex); \
266 if((CONDITION) && m_unTaskIndex < (TASKVEC).size()) { \
267 unTaskIndex = m_unTaskIndex; \
268 ++m_unTaskIndex; \
269 pthread_mutex_unlock(&m_tFetchTaskMutex); \
270 pthread_testcancel(); \
271 { \
272 SNIPPET; \
273 } \
274 pthread_testcancel(); \
275 } \
276 else { \
277 pthread_mutex_unlock(&m_tFetchTaskMutex); \
278 pthread_testcancel(); \
279 pthread_mutex_lock(&m_tStart ## PHASE ## PhaseMutex); \
280 ++m_un ## PHASE ## PhaseIdleCounter; \
281 pthread_cond_broadcast(&m_tStart ## PHASE ## PhaseCond); \
282 pthread_mutex_unlock(&m_tStart ## PHASE ## PhaseMutex); \
283 pthread_testcancel(); \
284 break; \
285 } \
286 } \
287 pthread_testcancel();
288
289 void CSpaceMultiThreadBalanceLength::SlaveThread() {
290 /* Task index */
291 size_t unTaskIndex;
292 while(1) {
295 Act,
297 true,
298 if(m_vecControllableEntities[unTaskIndex]->IsEnabled()) m_vecControllableEntities[unTaskIndex]->Act();
299 );
302 Physics,
304 true,
305 (*m_ptPhysicsEngines)[unTaskIndex]->Update();
306 );
309 Media,
310 *m_ptMedia,
311 true,
312 (*m_ptMedia)[unTaskIndex]->Update();
313 );
314 /* loop functions PreStep() */
315 THREAD_WAIT_FOR_START_OF(EntityIter);
317 EntityIter,
321 THREAD_WAIT_FOR_START_OF(SenseControl);
323 SenseControl,
325 true,
326 if(m_vecControllableEntities[unTaskIndex]->IsEnabled()) {
327 m_vecControllableEntities[unTaskIndex]->Sense();
328 m_vecControllableEntities[unTaskIndex]->ControlStep();
329 }
330 );
331 /* loop functions PostStep() */
332 THREAD_WAIT_FOR_START_OF(EntityIter);
334 EntityIter,
338 } /* while(1) */
339 }
340
341 /****************************************/
342 /****************************************/
343
344}
unsigned int UInt32
32-bit unsigned integer.
Definition datatypes.h:97
#define THROW_ARGOSEXCEPTION(message)
This macro throws an ARGoS exception with the passed message.
#define THREAD_WAIT_FOR_START_OF(PHASE)
#define MAIN_WAIT_FOR_END_OF(PHASE)
#define THREAD_PERFORM_TASK(PHASE, TASKVEC, CONDITION, SNIPPET)
#define MAIN_START_PHASE(PHASE)
The namespace containing all the ARGoS related code.
Definition ci_actuator.h:12
CARGoSLog LOGERR(std::cerr, SLogColor(ARGOS_LOG_ATTRIBUTE_BRIGHT, ARGOS_LOG_COLOR_RED))
Definition argos_log.h:180
ticpp::Element TConfigurationNode
The ARGoS configuration XML node.
CARGoSLog LOG(std::cout, SLogColor(ARGOS_LOG_ATTRIBUTE_BRIGHT, ARGOS_LOG_COLOR_GREEN))
Definition argos_log.h:179
void * LaunchThreadBalanceLength(void *p_data)
The core class of ARGOS.
Definition simulator.h:62
CProfiler & GetProfiler()
Returns a reference to the profiler.
Definition simulator.h:174
static CSimulator & GetInstance()
Returns the instance to the CSimulator class.
Definition simulator.cpp:78
UInt32 GetNumThreads() const
Returns the number of threads used during the experiment.
Definition simulator.h:260
bool IsProfiling() const
Returns true if ARGoS is being profiled.
Definition simulator.h:182
CControllableEntity::TVector m_vecControllableEntities
A vector of controllable entities.
Definition space.h:491
virtual void Init(TConfigurationNode &t_tree)
Initializes the space using the <arena> section of the XML configuration file.
Definition space.cpp:37
virtual void Destroy()
Destroys the space and all its entities.
Definition space.cpp:85
bool ControllableEntityIterationEnabled() const
Definition space.h:449
virtual void Update()
Updates the space.
Definition space.cpp:119
CPhysicsEngine::TVector * m_ptPhysicsEngines
A pointer to the list of physics engines.
Definition space.h:497
std::function< void(CControllableEntity *)> TControllableEntityIterCBType
The callback type for iteration over controllable entities within the PreStep() and/or PostStep() par...
Definition space.h:90
TControllableEntityIterCBType m_cbControllableEntityIter
Callback for iterating over entities from within the loop functions.
Definition space.h:503
CMedium::TVector * m_ptMedia
A pointer to the list of media.
Definition space.h:500
virtual void IterateOverControllableEntities(const TControllableEntityIterCBType &c_cb)
Given a callback specified in the loop functions, iterate over all controllable entities currently pr...
virtual void Destroy()
Destroys the space and all its entities.
virtual void Init(TConfigurationNode &t_tree)
Initializes the space using the <arena> section of the XML configuration file.
void CollectThreadResourceUsage()
Definition profiler.cpp:172