µOS++ IIIe Reference 7.0.0
The third edition of µOS++, a POSIX inspired open source framework, written in C++
Loading...
Searching...
No Matches
os-mqueue.cpp
Go to the documentation of this file.
1/*
2 * This file is part of the µOS++ distribution.
3 * (https://github.com/micro-os-plus)
4 * Copyright (c) 2016 Liviu Ionescu.
5 *
6 * Permission is hereby granted, free of charge, to any person
7 * obtaining a copy of this software and associated documentation
8 * files (the "Software"), to deal in the Software without
9 * restriction, including without limitation the rights to use,
10 * copy, modify, merge, publish, distribute, sublicense, and/or
11 * sell copies of the Software, and to permit persons to whom
12 * the Software is furnished to do so, subject to the following
13 * conditions:
14 *
15 * The above copyright notice and this permission notice shall be
16 * included in all copies or substantial portions of the Software.
17 *
18 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
19 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
20 * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
21 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
22 * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
23 * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
24 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
25 * OTHER DEALINGS IN THE SOFTWARE.
26 */
27
28#include <cmsis-plus/rtos/os.h>
29
30// ----------------------------------------------------------------------------
31
32#if defined(__clang__)
33#pragma clang diagnostic ignored "-Wc++98-compat"
34#endif
35
36// ----------------------------------------------------------------------------
37
38namespace os
39{
40 namespace rtos
41 {
42 // ------------------------------------------------------------------------
43
89 const message_queue::attributes message_queue::initializer;
90
91 // ------------------------------------------------------------------------
92
285 // ------------------------------------------------------------------------
339 // ------------------------------------------------------------------------
344 // Protected internal constructor.
346 {
347#if defined(OS_TRACE_RTOS_MQUEUE)
348 trace::printf ("%s() @%p %s\n", __func__, this, this->name ());
349#endif
350 }
351
352 message_queue::message_queue (const char* name) :
353 object_named_system
354 { name }
355 {
356#if defined(OS_TRACE_RTOS_MQUEUE)
357 trace::printf ("%s() @%p %s\n", __func__, this, this->name ());
358#endif
359 }
360
394 message_queue::message_queue (std::size_t msgs, std::size_t msg_size_bytes,
395 const attributes& attr,
396 const allocator_type& allocator) :
398 { nullptr, msgs, msg_size_bytes, attr, allocator }
399 {
400 ;
401 }
402
432 message_queue::message_queue (const char* name, std::size_t msgs,
433 std::size_t msg_size_bytes,
434 const attributes& attr,
435 const allocator_type& allocator) :
436 object_named_system
437 { name }
438 {
439#if defined(OS_TRACE_RTOS_MQUEUE)
440 trace::printf ("%s() @%p %s %u %u\n", __func__, this, this->name (), msgs,
441 msg_size_bytes);
442#endif
443
444 if (attr.mq_queue_address != nullptr)
445 {
446 // Do not use any allocator at all.
447 internal_construct_ (msgs, msg_size_bytes, attr, nullptr, 0);
448 }
449 else
450 {
451 allocator_ = &allocator;
452
453 // If no user storage was provided via attributes,
454 // allocate it dynamically via the allocator.
455 allocated_queue_size_elements_ = (compute_allocated_size_bytes<
456 typename allocator_type::value_type> (msgs, msg_size_bytes)
457 + sizeof(typename allocator_type::value_type) - 1)
458 / sizeof(typename allocator_type::value_type);
459
460 allocated_queue_addr_ =
461 const_cast<allocator_type&> (allocator).allocate (
462 allocated_queue_size_elements_);
463
464 internal_construct_ (
465 msgs,
466 msg_size_bytes,
467 attr,
468 allocated_queue_addr_,
469 allocated_queue_size_elements_
470 * sizeof(typename allocator_type::value_type));
471 }
472 }
473
489 {
490#if defined(OS_TRACE_RTOS_MQUEUE)
491 trace::printf ("%s() @%p %s\n", __func__, this, name ());
492#endif
493
494#if !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
495
496 // There must be no threads waiting for this queue.
497 assert(send_list_.empty ());
498 assert(receive_list_.empty ());
499
500#endif
501
502 if (allocated_queue_addr_ != nullptr)
503 {
504 typedef typename std::allocator_traits<allocator_type>::pointer pointer;
505
506 static_cast<allocator_type*> (const_cast<void*> (allocator_))->deallocate (
507 reinterpret_cast<pointer> (allocated_queue_addr_),
508 allocated_queue_size_elements_);
509 }
510
511#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
512
513 port::message_queue::destroy (this);
514
515#endif
516 }
517
522 void
523 message_queue::internal_construct_ (std::size_t msgs,
524 std::size_t msg_size_bytes,
525 const attributes& attr,
526 void* queue_address,
527 std::size_t queue_size_bytes)
528 {
529 // Don't call this from interrupt handlers.
531
532#if !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
533 clock_ = attr.clock != nullptr ? attr.clock : &sysclock;
534#endif
535 msg_size_bytes_ = static_cast<message_queue::msg_size_t> (msg_size_bytes);
536 assert(msg_size_bytes_ == msg_size_bytes);
537 assert(msg_size_bytes_ > 0);
538
539 // in order for the list of free messages to not consume additional memory,
540 // the pointers are stored at the beginning of the messages, thus messages should be large enough to fit a pointer
541 assert(msg_size_bytes_ >= sizeof(void*));
542
543 msgs_ = static_cast<message_queue::size_t> (msgs);
544 assert(msgs_ == msgs);
545 assert(msgs > 0);
546
547 // If the storage is given explicitly, override attributes.
548 if (queue_address != nullptr)
549 {
550 // The attributes should not define any storage in this case.
551 assert(attr.mq_queue_address == nullptr);
552
553 queue_addr_ = queue_address;
554 queue_size_bytes_ = queue_size_bytes;
555 }
556 else
557 {
558 queue_addr_ = attr.mq_queue_address;
559 queue_size_bytes_ = attr.mq_queue_size_bytes;
560 }
561
562#if defined(OS_TRACE_RTOS_MQUEUE)
563 trace::printf ("%s() @%p %s %u %u %p %u\n", __func__, this, name (),
564 msgs_, msg_size_bytes_, queue_addr_, queue_size_bytes_);
565#endif
566
567#if !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
568 std::size_t storage_size = compute_allocated_size_bytes<void*> (
569 msgs, msg_size_bytes);
570#endif
571 if (queue_addr_ != nullptr)
572 {
573 // The queue must be real, and have a non zero size.
574 os_assert_throw(queue_size_bytes_ > 0, EINVAL);
575#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
577 queue_size_bytes_ >= (std::size_t) (msgs * msg_size_bytes),
578 EINVAL);
579#else
580 // The queue must fit the storage.
581 os_assert_throw(queue_size_bytes_ >= storage_size, EINVAL);
582#endif
583 }
584
585#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
586
587 count_ = 0;
588 port::message_queue::create (this);
589
590#else
591
592 head_ = no_index;
593
594 // The queue storage must have a real address.
595 os_assert_throw(queue_addr_ != nullptr, ENOMEM);
596
597 // The array of prev indexes follows immediately after the content array.
598 prev_array_ =
599 reinterpret_cast<index_t*> (static_cast<char*> (queue_addr_)
600 + msgs
601 * ((msg_size_bytes + (sizeof(void*) - 1))
602 & ~(sizeof(void*) - 1)));
603 // The array of next indexes follows immediately the prev array.
604 next_array_ =
605 reinterpret_cast<index_t*> (reinterpret_cast<char*> (const_cast<index_t*> (prev_array_))
606 + msgs * sizeof(index_t));
607 // The array of priorities follows immediately the next array.
608 prio_array_ =
609 reinterpret_cast<priority_t*> (reinterpret_cast<char*> (const_cast<index_t*> (next_array_))
610 + msgs * sizeof(index_t));
611
612#if !defined(NDEBUG)
613 char* p =
614 reinterpret_cast<char*> (reinterpret_cast<char*> (const_cast<priority_t*> (prio_array_))
615 + msgs * sizeof(priority_t));
616
617 assert(
618 p - static_cast<char*> (queue_addr_)
619 <= static_cast<ptrdiff_t> (queue_size_bytes_));
620#endif
621
622 internal_init_ ();
623#endif
624 }
625
626 void
627 message_queue::internal_init_ (void)
628 {
629 count_ = 0;
630
631#if !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
632
633 // Construct a linked list of blocks. Store the pointer at
634 // the beginning of each block. Each block
635 // will hold the address of the next free block,
636 // or `nullptr` at the end.
637 char* p = static_cast<char*> (queue_addr_);
638 for (std::size_t i = 1; i < msgs_; ++i)
639 {
640 // Compute the address of the next block;
641 char* pn = p + msg_size_bytes_;
642
643 // Make this block point to the next one.
644 *(static_cast<void**> (static_cast<void*> (p))) = pn;
645 // Advance pointer
646 p = pn;
647 }
648
649 // Mark end of list.
650 *(static_cast<void**> (static_cast<void*> (p))) = nullptr;
651
652 first_free_ = queue_addr_; // Pointer to first block.
653
654 head_ = no_index;
655
656 // Need not be inside the critical section,
657 // the lists are protected by inner `resume_one()`.
658
659 // Wake-up all threads, if any.
660 send_list_.resume_all ();
661 receive_list_.resume_all ();
662
663#endif /* !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE) */
664
665 }
666
667#if !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
668
669 /*
670 * Internal function.
671 * Should be called from an interrupts critical section.
672 */
673 bool
674 message_queue::internal_try_send_ (const void* msg, std::size_t nbytes,
675 priority_t mprio)
676 {
677 if (first_free_ == nullptr)
678 {
679 // No available space to send the message.
680 return false;
681 }
682
683 // The first step is to remove the free block from the list,
684 // so another concurrent call will not get it too.
685
686 // Get the address where the message will be copied.
687 // This is the first free memory block.
688 char* dest = static_cast<char*> (first_free_);
689
690 // Update to next free, if any (the last one has nullptr).
691 first_free_ = *(static_cast<void**> (first_free_));
692
693 // The second step is to copy the message from the user buffer.
694 {
695 // ----- Enter uncritical section -----------------------------------
696 // interrupts::uncritical_section iucs;
697
698 // Copy message from user buffer to queue storage.
699 std::memcpy (dest, msg, nbytes);
700 if (nbytes < msg_size_bytes_)
701 {
702 // Fill in the remaining space with 0x00.
703 std::memset (dest + nbytes, 0x00, msg_size_bytes_ - nbytes);
704 }
705 // ----- Exit uncritical section ------------------------------------
706 }
707
708 // The third step is to link the buffer to the list.
709
710 // Using the address, compute the index in the array.
711 std::size_t msg_ix = (static_cast<std::size_t> (dest
712 - static_cast<char*> (queue_addr_)) / msg_size_bytes_);
713 prio_array_[msg_ix] = mprio;
714
715 if (head_ == no_index)
716 {
717 // No other message in the queue, enlist this one
718 // as head, with links to itself.
719 head_ = static_cast<index_t> (msg_ix);
720 prev_array_[msg_ix] = static_cast<index_t> (msg_ix);
721 next_array_[msg_ix] = static_cast<index_t> (msg_ix);
722 }
723 else
724 {
725 std::size_t ix;
726 // Arrange to insert between head and tail.
727 ix = prev_array_[head_];
728 // Check if the priority is higher than the head priority.
729 if (mprio > prio_array_[head_])
730 {
731 // Having the highest priority, the new message
732 // becomes the new head.
733 head_ = static_cast<index_t> (msg_ix);
734 }
735 else
736 {
737 // If not higher than the head, try to insert at the tail,
738 // but advance up until the same priority is found.
739 while ((mprio > prio_array_[ix]))
740 {
741 ix = prev_array_[ix];
742 }
743 }
744 prev_array_[msg_ix] = static_cast<index_t> (ix);
745 next_array_[msg_ix] = next_array_[ix];
746
747 // Break the chain and insert the new index.
748 std::size_t tmp_ix = next_array_[ix];
749 next_array_[ix] = static_cast<index_t> (msg_ix);
750 prev_array_[tmp_ix] = static_cast<index_t> (msg_ix);
751 }
752
753 // One more message added to the queue.
754 ++count_;
755
756 // Wake-up one thread, if any.
757 receive_list_.resume_one ();
758
759 return true;
760 }
761
762#endif /* !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE) */
763
764#if !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
765
766 /*
767 * Internal function.
768 * Should be called from an interrupts critical section.
769 */
770 bool
771 message_queue::internal_try_receive_ (void* msg, std::size_t nbytes,
772 priority_t* mprio)
773 {
774
775 if (head_ == no_index)
776 {
777 return false;
778 }
779
780 // Compute the message source address.
781 char* src = static_cast<char*> (queue_addr_) + head_ * msg_size_bytes_;
782 priority_t prio = prio_array_[head_];
783
784#if defined(OS_TRACE_RTOS_MQUEUE_)
785 trace::printf ("%s(%p,%u) @%p %s src %p %p\n", __func__, msg, nbytes,
786 this, name (), src, first_free_);
787#endif
788
789 // Unlink it from the list, so another concurrent call will
790 // not get it too.
791 if (count_ > 1)
792 {
793 // Remove the current element from the list.
794 prev_array_[next_array_[head_]] = prev_array_[head_];
795 next_array_[prev_array_[head_]] = next_array_[head_];
796
797 // Next becomes the new head.
798 head_ = next_array_[head_];
799 }
800 else
801 {
802 // If there was only one, the list is empty now.
803 head_ = no_index;
804 }
805
806 --count_;
807
808 // Copy to destination
809 {
810 // ----- Enter uncritical section -----------------------------------
811 interrupts::uncritical_section iucs;
812
813 // Copy message from queue to user buffer.
814 memcpy (msg, src, nbytes);
815 if (mprio != nullptr)
816 {
817 *mprio = prio;
818 }
819 // ----- Exit uncritical section ------------------------------------
820 }
821
822 // After the message was copied, the block can be released.
823
824 // Perform a push_front() on the single linked LIFO list,
825 // i.e. add the block to the beginning of the list.
826
827 // Link previous list to this block; may be null, but it does
828 // not matter.
829 *(static_cast<void**> (static_cast<void*> (src))) = first_free_;
830
831 // Now this block is the first one.
832 first_free_ = src;
833
834 // Wake-up one thread, if any.
835 send_list_.resume_one ();
836
837 return true;
838 }
839
840#endif /* !defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE) */
841
885 message_queue::send (const void* msg, std::size_t nbytes, priority_t mprio)
886 {
887#if defined(OS_TRACE_RTOS_MQUEUE)
888 trace::printf ("%s(%p,%d,%d) @%p %s\n", __func__, msg, nbytes, mprio,
889 this, name ());
890#endif
891
892 // Don't call this from interrupt handlers.
894 // Don't call this from critical regions.
896
897 os_assert_err(msg != nullptr, EINVAL);
898 os_assert_err(nbytes <= msg_size_bytes_, EMSGSIZE);
899
900#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
901
902 return port::message_queue::send (this, msg, nbytes, mprio);
903
904#else
905
906 {
907 // ----- Enter critical section -------------------------------------
909
910 if (internal_try_send_ (msg, nbytes, mprio))
911 {
912 return result::ok;
913 }
914 // ----- Exit critical section --------------------------------------
915 }
916
917 thread& crt_thread = this_thread::thread ();
918
919 // Prepare a list node pointing to the current thread.
920 // Do not worry for being on stack, it is temporarily linked to the
921 // list and guaranteed to be removed before this function returns.
923 { crt_thread };
924
925 for (;;)
926 {
927 {
928 // ----- Enter critical section ---------------------------------
930
931 if (internal_try_send_ (msg, nbytes, mprio))
932 {
933 return result::ok;
934 }
935
936 // Add this thread to the message queue send waiting list.
937 scheduler::internal_link_node (send_list_, node);
938 // state::suspended set in above link().
939 // ----- Exit critical section ----------------------------------
940 }
941
943
944 // Remove the thread from the message queue send waiting list,
945 // if not already removed by receive().
946 scheduler::internal_unlink_node (node);
947
948 if (crt_thread.interrupted ())
949 {
950#if defined(OS_TRACE_RTOS_MQUEUE)
951 trace::printf ("%s(%p,%d,%d) EINTR @%p %s\n", __func__, msg,
952 nbytes, mprio, this, name ());
953#endif
954 return EINTR;
955 }
956 }
957
958 /* NOTREACHED */
959 return ENOTRECOVERABLE;
960
961#endif
962 }
963
996 message_queue::try_send (const void* msg, std::size_t nbytes,
997 priority_t mprio)
998 {
999#if defined(OS_TRACE_RTOS_MQUEUE)
1000 trace::printf ("%s(%p,%u,%u) @%p %s\n", __func__, msg, nbytes, mprio,
1001 this, name ());
1002#endif
1003
1004 os_assert_err(msg != nullptr, EINVAL);
1005 os_assert_err(nbytes <= msg_size_bytes_, EMSGSIZE);
1006
1007#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
1008
1009 return port::message_queue::try_send (this, msg, nbytes, mprio);
1010
1011#else
1012 // Don't call this from high priority interrupts.
1013 assert(port::interrupts::is_priority_valid ());
1014
1015 {
1016 // ----- Enter critical section -------------------------------------
1018
1019 if (internal_try_send_ (msg, nbytes, mprio))
1020 {
1021 return result::ok;
1022 }
1023 else
1024 {
1025 return EWOULDBLOCK;
1026 }
1027 // ----- Exit critical section --------------------------------------
1028 }
1029
1030#endif
1031 }
1032
1079 result_t
1080 message_queue::timed_send (const void* msg, std::size_t nbytes,
1081 clock::duration_t timeout, priority_t mprio)
1082 {
1083#if defined(OS_TRACE_RTOS_MQUEUE)
1084 trace::printf ("%s(%p,%u,%u,%u) @%p %s\n", __func__, msg, nbytes, mprio,
1085 timeout, this, name ());
1086#endif
1087
1088 // Don't call this from interrupt handlers.
1090 // Don't call this from critical regions.
1091 os_assert_err(!scheduler::locked (), EPERM);
1092
1093 os_assert_err(msg != nullptr, EINVAL);
1094 os_assert_err(nbytes <= msg_size_bytes_, EMSGSIZE);
1095
1096#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
1097
1098 return port::message_queue::timed_send (this, msg, nbytes, timeout, mprio);
1099
1100#else
1101
1102 // Extra test before entering the loop, with its inherent weight.
1103 // Trade size for speed.
1104 {
1105 // ----- Enter critical section -------------------------------------
1107
1108 if (internal_try_send_ (msg, nbytes, mprio))
1109 {
1110 return result::ok;
1111 }
1112 // ----- Exit critical section --------------------------------------
1113 }
1114
1115 thread& crt_thread = this_thread::thread ();
1116
1117 // Prepare a list node pointing to the current thread.
1118 // Do not worry for being on stack, it is temporarily linked to the
1119 // list and guaranteed to be removed before this function returns.
1121 { crt_thread };
1122
1123 internal::clock_timestamps_list& clock_list = clock_->steady_list ();
1124
1125 clock::timestamp_t timeout_timestamp = clock_->steady_now () + timeout;
1126
1127 // Prepare a timeout node pointing to the current thread.
1129 { timeout_timestamp, crt_thread };
1130
1131 for (;;)
1132 {
1133 {
1134 // ----- Enter critical section ---------------------------------
1136
1137 if (internal_try_send_ (msg, nbytes, mprio))
1138 {
1139 return result::ok;
1140 }
1141
1142 // Add this thread to the semaphore waiting list,
1143 // and the clock timeout list.
1144 scheduler::internal_link_node (send_list_, node, clock_list,
1145 timeout_node);
1146 // state::suspended set in above link().
1147 // ----- Exit critical section ----------------------------------
1148 }
1149
1151
1152 // Remove the thread from the message queue send waiting list,
1153 // if not already removed by receive() and from the clock timeout list,
1154 // if not already removed by the timer.
1155 scheduler::internal_unlink_node (node, timeout_node);
1156
1157 if (crt_thread.interrupted ())
1158 {
1159#if defined(OS_TRACE_RTOS_MQUEUE)
1160 trace::printf ("%s(%p,%u,%u,%u) EINTR @%p %s\n", __func__, msg,
1161 nbytes, mprio, timeout, this, name ());
1162#endif
1163 return EINTR;
1164 }
1165
1166 if (clock_->steady_now () >= timeout_timestamp)
1167 {
1168#if defined(OS_TRACE_RTOS_MQUEUE)
1169 trace::printf ("%s(%p,%u,%u,%u) ETIMEDOUT @%p %s\n", __func__,
1170 msg, nbytes, mprio, timeout, this, name ());
1171#endif
1172 return ETIMEDOUT;
1173 }
1174 }
1175
1176 /* NOTREACHED */
1177 return ENOTRECOVERABLE;
1178
1179#endif
1180 }
1181
1216 result_t
1217 message_queue::receive (void* msg, std::size_t nbytes, priority_t* mprio)
1218 {
1219#if defined(OS_TRACE_RTOS_MQUEUE)
1220 trace::printf ("%s(%p,%u) @%p %s\n", __func__, msg, nbytes, this,
1221 name ());
1222#endif
1223
1224 // Don't call this from interrupt handlers.
1226 // Don't call this from critical regions.
1227 os_assert_err(!scheduler::locked (), EPERM);
1228
1229 os_assert_err(msg != nullptr, EINVAL);
1230 os_assert_err(nbytes <= msg_size_bytes_, EMSGSIZE);
1231
1232#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
1233
1234 return port::message_queue::receive (this, msg, nbytes, mprio);
1235
1236#else
1237
1238 // Extra test before entering the loop, with its inherent weight.
1239 // Trade size for speed.
1240 {
1241 // ----- Enter critical section -------------------------------------
1243
1244 if (internal_try_receive_ (msg, nbytes, mprio))
1245 {
1246 return result::ok;
1247 }
1248 // ----- Exit critical section --------------------------------------
1249 }
1250
1251 thread& crt_thread = this_thread::thread ();
1252
1253 // Prepare a list node pointing to the current thread.
1254 // Do not worry for being on stack, it is temporarily linked to the
1255 // list and guaranteed to be removed before this function returns.
1257 { crt_thread };
1258
1259 for (;;)
1260 {
1261 {
1262 // ----- Enter critical section ---------------------------------
1264
1265 if (internal_try_receive_ (msg, nbytes, mprio))
1266 {
1267 return result::ok;
1268 }
1269
1270 // Add this thread to the message queue receive waiting list.
1271 scheduler::internal_link_node (receive_list_, node);
1272 // state::suspended set in above link().
1273 // ----- Exit critical section ----------------------------------
1274 }
1275
1277
1278 // Remove the thread from the message queue receive waiting list,
1279 // if not already removed by send().
1280 scheduler::internal_unlink_node (node);
1281
1282 if (crt_thread.interrupted ())
1283 {
1284#if defined(OS_TRACE_RTOS_MQUEUE)
1285 trace::printf ("%s(%p,%u) EINTR @%p %s\n", __func__, msg, nbytes,
1286 this, name ());
1287#endif
1288 return EINTR;
1289 }
1290 }
1291
1292 /* NOTREACHED */
1293 return ENOTRECOVERABLE;
1294
1295#endif
1296 }
1297
1328 result_t
1329 message_queue::try_receive (void* msg, std::size_t nbytes,
1330 priority_t* mprio)
1331 {
1332#if defined(OS_TRACE_RTOS_MQUEUE)
1333 trace::printf ("%s(%p,%u) @%p %s\n", __func__, msg, nbytes, this,
1334 name ());
1335#endif
1336
1337 os_assert_err(msg != nullptr, EINVAL);
1338 os_assert_err(nbytes <= msg_size_bytes_, EMSGSIZE);
1339
1340#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
1341
1342 return port::message_queue::try_receive (this, msg, nbytes, mprio);
1343
1344#else
1345
1346 // Don't call this from high priority interrupts.
1347 assert(port::interrupts::is_priority_valid ());
1348
1349 {
1350 // ----- Enter critical section -------------------------------------
1352
1353 if (internal_try_receive_ (msg, nbytes, mprio))
1354 {
1355 return result::ok;
1356 }
1357 else
1358 {
1359 return EWOULDBLOCK;
1360 }
1361 // ----- Exit critical section --------------------------------------
1362 }
1363
1364#endif
1365 }
1366
1426 result_t
1427 message_queue::timed_receive (void* msg, std::size_t nbytes,
1428 clock::duration_t timeout, priority_t* mprio)
1429 {
1430#if defined(OS_TRACE_RTOS_MQUEUE)
1431 trace::printf ("%s(%p,%u,%u) @%p %s\n", __func__, msg, nbytes, timeout,
1432 this, name ());
1433#endif
1434
1435 // Don't call this from interrupt handlers.
1437 // Don't call this from critical regions.
1438 os_assert_err(!scheduler::locked (), EPERM);
1439
1440 os_assert_err(msg != nullptr, EINVAL);
1441 os_assert_err(nbytes <= msg_size_bytes_, EMSGSIZE);
1442
1443#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
1444
1445 return port::message_queue::timed_receive (this, msg, nbytes,
1446 timeout, mprio);
1447
1448#else
1449
1450 // Extra test before entering the loop, with its inherent weight.
1451 // Trade size for speed.
1452 {
1453 // ----- Enter critical section -------------------------------------
1455
1456 if (internal_try_receive_ (msg, nbytes, mprio))
1457 {
1458 return result::ok;
1459 }
1460 // ----- Exit critical section --------------------------------------
1461 }
1462
1463 thread& crt_thread = this_thread::thread ();
1464
1465 // Prepare a list node pointing to the current thread.
1466 // Do not worry for being on stack, it is temporarily linked to the
1467 // list and guaranteed to be removed before this function returns.
1469 { crt_thread };
1470
1471 internal::clock_timestamps_list& clock_list = clock_->steady_list ();
1472 clock::timestamp_t timeout_timestamp = clock_->steady_now () + timeout;
1473
1474 // Prepare a timeout node pointing to the current thread.
1476 { timeout_timestamp, crt_thread };
1477
1478 for (;;)
1479 {
1480 {
1481 // ----- Enter critical section ---------------------------------
1483
1484 if (internal_try_receive_ (msg, nbytes, mprio))
1485 {
1486 return result::ok;
1487 }
1488
1489 // Add this thread to the message queue receive waiting list,
1490 // and the clock timeout list.
1491 scheduler::internal_link_node (receive_list_, node, clock_list,
1492 timeout_node);
1493 // state::suspended set in above link().
1494 // ----- Exit critical section ----------------------------------
1495 }
1496
1498
1499 // Remove the thread from the semaphore waiting list,
1500 // if not already removed by send()and from the clock
1501 // timeout list, if not already removed by the timer.
1502 scheduler::internal_unlink_node (node, timeout_node);
1503
1504 if (crt_thread.interrupted ())
1505 {
1506#if defined(OS_TRACE_RTOS_MQUEUE)
1507 trace::printf ("%s(%p,%u,%u) EINTR @%p %s\n", __func__, msg,
1508 nbytes, timeout, this, name ());
1509#endif
1510 return EINTR;
1511 }
1512
1513 if (clock_->steady_now () >= timeout_timestamp)
1514 {
1515#if defined(OS_TRACE_RTOS_MQUEUE)
1516 trace::printf ("%s(%p,%u,%u) ETIMEDOUT @%p %s\n", __func__, msg,
1517 nbytes, timeout, this, name ());
1518#endif
1519 return ETIMEDOUT;
1520 }
1521 }
1522
1523 /* NOTREACHED */
1524 return ENOTRECOVERABLE;
1525
1526#endif
1527 }
1528
1539 result_t
1541 {
1542#if defined(OS_TRACE_RTOS_MQUEUE)
1543 trace::printf ("%s() @%p %s\n", __func__, this, name ());
1544#endif
1545
1546 // Don't call this from interrupt handlers.
1548
1549#if defined(OS_USE_RTOS_PORT_MESSAGE_QUEUE)
1550
1551 return port::message_queue::reset (this);
1552
1553#else
1554
1555 {
1556 // ----- Enter critical section -------------------------------------
1558
1559 internal_init_ ();
1560 return result::ok;
1561 // ----- Exit critical section --------------------------------------
1562 }
1563
1564#endif
1565 }
1566
1567 // --------------------------------------------------------------------------
1568
1569 } /* namespace rtos */
1570} /* namespace os */
1571
1572// ----------------------------------------------------------------------------
Ordered list of time stamp nodes.
Definition os-lists.h:671
const char * name(void) const
Get object name.
Definition os-decls.h:774
Double linked list node, with time stamp and thread.
Definition os-lists.h:227
Double linked list node, with thread reference.
Definition os-lists.h:72
Interrupts critical section RAII helper.
Definition os-sched.h:524
Standard allocator based on the RTOS system default memory manager.
Definition os-memory.h:544
void deallocate(value_type *addr, std::size_t elements) noexcept
Deallocate the number of memory blocks of type value_type.
Message queue attributes.
Definition os-mqueue.h:154
POSIX compliant message queue, using the default RTOS allocator.
Definition os-mqueue.h:70
result_t receive(void *msg, std::size_t nbytes, priority_t *mprio=nullptr)
Receive a message from the queue.
result_t reset(void)
Reset the message queue.
result_t try_send(const void *msg, std::size_t nbytes, priority_t mprio=default_priority)
Try to send a message to the queue.
result_t try_receive(void *msg, std::size_t nbytes, priority_t *mprio=nullptr)
Try to receive a message from the queue.
result_t send(const void *msg, std::size_t nbytes, priority_t mprio=default_priority)
Send a message to the queue.
virtual ~message_queue()
Destruct the message queue object instance.
result_t timed_receive(void *msg, std::size_t nbytes, clock::duration_t timeout, priority_t *mprio=nullptr)
Receive a message from the queue with timeout.
result_t timed_send(const void *msg, std::size_t nbytes, clock::duration_t timeout, priority_t mprio=default_priority)
Send a message to the queue with timeout.
message_queue(std::size_t msgs, std::size_t msg_size_bytes, const attributes &attr=initializer, const allocator_type &allocator=allocator_type())
Construct a message queue object instance.
POSIX compliant thread, using the default RTOS allocator.
Definition os-thread.h:247
bool interrupted(void)
Check if interrupted.
Definition os-thread.h:2361
int printf(const char *format,...)
Write a formatted string to the trace device.
Definition trace.cpp:74
port::clock::duration_t duration_t
Type of variables holding clock durations.
Definition os-clocks.h:83
port::clock::timestamp_t timestamp_t
Type of variables holding clock time stamps.
Definition os-clocks.h:92
clock_systick sysclock
The system clock object instance.
uint8_t priority_t
Type of message priority storage.
Definition os-mqueue.h:127
message_queue::size_t index_t
Type of list index storage.
Definition os-mqueue.h:111
static const attributes initializer
Default message queue initialiser.
Definition os-mqueue.h:218
uint8_t size_t
Type of a queue size storage.
Definition os-mqueue.h:86
uint16_t msg_size_t
Type of message size storage.
Definition os-mqueue.h:99
static constexpr index_t no_index
Index value to represent an illegal index.
Definition os-mqueue.h:117
bool in_handler_mode(void)
Check if the CPU is in handler mode.
Definition os-sched.h:1136
@ ok
Function completed; no errors or events occurred.
Definition os-decls.h:195
bool locked(void)
Check if the scheduler is locked.
Definition os-sched.h:882
thread & thread(void)
Get the current running thread.
uint32_t result_t
Type of values returned by RTOS functions.
Definition os-decls.h:110
System namespace.
#define os_assert_throw(__e, __er)
Assert or throw a system error exception.
Definition os-decls.h:1141
#define os_assert_err(__e, __er)
Assert or return an error.
Definition os-decls.h:1126
Single file µOS++ RTOS definitions.