µOS++ IIIe Reference 7.0.0
The third edition of µOS++, a POSIX inspired open source framework, written in C++
Loading...
Searching...
No Matches
os-mqueue.cpp
Go to the documentation of this file.
1/*
2 * This file is part of the µOS++ project (https://micro-os-plus.github.io/).
3 * Copyright (c) 2016-2025 Liviu Ionescu. All rights reserved.
4 *
5 * Permission to use, copy, modify, and/or distribute this software
6 * for any purpose is hereby granted, under the terms of the MIT license.
7 *
8 * If a copy of the license was not distributed with this file, it can
9 * be obtained from https://opensource.org/licenses/mit.
10 */
11
12#if defined(OS_USE_OS_APP_CONFIG_H)
13#include <cmsis-plus/os-app-config.h>
14#endif
15
16#include <cmsis-plus/rtos/os.h>
17
18// ----------------------------------------------------------------------------
19
20#if defined(__clang__)
21#pragma clang diagnostic ignored "-Wc++98-compat"
22#endif
23
24// ----------------------------------------------------------------------------
25
26namespace os
27{
28 namespace rtos
29 {
30 // ------------------------------------------------------------------------
31
82 const message_queue::attributes message_queue::initializer;
83
84 // ------------------------------------------------------------------------
85
286 // ------------------------------------------------------------------------
341 // ------------------------------------------------------------------------
346 // Protected internal constructor.
348 {
349#if defined(OS_TRACE_RTOS_MQUEUE)
350 trace::printf ("%s() @%p %s\n", __func__, this, this->name ());
351#endif
352 }
353
354 message_queue::message_queue (const char* name)
355 : object_named_system{ name }
356 {
357#if defined(OS_TRACE_RTOS_MQUEUE)
358 trace::printf ("%s() @%p %s\n", __func__, this, this->name ());
359#endif
360 }
361
395 message_queue::message_queue (std::size_t msgs, std::size_t msg_size_bytes,
396 const attributes& attr,
397 const allocator_type& allocator)
398 : message_queue{ nullptr, msgs, msg_size_bytes, attr, allocator }
399 {
400 }
401
431 message_queue::message_queue (const char* name, std::size_t msgs,
432 std::size_t msg_size_bytes,
433 const attributes& attr,
434 const allocator_type& allocator)
435 : object_named_system{ name }
436 {
437#if defined(OS_TRACE_RTOS_MQUEUE)
438 trace::printf ("%s() @%p %s %u %u\n", __func__, this, this->name (),
439 msgs, msg_size_bytes);
440#endif
441
442 if (attr.mq_queue_address != nullptr)
443 {
444 // Do not use any allocator at all.
445 internal_construct_ (msgs, msg_size_bytes, attr, nullptr, 0);
446 }
447 else
448 {
449 allocator_ = &allocator;
450
451 // If no user storage was provided via attributes,
452 // allocate it dynamically via the allocator.
453 allocated_queue_size_elements_
455 typename allocator_type::value_type> (msgs,
456 msg_size_bytes)
457 + sizeof (typename allocator_type::value_type) - 1)
458 / sizeof (typename allocator_type::value_type);
459
460 allocated_queue_addr_
461 = const_cast<allocator_type&> (allocator).allocate (
462 allocated_queue_size_elements_);
463
464 internal_construct_ (
465 msgs, msg_size_bytes, attr, allocated_queue_addr_,
466 allocated_queue_size_elements_
467 * sizeof (typename allocator_type::value_type));
468 }
469 }
470
486 {
487#if defined(OS_TRACE_RTOS_MQUEUE)
488 trace::printf ("%s() @%p %s\n", __func__, this, name ());
489#endif
490
491#if !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
492
493 // There must be no threads waiting for this queue.
494 assert (send_list_.empty ());
495 assert (receive_list_.empty ());
496
497#endif
498
499 if (allocated_queue_addr_ != nullptr)
500 {
501 typedef
502 typename std::allocator_traits<allocator_type>::pointer pointer;
503
504 static_cast<allocator_type*> (const_cast<void*> (allocator_))
505 ->deallocate (reinterpret_cast<pointer> (allocated_queue_addr_),
506 allocated_queue_size_elements_);
507 }
508
509#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
510
511 port::message_queue::destroy (this);
512
513#endif
514 }
515
520 void
521 message_queue::internal_construct_ (std::size_t msgs,
522 std::size_t msg_size_bytes,
523 const attributes& attr,
524 void* queue_address,
525 std::size_t queue_size_bytes)
526 {
527 // Don't call this from interrupt handlers.
529
530#if !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
531 clock_ = attr.clock != nullptr ? attr.clock : &sysclock;
532#endif
533 msg_size_bytes_
534 = static_cast<message_queue::msg_size_t> (msg_size_bytes);
535 assert (msg_size_bytes_ == msg_size_bytes);
536 assert (msg_size_bytes_ > 0);
537
538 // in order for the list of free messages to not consume additional
539 // memory, the pointers are stored at the beginning of the messages, thus
540 // messages should be large enough to fit a pointer
541 assert (msg_size_bytes_ >= sizeof (void*));
542
543 msgs_ = static_cast<message_queue::size_t> (msgs);
544 assert (msgs_ == msgs);
545 assert (msgs > 0);
546
547 // If the storage is given explicitly, override attributes.
548 if (queue_address != nullptr)
549 {
550 // The attributes should not define any storage in this case.
551 assert (attr.mq_queue_address == nullptr);
552
553 queue_addr_ = queue_address;
554 queue_size_bytes_ = queue_size_bytes;
555 }
556 else
557 {
558 queue_addr_ = attr.mq_queue_address;
559 queue_size_bytes_ = attr.mq_queue_size_bytes;
560 }
561
562#if defined(OS_TRACE_RTOS_MQUEUE)
563 trace::printf ("%s() @%p %s %u %u %p %u\n", __func__, this, name (),
564 msgs_, msg_size_bytes_, queue_addr_, queue_size_bytes_);
565#endif
566
567#if !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
568 std::size_t storage_size
569 = compute_allocated_size_bytes<void*> (msgs, msg_size_bytes);
570#endif
571 if (queue_addr_ != nullptr)
572 {
573 // The queue must be real, and have a non zero size.
574 os_assert_throw (queue_size_bytes_ > 0, EINVAL);
575#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
576 os_assert_throw (queue_size_bytes_
577 >= (std::size_t)(msgs * msg_size_bytes),
578 EINVAL);
579#else
580 // The queue must fit the storage.
581 os_assert_throw (queue_size_bytes_ >= storage_size, EINVAL);
582#endif
583 }
584
585#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
586
587 count_ = 0;
588 port::message_queue::create (this);
589
590#else
591
592 head_ = no_index;
593
594 // The queue storage must have a real address.
595 os_assert_throw (queue_addr_ != nullptr, ENOMEM);
596
597#pragma GCC diagnostic push
598#if defined(__clang__)
599#pragma clang diagnostic ignored "-Wunsafe-buffer-usage"
600#endif
601 // The array of prev indexes follows immediately after the content array.
602 prev_array_ = reinterpret_cast<index_t*> (
603 static_cast<char*> (queue_addr_)
604 + msgs
605 * ((msg_size_bytes + (sizeof (void*) - 1))
606 & ~(sizeof (void*) - 1)));
607 // The array of next indexes follows immediately the prev array.
608 next_array_ = reinterpret_cast<index_t*> (
609 reinterpret_cast<char*> (const_cast<index_t*> (prev_array_))
610 + msgs * sizeof (index_t));
611 // The array of priorities follows immediately the next array.
612 prio_array_ = reinterpret_cast<priority_t*> (
613 reinterpret_cast<char*> (const_cast<index_t*> (next_array_))
614 + msgs * sizeof (index_t));
615#pragma GCC diagnostic pop
616
617#if !defined(NDEBUG)
618#pragma GCC diagnostic push
619#if defined(__clang__)
620#pragma clang diagnostic ignored "-Wunsafe-buffer-usage"
621#elif defined(__GNUC__)
622#pragma GCC diagnostic ignored "-Wuseless-cast"
623#endif
624 char* p = reinterpret_cast<char*> (
625 reinterpret_cast<char*> (const_cast<priority_t*> (prio_array_))
626 + msgs * sizeof (priority_t));
627#pragma GCC diagnostic pop
628
629 assert (p - static_cast<char*> (queue_addr_)
630 <= static_cast<ptrdiff_t> (queue_size_bytes_));
631#endif
632
633 internal_init_ ();
634#endif
635 }
636
637 void
638 message_queue::internal_init_ (void)
639 {
640 count_ = 0;
641
642#if !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
643
644 // Construct a linked list of blocks. Store the pointer at
645 // the beginning of each block. Each block
646 // will hold the address of the next free block,
647 // or `nullptr` at the end.
648#pragma GCC diagnostic push
649#if defined(__clang__)
650#pragma clang diagnostic ignored "-Wunsafe-buffer-usage"
651#endif
652 char* p = static_cast<char*> (queue_addr_);
653 for (std::size_t i = 1; i < msgs_; ++i)
654 {
655 // Compute the address of the next block;
656 char* pn = p + msg_size_bytes_;
657
658 // Make this block point to the next one.
659 *(static_cast<void**> (static_cast<void*> (p))) = pn;
660 // Advance pointer
661 p = pn;
662 }
663#pragma GCC diagnostic pop
664
665 // Mark end of list.
666 *(static_cast<void**> (static_cast<void*> (p))) = nullptr;
667
668 first_free_ = queue_addr_; // Pointer to first block.
669
670 head_ = no_index;
671
672 // Need not be inside the critical section,
673 // the lists are protected by inner `resume_one()`.
674
675 // Wake-up all threads, if any.
676 send_list_.resume_all ();
677 receive_list_.resume_all ();
678
679#endif /* !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE) */
680 }
681
682#if !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
683
684 /*
685 * Internal function.
686 * Should be called from an interrupts critical section.
687 */
688 bool
689 message_queue::internal_try_send_ (const void* msg, std::size_t nbytes,
690 priority_t mprio)
691 {
692 if (first_free_ == nullptr)
693 {
694 // No available space to send the message.
695 return false;
696 }
697
698 // The first step is to remove the free block from the list,
699 // so another concurrent call will not get it too.
700
701 // Get the address where the message will be copied.
702 // This is the first free memory block.
703#pragma GCC diagnostic push
704#if defined(__clang__)
705#pragma clang diagnostic ignored "-Wunsafe-buffer-usage"
706#endif
707 char* dest = static_cast<char*> (first_free_);
708#pragma GCC diagnostic pop
709
710 // Update to next free, if any (the last one has nullptr).
711 first_free_ = *(static_cast<void**> (first_free_));
712
713 // The second step is to copy the message from the user buffer.
714 {
715 // ----- Enter uncritical section -------------------------------------
716 // interrupts::uncritical_section iucs;
717
718 // Copy message from user buffer to queue storage.
719 std::memcpy (dest, msg, nbytes);
720 if (nbytes < msg_size_bytes_)
721 {
722 // Fill in the remaining space with 0x00.
723#pragma GCC diagnostic push
724#if defined(__clang__)
725#pragma clang diagnostic ignored "-Wunsafe-buffer-usage"
726#endif
727 std::memset (dest + nbytes, 0x00, msg_size_bytes_ - nbytes);
728#pragma GCC diagnostic pop
729 }
730 // ----- Exit uncritical section --------------------------------------
731 }
732
733 // The third step is to link the buffer to the list.
734
735#pragma GCC diagnostic push
736#if defined(__clang__)
737#pragma clang diagnostic ignored "-Wunsafe-buffer-usage"
738#endif
739 // Using the address, compute the index in the array.
740 std::size_t msg_ix
741 = (static_cast<std::size_t> (dest - static_cast<char*> (queue_addr_))
742 / msg_size_bytes_);
743 prio_array_[msg_ix] = mprio;
744
745 if (head_ == no_index)
746 {
747 // No other message in the queue, enlist this one
748 // as head, with links to itself.
749 head_ = static_cast<index_t> (msg_ix);
750 prev_array_[msg_ix] = static_cast<index_t> (msg_ix);
751 next_array_[msg_ix] = static_cast<index_t> (msg_ix);
752 }
753 else
754 {
755 std::size_t ix;
756 // Arrange to insert between head and tail.
757 ix = prev_array_[head_];
758 // Check if the priority is higher than the head priority.
759 if (mprio > prio_array_[head_])
760 {
761 // Having the highest priority, the new message
762 // becomes the new head.
763 head_ = static_cast<index_t> (msg_ix);
764 }
765 else
766 {
767 // If not higher than the head, try to insert at the tail,
768 // but advance up until the same priority is found.
769 while ((mprio > prio_array_[ix]))
770 {
771 ix = prev_array_[ix];
772 }
773 }
774 prev_array_[msg_ix] = static_cast<index_t> (ix);
775 next_array_[msg_ix] = next_array_[ix];
776
777 // Break the chain and insert the new index.
778 std::size_t tmp_ix = next_array_[ix];
779 next_array_[ix] = static_cast<index_t> (msg_ix);
780 prev_array_[tmp_ix] = static_cast<index_t> (msg_ix);
781 }
782#pragma GCC diagnostic pop
783
784 // One more message added to the queue.
785 ++count_;
786
787 // Wake-up one thread, if any.
788 receive_list_.resume_one ();
789
790 return true;
791 }
792
793#endif /* !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE) */
794
795#if !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
796
797 /*
798 * Internal function.
799 * Should be called from an interrupts critical section.
800 */
801 bool
802 message_queue::internal_try_receive_ (void* msg, std::size_t nbytes,
803 priority_t* mprio)
804 {
805
806 if (head_ == no_index)
807 {
808 return false;
809 }
810
811 // Compute the message source address.
812#pragma GCC diagnostic push
813#if defined(__clang__)
814#pragma clang diagnostic ignored "-Wunsafe-buffer-usage"
815#endif
816 char* src = static_cast<char*> (queue_addr_) + head_ * msg_size_bytes_;
817 priority_t prio = prio_array_[head_];
818#pragma GCC diagnostic pop
819
820#if defined(OS_TRACE_RTOS_MQUEUE_)
821 trace::printf ("%s(%p,%u) @%p %s src %p %p\n", __func__, msg, nbytes,
822 this, name (), src, first_free_);
823#endif
824
825 // Unlink it from the list, so another concurrent call will
826 // not get it too.
827 if (count_ > 1)
828 {
829 // Remove the current element from the list.
830#pragma GCC diagnostic push
831#if defined(__clang__)
832#pragma clang diagnostic ignored "-Wunsafe-buffer-usage"
833#endif
834 prev_array_[next_array_[head_]] = prev_array_[head_];
835 next_array_[prev_array_[head_]] = next_array_[head_];
836
837 // Next becomes the new head.
838 head_ = next_array_[head_];
839#pragma GCC diagnostic pop
840 }
841 else
842 {
843 // If there was only one, the list is empty now.
844 head_ = no_index;
845 }
846
847 --count_;
848
849 // Copy to destination
850 {
851 // ----- Enter uncritical section -------------------------------------
852 interrupts::uncritical_section iucs;
853
854 // Copy message from queue to user buffer.
855 memcpy (msg, src, nbytes);
856 if (mprio != nullptr)
857 {
858 *mprio = prio;
859 }
860 // ----- Exit uncritical section --------------------------------------
861 }
862
863 // After the message was copied, the block can be released.
864
865 // Perform a push_front() on the single linked LIFO list,
866 // i.e. add the block to the beginning of the list.
867
868 // Link previous list to this block; may be null, but it does
869 // not matter.
870 *(static_cast<void**> (static_cast<void*> (src))) = first_free_;
871
872 // Now this block is the first one.
873 first_free_ = src;
874
875 // Wake-up one thread, if any.
876 send_list_.resume_one ();
877
878 return true;
879 }
880
881#endif /* !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE) */
882
929 message_queue::send (const void* msg, std::size_t nbytes, priority_t mprio)
930 {
931#if defined(OS_TRACE_RTOS_MQUEUE)
932 trace::printf ("%s(%p,%d,%d) @%p %s\n", __func__, msg, nbytes, mprio,
933 this, name ());
934#endif
935
936 // Don't call this from interrupt handlers.
938 // Don't call this from critical regions.
939 os_assert_err (!scheduler::locked (), EPERM);
940
941 os_assert_err (msg != nullptr, EINVAL);
942 os_assert_err (nbytes <= msg_size_bytes_, EMSGSIZE);
943
944#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
945
946 return port::message_queue::send (this, msg, nbytes, mprio);
947
948#else
949
950 {
951 // ----- Enter critical section ---------------------------------------
953
954 if (internal_try_send_ (msg, nbytes, mprio))
955 {
956 return result::ok;
957 }
958 // ----- Exit critical section ----------------------------------------
959 }
960
961 thread& crt_thread = this_thread::thread ();
962
963 // Prepare a list node pointing to the current thread.
964 // Do not worry for being on stack, it is temporarily linked to the
965 // list and guaranteed to be removed before this function returns.
966 internal::waiting_thread_node node{ crt_thread };
967
968 for (;;)
969 {
970 {
971 // ----- Enter critical section -----------------------------------
973
974 if (internal_try_send_ (msg, nbytes, mprio))
975 {
976 return result::ok;
977 }
978
979 // Add this thread to the message queue send waiting list.
980 scheduler::internal_link_node (send_list_, node);
981 // state::suspended set in above link().
982 // ----- Exit critical section ------------------------------------
983 }
984
986
987 // Remove the thread from the message queue send waiting list,
988 // if not already removed by receive().
989 scheduler::internal_unlink_node (node);
990
991 if (crt_thread.interrupted ())
992 {
993#if defined(OS_TRACE_RTOS_MQUEUE)
994 trace::printf ("%s(%p,%d,%d) EINTR @%p %s\n", __func__, msg,
995 nbytes, mprio, this, name ());
996#endif
997 return EINTR;
998 }
999 }
1000
1001 /* NOTREACHED */
1002 return ENOTRECOVERABLE;
1003
1004#endif
1005 }
1006
1041 result_t
1042 message_queue::try_send (const void* msg, std::size_t nbytes,
1043 priority_t mprio)
1044 {
1045#if defined(OS_TRACE_RTOS_MQUEUE)
1046 trace::printf ("%s(%p,%u,%u) @%p %s\n", __func__, msg, nbytes, mprio,
1047 this, name ());
1048#endif
1049
1050 os_assert_err (msg != nullptr, EINVAL);
1051 os_assert_err (nbytes <= msg_size_bytes_, EMSGSIZE);
1052
1053#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
1054
1055 return port::message_queue::try_send (this, msg, nbytes, mprio);
1056
1057#else
1058 // Don't call this from high priority interrupts.
1059 assert (port::interrupts::is_priority_valid ());
1060
1061 {
1062 // ----- Enter critical section ---------------------------------------
1064
1065 if (internal_try_send_ (msg, nbytes, mprio))
1066 {
1067 return result::ok;
1068 }
1069 else
1070 {
1071 return EWOULDBLOCK;
1072 }
1073 // ----- Exit critical section ----------------------------------------
1074 }
1075
1076#endif
1077 }
1078
1128 result_t
1129 message_queue::timed_send (const void* msg, std::size_t nbytes,
1130 clock::duration_t timeout, priority_t mprio)
1131 {
1132#if defined(OS_TRACE_RTOS_MQUEUE)
1133 trace::printf ("%s(%p,%u,%u,%u) @%p %s\n", __func__, msg, nbytes, mprio,
1134 timeout, this, name ());
1135#endif
1136
1137 // Don't call this from interrupt handlers.
1139 // Don't call this from critical regions.
1140 os_assert_err (!scheduler::locked (), EPERM);
1141
1142 os_assert_err (msg != nullptr, EINVAL);
1143 os_assert_err (nbytes <= msg_size_bytes_, EMSGSIZE);
1144
1145#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
1146
1147 return port::message_queue::timed_send (this, msg, nbytes, timeout,
1148 mprio);
1149
1150#else
1151
1152 // Extra test before entering the loop, with its inherent weight.
1153 // Trade size for speed.
1154 {
1155 // ----- Enter critical section ---------------------------------------
1157
1158 if (internal_try_send_ (msg, nbytes, mprio))
1159 {
1160 return result::ok;
1161 }
1162 // ----- Exit critical section ----------------------------------------
1163 }
1164
1165 thread& crt_thread = this_thread::thread ();
1166
1167 // Prepare a list node pointing to the current thread.
1168 // Do not worry for being on stack, it is temporarily linked to the
1169 // list and guaranteed to be removed before this function returns.
1170 internal::waiting_thread_node node{ crt_thread };
1171
1172 internal::clock_timestamps_list& clock_list = clock_->steady_list ();
1173
1174 clock::timestamp_t timeout_timestamp = clock_->steady_now () + timeout;
1175
1176 // Prepare a timeout node pointing to the current thread.
1177 internal::timeout_thread_node timeout_node{ timeout_timestamp,
1178 crt_thread };
1179
1180 for (;;)
1181 {
1182 {
1183 // ----- Enter critical section -----------------------------------
1185
1186 if (internal_try_send_ (msg, nbytes, mprio))
1187 {
1188 return result::ok;
1189 }
1190
1191 // Add this thread to the semaphore waiting list,
1192 // and the clock timeout list.
1193 scheduler::internal_link_node (send_list_, node, clock_list,
1194 timeout_node);
1195 // state::suspended set in above link().
1196 // ----- Exit critical section ------------------------------------
1197 }
1198
1200
1201 // Remove the thread from the message queue send waiting list,
1202 // if not already removed by receive() and from the clock timeout
1203 // list, if not already removed by the timer.
1204 scheduler::internal_unlink_node (node, timeout_node);
1205
1206 if (crt_thread.interrupted ())
1207 {
1208#if defined(OS_TRACE_RTOS_MQUEUE)
1209 trace::printf ("%s(%p,%u,%u,%u) EINTR @%p %s\n", __func__, msg,
1210 nbytes, mprio, timeout, this, name ());
1211#endif
1212 return EINTR;
1213 }
1214
1215 if (clock_->steady_now () >= timeout_timestamp)
1216 {
1217#if defined(OS_TRACE_RTOS_MQUEUE)
1218 trace::printf ("%s(%p,%u,%u,%u) ETIMEDOUT @%p %s\n", __func__,
1219 msg, nbytes, mprio, timeout, this, name ());
1220#endif
1221 return ETIMEDOUT;
1222 }
1223 }
1224
1225 /* NOTREACHED */
1226 return ENOTRECOVERABLE;
1227
1228#endif
1229 }
1230
1268 result_t
1269 message_queue::receive (void* msg, std::size_t nbytes, priority_t* mprio)
1270 {
1271#if defined(OS_TRACE_RTOS_MQUEUE)
1272 trace::printf ("%s(%p,%u) @%p %s\n", __func__, msg, nbytes, this,
1273 name ());
1274#endif
1275
1276 // Don't call this from interrupt handlers.
1278 // Don't call this from critical regions.
1279 os_assert_err (!scheduler::locked (), EPERM);
1280
1281 os_assert_err (msg != nullptr, EINVAL);
1282 os_assert_err (nbytes <= msg_size_bytes_, EMSGSIZE);
1283
1284#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
1285
1286 return port::message_queue::receive (this, msg, nbytes, mprio);
1287
1288#else
1289
1290 // Extra test before entering the loop, with its inherent weight.
1291 // Trade size for speed.
1292 {
1293 // ----- Enter critical section ---------------------------------------
1295
1296 if (internal_try_receive_ (msg, nbytes, mprio))
1297 {
1298 return result::ok;
1299 }
1300 // ----- Exit critical section ----------------------------------------
1301 }
1302
1303 thread& crt_thread = this_thread::thread ();
1304
1305 // Prepare a list node pointing to the current thread.
1306 // Do not worry for being on stack, it is temporarily linked to the
1307 // list and guaranteed to be removed before this function returns.
1308 internal::waiting_thread_node node{ crt_thread };
1309
1310 for (;;)
1311 {
1312 {
1313 // ----- Enter critical section -----------------------------------
1315
1316 if (internal_try_receive_ (msg, nbytes, mprio))
1317 {
1318 return result::ok;
1319 }
1320
1321 // Add this thread to the message queue receive waiting list.
1322 scheduler::internal_link_node (receive_list_, node);
1323 // state::suspended set in above link().
1324 // ----- Exit critical section ------------------------------------
1325 }
1326
1328
1329 // Remove the thread from the message queue receive waiting list,
1330 // if not already removed by send().
1331 scheduler::internal_unlink_node (node);
1332
1333 if (crt_thread.interrupted ())
1334 {
1335#if defined(OS_TRACE_RTOS_MQUEUE)
1336 trace::printf ("%s(%p,%u) EINTR @%p %s\n", __func__, msg, nbytes,
1337 this, name ());
1338#endif
1339 return EINTR;
1340 }
1341 }
1342
1343 /* NOTREACHED */
1344 return ENOTRECOVERABLE;
1345
1346#endif
1347 }
1348
1382 result_t
1383 message_queue::try_receive (void* msg, std::size_t nbytes,
1384 priority_t* mprio)
1385 {
1386#if defined(OS_TRACE_RTOS_MQUEUE)
1387 trace::printf ("%s(%p,%u) @%p %s\n", __func__, msg, nbytes, this,
1388 name ());
1389#endif
1390
1391 os_assert_err (msg != nullptr, EINVAL);
1392 os_assert_err (nbytes <= msg_size_bytes_, EMSGSIZE);
1393
1394#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
1395
1396 return port::message_queue::try_receive (this, msg, nbytes, mprio);
1397
1398#else
1399
1400 // Don't call this from high priority interrupts.
1401 assert (port::interrupts::is_priority_valid ());
1402
1403 {
1404 // ----- Enter critical section ---------------------------------------
1406
1407 if (internal_try_receive_ (msg, nbytes, mprio))
1408 {
1409 return result::ok;
1410 }
1411 else
1412 {
1413 return EWOULDBLOCK;
1414 }
1415 // ----- Exit critical section ----------------------------------------
1416 }
1417
1418#endif
1419 }
1420
1483 result_t
1484 message_queue::timed_receive (void* msg, std::size_t nbytes,
1485 clock::duration_t timeout, priority_t* mprio)
1486 {
1487#if defined(OS_TRACE_RTOS_MQUEUE)
1488 trace::printf ("%s(%p,%u,%u) @%p %s\n", __func__, msg, nbytes, timeout,
1489 this, name ());
1490#endif
1491
1492 // Don't call this from interrupt handlers.
1494 // Don't call this from critical regions.
1495 os_assert_err (!scheduler::locked (), EPERM);
1496
1497 os_assert_err (msg != nullptr, EINVAL);
1498 os_assert_err (nbytes <= msg_size_bytes_, EMSGSIZE);
1499
1500#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
1501
1502 return port::message_queue::timed_receive (this, msg, nbytes, timeout,
1503 mprio);
1504
1505#else
1506
1507 // Extra test before entering the loop, with its inherent weight.
1508 // Trade size for speed.
1509 {
1510 // ----- Enter critical section ---------------------------------------
1512
1513 if (internal_try_receive_ (msg, nbytes, mprio))
1514 {
1515 return result::ok;
1516 }
1517 // ----- Exit critical section ----------------------------------------
1518 }
1519
1520 thread& crt_thread = this_thread::thread ();
1521
1522 // Prepare a list node pointing to the current thread.
1523 // Do not worry for being on stack, it is temporarily linked to the
1524 // list and guaranteed to be removed before this function returns.
1525 internal::waiting_thread_node node{ crt_thread };
1526
1527 internal::clock_timestamps_list& clock_list = clock_->steady_list ();
1528 clock::timestamp_t timeout_timestamp = clock_->steady_now () + timeout;
1529
1530 // Prepare a timeout node pointing to the current thread.
1531 internal::timeout_thread_node timeout_node{ timeout_timestamp,
1532 crt_thread };
1533
1534 for (;;)
1535 {
1536 {
1537 // ----- Enter critical section -----------------------------------
1539
1540 if (internal_try_receive_ (msg, nbytes, mprio))
1541 {
1542 return result::ok;
1543 }
1544
1545 // Add this thread to the message queue receive waiting list,
1546 // and the clock timeout list.
1547 scheduler::internal_link_node (receive_list_, node, clock_list,
1548 timeout_node);
1549 // state::suspended set in above link().
1550 // ----- Exit critical section ------------------------------------
1551 }
1552
1554
1555 // Remove the thread from the semaphore waiting list,
1556 // if not already removed by send()and from the clock
1557 // timeout list, if not already removed by the timer.
1558 scheduler::internal_unlink_node (node, timeout_node);
1559
1560 if (crt_thread.interrupted ())
1561 {
1562#if defined(OS_TRACE_RTOS_MQUEUE)
1563 trace::printf ("%s(%p,%u,%u) EINTR @%p %s\n", __func__, msg,
1564 nbytes, timeout, this, name ());
1565#endif
1566 return EINTR;
1567 }
1568
1569 if (clock_->steady_now () >= timeout_timestamp)
1570 {
1571#if defined(OS_TRACE_RTOS_MQUEUE)
1572 trace::printf ("%s(%p,%u,%u) ETIMEDOUT @%p %s\n", __func__, msg,
1573 nbytes, timeout, this, name ());
1574#endif
1575 return ETIMEDOUT;
1576 }
1577 }
1578
1579 /* NOTREACHED */
1580 return ENOTRECOVERABLE;
1581
1582#endif
1583 }
1584
1595 result_t
1597 {
1598#if defined(OS_TRACE_RTOS_MQUEUE)
1599 trace::printf ("%s() @%p %s\n", __func__, this, name ());
1600#endif
1601
1602 // Don't call this from interrupt handlers.
1604
1605#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
1606
1607 return port::message_queue::reset (this);
1608
1609#else
1610
1611 {
1612 // ----- Enter critical section ---------------------------------------
1614
1615 internal_init_ ();
1616 return result::ok;
1617 // ----- Exit critical section ----------------------------------------
1618 }
1619
1620#endif
1621 }
1622
1623 // ------------------------------------------------------------------------
1624
1625 } /* namespace rtos */
1626} /* namespace os */
1627
1628// ----------------------------------------------------------------------------
Ordered list of time stamp nodes.
Definition os-lists.h:666
const char * name(void) const
Get object name.
Definition os-decls.h:753
Double linked list node, with time stamp and thread.
Definition os-lists.h:220
Double linked list node, with thread reference.
Definition os-lists.h:59
Interrupts critical section RAII helper.
Definition os-sched.h:502
Standard allocator based on the RTOS system default memory manager.
Definition os-memory.h:538
void deallocate(value_type *addr, std::size_t elements) noexcept
Deallocate the number of memory blocks of type value_type.
T value_type
Type of elements to be allocated.
Definition os-memory.h:543
Message queue attributes.
Definition os-mqueue.h:154
void * mq_queue_address
Address of the user defined storage for the message queue.
Definition os-mqueue.h:198
POSIX compliant message queue, using the default RTOS allocator.
Definition os-mqueue.h:67
result_t receive(void *msg, std::size_t nbytes, priority_t *mprio=nullptr)
Receive a message from the queue.
result_t reset(void)
Reset the message queue.
result_t try_send(const void *msg, std::size_t nbytes, priority_t mprio=default_priority)
Try to send a message to the queue.
result_t try_receive(void *msg, std::size_t nbytes, priority_t *mprio=nullptr)
Try to receive a message from the queue.
result_t send(const void *msg, std::size_t nbytes, priority_t mprio=default_priority)
Send a message to the queue.
virtual ~message_queue()
Destruct the message queue object instance.
result_t timed_receive(void *msg, std::size_t nbytes, clock::duration_t timeout, priority_t *mprio=nullptr)
Receive a message from the queue with timeout.
result_t timed_send(const void *msg, std::size_t nbytes, clock::duration_t timeout, priority_t mprio=default_priority)
Send a message to the queue with timeout.
constexpr std::size_t compute_allocated_size_bytes(std::size_t msgs, std::size_t msg_size_bytes)
Calculator for queue storage requirements.
Definition os-mqueue.h:253
message_queue(std::size_t msgs, std::size_t msg_size_bytes, const attributes &attr=initializer, const allocator_type &allocator=allocator_type())
Construct a message queue object instance.
POSIX compliant thread, using the default RTOS allocator.
Definition os-thread.h:251
bool interrupted(void)
Check if interrupted.
Definition os-thread.h:2373
int printf(const char *format,...)
Write a formatted string to the trace device.
Definition trace.cpp:59
port::clock::duration_t duration_t
Type of variables holding clock durations.
Definition os-clocks.h:78
port::clock::timestamp_t timestamp_t
Type of variables holding clock time stamps.
Definition os-clocks.h:88
clock_systick sysclock
The system clock object instance.
uint8_t priority_t
Type of message priority storage.
Definition os-mqueue.h:125
message_queue::size_t index_t
Type of list index storage.
Definition os-mqueue.h:108
static const attributes initializer
Default message queue initialiser.
Definition os-mqueue.h:217
uint8_t size_t
Type of a queue size storage.
Definition os-mqueue.h:83
uint16_t msg_size_t
Type of message size storage.
Definition os-mqueue.h:96
static constexpr index_t no_index
Index value to represent an illegal index.
Definition os-mqueue.h:114
bool in_handler_mode(void)
Check if the CPU is in handler mode.
Definition os-sched.h:1101
@ ok
Function completed; no errors or events occurred.
Definition os-decls.h:179
bool locked(void)
Check if the scheduler is locked.
Definition os-sched.h:858
thread & thread(void)
Get the current running thread.
uint32_t result_t
Type of values returned by RTOS functions.
Definition os-decls.h:95
System namespace.
#define os_assert_throw(__e, __er)
Assert or throw a system error exception.
Definition os-decls.h:1122
#define os_assert_err(__e, __er)
Assert or return an error.
Definition os-decls.h:1101
Single file µOS++ RTOS definitions.