CppELib 1.7.0
Loading...
Searching...
No Matches
MessageQueue.h
Go to the documentation of this file.
1#ifndef OS_WRAPPER_MESSAGE_QUEUE_H_INCLUDED
2#define OS_WRAPPER_MESSAGE_QUEUE_H_INCLUDED
3
4#include <cstddef>
5#include <new>
6#include "Timeout.h"
7#include "OSWrapperError.h"
8#include "Mutex.h"
9#include "EventFlag.h"
10#include "FixedMemoryPool.h"
11#include "Assertion/Assertion.h"
12
13namespace OSWrapper {
14
21template <typename T>
23public:
29 static MessageQueue* create(std::size_t maxSize)
30 {
31 const std::size_t alignedMQSize =
32 (sizeof(MessageQueue) + (sizeof(double) - 1U)) & ~(sizeof(double) - 1U);
33 const std::size_t rbBufSize = maxSize + 1U;
34 const std::size_t poolBufSize = alignedMQSize + (sizeof(T) * rbBufSize);
35
36 FixedMemoryPool* pool = FixedMemoryPool::create(poolBufSize,
38 if (pool == 0) {
39 return 0;
40 }
41
42 void* p = pool->allocate();
43 if (p == 0) {
45 return 0;
46 }
47
48 MessageQueue* m = new(p) MessageQueue(pool,
49 reinterpret_cast<T*>(static_cast<unsigned char*>(p) + alignedMQSize), rbBufSize);
50 if (!m->createOSObjects()) {
51 destroy(m);
52 return 0;
53 }
54 return m;
55 }
56
63 static void destroy(MessageQueue* m)
64 {
65 if (m == 0) {
66 return;
67 }
68 FixedMemoryPool* pool = m->m_pool;
69 m->~MessageQueue();
70 pool->deallocate(m);
72 }
73
86 Error send(const T& msg)
87 {
88 return timedSend(msg, Timeout::FOREVER);
89 }
90
103 Error trySend(const T& msg)
104 {
105 return timedSend(msg, Timeout::POLLING);
106 }
107
123 Error timedSend(const T& msg, Timeout tmout)
124 {
125 Error err = m_mtxSend->timedLock(tmout);
126 if (err != OK) {
127 return err;
128 }
129 LockGuard lock(m_mtxSend, LockGuard::ADOPT_LOCK);
130
131 if (isFull()) {
132 err = m_event->timedWait(EV_NOT_FULL, EventFlag::OR, 0, tmout);
133 if (err != OK) {
134 return err;
135 }
136 }
137
138 err = OK;
139#ifndef CPPELIB_NO_EXCEPTIONS
140 try {
141#endif
142 push(msg);
143#ifndef CPPELIB_NO_EXCEPTIONS
144 }
145 catch (const Assertion::Failure&) {
146 throw;
147 }
148 catch (...) {
149 err = OtherError;
150 }
151#endif
152 return err;
153 }
154
168 {
169 return timedReceive(msg, Timeout::FOREVER);
170 }
171
185 {
186 return timedReceive(msg, Timeout::POLLING);
187 }
188
205 {
206 Error err = m_mtxRecv->timedLock(tmout);
207 if (err != OK) {
208 return err;
209 }
210 LockGuard lock(m_mtxRecv, LockGuard::ADOPT_LOCK);
211
212 if (isEmpty()) {
213 err = m_event->timedWait(EV_NOT_EMPTY, EventFlag::OR, 0, tmout);
214 if (err != OK) {
215 return err;
216 }
217 }
218
219 err = OK;
220#ifndef CPPELIB_NO_EXCEPTIONS
221 try {
222#endif
223 pop(msg);
224#ifndef CPPELIB_NO_EXCEPTIONS
225 }
226 catch (const Assertion::Failure&) {
227 throw;
228 }
229 catch (...) {
230 err = OtherError;
231 }
232#endif
233 return err;
234 }
235
240 std::size_t getSize() const
241 {
242 LockGuard lock(m_mtxRB);
243 return m_rb.getSize();
244 }
245
250 std::size_t getMaxSize() const
251 {
252 LockGuard lock(m_mtxRB);
253 return m_rb.getMaxSize();
254 }
255
256private:
257 class RingBuf {
258 private:
259 std::size_t m_begin;
260 std::size_t m_end;
261 std::size_t m_bufSize;
262 T* m_buf;
263
264 std::size_t next_idx(std::size_t idx) const
265 {
266 if (idx + 1U < m_bufSize) {
267 return idx + 1U;
268 }
269 return idx + 1U - m_bufSize;
270 }
271
272 void construct(T* p, const T& val) { new(p) T(val); }
273 void destroy(T* p) { p->~T(); }
274
275 RingBuf(const RingBuf&);
276 RingBuf& operator=(const RingBuf&);
277 public:
278 RingBuf(T* buf, std::size_t bufSize) : m_begin(0U), m_end(0U), m_bufSize(bufSize), m_buf(buf) {}
279
280 ~RingBuf() {}
281
282 std::size_t getSize() const
283 {
284 if (m_begin <= m_end) {
285 return m_end - m_begin;
286 }
287 return m_bufSize - m_begin + m_end;
288 }
289
290 std::size_t getMaxSize() const
291 {
292 return m_bufSize - 1U;
293 }
294
295 bool isEmpty() const
296 {
297 return m_begin == m_end;
298 }
299
300 bool isFull() const
301 {
302 return getSize() == getMaxSize();
303 }
304
305 void push(const T& data)
306 {
307 construct(&m_buf[m_end], data);
308 m_end = next_idx(m_end);
309 }
310
311 void pop(T* data)
312 {
313 if (data != 0) {
314 *data = m_buf[m_begin];
315 }
316 destroy(&m_buf[m_begin]);
317 m_begin = next_idx(m_begin);
318 }
319
320 };
321 RingBuf m_rb;
322
323 FixedMemoryPool* m_pool;
324 Mutex* m_mtxRB;
325 Mutex* m_mtxSend;
326 Mutex* m_mtxRecv;
327 EventFlag* m_event;
328
329 static const EventFlag::Pattern EV_NOT_EMPTY;
330 static const EventFlag::Pattern EV_NOT_FULL;
331
332 MessageQueue(FixedMemoryPool* pool, T* rbBuffer, std::size_t rbBufSize)
333 : m_rb(rbBuffer, rbBufSize), m_pool(pool), m_mtxRB(0), m_mtxSend(0), m_mtxRecv(0), m_event(0)
334 {
335 }
336
337 ~MessageQueue()
338 {
339 EventFlag::destroy(m_event);
340 Mutex::destroy(m_mtxRecv);
341 Mutex::destroy(m_mtxSend);
342 Mutex::destroy(m_mtxRB);
343 }
344
345 bool createOSObjects()
346 {
347 m_mtxRB = Mutex::create();
348 if (m_mtxRB == 0) {
349 return false;
350 }
351 m_mtxSend = Mutex::create();
352 if (m_mtxSend == 0) {
353 return false;
354 }
355 m_mtxRecv = Mutex::create();
356 if (m_mtxRecv == 0) {
357 return false;
358 }
359 m_event = EventFlag::create(false);
360 if (m_event == 0) {
361 return false;
362 }
363 return true;
364 }
365
366 bool isEmpty() const
367 {
368 LockGuard lock(m_mtxRB);
369 const bool is_empty = m_rb.isEmpty();
370 if (is_empty) {
371 m_event->reset(EV_NOT_EMPTY);
372 }
373 return is_empty;
374 }
375
376 bool isFull() const
377 {
378 LockGuard lock(m_mtxRB);
379 const bool is_full = m_rb.isFull();
380 if (is_full) {
381 m_event->reset(EV_NOT_FULL);
382 }
383 return is_full;
384 }
385
386 void push(const T& msg)
387 {
388 LockGuard lock(m_mtxRB);
389 m_rb.push(msg);
390 m_event->set(EV_NOT_EMPTY);
391 }
392
393 void pop(T* msg)
394 {
395 LockGuard lock(m_mtxRB);
396 m_rb.pop(msg);
397 m_event->set(EV_NOT_FULL);
398 }
399
400 MessageQueue(const MessageQueue&);
401 MessageQueue& operator=(const MessageQueue&);
402};
403
404template <typename T>
405const EventFlag::Pattern MessageQueue<T>::EV_NOT_EMPTY = 0x0001U;
406
407template <typename T>
408const EventFlag::Pattern MessageQueue<T>::EV_NOT_FULL = 0x0002U;
409
410}
411
412#endif // OS_WRAPPER_MESSAGE_QUEUE_H_INCLUDED
Class used when CHECK_ASSERT() macro fails.
Definition Assertion.h:74
Container::BitPattern< unsigned int > Pattern
Type for bit pattern of EventFlag.
Definition EventFlag.h:40
static EventFlag * create(bool autoReset)
Create an EventFlag object.
Definition EventFlag.cpp:14
@ OR
Definition EventFlag.h:33
static void destroy(EventFlag *e)
Destroy an EventFlag object.
Definition EventFlag.cpp:20
virtual Error timedWait(Pattern bitPattern, Mode waitMode, Pattern *releasedPattern, Timeout tmout)=0
Block the current thread until the condition is satisfied but only within the limited time.
Abstract class that has functions of common RTOS's fixed-size memory pool.
Definition FixedMemoryPool.h:23
static FixedMemoryPool * create(std::size_t blockSize, std::size_t memoryPoolSize, void *memoryPoolAddress=0)
Create a FixedMemoryPool object.
Definition FixedMemoryPool.cpp:14
virtual void * allocate()=0
Allocate a block from this FixedMemoryPool.
static std::size_t getRequiredMemorySize(std::size_t blockSize, std::size_t numBlocks)
Get the required total memory size for allocation of (blockSize * numBlocks)
Definition FixedMemoryPool.cpp:27
static void destroy(FixedMemoryPool *p)
Destroy a FixedMemoryPool object.
Definition FixedMemoryPool.cpp:20
virtual void deallocate(void *p)=0
Release the allocated block.
RAII wrapper of Mutex.
Definition Mutex.h:99
static const AdoptLock ADOPT_LOCK
Used as argument of LockGuard's constructor.
Definition Mutex.h:102
Class template of message queue.
Definition MessageQueue.h:22
std::size_t getSize() const
Get the queue size.
Definition MessageQueue.h:240
Error send(const T &msg)
Send the message.
Definition MessageQueue.h:86
Error trySend(const T &msg)
Send the message without blocking.
Definition MessageQueue.h:103
Error timedReceive(T *msg, Timeout tmout)
Receive the message within the limited time.
Definition MessageQueue.h:204
static MessageQueue * create(std::size_t maxSize)
Create a MessageQueue object.
Definition MessageQueue.h:29
std::size_t getMaxSize() const
Get the max queue size.
Definition MessageQueue.h:250
Error timedSend(const T &msg, Timeout tmout)
Send the message within the limited time.
Definition MessageQueue.h:123
Error receive(T *msg)
Receive the message.
Definition MessageQueue.h:167
static void destroy(MessageQueue *m)
Destroy a MessageQueue object.
Definition MessageQueue.h:63
Error tryReceive(T *msg)
Receive the message without blocking.
Definition MessageQueue.h:184
static Mutex * create()
Create a Mutex object.
Definition Mutex.cpp:14
virtual Error timedLock(Timeout tmout)=0
Block the current thread until locks this mutex but only within the limited time.
static void destroy(Mutex *m)
Destroy a Mutex object.
Definition Mutex.cpp:26
Value object for the timeout.
Definition Timeout.h:11
static const Timeout FOREVER
Constant value object for waiting forever until condition satisfied.
Definition Timeout.h:42
static const Timeout POLLING
Constant value object for non-blocking.
Definition Timeout.h:37
OSWrapper provides abstract C++ interface of common RTOS: thread, mutex, event flag,...
Definition EventFlag.cpp:5
Error
Kinds of errors of the OS objects.
Definition OSWrapperError.h:9
@ OK
Definition OSWrapperError.h:10
@ OtherError
Definition OSWrapperError.h:15