µOS++ IIIe Reference 7.0.0
The third edition of µOS++, a POSIX inspired open source framework, written in C++
Loading...
Searching...
No Matches
os-mqueue.cpp
Go to the documentation of this file.
1/*
2 * This file is part of the µOS++ distribution.
3 * (https://github.com/micro-os-plus)
4 * Copyright (c) 2016-2023 Liviu Ionescu. All rights reserved.
5 *
6 * Permission to use, copy, modify, and/or distribute this software
7 * for any purpose is hereby granted, under the terms of the MIT license.
8 *
9 * If a copy of the license was not distributed with this file, it can
10 * be obtained from https://opensource.org/licenses/mit/.
11 */
12
13#if defined(OS_USE_OS_APP_CONFIG_H)
14#include <cmsis-plus/os-app-config.h>
15#endif
16
17#include <cmsis-plus/rtos/os.h>
18
19// ----------------------------------------------------------------------------
20
21#if defined(__clang__)
22#pragma clang diagnostic ignored "-Wc++98-compat"
23#endif
24
25// ----------------------------------------------------------------------------
26
27namespace os
28{
29 namespace rtos
30 {
31 // ------------------------------------------------------------------------
32
78 const message_queue::attributes message_queue::initializer;
79
80 // ------------------------------------------------------------------------
81
274 // ------------------------------------------------------------------------
328 // ------------------------------------------------------------------------
333 // Protected internal constructor.
335 {
336#if defined(OS_TRACE_RTOS_MQUEUE)
337 trace::printf ("%s() @%p %s\n", __func__, this, this->name ());
338#endif
339 }
340
341 message_queue::message_queue (const char* name) :
342 object_named_system
343 { name }
344 {
345#if defined(OS_TRACE_RTOS_MQUEUE)
346 trace::printf ("%s() @%p %s\n", __func__, this, this->name ());
347#endif
348 }
349
383 message_queue::message_queue (std::size_t msgs, std::size_t msg_size_bytes,
384 const attributes& attr,
385 const allocator_type& allocator) :
387 { nullptr, msgs, msg_size_bytes, attr, allocator }
388 {
389 }
390
420 message_queue::message_queue (const char* name, std::size_t msgs,
421 std::size_t msg_size_bytes,
422 const attributes& attr,
423 const allocator_type& allocator) :
424 object_named_system
425 { name }
426 {
427#if defined(OS_TRACE_RTOS_MQUEUE)
428 trace::printf ("%s() @%p %s %u %u\n", __func__, this, this->name (), msgs,
429 msg_size_bytes);
430#endif
431
432 if (attr.mq_queue_address != nullptr)
433 {
434 // Do not use any allocator at all.
435 internal_construct_ (msgs, msg_size_bytes, attr, nullptr, 0);
436 }
437 else
438 {
439 allocator_ = &allocator;
440
441 // If no user storage was provided via attributes,
442 // allocate it dynamically via the allocator.
443 allocated_queue_size_elements_ = (compute_allocated_size_bytes<
444 typename allocator_type::value_type> (msgs, msg_size_bytes)
445 + sizeof(typename allocator_type::value_type) - 1)
446 / sizeof(typename allocator_type::value_type);
447
448 allocated_queue_addr_ =
449 const_cast<allocator_type&> (allocator).allocate (
450 allocated_queue_size_elements_);
451
452 internal_construct_ (
453 msgs,
454 msg_size_bytes,
455 attr,
456 allocated_queue_addr_,
457 allocated_queue_size_elements_
458 * sizeof(typename allocator_type::value_type));
459 }
460 }
461
477 {
478#if defined(OS_TRACE_RTOS_MQUEUE)
479 trace::printf ("%s() @%p %s\n", __func__, this, name ());
480#endif
481
482#if !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
483
484 // There must be no threads waiting for this queue.
485 assert(send_list_.empty ());
486 assert(receive_list_.empty ());
487
488#endif
489
490 if (allocated_queue_addr_ != nullptr)
491 {
492 typedef typename std::allocator_traits<allocator_type>::pointer pointer;
493
494 static_cast<allocator_type*> (const_cast<void*> (allocator_))->deallocate (
495 reinterpret_cast<pointer> (allocated_queue_addr_),
496 allocated_queue_size_elements_);
497 }
498
499#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
500
501 port::message_queue::destroy (this);
502
503#endif
504 }
505
510 void
511 message_queue::internal_construct_ (std::size_t msgs,
512 std::size_t msg_size_bytes,
513 const attributes& attr,
514 void* queue_address,
515 std::size_t queue_size_bytes)
516 {
517 // Don't call this from interrupt handlers.
519
520#if !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
521 clock_ = attr.clock != nullptr ? attr.clock : &sysclock;
522#endif
523 msg_size_bytes_ = static_cast<message_queue::msg_size_t> (msg_size_bytes);
524 assert(msg_size_bytes_ == msg_size_bytes);
525 assert(msg_size_bytes_ > 0);
526
527 // in order for the list of free messages to not consume additional memory,
528 // the pointers are stored at the beginning of the messages, thus messages should be large enough to fit a pointer
529 assert(msg_size_bytes_ >= sizeof(void*));
530
531 msgs_ = static_cast<message_queue::size_t> (msgs);
532 assert(msgs_ == msgs);
533 assert(msgs > 0);
534
535 // If the storage is given explicitly, override attributes.
536 if (queue_address != nullptr)
537 {
538 // The attributes should not define any storage in this case.
539 assert(attr.mq_queue_address == nullptr);
540
541 queue_addr_ = queue_address;
542 queue_size_bytes_ = queue_size_bytes;
543 }
544 else
545 {
546 queue_addr_ = attr.mq_queue_address;
547 queue_size_bytes_ = attr.mq_queue_size_bytes;
548 }
549
550#if defined(OS_TRACE_RTOS_MQUEUE)
551 trace::printf ("%s() @%p %s %u %u %p %u\n", __func__, this, name (),
552 msgs_, msg_size_bytes_, queue_addr_, queue_size_bytes_);
553#endif
554
555#if !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
556 std::size_t storage_size = compute_allocated_size_bytes<void*> (
557 msgs, msg_size_bytes);
558#endif
559 if (queue_addr_ != nullptr)
560 {
561 // The queue must be real, and have a non zero size.
562 os_assert_throw(queue_size_bytes_ > 0, EINVAL);
563#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
565 queue_size_bytes_ >= (std::size_t) (msgs * msg_size_bytes),
566 EINVAL);
567#else
568 // The queue must fit the storage.
569 os_assert_throw(queue_size_bytes_ >= storage_size, EINVAL);
570#endif
571 }
572
573#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
574
575 count_ = 0;
576 port::message_queue::create (this);
577
578#else
579
580 head_ = no_index;
581
582 // The queue storage must have a real address.
583 os_assert_throw(queue_addr_ != nullptr, ENOMEM);
584
585 // The array of prev indexes follows immediately after the content array.
586 prev_array_ =
587 reinterpret_cast<index_t*> (static_cast<char*> (queue_addr_)
588 + msgs
589 * ((msg_size_bytes + (sizeof(void*) - 1))
590 & ~(sizeof(void*) - 1)));
591 // The array of next indexes follows immediately the prev array.
592 next_array_ =
593 reinterpret_cast<index_t*> (reinterpret_cast<char*> (const_cast<index_t*> (prev_array_))
594 + msgs * sizeof(index_t));
595 // The array of priorities follows immediately the next array.
596 prio_array_ =
597 reinterpret_cast<priority_t*> (reinterpret_cast<char*> (const_cast<index_t*> (next_array_))
598 + msgs * sizeof(index_t));
599
600#if !defined(NDEBUG)
601#pragma GCC diagnostic push
602#if defined(__clang__)
603#elif defined(__GNUC__)
604#pragma GCC diagnostic ignored "-Wuseless-cast"
605#endif
606 char* p =
607 reinterpret_cast<char*> (reinterpret_cast<char*> (const_cast<priority_t*> (prio_array_))
608 + msgs * sizeof(priority_t));
609#pragma GCC diagnostic pop
610
611 assert(
612 p - static_cast<char*> (queue_addr_)
613 <= static_cast<ptrdiff_t> (queue_size_bytes_));
614#endif
615
616 internal_init_ ();
617#endif
618 }
619
620 void
621 message_queue::internal_init_ (void)
622 {
623 count_ = 0;
624
625#if !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
626
627 // Construct a linked list of blocks. Store the pointer at
628 // the beginning of each block. Each block
629 // will hold the address of the next free block,
630 // or `nullptr` at the end.
631 char* p = static_cast<char*> (queue_addr_);
632 for (std::size_t i = 1; i < msgs_; ++i)
633 {
634 // Compute the address of the next block;
635 char* pn = p + msg_size_bytes_;
636
637 // Make this block point to the next one.
638 *(static_cast<void**> (static_cast<void*> (p))) = pn;
639 // Advance pointer
640 p = pn;
641 }
642
643 // Mark end of list.
644 *(static_cast<void**> (static_cast<void*> (p))) = nullptr;
645
646 first_free_ = queue_addr_; // Pointer to first block.
647
648 head_ = no_index;
649
650 // Need not be inside the critical section,
651 // the lists are protected by inner `resume_one()`.
652
653 // Wake-up all threads, if any.
654 send_list_.resume_all ();
655 receive_list_.resume_all ();
656
657#endif /* !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE) */
658
659 }
660
661#if !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
662
663 /*
664 * Internal function.
665 * Should be called from an interrupts critical section.
666 */
667 bool
668 message_queue::internal_try_send_ (const void* msg, std::size_t nbytes,
669 priority_t mprio)
670 {
671 if (first_free_ == nullptr)
672 {
673 // No available space to send the message.
674 return false;
675 }
676
677 // The first step is to remove the free block from the list,
678 // so another concurrent call will not get it too.
679
680 // Get the address where the message will be copied.
681 // This is the first free memory block.
682 char* dest = static_cast<char*> (first_free_);
683
684 // Update to next free, if any (the last one has nullptr).
685 first_free_ = *(static_cast<void**> (first_free_));
686
687 // The second step is to copy the message from the user buffer.
688 {
689 // ----- Enter uncritical section -----------------------------------
690 // interrupts::uncritical_section iucs;
691
692 // Copy message from user buffer to queue storage.
693 std::memcpy (dest, msg, nbytes);
694 if (nbytes < msg_size_bytes_)
695 {
696 // Fill in the remaining space with 0x00.
697 std::memset (dest + nbytes, 0x00, msg_size_bytes_ - nbytes);
698 }
699 // ----- Exit uncritical section ------------------------------------
700 }
701
702 // The third step is to link the buffer to the list.
703
704 // Using the address, compute the index in the array.
705 std::size_t msg_ix = (static_cast<std::size_t> (dest
706 - static_cast<char*> (queue_addr_)) / msg_size_bytes_);
707 prio_array_[msg_ix] = mprio;
708
709 if (head_ == no_index)
710 {
711 // No other message in the queue, enlist this one
712 // as head, with links to itself.
713 head_ = static_cast<index_t> (msg_ix);
714 prev_array_[msg_ix] = static_cast<index_t> (msg_ix);
715 next_array_[msg_ix] = static_cast<index_t> (msg_ix);
716 }
717 else
718 {
719 std::size_t ix;
720 // Arrange to insert between head and tail.
721 ix = prev_array_[head_];
722 // Check if the priority is higher than the head priority.
723 if (mprio > prio_array_[head_])
724 {
725 // Having the highest priority, the new message
726 // becomes the new head.
727 head_ = static_cast<index_t> (msg_ix);
728 }
729 else
730 {
731 // If not higher than the head, try to insert at the tail,
732 // but advance up until the same priority is found.
733 while ((mprio > prio_array_[ix]))
734 {
735 ix = prev_array_[ix];
736 }
737 }
738 prev_array_[msg_ix] = static_cast<index_t> (ix);
739 next_array_[msg_ix] = next_array_[ix];
740
741 // Break the chain and insert the new index.
742 std::size_t tmp_ix = next_array_[ix];
743 next_array_[ix] = static_cast<index_t> (msg_ix);
744 prev_array_[tmp_ix] = static_cast<index_t> (msg_ix);
745 }
746
747 // One more message added to the queue.
748 ++count_;
749
750 // Wake-up one thread, if any.
751 receive_list_.resume_one ();
752
753 return true;
754 }
755
756#endif /* !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE) */
757
758#if !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
759
760 /*
761 * Internal function.
762 * Should be called from an interrupts critical section.
763 */
764 bool
765 message_queue::internal_try_receive_ (void* msg, std::size_t nbytes,
766 priority_t* mprio)
767 {
768
769 if (head_ == no_index)
770 {
771 return false;
772 }
773
774 // Compute the message source address.
775 char* src = static_cast<char*> (queue_addr_) + head_ * msg_size_bytes_;
776 priority_t prio = prio_array_[head_];
777
778#if defined(OS_TRACE_RTOS_MQUEUE_)
779 trace::printf ("%s(%p,%u) @%p %s src %p %p\n", __func__, msg, nbytes,
780 this, name (), src, first_free_);
781#endif
782
783 // Unlink it from the list, so another concurrent call will
784 // not get it too.
785 if (count_ > 1)
786 {
787 // Remove the current element from the list.
788 prev_array_[next_array_[head_]] = prev_array_[head_];
789 next_array_[prev_array_[head_]] = next_array_[head_];
790
791 // Next becomes the new head.
792 head_ = next_array_[head_];
793 }
794 else
795 {
796 // If there was only one, the list is empty now.
797 head_ = no_index;
798 }
799
800 --count_;
801
802 // Copy to destination
803 {
804 // ----- Enter uncritical section -----------------------------------
805 interrupts::uncritical_section iucs;
806
807 // Copy message from queue to user buffer.
808 memcpy (msg, src, nbytes);
809 if (mprio != nullptr)
810 {
811 *mprio = prio;
812 }
813 // ----- Exit uncritical section ------------------------------------
814 }
815
816 // After the message was copied, the block can be released.
817
818 // Perform a push_front() on the single linked LIFO list,
819 // i.e. add the block to the beginning of the list.
820
821 // Link previous list to this block; may be null, but it does
822 // not matter.
823 *(static_cast<void**> (static_cast<void*> (src))) = first_free_;
824
825 // Now this block is the first one.
826 first_free_ = src;
827
828 // Wake-up one thread, if any.
829 send_list_.resume_one ();
830
831 return true;
832 }
833
834#endif /* !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE) */
835
879 message_queue::send (const void* msg, std::size_t nbytes, priority_t mprio)
880 {
881#if defined(OS_TRACE_RTOS_MQUEUE)
882 trace::printf ("%s(%p,%d,%d) @%p %s\n", __func__, msg, nbytes, mprio,
883 this, name ());
884#endif
885
886 // Don't call this from interrupt handlers.
888 // Don't call this from critical regions.
890
891 os_assert_err(msg != nullptr, EINVAL);
892 os_assert_err(nbytes <= msg_size_bytes_, EMSGSIZE);
893
894#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
895
896 return port::message_queue::send (this, msg, nbytes, mprio);
897
898#else
899
900 {
901 // ----- Enter critical section -------------------------------------
903
904 if (internal_try_send_ (msg, nbytes, mprio))
905 {
906 return result::ok;
907 }
908 // ----- Exit critical section --------------------------------------
909 }
910
911 thread& crt_thread = this_thread::thread ();
912
913 // Prepare a list node pointing to the current thread.
914 // Do not worry for being on stack, it is temporarily linked to the
915 // list and guaranteed to be removed before this function returns.
917 { crt_thread };
918
919 for (;;)
920 {
921 {
922 // ----- Enter critical section ---------------------------------
924
925 if (internal_try_send_ (msg, nbytes, mprio))
926 {
927 return result::ok;
928 }
929
930 // Add this thread to the message queue send waiting list.
931 scheduler::internal_link_node (send_list_, node);
932 // state::suspended set in above link().
933 // ----- Exit critical section ----------------------------------
934 }
935
937
938 // Remove the thread from the message queue send waiting list,
939 // if not already removed by receive().
940 scheduler::internal_unlink_node (node);
941
942 if (crt_thread.interrupted ())
943 {
944#if defined(OS_TRACE_RTOS_MQUEUE)
945 trace::printf ("%s(%p,%d,%d) EINTR @%p %s\n", __func__, msg,
946 nbytes, mprio, this, name ());
947#endif
948 return EINTR;
949 }
950 }
951
952 /* NOTREACHED */
953 return ENOTRECOVERABLE;
954
955#endif
956 }
957
990 message_queue::try_send (const void* msg, std::size_t nbytes,
991 priority_t mprio)
992 {
993#if defined(OS_TRACE_RTOS_MQUEUE)
994 trace::printf ("%s(%p,%u,%u) @%p %s\n", __func__, msg, nbytes, mprio,
995 this, name ());
996#endif
997
998 os_assert_err(msg != nullptr, EINVAL);
999 os_assert_err(nbytes <= msg_size_bytes_, EMSGSIZE);
1000
1001#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
1002
1003 return port::message_queue::try_send (this, msg, nbytes, mprio);
1004
1005#else
1006 // Don't call this from high priority interrupts.
1007 assert(port::interrupts::is_priority_valid ());
1008
1009 {
1010 // ----- Enter critical section -------------------------------------
1012
1013 if (internal_try_send_ (msg, nbytes, mprio))
1014 {
1015 return result::ok;
1016 }
1017 else
1018 {
1019 return EWOULDBLOCK;
1020 }
1021 // ----- Exit critical section --------------------------------------
1022 }
1023
1024#endif
1025 }
1026
1073 result_t
1074 message_queue::timed_send (const void* msg, std::size_t nbytes,
1075 clock::duration_t timeout, priority_t mprio)
1076 {
1077#if defined(OS_TRACE_RTOS_MQUEUE)
1078 trace::printf ("%s(%p,%u,%u,%u) @%p %s\n", __func__, msg, nbytes, mprio,
1079 timeout, this, name ());
1080#endif
1081
1082 // Don't call this from interrupt handlers.
1084 // Don't call this from critical regions.
1085 os_assert_err(!scheduler::locked (), EPERM);
1086
1087 os_assert_err(msg != nullptr, EINVAL);
1088 os_assert_err(nbytes <= msg_size_bytes_, EMSGSIZE);
1089
1090#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
1091
1092 return port::message_queue::timed_send (this, msg, nbytes, timeout, mprio);
1093
1094#else
1095
1096 // Extra test before entering the loop, with its inherent weight.
1097 // Trade size for speed.
1098 {
1099 // ----- Enter critical section -------------------------------------
1101
1102 if (internal_try_send_ (msg, nbytes, mprio))
1103 {
1104 return result::ok;
1105 }
1106 // ----- Exit critical section --------------------------------------
1107 }
1108
1109 thread& crt_thread = this_thread::thread ();
1110
1111 // Prepare a list node pointing to the current thread.
1112 // Do not worry for being on stack, it is temporarily linked to the
1113 // list and guaranteed to be removed before this function returns.
1115 { crt_thread };
1116
1117 internal::clock_timestamps_list& clock_list = clock_->steady_list ();
1118
1119 clock::timestamp_t timeout_timestamp = clock_->steady_now () + timeout;
1120
1121 // Prepare a timeout node pointing to the current thread.
1123 { timeout_timestamp, crt_thread };
1124
1125 for (;;)
1126 {
1127 {
1128 // ----- Enter critical section ---------------------------------
1130
1131 if (internal_try_send_ (msg, nbytes, mprio))
1132 {
1133 return result::ok;
1134 }
1135
1136 // Add this thread to the semaphore waiting list,
1137 // and the clock timeout list.
1138 scheduler::internal_link_node (send_list_, node, clock_list,
1139 timeout_node);
1140 // state::suspended set in above link().
1141 // ----- Exit critical section ----------------------------------
1142 }
1143
1145
1146 // Remove the thread from the message queue send waiting list,
1147 // if not already removed by receive() and from the clock timeout list,
1148 // if not already removed by the timer.
1149 scheduler::internal_unlink_node (node, timeout_node);
1150
1151 if (crt_thread.interrupted ())
1152 {
1153#if defined(OS_TRACE_RTOS_MQUEUE)
1154 trace::printf ("%s(%p,%u,%u,%u) EINTR @%p %s\n", __func__, msg,
1155 nbytes, mprio, timeout, this, name ());
1156#endif
1157 return EINTR;
1158 }
1159
1160 if (clock_->steady_now () >= timeout_timestamp)
1161 {
1162#if defined(OS_TRACE_RTOS_MQUEUE)
1163 trace::printf ("%s(%p,%u,%u,%u) ETIMEDOUT @%p %s\n", __func__,
1164 msg, nbytes, mprio, timeout, this, name ());
1165#endif
1166 return ETIMEDOUT;
1167 }
1168 }
1169
1170 /* NOTREACHED */
1171 return ENOTRECOVERABLE;
1172
1173#endif
1174 }
1175
1210 result_t
1211 message_queue::receive (void* msg, std::size_t nbytes, priority_t* mprio)
1212 {
1213#if defined(OS_TRACE_RTOS_MQUEUE)
1214 trace::printf ("%s(%p,%u) @%p %s\n", __func__, msg, nbytes, this,
1215 name ());
1216#endif
1217
1218 // Don't call this from interrupt handlers.
1220 // Don't call this from critical regions.
1221 os_assert_err(!scheduler::locked (), EPERM);
1222
1223 os_assert_err(msg != nullptr, EINVAL);
1224 os_assert_err(nbytes <= msg_size_bytes_, EMSGSIZE);
1225
1226#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
1227
1228 return port::message_queue::receive (this, msg, nbytes, mprio);
1229
1230#else
1231
1232 // Extra test before entering the loop, with its inherent weight.
1233 // Trade size for speed.
1234 {
1235 // ----- Enter critical section -------------------------------------
1237
1238 if (internal_try_receive_ (msg, nbytes, mprio))
1239 {
1240 return result::ok;
1241 }
1242 // ----- Exit critical section --------------------------------------
1243 }
1244
1245 thread& crt_thread = this_thread::thread ();
1246
1247 // Prepare a list node pointing to the current thread.
1248 // Do not worry for being on stack, it is temporarily linked to the
1249 // list and guaranteed to be removed before this function returns.
1251 { crt_thread };
1252
1253 for (;;)
1254 {
1255 {
1256 // ----- Enter critical section ---------------------------------
1258
1259 if (internal_try_receive_ (msg, nbytes, mprio))
1260 {
1261 return result::ok;
1262 }
1263
1264 // Add this thread to the message queue receive waiting list.
1265 scheduler::internal_link_node (receive_list_, node);
1266 // state::suspended set in above link().
1267 // ----- Exit critical section ----------------------------------
1268 }
1269
1271
1272 // Remove the thread from the message queue receive waiting list,
1273 // if not already removed by send().
1274 scheduler::internal_unlink_node (node);
1275
1276 if (crt_thread.interrupted ())
1277 {
1278#if defined(OS_TRACE_RTOS_MQUEUE)
1279 trace::printf ("%s(%p,%u) EINTR @%p %s\n", __func__, msg, nbytes,
1280 this, name ());
1281#endif
1282 return EINTR;
1283 }
1284 }
1285
1286 /* NOTREACHED */
1287 return ENOTRECOVERABLE;
1288
1289#endif
1290 }
1291
1322 result_t
1323 message_queue::try_receive (void* msg, std::size_t nbytes,
1324 priority_t* mprio)
1325 {
1326#if defined(OS_TRACE_RTOS_MQUEUE)
1327 trace::printf ("%s(%p,%u) @%p %s\n", __func__, msg, nbytes, this,
1328 name ());
1329#endif
1330
1331 os_assert_err(msg != nullptr, EINVAL);
1332 os_assert_err(nbytes <= msg_size_bytes_, EMSGSIZE);
1333
1334#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
1335
1336 return port::message_queue::try_receive (this, msg, nbytes, mprio);
1337
1338#else
1339
1340 // Don't call this from high priority interrupts.
1341 assert(port::interrupts::is_priority_valid ());
1342
1343 {
1344 // ----- Enter critical section -------------------------------------
1346
1347 if (internal_try_receive_ (msg, nbytes, mprio))
1348 {
1349 return result::ok;
1350 }
1351 else
1352 {
1353 return EWOULDBLOCK;
1354 }
1355 // ----- Exit critical section --------------------------------------
1356 }
1357
1358#endif
1359 }
1360
1420 result_t
1421 message_queue::timed_receive (void* msg, std::size_t nbytes,
1422 clock::duration_t timeout, priority_t* mprio)
1423 {
1424#if defined(OS_TRACE_RTOS_MQUEUE)
1425 trace::printf ("%s(%p,%u,%u) @%p %s\n", __func__, msg, nbytes, timeout,
1426 this, name ());
1427#endif
1428
1429 // Don't call this from interrupt handlers.
1431 // Don't call this from critical regions.
1432 os_assert_err(!scheduler::locked (), EPERM);
1433
1434 os_assert_err(msg != nullptr, EINVAL);
1435 os_assert_err(nbytes <= msg_size_bytes_, EMSGSIZE);
1436
1437#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
1438
1439 return port::message_queue::timed_receive (this, msg, nbytes,
1440 timeout, mprio);
1441
1442#else
1443
1444 // Extra test before entering the loop, with its inherent weight.
1445 // Trade size for speed.
1446 {
1447 // ----- Enter critical section -------------------------------------
1449
1450 if (internal_try_receive_ (msg, nbytes, mprio))
1451 {
1452 return result::ok;
1453 }
1454 // ----- Exit critical section --------------------------------------
1455 }
1456
1457 thread& crt_thread = this_thread::thread ();
1458
1459 // Prepare a list node pointing to the current thread.
1460 // Do not worry for being on stack, it is temporarily linked to the
1461 // list and guaranteed to be removed before this function returns.
1463 { crt_thread };
1464
1465 internal::clock_timestamps_list& clock_list = clock_->steady_list ();
1466 clock::timestamp_t timeout_timestamp = clock_->steady_now () + timeout;
1467
1468 // Prepare a timeout node pointing to the current thread.
1470 { timeout_timestamp, crt_thread };
1471
1472 for (;;)
1473 {
1474 {
1475 // ----- Enter critical section ---------------------------------
1477
1478 if (internal_try_receive_ (msg, nbytes, mprio))
1479 {
1480 return result::ok;
1481 }
1482
1483 // Add this thread to the message queue receive waiting list,
1484 // and the clock timeout list.
1485 scheduler::internal_link_node (receive_list_, node, clock_list,
1486 timeout_node);
1487 // state::suspended set in above link().
1488 // ----- Exit critical section ----------------------------------
1489 }
1490
1492
1493 // Remove the thread from the semaphore waiting list,
1494 // if not already removed by send()and from the clock
1495 // timeout list, if not already removed by the timer.
1496 scheduler::internal_unlink_node (node, timeout_node);
1497
1498 if (crt_thread.interrupted ())
1499 {
1500#if defined(OS_TRACE_RTOS_MQUEUE)
1501 trace::printf ("%s(%p,%u,%u) EINTR @%p %s\n", __func__, msg,
1502 nbytes, timeout, this, name ());
1503#endif
1504 return EINTR;
1505 }
1506
1507 if (clock_->steady_now () >= timeout_timestamp)
1508 {
1509#if defined(OS_TRACE_RTOS_MQUEUE)
1510 trace::printf ("%s(%p,%u,%u) ETIMEDOUT @%p %s\n", __func__, msg,
1511 nbytes, timeout, this, name ());
1512#endif
1513 return ETIMEDOUT;
1514 }
1515 }
1516
1517 /* NOTREACHED */
1518 return ENOTRECOVERABLE;
1519
1520#endif
1521 }
1522
1533 result_t
1535 {
1536#if defined(OS_TRACE_RTOS_MQUEUE)
1537 trace::printf ("%s() @%p %s\n", __func__, this, name ());
1538#endif
1539
1540 // Don't call this from interrupt handlers.
1542
1543#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
1544
1545 return port::message_queue::reset (this);
1546
1547#else
1548
1549 {
1550 // ----- Enter critical section -------------------------------------
1552
1553 internal_init_ ();
1554 return result::ok;
1555 // ----- Exit critical section --------------------------------------
1556 }
1557
1558#endif
1559 }
1560
1561 // --------------------------------------------------------------------------
1562
1563 } /* namespace rtos */
1564} /* namespace os */
1565
1566// ----------------------------------------------------------------------------
Ordered list of time stamp nodes.
Definition os-lists.h:671
const char * name(void) const
Get object name.
Definition os-decls.h:759
Double linked list node, with time stamp and thread.
Definition os-lists.h:223
Double linked list node, with thread reference.
Definition os-lists.h:60
Interrupts critical section RAII helper.
Definition os-sched.h:498
Standard allocator based on the RTOS system default memory manager.
Definition os-memory.h:540
void deallocate(value_type *addr, std::size_t elements) noexcept
Deallocate the number of memory blocks of type value_type.
Message queue attributes.
Definition os-mqueue.h:152
POSIX compliant message queue, using the default RTOS allocator.
Definition os-mqueue.h:68
result_t receive(void *msg, std::size_t nbytes, priority_t *mprio=nullptr)
Receive a message from the queue.
result_t reset(void)
Reset the message queue.
result_t try_send(const void *msg, std::size_t nbytes, priority_t mprio=default_priority)
Try to send a message to the queue.
result_t try_receive(void *msg, std::size_t nbytes, priority_t *mprio=nullptr)
Try to receive a message from the queue.
result_t send(const void *msg, std::size_t nbytes, priority_t mprio=default_priority)
Send a message to the queue.
virtual ~message_queue()
Destruct the message queue object instance.
result_t timed_receive(void *msg, std::size_t nbytes, clock::duration_t timeout, priority_t *mprio=nullptr)
Receive a message from the queue with timeout.
result_t timed_send(const void *msg, std::size_t nbytes, clock::duration_t timeout, priority_t mprio=default_priority)
Send a message to the queue with timeout.
message_queue(std::size_t msgs, std::size_t msg_size_bytes, const attributes &attr=initializer, const allocator_type &allocator=allocator_type())
Construct a message queue object instance.
POSIX compliant thread, using the default RTOS allocator.
Definition os-thread.h:250
bool interrupted(void)
Check if interrupted.
Definition os-thread.h:2367
int printf(const char *format,...)
Write a formatted string to the trace device.
Definition trace.cpp:60
port::clock::duration_t duration_t
Type of variables holding clock durations.
Definition os-clocks.h:76
port::clock::timestamp_t timestamp_t
Type of variables holding clock time stamps.
Definition os-clocks.h:85
clock_systick sysclock
The system clock object instance.
uint8_t priority_t
Type of message priority storage.
Definition os-mqueue.h:125
message_queue::size_t index_t
Type of list index storage.
Definition os-mqueue.h:109
static const attributes initializer
Default message queue initialiser.
Definition os-mqueue.h:216
uint8_t size_t
Type of a queue size storage.
Definition os-mqueue.h:84
uint16_t msg_size_t
Type of message size storage.
Definition os-mqueue.h:97
static constexpr index_t no_index
Index value to represent an illegal index.
Definition os-mqueue.h:115
bool in_handler_mode(void)
Check if the CPU is in handler mode.
Definition os-sched.h:1108
@ ok
Function completed; no errors or events occurred.
Definition os-decls.h:181
bool locked(void)
Check if the scheduler is locked.
Definition os-sched.h:856
thread & thread(void)
Get the current running thread.
uint32_t result_t
Type of values returned by RTOS functions.
Definition os-decls.h:96
System namespace.
#define os_assert_throw(__e, __er)
Assert or throw a system error exception.
Definition os-decls.h:1130
#define os_assert_err(__e, __er)
Assert or return an error.
Definition os-decls.h:1115
Single file µOS++ RTOS definitions.