ARGoS 3
A parallel, multi-engine simulator for swarm robotics
space_multi_thread_balance_quantity.cpp
Go to the documentation of this file.
1
7#include <unistd.h>
8#include <cstring>
9#include <argos3/core/simulator/simulator.h>
10#include <argos3/core/utility/profiler/profiler.h>
12
13namespace argos {
14
15 /****************************************/
16 /****************************************/
17
20 pthread_mutex_t* ActConditionalMutex;
21 pthread_mutex_t* PhysicsConditionalMutex;
22 pthread_mutex_t* MediaConditionalMutex;
24 };
25
26 static void CleanupUpdateThread(void* p_data) {
27 CSimulator& cSimulator = CSimulator::GetInstance();
28 if(cSimulator.IsProfiling()) {
30 }
31 SCleanupUpdateThreadData& sData =
32 *reinterpret_cast<SCleanupUpdateThreadData*>(p_data);
33 pthread_mutex_unlock(sData.SenseControlStepConditionalMutex);
34 pthread_mutex_unlock(sData.ActConditionalMutex);
35 pthread_mutex_unlock(sData.PhysicsConditionalMutex);
36 pthread_mutex_unlock(sData.MediaConditionalMutex);
37 pthread_mutex_unlock(sData.EntityIterConditionalMutex);
38 }
39
41 LOG.AddThreadSafeBuffer();
42 LOGERR.AddThreadSafeBuffer();
43 auto* psData = reinterpret_cast<CSpaceMultiThreadBalanceQuantity::SUpdateThreadData*>(p_data);
44 psData->Space->UpdateThread(psData->ThreadId);
45 return nullptr;
46 }
47
48 /****************************************/
49 /****************************************/
50
52 m_psUpdateThreadData(nullptr),
53 m_ptUpdateThreads(nullptr),
54 m_bIsControllableEntityAssignmentRecalculationNeeded(true) {}
55
56 /****************************************/
57 /****************************************/
58
60 /* Initialize the space */
61 CSpace::Init(t_tree);
62 /* Initialize thread related structures */
63 int nErrors;
64 /* First the counters */
65 m_unSenseControlStepPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads();
66 m_unActPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads();
67 m_unPhysicsPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads();
68 m_unMediaPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads();
69 m_unEntityIterPhaseDoneCounter = CSimulator::GetInstance().GetNumThreads();
70
71 /* Then the mutexes */
72 if((nErrors = pthread_mutex_init(&m_tSenseControlStepConditionalMutex, nullptr)) ||
73 (nErrors = pthread_mutex_init(&m_tActConditionalMutex, nullptr)) ||
74 (nErrors = pthread_mutex_init(&m_tPhysicsConditionalMutex, nullptr)) ||
75 (nErrors = pthread_mutex_init(&m_tMediaConditionalMutex, nullptr)) ||
76 (nErrors = pthread_mutex_init(&m_tEntityIterConditionalMutex, nullptr))) {
77 THROW_ARGOSEXCEPTION("Error creating thread mutexes " << ::strerror(nErrors));
78 }
79 /* Finally the conditionals */
80 if((nErrors = pthread_cond_init(&m_tSenseControlStepConditional, nullptr)) ||
81 (nErrors = pthread_cond_init(&m_tActConditional, nullptr)) ||
82 (nErrors = pthread_cond_init(&m_tPhysicsConditional, nullptr)) ||
83 (nErrors = pthread_cond_init(&m_tMediaConditional, nullptr)) ||
84 (nErrors = pthread_cond_init(&m_tEntityIterConditional, nullptr))) {
85 THROW_ARGOSEXCEPTION("Error creating thread conditionals " << ::strerror(nErrors));
86 }
87 /* Start threads */
88 StartThreads();
89 }
90
91 /****************************************/
92 /****************************************/
93
94 void CSpaceMultiThreadBalanceQuantity::StartThreads() {
95 int nErrors;
96 /* Create the threads to update the controllable entities */
97 m_ptUpdateThreads = new pthread_t[CSimulator::GetInstance().GetNumThreads()];
98 m_psUpdateThreadData = new SUpdateThreadData*[CSimulator::GetInstance().GetNumThreads()];
99 for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
100 /* Create the struct with the info to launch the thread */
101 m_psUpdateThreadData[i] = new SUpdateThreadData(i, this);
102 /* Create the thread */
103 if((nErrors = pthread_create(m_ptUpdateThreads + i,
104 nullptr,
106 reinterpret_cast<void*>(m_psUpdateThreadData[i])))) {
107 THROW_ARGOSEXCEPTION("Error creating thread: " << ::strerror(nErrors));
108 }
109 }
110 }
111
112 /****************************************/
113 /****************************************/
114
116 /* Destroy the threads to update the controllable entities */
117 int nErrors;
118 if(m_ptUpdateThreads != nullptr) {
119 for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
120 if((nErrors = pthread_cancel(m_ptUpdateThreads[i]))) {
121 THROW_ARGOSEXCEPTION("Error canceling controllable entities update threads " << ::strerror(nErrors));
122 }
123 }
124 auto** ppJoinResult = new void*[CSimulator::GetInstance().GetNumThreads()];
125 for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
126 if((nErrors = pthread_join(m_ptUpdateThreads[i], ppJoinResult + i))) {
127 THROW_ARGOSEXCEPTION("Error joining controllable entities update threads " << ::strerror(nErrors));
128 }
129 if(ppJoinResult[i] != PTHREAD_CANCELED) {
130 LOGERR << "[WARNING] Controllable entities update thread #" << i<< " not canceled" << std::endl;
131 }
132 }
133 delete[] ppJoinResult;
134 }
135 delete[] m_ptUpdateThreads;
136 /* Destroy the thread launch info */
137 if(m_psUpdateThreadData != nullptr) {
138 for(UInt32 i = 0; i < CSimulator::GetInstance().GetNumThreads(); ++i) {
139 delete m_psUpdateThreadData[i];
140 }
141 }
142 delete[] m_psUpdateThreadData;
143 pthread_mutex_destroy(&m_tSenseControlStepConditionalMutex);
144 pthread_mutex_destroy(&m_tActConditionalMutex);
145 pthread_mutex_destroy(&m_tPhysicsConditionalMutex);
146 pthread_mutex_destroy(&m_tMediaConditionalMutex);
147 pthread_mutex_destroy(&m_tEntityIterConditionalMutex);
148
149 pthread_cond_destroy(&m_tSenseControlStepConditional);
150 pthread_cond_destroy(&m_tActConditional);
151 pthread_cond_destroy(&m_tPhysicsConditional);
152 pthread_cond_destroy(&m_tMediaConditional);
153 pthread_cond_destroy(&m_tEntityIterConditional);
154
155 /* Destroy the base space */
157 }
158
159 /****************************************/
160 /****************************************/
161
163 m_bIsControllableEntityAssignmentRecalculationNeeded = true;
165 }
166
167 /****************************************/
168 /****************************************/
169
171 m_bIsControllableEntityAssignmentRecalculationNeeded = true;
173 }
174
175 /****************************************/
176 /****************************************/
177
178#define MAIN_SEND_GO_FOR_PHASE(PHASE) \
179 LOG.Flush(); \
180 LOGERR.Flush(); \
181 pthread_mutex_lock(&m_t ## PHASE ## ConditionalMutex); \
182 m_un ## PHASE ## PhaseDoneCounter = 0; \
183 pthread_cond_broadcast(&m_t ## PHASE ## Conditional); \
184 pthread_mutex_unlock(&m_t ## PHASE ## ConditionalMutex);
185
186#define MAIN_WAIT_FOR_PHASE_END(PHASE) \
187 pthread_mutex_lock(&m_t ## PHASE ## ConditionalMutex); \
188 while(m_un ## PHASE ## PhaseDoneCounter < CSimulator::GetInstance().GetNumThreads()) { \
189 pthread_cond_wait(&m_t ## PHASE ## Conditional, &m_t ## PHASE ## ConditionalMutex); \
190 } \
191 pthread_mutex_unlock(&m_t ## PHASE ## ConditionalMutex);
192
196 /* Avoid recalculation at the next time step */
197 m_bIsControllableEntityAssignmentRecalculationNeeded = false;
198 }
199
200 /****************************************/
201 /****************************************/
202
204 /* Update the physics engines */
205 MAIN_SEND_GO_FOR_PHASE(Physics);
207 /* Perform entity transfer from engine to engine, if needed */
208 for(size_t i = 0; i < m_ptPhysicsEngines->size(); ++i) {
209 if((*m_ptPhysicsEngines)[i]->IsEntityTransferNeeded()) {
210 (*m_ptPhysicsEngines)[i]->TransferEntities();
211 }
212 }
213 }
214
215 /****************************************/
216 /****************************************/
217
219 /* Update the media */
222 }
223
224 /****************************************/
225 /****************************************/
226
228 const TControllableEntityIterCBType &c_cb) {
230
231 /* Iterate over all robots in the swarm */
232 MAIN_SEND_GO_FOR_PHASE(EntityIter);
233 MAIN_WAIT_FOR_PHASE_END(EntityIter);
234 } /* IterateOverControllableEntities() */
235
236 /****************************************/
237 /****************************************/
238
239 void CSpaceMultiThreadBalanceQuantity::ControllableEntityIterationWaitAbort(void) {
241 } /* ControllableEntitiesIterationWaitAbort() */
242
243
244 /****************************************/
245 /****************************************/
246
248 MAIN_SEND_GO_FOR_PHASE(SenseControlStep);
249 MAIN_WAIT_FOR_PHASE_END(SenseControlStep);
250 /* Avoid recalculation at the next time step */
251 m_bIsControllableEntityAssignmentRecalculationNeeded = false;
252 }
253
254 /****************************************/
255 /****************************************/
256
257#define THREAD_WAIT_FOR_GO_SIGNAL(PHASE) \
258 pthread_mutex_lock(&m_t ## PHASE ## ConditionalMutex); \
259 while(m_un ## PHASE ## PhaseDoneCounter == CSimulator::GetInstance().GetNumThreads()) { \
260 pthread_cond_wait(&m_t ## PHASE ## Conditional, &m_t ## PHASE ## ConditionalMutex); \
261 } \
262 pthread_mutex_unlock(&m_t ## PHASE ## ConditionalMutex); \
263 pthread_testcancel();
264
265#define THREAD_SIGNAL_PHASE_DONE(PHASE) \
266 pthread_mutex_lock(&m_t ## PHASE ## ConditionalMutex); \
267 ++m_un ## PHASE ## PhaseDoneCounter; \
268 pthread_cond_broadcast(&m_t ## PHASE ## Conditional); \
269 pthread_mutex_unlock(&m_t ## PHASE ## ConditionalMutex); \
270 pthread_testcancel();
271
273 size_t un_tot_plugins) {
274 /* This is the minimum number of plugins assigned to a thread */
275 size_t unMinPortion = un_tot_plugins / CSimulator::GetInstance().GetNumThreads();
276 /* If the division has a remainder, the extra plugins must be assigned too */
277 size_t unExtraPortion = un_tot_plugins % CSimulator::GetInstance().GetNumThreads();
278 /* Calculate the range */
279 if(unMinPortion == 0) {
280 /* Not all threads get a plugin */
281 if(un_id < unExtraPortion) {
282 /* This thread does */
283 return CRange<size_t>(un_id, un_id+1);
284 }
285 else {
286 /* This thread does not */
287 return CRange<size_t>();
288 }
289 }
290 else {
291 /* For sure this thread will get unMinPortion plugins, does it get an extra too? */
292 if(un_id < unExtraPortion) {
293 /* Yes, it gets an extra */
294 return CRange<size_t>( un_id * (unMinPortion+1),
295 (un_id+1) * (unMinPortion+1));
296 }
297 else {
298 /* No, it doesn't get an extra */
299 return CRange<size_t>(unExtraPortion * (unMinPortion+1) + (un_id-unExtraPortion) * unMinPortion,
300 unExtraPortion * (unMinPortion+1) + (un_id-unExtraPortion+1) * unMinPortion);
301 }
302 }
303 }
304
305 void CSpaceMultiThreadBalanceQuantity::UpdateThread(UInt32 un_id) {
306 /* Copy the id */
307 UInt32 unId = un_id;
308 /* Create cancellation data */
309 SCleanupUpdateThreadData sCancelData;
310 sCancelData.SenseControlStepConditionalMutex = &m_tSenseControlStepConditionalMutex;
311 sCancelData.ActConditionalMutex = &m_tActConditionalMutex;
312 sCancelData.PhysicsConditionalMutex = &m_tPhysicsConditionalMutex;
313 sCancelData.MediaConditionalMutex = &m_tMediaConditionalMutex;
314 sCancelData.EntityIterConditionalMutex = &m_tEntityIterConditionalMutex;
315
316 pthread_cleanup_push(CleanupUpdateThread, &sCancelData);
317
318 /* Id range for the physics engines assigned to this thread */
319 CRange<size_t> cPhysicsRange = CalculatePluginRangeForThread(unId,
320 m_ptPhysicsEngines->size());
321 /* Id range for the media assigned to this thread */
322 CRange<size_t> cMediaRange = CalculatePluginRangeForThread(unId,
323 m_ptMedia->size());
324
325 /*
326 * Id range for the entities to update assigned to this thread. Can change
327 * as simulation progressesso periodic re-calculation may be necessary
328 * before *ANY* phase which iterates over the entities, as the
329 * CLoopFunctions::PreStep()/CLoopFunctions::PostStep() may have
330 * added/removed entities.
331 */
332 CRange<size_t> cEntityRange;
333 while (1) {
334 /* Actuate entities assigned to this thread */
335 UpdateThreadEntityAct(un_id, cEntityRange);
336
337 /* Update physics engines assigned to this thread */
338 UpdateThreadPhysics(cPhysicsRange);
339
340 /* Update media assigned to this thread */
341 UpdateThreadMedia(cMediaRange);
342
343 /* loop functions PreStep() iteration (maybe) */
344 UpdateThreadIterateOverEntities(un_id, cEntityRange);
345
346 /* Update sensor readings/execute control step for entities */
347 UpdateThreadEntitySenseControl(un_id, cEntityRange);
348
349 /* loop functions PostStep() iteration (maybe) */
350 UpdateThreadIterateOverEntities(un_id, cEntityRange);
351 } /* while(1) */
352
353 pthread_cleanup_pop(1);
354 } /* UpdateThread */
355
356 /****************************************/
357 /****************************************/
358
359 void CSpaceMultiThreadBalanceQuantity::UpdateThreadEntityAct(UInt32 un_id,
360 CRange<size_t>& c_range) {
362 /* Calculate the portion of entities to update, if needed */
363 if (m_bIsControllableEntityAssignmentRecalculationNeeded) {
364 c_range = CalculatePluginRangeForThread(un_id,
366 }
367 /* Cope with the fact that there may be less entities than threads */
368 if (c_range.GetSpan() > 0) {
369 /* This thread has entities */
370 /* Actuate control choices */
371 for(size_t i = c_range.GetMin(); i < c_range.GetMax(); ++i) {
372 if(m_vecControllableEntities[i]->IsEnabled())
374 }
375 pthread_testcancel();
377 }
378 else {
379 /* This thread has no entities -> dummy computation */
381 }
382 } /* UpdateThreadEntityAct() */
383
384 /****************************************/
385 /****************************************/
386
387 void CSpaceMultiThreadBalanceQuantity::UpdateThreadPhysics(
388 const CRange<size_t>& c_range) {
389 /* Update physics engines, if this thread has been assigned to them */
391 if (c_range.GetSpan() > 0) {
392 /* This thread has engines, update them */
393 for (size_t i = c_range.GetMin(); i < c_range.GetMax(); ++i) {
394 (*m_ptPhysicsEngines)[i]->Update();
395 }
396 pthread_testcancel();
398 }
399 else {
400 /* This thread has no engines -> dummy computation */
402 }
403 } /* UpdateThreadPhysics() */
404
405 /****************************************/
406 /****************************************/
407
408 void CSpaceMultiThreadBalanceQuantity::UpdateThreadMedia(
409 const CRange<size_t>& c_range) {
410 /* Update media, if this thread has been assigned to them */
412 if(c_range.GetSpan() > 0) {
413 /* This thread has media, update them */
414 for(size_t i = c_range.GetMin(); i < c_range.GetMax(); ++i) {
415 (*m_ptMedia)[i]->Update();
416 }
417 pthread_testcancel();
419 }
420 else {
421 /* This thread has no media -> dummy computation */
423 }
424 } /* UpdateThreadMedia() */
425
426 /****************************************/
427 /****************************************/
428
429 void CSpaceMultiThreadBalanceQuantity::UpdateThreadIterateOverEntities(UInt32 un_id,
430 CRange<size_t>& c_range) {
431 THREAD_WAIT_FOR_GO_SIGNAL(EntityIter);
432 /* Calculate the portion of entities to update, if needed */
433 if (m_bIsControllableEntityAssignmentRecalculationNeeded) {
434 c_range = CalculatePluginRangeForThread(un_id,
436 }
437 /* Cope with the fact that there may be less entities than threads */
438 if (c_range.GetSpan() > 0 && ControllableEntityIterationEnabled()) {
439 /* This thread has entities */
440 for (size_t i = c_range.GetMin(); i < c_range.GetMax(); ++i) {
442 } /* for(i...) */
443 pthread_testcancel();
444 THREAD_SIGNAL_PHASE_DONE(EntityIter);
445 }
446 else {
447 THREAD_SIGNAL_PHASE_DONE(EntityIter);
448 }
449 } /* UpdateThreadIterateOverEntities() */
450
451 /****************************************/
452 /****************************************/
453
454 void CSpaceMultiThreadBalanceQuantity::UpdateThreadEntitySenseControl(UInt32 un_id,
455 CRange<size_t>& c_range) {
456 /* Update sensor readings and call controllers */
457 THREAD_WAIT_FOR_GO_SIGNAL(SenseControlStep);
458
459 /* Calculate the portion of entities to update, if needed */
460 if (m_bIsControllableEntityAssignmentRecalculationNeeded) {
461 c_range = CalculatePluginRangeForThread(un_id,
463 }
464 /* Cope with the fact that there may be less entities than threads */
465 if (c_range.GetSpan() > 0) {
466 /* This thread has entities */
467 for (size_t i = c_range.GetMin(); i < c_range.GetMax(); ++i) {
468 if (m_vecControllableEntities[i]->IsEnabled()) {
469 m_vecControllableEntities[i]->Sense();
470 m_vecControllableEntities[i]->ControlStep();
471 }
472 }
473 pthread_testcancel();
474 THREAD_SIGNAL_PHASE_DONE(SenseControlStep);
475 }
476 else {
477 /* This thread has no entities -> dummy computation */
478 THREAD_SIGNAL_PHASE_DONE(SenseControlStep);
479 }
480 } /* UpdateThreadEntitySenseControl() */
481
482 /****************************************/
483 /****************************************/
484}
unsigned int UInt32
32-bit unsigned integer.
Definition datatypes.h:97
#define THROW_ARGOSEXCEPTION(message)
This macro throws an ARGoS exception with the passed message.
#define THREAD_WAIT_FOR_GO_SIGNAL(PHASE)
#define MAIN_SEND_GO_FOR_PHASE(PHASE)
#define MAIN_WAIT_FOR_PHASE_END(PHASE)
#define THREAD_SIGNAL_PHASE_DONE(PHASE)
The namespace containing all the ARGoS related code.
Definition ci_actuator.h:12
CARGoSLog LOGERR(std::cerr, SLogColor(ARGOS_LOG_ATTRIBUTE_BRIGHT, ARGOS_LOG_COLOR_RED))
Definition argos_log.h:180
ticpp::Element TConfigurationNode
The ARGoS configuration XML node.
CARGoSLog LOG(std::cout, SLogColor(ARGOS_LOG_ATTRIBUTE_BRIGHT, ARGOS_LOG_COLOR_GREEN))
Definition argos_log.h:179
void * LaunchUpdateThreadBalanceQuantity(void *p_data)
CRange< size_t > CalculatePluginRangeForThread(size_t un_id, size_t un_tot_plugins)
An entity that contains a pointer to the user-defined controller.
The core class of ARGOS.
Definition simulator.h:62
CProfiler & GetProfiler()
Returns a reference to the profiler.
Definition simulator.h:174
static CSimulator & GetInstance()
Returns the instance to the CSimulator class.
Definition simulator.cpp:78
UInt32 GetNumThreads() const
Returns the number of threads used during the experiment.
Definition simulator.h:260
bool IsProfiling() const
Returns true if ARGoS is being profiled.
Definition simulator.h:182
CControllableEntity::TVector m_vecControllableEntities
A vector of controllable entities.
Definition space.h:491
virtual void Init(TConfigurationNode &t_tree)
Initializes the space using the <arena> section of the XML configuration file.
Definition space.cpp:37
virtual void Destroy()
Destroys the space and all its entities.
Definition space.cpp:85
bool ControllableEntityIterationEnabled() const
Definition space.h:449
CPhysicsEngine::TVector * m_ptPhysicsEngines
A pointer to the list of physics engines.
Definition space.h:497
std::function< void(CControllableEntity *)> TControllableEntityIterCBType
The callback type for iteration over controllable entities within the PreStep() and/or PostStep() par...
Definition space.h:90
TControllableEntityIterCBType m_cbControllableEntityIter
Callback for iterating over entities from within the loop functions.
Definition space.h:503
CMedium::TVector * m_ptMedia
A pointer to the list of media.
Definition space.h:500
virtual void AddControllableEntity(CControllableEntity &c_entity)
Definition space.cpp:167
virtual void RemoveControllableEntity(CControllableEntity &c_entity)
Definition space.cpp:174
virtual void AddControllableEntity(CControllableEntity &c_entity)
virtual void Destroy()
Destroys the space and all its entities.
virtual void Init(TConfigurationNode &t_tree)
Initializes the space using the <arena> section of the XML configuration file.
virtual void IterateOverControllableEntities(const TControllableEntityIterCBType &c_cb)
Given a callback specified in the loop functions, iterate over all controllable entities currently pr...
virtual void RemoveControllableEntity(CControllableEntity &c_entity)
void CollectThreadResourceUsage()
Definition profiler.cpp:172