GemaCoreLib
The GeMA Core library
gmAppendBuffer.h
Go to the documentation of this file.
1 /************************************************************************
2 **
3 ** Copyright (C) 2014 by Carlos Augusto Teixera Mendes
4 ** All rights reserved.
5 **
6 ** This file is part of the "GeMA" software. It's use should respect
7 ** the terms in the license agreement that can be found together
8 ** with this source code.
9 ** It is provided AS IS, with NO WARRANTY OF ANY KIND,
10 ** INCLUDING THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR
11 ** A PARTICULAR PURPOSE.
12 **
13 ************************************************************************/
14 
24 #ifndef _GEMA_APPEND_BUFFER_H_
25 #define _GEMA_APPEND_BUFFER_H_
26 
27 #include "gmTrace.h"
28 #include <assert.h>
29 
30 #include <QAtomicInteger>
31 #include <QVarLengthArray>
32 
33 #include "gmSpinLock.h"
34 #include "gmThreadManager.h"
35 #include "gmThreadLocalStorage.h"
36 
37 // Sanity checks for the current implementation
38 #ifndef Q_ATOMIC_INT64_IS_SUPPORTED
39 #error No 64 bit support for atomic operations
40 #endif
41 
42 // Assert commented due to QTBUG-82864
43 //#ifndef Q_ATOMIC_INT64_FETCH_AND_ADD_IS_ALWAYS_NATIVE
44 //#error No 64 bit support for Fetch and Add operations
45 //#endif
46 
47 static_assert(sizeof(size_t) == 8, "Unexpected size_t size");
48 
49 
65 template <class T> class GmAppendBuffer
66 {
67 public:
69  virtual ~GmAppendBuffer() {}
70 
75  virtual void clear() = 0;
76 
85  virtual void reserve(size_t size) = 0;
86 
88  virtual void append(const T& val) = 0;
89 
91  virtual void appendFromThread(int tid, const T& val) = 0;
92 
94  virtual size_t size() const = 0;
95 
109  virtual T* data() = 0;
110 
112  virtual size_t usedMemory() const = 0;
113 };
114 
115 
152 template <class T, class Base = GmAppendBuffer<T> > class GmPerThreadAppendBuffer : public Base
153 {
154 public:
155 
178  GmPerThreadAppendBuffer(size_t initSize, double resizeFactor = 2.0, int numThreads = -1)
179  {
180  S_TRACE();
181  Q_UNUSED(resizeFactor);
182 
184  assert(initSize >= 0);
185  assert(numThreads >= -1 && numThreads <= GmThreadManager::maxWorkerThreads());
186 
187  // numThreads set to 0 is used to inform us that only the global thread will be used
188  _globalOnly = (numThreads == 0);
189  _nt = GmOmpAdjustNumThreads(numThreads);
190 
191  _dataBuffer = NULL;
192  _dataSize = 0;
193 
194  if(initSize)
195  reserve(initSize);
196  }
197 
200  {
201  S_TRACE();
203 
204  if(_dataBuffer != _data.localData(0).data()) // Can't delete _dataBuffer if it is pointing to a _data internal buffer.
205  delete[] _dataBuffer; // It will be deleted by _data. This only happens if no threads where used.
206  }
207 
208  // See comments on the base class
209  virtual void clear()
210  {
211  S_TRACE();
213 
214  if(_dataBuffer != _data.localData(0).data()) // Can't delete _dataBuffer if it is pointing to a _data internal buffer
215  delete[] _dataBuffer; // It will be cleared by _data. This only happens if no threads where used.
216  _dataBuffer = NULL;
217  _dataSize = 0;
218 
219  for(int i = 0, nb = _data.size(); i < nb; i++)
220  {
221  _data.localData(i).clear();
222  _data.localData(i).squeeze();
223  }
224  }
225 
232  virtual void reserve(size_t bsize)
233  {
234  S_TRACE();
236  assert(!_dataBuffer);
237  assert(size() == 0);
238 
239  if(_globalOnly)
240  _data.localData(0).reserve((int)bsize); // TODO : Use a buffer with a size_t maximum capacity
241  else
242  {
243  // Although it is possible that both the main thread buffer and the
244  // worker thread buffers are used, we are going to reserve space for the
245  // workers only. The mixed option should happend only when a physics
246  // supports threading and another doesn't.
247  assert(_nt > 0 && _nt <= GmThreadManager::maxWorkerThreads());
248 
249  size_t s = (bsize +_nt-1)/_nt; // bsize / _nt rounded up
250 
251  for(int i = 1; i < _nt; i++)
252  _data.localData(i).reserve((int)s);
253  }
254  }
255 
256  // See comments on the base class
257  virtual void append(const T& val)
258  {
259  S_TRACE();
260  assert(!_dataBuffer);
261 
262  _data.localData().append(val);
263  }
264 
265  // See comments on the base class
266  virtual void appendFromThread(int tid, const T& val)
267  {
268  S_TRACE();
269  assert(!_dataBuffer);
270  assert(!_globalOnly || (_globalOnly && tid == 0));
271 
272  _data.localData(tid).append(val);
273  }
274 
275  // See comments on the base class
276  virtual size_t size() const
277  {
278  S_TRACE();
280 
281  // If data() was called, returns the saved size from the previous call
282  if(_dataBuffer)
283  return _dataSize;
284 
285  size_t s = 0;
286  for(int i = 0, nb = _data.size(); i < nb; i++)
287  s += _data.localData(i).size();
288  return s;
289  }
290 
291  // See comments on the base class
292  virtual T* data()
293  {
294  S_TRACE();
296 
297  // If the user has already called data(), returns the same buffer as the previous call
298  if(_dataBuffer)
299  return _dataBuffer;
300 
301  size_t s = size();
302 
303  if(s == _data.localData(0).size()) // Only the local buffer was filled
304  {
305  _dataBuffer = _data.localData(0).data();
306  _dataSize = s;
307  }
308  else
309  {
310  _dataBuffer = new(std::nothrow) T[s];
311  if(_dataBuffer)
312  {
313  size_t o = 0;
314  for(int i = 0, nb = _data.size(); i < nb; i++) // Traverses the main thread and the worker thread buffers
315  {
316  // Copy buffer data
317  QVarLengthArray<T>& buffer = _data.localData(i);
318 
319  size_t n = buffer.size();
320  GmPmemcpy(_dataBuffer + o, buffer.data(), n * sizeof(T), _nt);
321  o += n;
322 
323  // Since we have allocated a new full buffer, lets release the per thread buffers
324  buffer.clear();
325  buffer.squeeze();
326  }
327  assert(o == s);
328  _dataSize = s;
329  }
330  }
331 
332  return _dataBuffer;
333  }
334 
335  // See comments on the base class
336  virtual size_t usedMemory() const
337  {
338  S_TRACE();
340 
341  if(_dataBuffer) // After calling data()
342  {
343  if(_dataBuffer == _data.localData(0).data())
344  return _data.localData(0).capacity() * sizeof(T);
345  else
346  return _dataSize * sizeof(T);
347  }
348 
349  size_t s = 0;
350  for(int i = 0, nb = _data.size(); i < nb; i++)
351  s += _data.localData(i).capacity(); // capacity is the true used memory, and not size...
352  return s * sizeof(T);
353  }
354 
355 protected:
357  bool _globalOnly;
358  int _nt;
360  size_t _dataSize;
361 };
362 
363 
402 template <class T, class Base = GmAppendBuffer<T>> class GmSingleAppendBuffer : public Base
403 {
404 public:
415  GmSingleAppendBuffer(size_t initSize, double resizeFactor = 2.0, int numThreads = -1)
416  : _controll(NULL)
417  {
418  S_TRACE();
420  assert(resizeFactor > 1.0);
421  assert(initSize >= 0);
422  assert(numThreads >= -1 && numThreads <= GmThreadManager::maxWorkerThreads());
423 
424  _nt = GmOmpAdjustNumThreads(numThreads);
425 
426  _resizeFactor = resizeFactor;
427  _nextIndex = 0;
428  _head = NULL;
429  _dataBuffer = NULL;
430 
431  if(initSize)
432  reserve(initSize);
433  }
434 
437  {
438  S_TRACE();
439  clear();
440  }
441 
442  // See comments on the base class
443  virtual void clear()
444  {
445  S_TRACE();
447 
448  clearList();
449  delete[] _dataBuffer;
450  _dataBuffer = NULL;
451 
452  _nextIndex = 0;
453  }
454 
456  virtual void reserve(size_t bsize)
457  {
458  S_TRACE();
460  assert(!_head);
461  assert(!_dataBuffer);
462 
463  _head = new ControllData(bsize, NULL);
464  _controll.store(_head);
465  }
466 
467  // See comments on the base class
468  virtual void append(const T& val)
469  {
470  S_TRACE();
471  assert(_head);
472  assert(!_dataBuffer);
473 
474  // Get our index in the buffer. It might be in an unallocated position!
475  // In that case, no matter who allocates the next buffer, our global
476  // position is fixed in pos.
477  size_t pos = _nextIndex.fetchAndAddAcquire(1);
478 
479  while(1)
480  {
481  // Get the pointer for the current control structure
482  ControllData* p = _controll.loadAcquire();
483  assert(p);
484 
485  // Does 'pos' belongs to the buffer B pointed by p?
486  // There are 3 possible options:
487  // a) pos belongs to B: we can just write it
488  // b) pos references a position that is further away than the last position
489  // in B. We must allocate a new buffer or retry with a buffer allocated
490  // by another thread.
491  // c) pos belongs to a buffer that came before B. This can only happend
492  // in a very unlikelly case where the current buffer fills but before
493  // the current thread allocates a new buffer, another thread does the
494  // allocation, the new buffer is also filled and yet another buffer is
495  // allocated. In this very unlikely case, we just traverse the control
496  // list until we find the correct buffer. Notice that this is safe
497  // since the previous pointer is written when the controll data is
498  // created and never again changed.
499  qint64 index = pos - p->_offset; // MUST be SIGNED!
500 
501  if(index < (qint64)p->_size) // Cases a) or c)
502  {
503  // If we are in case c), lets traverse the controll blocks to find the correct buffer
504  while(index < 0) { assert(p->_prev); p = p->_prev; index += p->_size; }
505 
506  p->_data[index] = val;
507  return;
508  }
509 
510  // No luck, we are in case b). We need to allocate a new buffer, if it was
511  // not already allocated by another thread. To know if we are the one to
512  // create the new buffer, we will try to acquire a spin lock and recheck if p is
513  // still the current buffer. If it isn't (or we couldn't get the lock), we will
514  // restart our append, keeping the same target position, but with a new p.
515  // If it is, we will create a new Controll block with a new buffer and publish
516  // it before releasing the lock.
517  if(!_controllLock.tryLock())
518  continue;
519 
520  if(_controll.load() != p)
521  {
523  continue;
524  }
525 
526  size_t cursize = p->_offset + p->_size;
527  size_t newsize = qMax((size_t)(cursize * _resizeFactor), pos + 1); // qMax makes sure that after growing we can at least fit our position
528 
529  try
530  {
531  ControllData* newc = new ControllData(newsize - cursize, p);
532  p->_next = newc;
533  assert(pos >= newc->_offset);
534  newc->_data[pos - newc->_offset] = val;
535 
536  _controll.storeRelease(newc);
538  }
539  catch(...) // If allocating the new ControllData raises an error (out of memory), we MUST release the lock!
540  {
541  // printf("Erro alocando buffer: %zu %zu %zu %zu %f %zu\n", p->_offset, p->_size, cursize, pos, _resizeFactor, newsize);
543  throw;
544  }
545  return;
546  }
547  }
548 
549  // See comments on the base class
550  virtual void appendFromThread(int tid, const T& val)
551  {
552  S_TRACE();
553  Q_UNUSED(tid);
554  append(val);
555  }
556 
557  // See comments on the base class
558  virtual size_t size() const { assert(GmThreadManager::inMainThread()); return _nextIndex; }
559 
560  // See comments on the base class
561  virtual T* data()
562  {
563  S_TRACE();
565 
566  // If the user has already called data(), returns the same buffer as the previous call
567  if(_dataBuffer)
568  {
569  assert(!_head);
570  return _dataBuffer;
571  }
572 
573  // First call to data
574  assert(_head);
575 
576  // If the buffer stores a single vector, we can simply take and return that vector
577  if(!_head->_next)
578  {
579  assert(_nextIndex <= _head->_size && _head->_offset == 0 && !_head->_prev);
580 
582  _head->_data = NULL; // Prevent _data from being deleted by the ControllData destructor
583  }
584  else
585  {
586  // No such luck. We need to allocate a full vector and fill its contents copying
587  // data from each buffer vector
588  _dataBuffer = new(std::nothrow) T[_nextIndex];
589  if(_dataBuffer)
590  {
591 #ifndef NDEBUG
592  size_t o = 0;
593 #endif
594  ControllData* p = _head;
595  while(p)
596  {
597  size_t n = p->_next ? p->_size : _nextIndex - p->_offset;
598 
599 #ifndef NDEBUG
600  assert(!p->_prev || p->_prev->_next == p);
601  assert(!p->_next || p->_next->_prev == p);
602  assert(p->_offset == o);
603  o += n;
604 #endif
605  GmPmemcpy(_dataBuffer + p->_offset, p->_data, n * sizeof(T), _nt);
606  p = p->_next;
607  }
608 #ifndef NDEBUG
609  assert(o == _nextIndex);
610 #endif
611  }
612  }
613 
614  // Release memory stored in the buffers
615  if(_dataBuffer)
616  clearList();
617 
618  return _dataBuffer;
619  }
620 
621  // See comments on the base class
622  virtual size_t usedMemory() const
623  {
624  S_TRACE();
626 
627  if(_dataBuffer) // After call to data()
628  return _nextIndex * sizeof(T);
629 
630  ControllData* p = _controll.load(); // In the main thread, all tasks have finished so controll MUST point to the last buffer
631  return (p->_offset + p->_size) * sizeof(T);
632  }
633 
634 protected:
635  // Clears the buffer list, releasing buffer memory
636  void clearList()
637  {
638  ControllData* p;
639  while((p = _head) != NULL)
640  {
641  _head = _head->_next;
642  delete p;
643  }
644  _controll.store(NULL);
645  }
646 
649  {
653  ControllData(size_t size, ControllData* prev)
654  {
655  S_TRACE();
656  _data = new T[size];
657  _size = size;
658  _offset = prev ? (prev->_offset + prev->_size) : 0;
659  _prev = prev;
660  _next = NULL;
661  }
662 
664  ~ControllData() { S_TRACE(); delete[] _data; }
665 
666  T* _data;
667  size_t _size;
668  size_t _offset;
669 
672  };
673 
674  ControllData* _head;
675  double _resizeFactor;
680  int _nt;
681 };
682 
683 #endif
T * _data
This data buffer.
Definition: gmAppendBuffer.h:666
T * _dataBuffer
The single buffer after a call to data()
Definition: gmAppendBuffer.h:359
A simple spin lock implementation based on a loop using test and set over an atomic int to change its...
Definition: gmSpinLock.h:36
T fetchAndAddAcquire(T valueToAdd)
void * GmPmemcpy(void *dst, const void *src, size_t n, int nt=0, size_t min=10 *1024 *1024)
Parallel (thread enabled) version of memcpy using OpenMP.
Definition: gmMemory.h:167
bool _globalOnly
Flag set to true if the user passed zero as the number of threads for the constructor.
Definition: gmAppendBuffer.h:357
An implementation of the GmAppendBuffer interface based on a "per thread" growing buffer.
Definition: gmAppendBuffer.h:152
virtual void append(const T &val)=0
Appends val to the buffer in a thread safe way. Can grow the buffer if needed.
Aux controll structure storing a buffer.
Definition: gmAppendBuffer.h:648
Declaration of the GmTLS class.
virtual void appendFromThread(int tid, const T &val)=0
Appends val to the buffer in a thread safe way, using the given tid to access TLS storage....
ControllData * _prev
The previous buffer.
Definition: gmAppendBuffer.h:670
GmPerThreadAppendBuffer(size_t initSize, double resizeFactor=2.0, int numThreads=-1)
Buffer constructor. Can optionally pre allocate the buffer with initSize entries. If initSize is zero...
Definition: gmAppendBuffer.h:178
#define S_TRACE()
Macro for run time stack tracking at release build.
Definition: gmTrace.h:44
An implementation of the GmAppendBuffer interface based on synchoronized access to a shared buffer.
Definition: gmAppendBuffer.h:402
virtual T * data()=0
Returns a vector filled with the buffer data. After this call, NO calls to append() can be made witho...
A class that works together with GmThreadManager to provide thread local storage.
Definition: gmThreadLocalStorage.h:131
Declaration of the GmSpinLock class.
double _resizeFactor
The resize factor.
Definition: gmAppendBuffer.h:675
A virtual class representing a buffer of T objects that can be appended in a thread-safe way,...
Definition: gmArmadilloSolverMatrix.h:36
virtual void reserve(size_t bsize)
See comments on the base class. Should not be called if the size was given in the constructor.
Definition: gmAppendBuffer.h:456
bool tryLock()
Try to lock. Returns true if the lock was aquired, false otherwise.
Definition: gmSpinLock.h:46
Declaration of the GmThreadManager class.
virtual void reserve(size_t bsize)
Pre allocates buffer sizes. See basic description on the base class Should not be called if the size ...
Definition: gmAppendBuffer.h:232
QAtomicInteger< size_t > _nextIndex
The next free index in the global vector.
Definition: gmAppendBuffer.h:676
~ControllData()
Destructor.
Definition: gmAppendBuffer.h:664
size_t _size
The size of this data buffer.
Definition: gmAppendBuffer.h:667
T & localData(int tid)
Returns the given thread local data as a modifiable reference.
Definition: gmThreadLocalStorage.h:163
T * _dataBuffer
The single buffer after a call to data()
Definition: gmAppendBuffer.h:679
int GmOmpAdjustNumThreads(int nt)
Adjusts the given number of threads. If nt <= 0 or if nt > maximum number of omp threads,...
Definition: gmOmp.h:51
int _nt
Number of threads considered for buffer pre-allocation & parallel memcopy.
Definition: gmAppendBuffer.h:358
GmSingleAppendBuffer(size_t initSize, double resizeFactor=2.0, int numThreads=-1)
Buffer constructor. Can optionally pre allocate the buffer with initSize entries. If initSize is zero...
Definition: gmAppendBuffer.h:415
virtual void reserve(size_t size)=0
Informs the buffer of the expected number of entries that will be filled by (concurrent) calls to app...
virtual void clear()=0
Releases the memory used by the buffer, returning it to a recently constructed state....
virtual size_t usedMemory() const =0
Returns an estimative of the total memory used by the buffer in bytes.
virtual size_t size() const =0
Returns the number of entries in the buffer. Must be called from the main thread only.
size_t _dataSize
The size in _dataBuffer when _dataBuffer is not NULL.
Definition: gmAppendBuffer.h:360
ControllData * _next
The next buffer.
Definition: gmAppendBuffer.h:671
GmTLS< QVarLengthArray< T >, false > _data
Per thread buffers.
Definition: gmAppendBuffer.h:356
static bool inMainThread()
Is the current thread the main thread? Equivalent to comparing the currentId() with 0.
Definition: gmThreadManager.h:169
int size() const
Returns the number of values stored in the TLS object. Equal to the max number of threads + 1 (for th...
Definition: gmThreadLocalStorage.h:160
int _nt
Number of threads considered for parallel memcopy.
Definition: gmAppendBuffer.h:680
void unlock()
Releases the lock.
Definition: gmSpinLock.h:49
~GmPerThreadAppendBuffer()
Destructor.
Definition: gmAppendBuffer.h:199
~GmSingleAppendBuffer()
Destructor. Releases the allocated memory.
Definition: gmAppendBuffer.h:436
GmSpinLock _controllLock
The lock controlling changes to _controll.
Definition: gmAppendBuffer.h:678
int size() const const
virtual ~GmAppendBuffer()
Virtual destructor.
Definition: gmAppendBuffer.h:69
size_t _offset
The offset of the first entry in data in the global buffer reference.
Definition: gmAppendBuffer.h:668
QAtomicPointer< ControllData > _controll
The controll block pointing to the current buffer.
Definition: gmAppendBuffer.h:677
Auxiliary configuration file used to enable or disable compiling the GeMA tools with support for usin...
static int maxWorkerThreads()
Returns the maximum number of allowed working threads.
Definition: gmThreadManager.h:153
ControllData * _head
Pointer to the first allocated buffer.
Definition: gmAppendBuffer.h:674
ControllData(size_t size, ControllData *prev)
Constructor: Initializes the buffer with the given size. Allocation errors should be cought by the ca...
Definition: gmAppendBuffer.h:653