diff options
author | patacongo <patacongo@42af7a65-404d-4744-a932-0658087f49c3> | 2007-03-29 13:25:18 +0000 |
---|---|---|
committer | patacongo <patacongo@42af7a65-404d-4744-a932-0658087f49c3> | 2007-03-29 13:25:18 +0000 |
commit | 605bd84ced2cf37e267eccf604bdff1a985a65d8 (patch) | |
tree | c16c8b2b4e77974763bc6df44c2c116eac67f454 | |
parent | a300116bf7139941033b6d36dc3814af9d1e67c9 (diff) | |
download | nuttx-605bd84ced2cf37e267eccf604bdff1a985a65d8.tar.gz nuttx-605bd84ced2cf37e267eccf604bdff1a985a65d8.tar.bz2 nuttx-605bd84ced2cf37e267eccf604bdff1a985a65d8.zip |
Added mq_timedsend() and mq_timedreceive()
git-svn-id: svn://svn.code.sf.net/p/nuttx/code/trunk@166 42af7a65-404d-4744-a932-0658087f49c3
-rw-r--r-- | nuttx/ChangeLog | 5 | ||||
-rw-r--r-- | nuttx/Documentation/NuttX.html | 5 | ||||
-rw-r--r-- | nuttx/Documentation/NuttxUserGuide.html | 189 | ||||
-rw-r--r-- | nuttx/include/mqueue.h | 14 | ||||
-rw-r--r-- | nuttx/sched/Makefile | 4 | ||||
-rw-r--r-- | nuttx/sched/mq_internal.h | 76 | ||||
-rw-r--r-- | nuttx/sched/mq_rcvinternal.c | 314 | ||||
-rw-r--r-- | nuttx/sched/mq_receive.c | 167 | ||||
-rw-r--r-- | nuttx/sched/mq_send.c | 418 | ||||
-rw-r--r-- | nuttx/sched/mq_sndinternal.c | 448 | ||||
-rw-r--r-- | nuttx/sched/mq_timedreceive.c | 308 | ||||
-rw-r--r-- | nuttx/sched/mq_timedsend.c | 317 |
12 files changed, 1732 insertions, 533 deletions
diff --git a/nuttx/ChangeLog b/nuttx/ChangeLog index 476f87bbb..181082fc7 100644 --- a/nuttx/ChangeLog +++ b/nuttx/ChangeLog @@ -93,6 +93,11 @@ * mq_receive and mq_send now return errno's appropriately * mq_receive and mq_send are now correctly awakened by signals. + * Fixed an unmatched sched_lock/unlock pair in task_delete(). + * sched_lock must be called in _exit() because operation of + task_delete() can cause pending tasks to be merged and a + context switch to occur. + * Added mq_timedreceive() and mq_timedsend() * Started m68322 diff --git a/nuttx/Documentation/NuttX.html b/nuttx/Documentation/NuttX.html index 087fd48a7..fcdb9b1d1 100644 --- a/nuttx/Documentation/NuttX.html +++ b/nuttx/Documentation/NuttX.html @@ -454,6 +454,11 @@ Other memory: * mq_receive and mq_send now return errno's appropriately * mq_receive and mq_send are now correctly awakened by signals. + * Fixed an unmatched sched_lock/unlock pair in task_delete(). + * sched_lock must be called in _exit() because operation of + task_delete() can cause pending tasks to be merged and a + context switch to occur. + * Added mq_timedreceive() and mq_timedsend() * Started m68322 </pre></ul> diff --git a/nuttx/Documentation/NuttxUserGuide.html b/nuttx/Documentation/NuttxUserGuide.html index d706c60c6..3fd59357c 100644 --- a/nuttx/Documentation/NuttxUserGuide.html +++ b/nuttx/Documentation/NuttxUserGuide.html @@ -1021,10 +1021,12 @@ on this thread of execution. <li><a href="#mqclose">2.4.2 mq_close</a></li> <li><a href="#mqunlink">2.4.3 mq_unlink</a></li> <li><a href="#mqsend">2.4.4 mq_send</a></li> - <li><a href="#mqreceive">2.4.5 mq_receive</a></li> - <li><a href="#mqnotify">2.4.6 mq_notify</a></li> - <li><a href="#mqsetattr">2.4.7 mq_setattr</a></li> - <li><a href="#mqgetattr">2.4.8 mq_getattr</a></li> + <li><a href="#mqtimedsend">2.4.5 mq_timedsend</a></li> + <li><a href="#mqreceive">2.4.6 mq_receive</a></li> + <li><a href="#mqtimedreceive">2.4.7 mq_timedreceive</a></li> + <li><a href="#mqnotify">2.4.8 mq_notify</a></li> + <li><a href="#mqsetattr">2.4.9 mq_setattr</a></li> + <li><a href="#mqgetattr">2.4.10 mq_getattr</a></li> </ul> <H3><a name="mqopen">2.4.1 mq_open</a></H3> @@ -1171,7 +1173,6 @@ closed. interface of the same name. <H3><a name="mqsend">2.4.4 mq_send</a></H3> - <p> <b>Function Prototype:</b> </p> @@ -1246,13 +1247,96 @@ interface of the same name. Comparable to the POSIX interface of the same name. </p> +<h3><a name="mqtimedsend">mq_timedsend</a></h3> +<b>Function Prototype:</b> +</p> +<pre> + #include <mqueue.h> + int mq_timedsend(mqd_t mqdes, const char *msg, size_t msglen, int prio, + const struct timespec *abstime); +</pre> +<p> +<b>Description:</b> + This function adds the specified message, <code>msg</code>, + to the message queue, <code>mqdes</code>. + The <code>msglen</code> parameter specifies the length of the message in bytes pointed to by <code>msg</code>. + This length must not exceed the maximum message length from the <code>mq_getattr()</code>. +</p> +<p> + If the message queue is not full, <code>mq_timedsend()</code> will place the <code>msg</code> + in the message queue at the position indicated by the <code>prio</code> argument. + Messages with higher priority will be inserted before lower priority messages + The value of <code>prio</code> must not exceed <code>MQ_PRIO_MAX</code>. +</p> +<p> + If the specified message queue is full and <code>O_NONBLOCK</code> is not + set in the message queue, then <code>mq_send()</code> will block until space + becomes available to the queue the message or until a timeout occurs. +</p> +<p> + <code>mq_timedsend()</code> behaves just like <code>mq_send()</code>, except + that if the queue is full and the <code>O_NONBLOCK</code> flag is not enabled + for the message queue description, then <code>abstime</code> points to a + structure which specifies a ceiling on the time for which the call will block. + This ceiling is an absolute timeout in seconds and nanoseconds since the + Epoch (midnight on the morning of 1 January 1970). +</p> +<p> + If the message queue is full, and the timeout has already expired by the time + of the call, <code>mq_timedsend()<code> returns immediately. +</p> +<p> + <b>Input Parameters:</b> +</p> +<ul> + <li><code>mqdes</code>. Message queue descriptor.</li> + <li><code>msg</code>. Message to send.</li> + <li><code>msglen</code>. The length of the message in bytes.</li> + <li><code>prio</code>. The priority of the message.</li> +</ul> +<p> + <b>Returned Values:</b> + On success, <code>mq_send()</code> returns 0 (<code>OK</code>); + on error, -1 (<code>ERROR</code>) is returned, with <code>errno</code> set + to indicate the error: +</p> +<ul> + <li> + <code>EAGAIN</code>. + The queue was empty, and the <code>O_NONBLOCK</code> flag was set for the message queue description referred to by <code>mqdes</code>. + </li> + <li> + <code>EINVAL</code>. + Either <code>msg</code> or <code>mqdes</code> is <code>NULL</code> or the value of <code>prio</code> is invalid. + </li> + <li> + <code>EPERM</code>. + Message queue opened not opened for writing. + </li> + <li> + <code>EMSGSIZE</code>. + <code>msglen</code> was greater than the <code>maxmsgsize</code> attribute of the message queue. + </li> + <li> + <code>EINTR</code>. + The call was interrupted by a signal handler. + </li> +</ul> +<p> + <b>Assumptions/Limitations:</b> +</p> +<p> + <b>POSIX Compatibility:</b> + Comparable to the POSIX interface of the same name. +</p> + <h3><a name="mqreceive">2.4.5 mq_receive</a></h3> <p> <b>Function Prototype:</b> </p> <pre> #include <mqueue.h> - int mq_receive(mqd_t mqdes, void *msg, size_t msglen, int *prio); + ssize_t mq_receive(mqd_t mqdes, void *msg, size_t msglen, int *prio); </pre> <p> <b>Description:</b> @@ -1316,7 +1400,92 @@ interface of the same name. Comparable to the POSIX interface of the same name. </p> -<H3><a name="mqnotify">2.4.6 mq_notify</a></H3> +<h3><a name="mqtimedreceive">2.4.6 mq_timedreceive</a></h3> +<p> + <b>Function Prototype:</b> +</p> +<pre> + #include <mqueue.h> + ssize_t mq_timedreceive(mqd_t mqdes, void *msg, size_t msglen, + int *prio, const struct timespec *abstime); +</pre> +<p> + <b>Description:</b> + This function receives the oldest of the highest priority messages from the message + queue specified by <code>mqdes</code>. + If the size of the buffer in bytes, <code>msgLen</code>, is less than the + <code>mq_msgsize</code> attribute of the message queue, <code>mq_timedreceive()</code> will + return an error. + Otherwise, the selected message is removed from the queue and copied to <code>msg</code>. +</p> +<p> + If the message queue is empty and <code>O_NONBLOCK</code> was not set, <code>mq_timedreceive()</code> + will block until a message is added to the message queue (or until a timeout occurs). + If more than one task is waiting to receive a message, only the task with the highest + priority that has waited the longest will be unblocked. +</p> +<p> + <code>mq_timedreceive()</code> behaves just like <code>mq_receive()<code>, except + that if the queue is empty and the <code>O_NONBLOCK<c/ode> flag is not enabled + for the message queue description, then <code>abstime</code> points to a structure + which specifies a ceiling on the time for which the call will block. + This ceiling is an absolute timeout in seconds and nanoseconds since the Epoch + (midnight on the morning of 1 January 1970). +</p> +<p> + If no message is available, and the timeout has already expired by the time of + the call, <code>mq_timedreceive()</code> returns immediately. +</p> +<p> + <b>Input Parameters:</b> +</p> +<ul> + <li><code>mqdes</code>. Message Queue Descriptor.</li> + <li><code>msg</code>. Buffer to receive the message.</li> + <li><code>msglen</code>. Size of the buffer in bytes.</li> + <li><code>prio</code>. If not NULL, the location to store message priority. + <li><code>abstime</code>. The absolute time to wait until a timeout is declared. +</ul> +<p> + <b>Returned Values:</b>. + One success, the length of the selected message in bytes is returned. + On failure, -1 (<code>ERROR</code>) is returned and the <code>errno</code> is set appropriately: +</p> +<ul> + <li> + <code>EAGAIN</code>: + The queue was empty and the <code>O_NONBLOCK</code> flag was set for the message queue description referred to by <code>mqdes</code>. + </li> + <li> + <code>EPERM</code>: + Message queue opened not opened for reading. + </li> + <li> + <code>EMSGSIZE</code>: + <code>msglen</code> was less than the <code>maxmsgsize</code> attribute of the message queue. + </li> + <li> + <code>EINTR</code>: + The call was interrupted by a signal handler. + </li> + <li> + <code>EINVAL</code>: + Invalid <code>msg</code> or <code>mqdes</code> or <code>abstime</code> + </li> + <li> + <code>ETIMEDOUT</code>: + The call timed out before a message could be transferred. + </li> +</ul> +<p> + <b>Assumptions/Limitations:</b> +</p> +<p> + <b>POSIX Compatibility:</b> + Comparable to the POSIX interface of the same name. +</p> + +<h3><a name="mqnotify">2.4.7 mq_notify</a></h3> <p> <b>Function Prototype:</b> @@ -1372,7 +1541,7 @@ appropriate <I>mq_receive()</I> ... The resulting behavior is as if the message queue remains empty, and no notification shall be sent." </ul> -<H3><a name="mqsetattr">2.4.7 mq_setattr</a></H3> +<H3><a name="mqsetattr">2.4.8 mq_setattr</a></H3> <p> <b>Function Prototype:</b> @@ -1411,7 +1580,7 @@ would have been returned by mq_getattr()). <b> POSIX Compatibility:</b> Comparable to the POSIX interface of the same name. -<H3><a name="mqgetattr">2.4.8 mq_getattr</a></H3> +<H3><a name="mqgetattr">2.4.9 mq_getattr</a></H3> <p> <b>Function Prototype:</b> @@ -5641,6 +5810,8 @@ notify a task when a message is available on a queue. <li><a href="#mqreceive">mq_receive</a></li> <li><a href="#mqsend">mq_send</a></li> <li><a href="#mqsetattr">mq_setattr</a></li> + <li><a href="#mqtimedreceive">mq_timedreceive</a></li> + <li><a href="#mqtimedsend">mq_timedsend</a></li> <li><a href="#mqunlink">mq_unlink</a></li> <li><a href="#OS_Interfaces">OS Interfaces</a> <li><a href="#Pthread">Pthread Interfaces</a> diff --git a/nuttx/include/mqueue.h b/nuttx/include/mqueue.h index 22e853a16..d13fe7006 100644 --- a/nuttx/include/mqueue.h +++ b/nuttx/include/mqueue.h @@ -87,12 +87,16 @@ extern "C" { #define EXTERN extern #endif -EXTERN mqd_t mq_open(const char *mq_name, int oflags, ... ); +EXTERN mqd_t mq_open(const char *mq_name, int oflags, ...); EXTERN int mq_close(mqd_t mqdes ); -EXTERN int mq_unlink(const char *mq_name ); -EXTERN int mq_send(mqd_t mqdes, const void *msg, size_t msglen, int prio ); -EXTERN int mq_receive(mqd_t mqdes, void *msg, size_t msglen, int *prio ); -EXTERN int mq_notify(mqd_t mqdes, const struct sigevent *notification ); +EXTERN int mq_unlink(const char *mq_name); +EXTERN int mq_send(mqd_t mqdes, const void *msg, size_t msglen, int prio); +EXTERN int mq_timedsend(mqd_t mqdes, const char *msg, size_t msglen, int prio, + const struct timespec *abstime); +EXTERN ssize_t mq_receive(mqd_t mqdes, void *msg, size_t msglen, int *prio); +EXTERN ssize_t mq_timedreceive(mqd_t mqdes, void *msg, size_t msglen, + int *prio, const struct timespec *abstime); +EXTERN int mq_notify(mqd_t mqdes, const struct sigevent *notification); EXTERN int mq_setattr(mqd_t mqdes, const struct mq_attr *mq_stat, struct mq_attr *oldstat); EXTERN int mq_getattr(mqd_t mqdes, struct mq_attr *mq_stat); diff --git a/nuttx/sched/Makefile b/nuttx/sched/Makefile index 1f6ec2384..50992734a 100644 --- a/nuttx/sched/Makefile +++ b/nuttx/sched/Makefile @@ -69,7 +69,9 @@ SIGNAL_SRCS = sig_initialize.c \ sig_unmaskpendingsignal.c sig_removependingsignal.c \ sig_releasependingsignal.c sig_lowest.c sig_mqnotempty.c \ sig_cleanup.c sig_received.c sig_deliver.c -MQUEUE_SRCS = mq_open.c mq_close.c mq_unlink.c mq_send.c mq_receive.c \ +MQUEUE_SRCS = mq_open.c mq_close.c mq_unlink.c \ + mq_send.c mq_timedsend.c mq_sndinternal.c \ + mq_receive.c mq_timedreceive.c mq_rcvinternal.c \ mq_setattr.c mq_getattr.c mq_initialize.c mq_descreate.c \ mq_findnamed.c mq_msgfree.c mq_msgqfree.c mq_waitirq.c ifneq ($(CONFIG_DISABLE_SIGNALS),y) diff --git a/nuttx/sched/mq_internal.h b/nuttx/sched/mq_internal.h index a4e21d3c7..d7510a5db 100644 --- a/nuttx/sched/mq_internal.h +++ b/nuttx/sched/mq_internal.h @@ -1,4 +1,4 @@ -/************************************************************ +/**************************************************************************** * mq_internal.h * * Copyright (C) 2007 Gregory Nutt. All rights reserved. @@ -31,14 +31,14 @@ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * - ************************************************************/ + ****************************************************************************/ #ifndef __MQ_INTERNAL_H #define __MQ_INTERNAL_H -/************************************************************ +/**************************************************************************** * Included Files - ************************************************************/ + ****************************************************************************/ #include <sys/types.h> #include <limits.h> @@ -47,16 +47,15 @@ #include <signal.h> #include <nuttx/compiler.h> -/************************************************************ +/**************************************************************************** * Compilations Switches - ************************************************************/ + ****************************************************************************/ -/************************************************************ +/**************************************************************************** * Definitions - ************************************************************/ + ****************************************************************************/ #define MQ_MAX_BYTES CONFIG_MQ_MAXMSGSIZE -#define MQ_MAX_HWORDS ((MQ_MAX_BYTES + sizeof(uint16) - 1) / sizeof(uint16)) #define MQ_MAX_MSGS 16 #define MQ_PRIO_MAX _POSIX_MQ_PRIO_MAX @@ -72,9 +71,9 @@ #define NUM_INTERRUPT_MSGS 8 -/************************************************************ +/**************************************************************************** * Global Type Declarations - ************************************************************/ + ****************************************************************************/ enum mqalloc_e { @@ -85,23 +84,18 @@ enum mqalloc_e typedef enum mqalloc_e mqalloc_t; /* This structure describes one buffered POSIX message. */ -/* NOTE: This structure is allocated from the same pool as MQ_type. - * Therefore, (1) it must have a fixed "mail" size, and (2) must - * exactly match MQ_type in size. - */ struct mqmsg { - /* The position of the following two field must exactly match - * MQ_type. - */ - FAR struct mqmsg *next; /* Forward link to next message */ ubyte type; /* (Used to manage allocations) */ - ubyte priority; /* priority of message */ +#if MQ_MAX_BYTES < 256 ubyte msglen; /* Message data length */ - uint16 mail[MQ_MAX_HWORDS]; /* Message data */ +#else + uint16 msglen; /* Message data length */ +#endif + ubyte mail[MQ_MAX_BYTES]; /* Message data */ }; typedef struct mqmsg mqmsg_t; @@ -142,9 +136,9 @@ struct mq_des int oflags; /* Flags set when message queue was opened */ }; -/************************************************************ +/**************************************************************************** * Global Variables - ************************************************************/ + ****************************************************************************/ /* This is a list of all opened message queues */ @@ -170,9 +164,9 @@ extern sq_queue_t g_msgfreeirq; extern sq_queue_t g_desfree; -/************************************************************ +/**************************************************************************** * Global Function Prototypes - ************************************************************/ + ****************************************************************************/ #ifdef __cplusplus #define EXTERN extern "C" @@ -181,19 +175,35 @@ extern "C" { #define EXTERN extern #endif -/* Functions defined in mq_initialize.c ********************/ +/* Functions defined in mq_initialize.c ************************************/ EXTERN void weak_function mq_initialize(void); -EXTERN void mq_desblockalloc(void); +EXTERN void mq_desblockalloc(void); + +EXTERN mqd_t mq_descreate(FAR _TCB* mtcb, FAR msgq_t* msgq, int oflags); +EXTERN FAR msgq_t *mq_findnamed(const char *mq_name); +EXTERN void mq_msgfree(FAR mqmsg_t *mqmsg); +EXTERN void mq_msgqfree(FAR msgq_t *msgq); + +/* mq_waitirq.c ************************************************************/ + +EXTERN void mq_waitirq(FAR _TCB *wtcb); + +/* mq_rcvinternal.c ********************************************************/ -EXTERN mqd_t mq_descreate(FAR _TCB* mtcb, FAR msgq_t* msgq, int oflags); -EXTERN FAR msgq_t *mq_findnamed(const char *mq_name); -EXTERN void mq_msgfree(FAR mqmsg_t *mqmsg); -EXTERN void mq_msgqfree(FAR msgq_t *msgq); +EXTERN int mq_verifyreceive(mqd_t mqdes, void *msg, size_t msglen); +EXTERN FAR mqmsg_t *mq_waitreceive(mqd_t mqdes); +EXTERN ssize_t mq_doreceive(mqd_t mqdes, mqmsg_t *mqmsg, void *ubuffer, + int *prio); -/* mq_waitirq.c ********************************************/ +/* mq_sndinternal.c ********************************************************/ -EXTERN void mq_waitirq(FAR _TCB *wtcb); +EXTERN int mq_verifysend(mqd_t mqdes, const void *msg, size_t msglen, + int prio); +EXTERN FAR mqmsg_t *mq_msgalloc(void); +EXTERN int mq_waitsend(mqd_t mqdes); +EXTERN int mq_dosend(mqd_t mqdes, FAR mqmsg_t *mqmsg, const void *msg, + size_t msglen, int prio); #undef EXTERN #ifdef __cplusplus diff --git a/nuttx/sched/mq_rcvinternal.c b/nuttx/sched/mq_rcvinternal.c new file mode 100644 index 000000000..dcbfc0b31 --- /dev/null +++ b/nuttx/sched/mq_rcvinternal.c @@ -0,0 +1,314 @@ +/************************************************************ + * mq_rcvinternal.c + * + * Copyright (C) 2007 Gregory Nutt. All rights reserved. + * Author: Gregory Nutt <spudmonkey@racsa.co.cr> + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name Gregory Nutt nor the names of its contributors may be + * used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS + * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + ************************************************************/ + +/************************************************************ + * Included Files + ************************************************************/ + +#include <sys/types.h> +#include <fcntl.h> /* O_NONBLOCK */ +#include <string.h> +#include <assert.h> +#include <errno.h> +#include <mqueue.h> +#include <sched.h> +#include <debug.h> +#include <nuttx/arch.h> +#include <nuttx/os_external.h> +#include "os_internal.h" +#include "mq_internal.h" + +/************************************************************ + * Definitions + ************************************************************/ + +/************************************************************ + * Private Type Declarations + ************************************************************/ + +/************************************************************ + * Global Variables + ************************************************************/ + +/************************************************************ + * Private Variables + ************************************************************/ + +/************************************************************ + * Private Functions + ************************************************************/ + +/************************************************************ + * Public Functions + ************************************************************/ + +/************************************************************ + * Name: mq_verifyreceive + * + * Description: + * This is internal, common logic shared by both mq_receive + * and mq_timedreceive. This function verifies the + * input parameters that are common to both functions. + * + * Parameters: + * mqdes - Message Queue Descriptor + * msg - Buffer to receive the message + * msglen - Size of the buffer in bytes + * + * Return Value: + * One success, 0 (OK) is returned. On failure, -1 (ERROR) is + * returned and the errno is set appropriately: + * + * EPERM Message queue opened not opened for reading. + * EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the + * message queue. + * EINVAL Invalid 'msg' or 'mqdes' + * + * Assumptions: + * + ************************************************************/ + +int mq_verifyreceive(mqd_t mqdes, void *msg, size_t msglen) +{ + /* Verify the input parameters */ + + if (!msg || !mqdes) + { + *get_errno_ptr() = EINVAL; + return ERROR; + } + + if ((mqdes->oflags & O_RDOK) == 0) + { + *get_errno_ptr() = EPERM; + return ERROR; + } + + if (msglen < (size_t)mqdes->msgq->maxmsgsize) + { + *get_errno_ptr() = EMSGSIZE; + return ERROR; + } + + return OK; +} + +/************************************************************ + * Function: mq_waitreceive + * + * Description: + * This is internal, common logic shared by both mq_receive + * and mq_timedreceive. This function waits for a message to + * be received on the specified message queue, removes the + * message from the queue, and returns it. + * + * Parameters: + * mqdes - Message queue descriptor + * + * Return Value: + * On success, a reference to the received message. If the + * wait was interrupted by a signal or a timeout, then the + * errno will be set appropriately and NULL will be returned. + * + * Assumptions: + * - The caller has provided all validity checking of the + * input parameters using mq_verifyreceive. + * - Interrupts should be disabled throughout this call. This + * is necessary because messages can be sent from interrupt + * level processing. + * - For mq_timedreceive, setting of the timer and this wait + * must be atomic. + * + ************************************************************/ + +FAR mqmsg_t *mq_waitreceive(mqd_t mqdes) +{ + FAR _TCB *rtcb; + FAR msgq_t *msgq; + FAR mqmsg_t *rcvmsg; + + /* Get a pointer to the message queue */ + + msgq = mqdes->msgq; + + /* Get the message from the head of the queue */ + + while ((rcvmsg = (FAR mqmsg_t*)sq_remfirst(&msgq->msglist)) == NULL) + { + /* Should we block until there the above condition has been + * satisfied? + */ + + if (!(mqdes->oflags & O_NONBLOCK)) + { + /* Block and try again */ + + rtcb = (FAR _TCB*)g_readytorun.head; + rtcb->msgwaitq = msgq; + msgq->nwaitnotempty++; + + *get_errno_ptr() = OK; + up_block_task(rtcb, TSTATE_WAIT_MQNOTEMPTY); + + /* When we resume at this point, either (1) the message queue + * is no longer empty, or (2) the wait has been interrupted by + * a signal. We can detect the latter case be examining the + * errno value (should be either EINTR or ETIMEDOUT). + */ + + if (*get_errno_ptr() != OK) + { + break; + } + } + else + { + /* The queue was empty, and the O_NONBLOCK flag was set for the + * message queue description referred to by 'mqdes'. + */ + + *get_errno_ptr() = EAGAIN; + break; + } + } + + /* If we got message, then decrement the number of messages in + * the queue while we are still in the critical section + */ + + if (rcvmsg) + { + msgq->nmsgs--; + } + return rcvmsg; +} + +/************************************************************ + * Function: mq_doreceive + * + * Description: + * This is internal, common logic shared by both mq_receive + * and mq_timedreceive. This function accepts the message + * obtained by mq_waitmsg, provides the message content to + * the user, notifies any threads that were waiting for + * the message queue to become non-full, and disposes of the + * message structure + * + * Parameters: + * mqdes - Message queue descriptor + * mqmsg - The message obtained by mq_waitmsg() + * ubuffer - The address of the user provided buffer to + * receive the message + * prio - The user-provided location to return the + * message priority. + * + * Return Value: + * Returns the length of the received message. This + * function does not fail. + * + * Assumptions: + * - The caller has provided all validity checking of the + * input parameters using mq_verifyreceive. + * - The user buffer, ubuffer, is known to be large enough + * to accept the largest message that an be sent on this + * message queue + * - Pre-emption should be disabled throughout this call. + * + ************************************************************/ + +ssize_t mq_doreceive(mqd_t mqdes, mqmsg_t *mqmsg, void *ubuffer, int *prio) +{ + FAR _TCB *btcb; + irqstate_t saved_state; + FAR msgq_t *msgq; + ssize_t rcvmsglen; + + /* Get the length of the message (also the return value) */ + + rcvmsglen = mqmsg->msglen; + + /* Copy the message into the caller's buffer */ + + memcpy(ubuffer, (const void*)mqmsg->mail, rcvmsglen); + + /* Copy the message priority as well (if a buffer is provided) */ + + if (prio) + { + *prio = mqmsg->priority; + } + + /* We are done with the message. Deallocate it now. */ + + mq_msgfree(mqmsg); + + /* Check if any tasks are waiting for the MQ not full event. */ + + msgq = mqdes->msgq; + if (msgq->nwaitnotfull > 0) + { + /* Find the highest priority task that is waiting for + * this queue to be not-full in g_waitingformqnotfull list. + * This must be performed in a critical section because + * messages can be sent from interrupt handlers. + */ + + saved_state = irqsave(); + for (btcb = (FAR _TCB*)g_waitingformqnotfull.head; + btcb && btcb->msgwaitq != msgq; + btcb = btcb->flink); + + /* If one was found, unblock it. NOTE: There is a race + * condition here: the queue might be full again by the + * time the task is unblocked + */ + + if (!btcb) + { + PANIC(OSERR_MQNOTFULLCOUNT); + } + else + { + btcb->msgwaitq = NULL; + msgq->nwaitnotfull--; + up_unblock_task(btcb); + } + irqrestore(saved_state); + } + + /* Return the length of the message transferred to the user buffer */ + + return rcvmsglen; +} diff --git a/nuttx/sched/mq_receive.c b/nuttx/sched/mq_receive.c index 9d1272ae9..b19567058 100644 --- a/nuttx/sched/mq_receive.c +++ b/nuttx/sched/mq_receive.c @@ -37,21 +37,12 @@ * Included Files ************************************************************/ -#include <sys/types.h> /* uint32, etc. */ -#include <stdarg.h> /* va_list */ -#include <unistd.h> -#include <fcntl.h> /* O_NONBLOCK */ -#include <string.h> -#include <assert.h> +#include <nuttx/config.h> +#include <sys/types.h> #include <errno.h> #include <mqueue.h> -#include <sched.h> #include <debug.h> -#include <nuttx/kmalloc.h> #include <nuttx/arch.h> -#include <nuttx/os_external.h> -#include "os_internal.h" -#include "sig_internal.h" #include "mq_internal.h" /************************************************************ @@ -122,155 +113,53 @@ * ************************************************************/ -int mq_receive(mqd_t mqdes, void *msg, size_t msglen, int *prio) +ssize_t mq_receive(mqd_t mqdes, void *msg, size_t msglen, int *prio) { - FAR _TCB *rtcb; - FAR _TCB *btcb; - FAR msgq_t *msgq; - FAR mqmsg_t *curr; + FAR mqmsg_t *mqmsg; irqstate_t saved_state; - ubyte rcvmsglen; - int ret = ERROR; + ssize_t ret = ERROR; - /* Verify the input parameters */ + DEBUGASSERT(!up_interrupt_context()); - if (!msg || !mqdes) - { - *get_errno_ptr() = EINVAL; - return ERROR; - } - - if ((mqdes->oflags & O_RDOK) == 0) - { - *get_errno_ptr() = EPERM; - return ERROR; - } + /* Verify the input parameters and, in case of an error, set + * errno appropriately. + */ - if (msglen < (size_t)mqdes->msgq->maxmsgsize) + if (mq_verifyreceive(mqdes, msg, msglen) != OK) { - *get_errno_ptr() = EMSGSIZE; return ERROR; } - /* Get a pointer to the message queue */ + /* Get the next mesage from the message queue. We will disable + * pre-emption until we have completed the message received. This + * is not too bad because if the receipt takes a long time, it will + * be because we are blocked waiting for a message and pre-emption + * will be re-enabled while we are blocked + */ sched_lock(); - msgq = mqdes->msgq; - /* Several operations must be performed below: We must determine if - * a message is pending and, if not, wait for the message. Since - * messages can be sent from the interrupt level, there is a race - * condition that can only be eliminated by disabling interrupts! + /* Furthermore, mq_waitreceive() expects to have interrupts disabled + * because messages can be sent from interrupt level. */ saved_state = irqsave(); - /* Get the message from the head of the queue */ - - while ((curr = (FAR mqmsg_t*)sq_remfirst(&msgq->msglist)) == NULL) - { - /* Should we block until there the above condition has been - * satisfied? - */ - - if (!(mqdes->oflags & O_NONBLOCK)) - { - /* Block and try again */ - - rtcb = (FAR _TCB*)g_readytorun.head; - rtcb->msgwaitq = msgq; - msgq->nwaitnotempty++; - - *get_errno_ptr() = OK; - up_block_task(rtcb, TSTATE_WAIT_MQNOTEMPTY); - - /* When we resume at this point, either (1) the message queue - * is no longer empty, or (2) the wait has been interrupted by - * a signal. We can detect the latter case be examining the - * errno value (should be EINTR). - */ - - if (*get_errno_ptr() != OK) - { - break; - } - } - else - { - /* The queue was empty, and the O_NONBLOCK flag was set for the - * message queue description referred to by 'mqdes'. - */ - - *get_errno_ptr() = EAGAIN; - break; - } - } + /* Get the message from the message queue */ - /* If we got message, then decrement the number of messages in - * the queue while we are still in the critical section - */ - - if (curr) - { - msgq->nmsgs--; - } + mqmsg = mq_waitreceive(mqdes); irqrestore(saved_state); - /* Check (again) if we got a message from the message queue*/ + /* Check if we got a message from the message queue. We might + * not have a message if: + * + * - The message queue is empty and O_NONBLOCK is set in the mqdes + * - The wait was interrupted by a signal + */ - if (curr) + if (mqmsg) { - /* Get the length of the message (also the return value) */ - - ret = rcvmsglen = curr->msglen; - - /* Copy the message into the caller's buffer */ - - memcpy(msg, (const void*)curr->mail, rcvmsglen); - - /* Copy the message priority as well (if a buffer is provided) */ - - if (prio) - { - *prio = curr->priority; - } - - /* We are done with the message. Deallocate it now. */ - - mq_msgfree(curr); - - /* Check if any tasks are waiting for the MQ not full event. */ - - if (msgq->nwaitnotfull > 0) - { - /* Find the highest priority task that is waiting for - * this queue to be not-full in g_waitingformqnotfull list. - * This must be performed in a critical section because - * messages can be sent from interrupt handlers. - */ - - saved_state = irqsave(); - for (btcb = (FAR _TCB*)g_waitingformqnotfull.head; - btcb && btcb->msgwaitq != msgq; - btcb = btcb->flink); - - /* If one was found, unblock it. NOTE: There is a race - * condition here: the queue might be full again by the - * time the task is unblocked - */ - - if (!btcb) - { - PANIC(OSERR_MQNOTFULLCOUNT); - } - else - { - btcb->msgwaitq = NULL; - msgq->nwaitnotfull--; - up_unblock_task(btcb); - } - irqrestore(saved_state); - } + ret = mq_doreceive(mqdes, mqmsg, msg, prio); } sched_unlock(); diff --git a/nuttx/sched/mq_send.c b/nuttx/sched/mq_send.c index faa03e720..84e6e5842 100644 --- a/nuttx/sched/mq_send.c +++ b/nuttx/sched/mq_send.c @@ -1,4 +1,4 @@ -/************************************************************ +/**************************************************************************** * mq_send.c * * Copyright (C) 2007 Gregory Nutt. All rights reserved. @@ -31,165 +31,65 @@ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. * - ************************************************************/ + ****************************************************************************/ -/************************************************************ +/**************************************************************************** * Included Files - ************************************************************/ + ****************************************************************************/ -#include <nuttx/compiler.h> -#include <nuttx/kmalloc.h> -#include <sys/types.h> /* uint32, etc. */ -#include <fcntl.h> +#include <nuttx/config.h> +#include <sys/types.h> #include <mqueue.h> -#include <string.h> #include <errno.h> -#include <sched.h> #include <debug.h> #include <nuttx/arch.h> #include "os_internal.h" -#ifndef CONFIG_DISABLE_SIGNALS -# include "sig_internal.h" -#endif #include "mq_internal.h" -/************************************************************ +/**************************************************************************** * Definitions - ************************************************************/ + ****************************************************************************/ -/************************************************************ +/**************************************************************************** * Private Type Declarations - ************************************************************/ + ****************************************************************************/ -/************************************************************ +/**************************************************************************** * Global Variables - ************************************************************/ + ****************************************************************************/ -/************************************************************ +/**************************************************************************** * Private Variables - ************************************************************/ + ****************************************************************************/ -/************************************************************ - * Function: mq_msgalloc - * - * Description: - * The mq_msgalloc function will get a free message for use - * by the operating system. The message will be allocated - * from the g_msgfree list. - * - * If the list is empty AND the message is NOT being - * allocated from the interrupt level, then the message - * will be allocated. If a message cannot be obtained, - * the operating system is dead and therefore cannot - * continue. - * - * If the list is empty AND the message IS being allocated - * from the interrupt level. This function will attempt to - * get a message from the g_msgfreeirq list. If this is - * unsuccessful, the calling interrupt handler will be - * notified. - * - * Inputs: - * None - * - * Return Value: - * A reference to the allocated msg structure - * - ************************************************************/ - -FAR mqmsg_t *mq_msgalloc(void) -{ - FAR mqmsg_t *mqmsg; - irqstate_t saved_state; - - /* If we were called from an interrupt handler, then try to - * get the message from generally available list of messages. - * If this fails, then try the list of messages reserved for - * interrupt handlers - */ - - if (up_interrupt_context()) - { - /* Try the general free list */ - - mqmsg = (FAR mqmsg_t*)sq_remfirst(&g_msgfree); - if (!mqmsg) - { - /* Try the free list reserved for interrupt handlers */ - - mqmsg = (FAR mqmsg_t*)sq_remfirst(&g_msgfreeirq); - } - } - - /* We were not called from an interrupt handler. */ - - else - { - /* Try to get the message from the generally available free list. - * Disable interrupts -- we might be called from an interrupt handler. - */ - - saved_state = irqsave(); - mqmsg = (FAR mqmsg_t*)sq_remfirst(&g_msgfree); - irqrestore(saved_state); - - /* If we cannot a message from the free list, then we will have to allocate one. */ - - if (!mqmsg) - { - mqmsg = (FAR mqmsg_t *)kmalloc((sizeof (mqmsg_t))); - - /* Check if we got an allocated message */ - - if (mqmsg) - { - mqmsg->type = MQ_ALLOC_DYN; - } - - /* No? We are dead */ - - else - { - dbg("Out of messages\n"); - PANIC((uint32)OSERR_OUTOFMESSAGES); - } - } - } - - return(mqmsg); -} - -/************************************************************ +/**************************************************************************** * Private Functions - ************************************************************/ + ****************************************************************************/ -/************************************************************ +/**************************************************************************** * Public Functions - ************************************************************/ + ****************************************************************************/ -/************************************************************ +/**************************************************************************** * Function: mq_send * * Description: - * This function adds the specificied message (msg) to the - * message queue (mqdes). The "msglen" parameter specifies - * the length of the message in bytes pointed to by "msg." - * This length must not exceed the maximum message length - * from the mq_getattr(). - * - * If the message queue is not full, mq_send() place the - * message in the message queue at the position indicated - * by the "prio" argrument. Messages with higher priority - * will be inserted before lower priority messages. The - * value of "prio" must not exceed MQ_PRIO_MAX. - * - * If the specified message queue is full and O_NONBLOCK - * is not set in the message queue, then mq_send() will - * block until space becomes available to the queue the - * message. - * - * If the message queue is full and O_NONBLOCK is set, - * the message is not queued and ERROR is returned. + * This function adds the specificied message (msg) to the message queue + * (mqdes). The "msglen" parameter specifies the length of the message + * in bytes pointed to by "msg." This length must not exceed the maximum + * message length from the mq_getattr(). + * + * If the message queue is not full, mq_send() place the message in the + * message queue at the position indicated by the "prio" argrument. + * Messages with higher priority will be inserted before lower priority + * messages. The value of "prio" must not exceed MQ_PRIO_MAX. + * + * If the specified message queue is full and O_NONBLOCK is not set in the + * message queue, then mq_send() will block until space becomes available + * to the queue the message. + * + * If the message queue is full and O_NONBLOCK is set, the message is not + * queued and ERROR is returned. * * Parameters: * mqdes - Message queue descriptor @@ -201,48 +101,31 @@ FAR mqmsg_t *mq_msgalloc(void) * On success, mq_send() returns 0 (OK); on error, -1 (ERROR) * is returned, with errno set to indicate the error: * - * EAGAIN The queue was empty, and the O_NONBLOCK flag was - * set for the message queue description referred to - * by mqdes. - * EINVAL Either msg or mqdes is NULL or the value of prio - * is invalid. + * EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the + * message queue description referred to by mqdes. + * EINVAL Either msg or mqdes is NULL or the value of prio is invalid. * EPERM Message queue opened not opened for writing. - * EMSGSIZE 'msglen' was greater than the maxmsgsize attribute - * of the message queue. + * EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the + * message queue. * EINTR The call was interrupted by a signal handler. * * Assumptions/restrictions: * - ************************************************************/ + ****************************************************************************/ int mq_send(mqd_t mqdes, const void *msg, size_t msglen, int prio) { - FAR _TCB *rtcb; - FAR _TCB *btcb; FAR msgq_t *msgq; - FAR mqmsg_t *curr; - FAR mqmsg_t *next; - FAR mqmsg_t *prev; + FAR mqmsg_t *mqmsg = NULL; irqstate_t saved_state; int ret = ERROR; - /* Verify the input parameters */ - - if (!msg || !mqdes || prio < 0 || prio > MQ_PRIO_MAX) - { - *get_errno_ptr() = EINVAL; - return ERROR; - } - - if ((mqdes->oflags & O_WROK) == 0) - { - *get_errno_ptr() = EPERM; - return ERROR; - } + /* Verify the input parameters -- setting errno appropriately + * on any failures to verify. + */ - if (msglen < 0 || msglen > (size_t)mqdes->msgq->maxmsgsize) + if (mq_verifysend(mqdes, msg, msglen, prio) != OK) { - *get_errno_ptr() = EMSGSIZE; return ERROR; } @@ -251,205 +134,48 @@ int mq_send(mqd_t mqdes, const void *msg, size_t msglen, int prio) sched_lock(); msgq = mqdes->msgq; - /* If we are sending a message from an interrupt handler, then - * try to get message structure unconditionally. + /* Allocate a message structure: + * - Immediately if we are called from an interrupt handler. + * - Immediately if the message queue is not full, or + * - After successfully waiting for the message queue to become + * non-FULL. This would fail with EAGAIN, EINTR, or ETIMEOUT. */ saved_state = irqsave(); - if (up_interrupt_context()) + if (up_interrupt_context() || /* In an interrupt handler */ + msgq->nmsgs < msgq->maxmsgs || /* OR Message queue not full */ + mq_waitsend(mqdes) == OK) /* OR Successfully waited for mq not full */ { - curr = mq_msgalloc(); - } + /* Allocate the message */ - /* Otherwise, arbitrarily limit the number of messages in the - * queue to the value determined when the message queue was opened. - * This makes us more POSIX-like as well as prohibits one slow - * responding task from consuming all available memory. - */ - - else if (msgq->nmsgs >= msgq->maxmsgs) - { - /* Should we block until there is sufficient space in the - * message queue? - */ - - if ((mqdes->oflags & O_NONBLOCK) != 0) - { - /* No... We will return an error to the caller. */ - - *get_errno_ptr() = EAGAIN; - curr = NULL; - } - - /* Yes... We will not return control until the message queue is - * available. - */ - - else - { - boolean interrupted = FALSE; - - /* Loop until there are fewer than max allowable messages in the - * receiving message queue - */ - - while (msgq->nmsgs >= msgq->maxmsgs) - { - /* Block until the message queue is no longer full. - * When we are unblocked, we will try again - */ - - rtcb = (FAR _TCB*)g_readytorun.head; - rtcb->msgwaitq = msgq; - (msgq->nwaitnotfull)++; - - *get_errno_ptr() = OK; - up_block_task(rtcb, TSTATE_WAIT_MQNOTFULL); - - /* When we resume at this point, either (1) the message queue - * is no longer empty, or (2) the wait has been interrupted by - * a signal. We can detect the latter case be examining the - * errno value (should be EINTR). - */ - - if (*get_errno_ptr() != OK) - { - interrupted = TRUE; - break; - } - } - - /* If we were not interrupted, then it should be okay to add - * a message to the receiving message queue now. - */ - - if (!interrupted) - { - curr = mq_msgalloc(); - } - } + irqrestore(saved_state); + mqmsg = mq_msgalloc(); } - - /* We are not in an interrupt handler and the receiving message queue - * is not full - */ - else { - /* Just allocate a message */ - - curr = mq_msgalloc(); - } - irqrestore(saved_state); - - /* Check if we were able to get a message structure */ - - if (curr) - { - /* Construct the current message header info */ - - curr->priority = (ubyte)prio; - curr->msglen = (ubyte)msglen; - - /* Copy the message data into the message */ - - memcpy((void*)curr->mail, (const void*)msg, msglen); - - /* Insert the new message in the message queue */ - - saved_state = irqsave(); - - /* Search the message list to find the location to insert the new - * message. Each is list is maintained in ascending priority order. + /* We cannot send the message (and didn't even try to allocate it) + * because: + * - We are not in an interrupt handler AND + * - The message queue is full AND + * - When we tried waiting, the wait was unsuccessful. */ - for (prev = NULL, next = (FAR mqmsg_t*)msgq->msglist.head; - next && prio <= next->priority; - prev = next, next = next->next); - - /* Add the message at the right place */ - - if (prev) - { - sq_addafter((FAR sq_entry_t*)prev, (FAR sq_entry_t*)curr, - &msgq->msglist); - } - else - { - sq_addfirst((FAR sq_entry_t*)curr, &msgq->msglist); - } - - /* Increment the count of message in the queue */ - - msgq->nmsgs++; irqrestore(saved_state); + } - /* Check if we need to notify any tasks that are attached to the - * message queue - */ - -#ifndef CONFIG_DISABLE_SIGNALS - if (msgq->ntmqdes) - { - /* Remove the message notification data from the message queue. */ - -#ifdef CONFIG_CAN_PASS_STRUCTS - union sigval value = msgq->ntvalue; -#else - void *sival_ptr = msgq->ntvalue.sival_ptr; -#endif - int signo = msgq->ntsigno; - int pid = msgq->ntpid; - - /* Detach the notification */ - - msgq->ntpid = INVALID_PROCESS_ID; - msgq->ntsigno = 0; - msgq->ntvalue.sival_int = 0; - msgq->ntmqdes = NULL; - - /* Queue the signal -- What if this returns an error? */ - -#ifdef CONFIG_CAN_PASS_STRUCTS - sig_mqnotempty(pid, signo, value); -#else - sig_mqnotempty(pid, signo, sival_ptr); -#endif - } -#endif - /* Check if any tasks are waiting for the MQ not empty event. */ - - saved_state = irqsave(); - if (msgq->nwaitnotempty > 0) - { - /* Find the highest priority task that is waiting for - * this queue to be non-empty in g_waitingformqnotempty - * list. sched_lock() should give us sufficent protection since - * interrupts should never cause a change in this list - */ - - for (btcb = (FAR _TCB*)g_waitingformqnotempty.head; - btcb && btcb->msgwaitq != msgq; - btcb = btcb->flink); + /* Check if we were able to get a message structure -- this can fail + * either because we cannot send the message (and didn't bother trying + * to allocate it) or because the allocation failed. + */ - /* If one was found, unblock it */ + if (mqmsg) + { + /* Yes, peforrm the message send. */ - if (!btcb) - { - PANIC(OSERR_MQNONEMPTYCOUNT); - } - else - { - btcb->msgwaitq = NULL; - msgq->nwaitnotempty--; - up_unblock_task(btcb); - } - } - irqrestore(saved_state); - ret = OK; + ret = mq_dosend(mqdes, mqmsg, msg, msglen, prio); } sched_unlock(); - return(ret); + return ret; } diff --git a/nuttx/sched/mq_sndinternal.c b/nuttx/sched/mq_sndinternal.c new file mode 100644 index 000000000..09d56333b --- /dev/null +++ b/nuttx/sched/mq_sndinternal.c @@ -0,0 +1,448 @@ +/**************************************************************************** + * mq_send.c + * + * Copyright (C) 2007 Gregory Nutt. All rights reserved. + * Author: Gregory Nutt <spudmonkey@racsa.co.cr> + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name Gregory Nutt nor the names of its contributors may be + * used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS + * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + ****************************************************************************/ + +/**************************************************************************** + * Included Files + ****************************************************************************/ + +#include <nuttx/config.h> +#include <nuttx/kmalloc.h> +#include <sys/types.h> +#include <fcntl.h> +#include <mqueue.h> +#include <string.h> +#include <errno.h> +#include <sched.h> +#include <debug.h> +#include <nuttx/arch.h> +#include "os_internal.h" +#ifndef CONFIG_DISABLE_SIGNALS +# include "sig_internal.h" +#endif +#include "mq_internal.h" + +/**************************************************************************** + * Definitions + ****************************************************************************/ + +/**************************************************************************** + * Private Type Declarations + ****************************************************************************/ + +/**************************************************************************** + * Global Variables + ****************************************************************************/ + +/**************************************************************************** + * Private Variables + ****************************************************************************/ + +/**************************************************************************** + * Private Functions + ****************************************************************************/ + +/**************************************************************************** + * Public Functions + ****************************************************************************/ + +/**************************************************************************** + * Name: mq_verifysend + * + * Description: + * This is internal, common logic shared by both mq_send and mq_timesend. + * This function verifies the input parameters that are common to both + * functions. + * + * Parameters: + * mqdes - Message queue descriptor + * msg - Message to send + * msglen - The length of the message in bytes + * prio - The priority of the message + * + * Return Value: + * One success, 0 (OK) is returned. On failure, -1 (ERROR) is returned and + * the errno is set appropriately: + * + * EINVAL Either msg or mqdes is NULL or the value of prio is invalid. + * EPERM Message queue opened not opened for writing. + * EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the + * message queue. + * + * Assumptions: + * + ****************************************************************************/ + +int mq_verifysend(mqd_t mqdes, const void *msg, size_t msglen, int prio) +{ + /* Verify the input parameters */ + + if (!msg || !mqdes || prio < 0 || prio > MQ_PRIO_MAX) + { + *get_errno_ptr() = EINVAL; + return ERROR; + } + + if ((mqdes->oflags & O_WROK) == 0) + { + *get_errno_ptr() = EPERM; + return ERROR; + } + + if (msglen < 0 || msglen > (size_t)mqdes->msgq->maxmsgsize) + { + *get_errno_ptr() = EMSGSIZE; + return ERROR; + } + + return OK; +} + +/**************************************************************************** + * Function: mq_msgalloc + * + * Description: + * The mq_msgalloc function will get a free message for use by the + * operating system. The message will be allocated from the g_msgfree list. + * + * If the list is empty AND the message is NOT being allocated from the + * interrupt level, then the message will be allocated. If a message + * cannot be obtained, the operating system is dead and therefore cannot + * continue. + * + * If the list is empty AND the message IS being allocated from the + * interrupt level. This function will attempt to get a message from + * the g_msgfreeirq list. If this is unsuccessful, the calling interrupt + * handler will be notified. + * + * Inputs: + * None + * + * Return Value: + * A reference to the allocated msg structure. On a failure to allocate, + * this function PANICs. + * + ****************************************************************************/ + +FAR mqmsg_t *mq_msgalloc(void) +{ + FAR mqmsg_t *mqmsg; + irqstate_t saved_state; + + /* If we were called from an interrupt handler, then try to get the message + * from generally available list of messages. If this fails, then try the + * list of messages reserved for interrupt handlers + */ + + if (up_interrupt_context()) + { + /* Try the general free list */ + + mqmsg = (FAR mqmsg_t*)sq_remfirst(&g_msgfree); + if (!mqmsg) + { + /* Try the free list reserved for interrupt handlers */ + + mqmsg = (FAR mqmsg_t*)sq_remfirst(&g_msgfreeirq); + } + } + + /* We were not called from an interrupt handler. */ + + else + { + /* Try to get the message from the generally available free list. + * Disable interrupts -- we might be called from an interrupt handler. + */ + + saved_state = irqsave(); + mqmsg = (FAR mqmsg_t*)sq_remfirst(&g_msgfree); + irqrestore(saved_state); + + /* If we cannot a message from the free list, then we will have to allocate one. */ + + if (!mqmsg) + { + mqmsg = (FAR mqmsg_t *)kmalloc((sizeof (mqmsg_t))); + + /* Check if we got an allocated message */ + + if (mqmsg) + { + mqmsg->type = MQ_ALLOC_DYN; + } + + /* No? We are dead */ + + else + { + dbg("Out of messages\n"); + PANIC((uint32)OSERR_OUTOFMESSAGES); + } + } + } + + return mqmsg; +} + +/**************************************************************************** + * Function: mq_waitsend + * + * Description: + * This is internal, common logic shared by both mq_send and mq_timesend. + * This function waits until the message queue is not full. + * + * Parameters: + * mqdes - Message queue descriptor + * + * Return Value: + * On success, mq_waitmqnotfull() returns 0 (OK); on error, -1 (ERROR) is + * returned, with errno set to indicate the error: + * + * EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the + * message queue description referred to by mqdes. + * EINTR The call was interrupted by a signal handler. + * ETIMEOUT A timeout expired before the message queue became non-full + * (mq_timedsend only). + * + * Assumptions/restrictions: + * - The caller has verified the input parameters using mq_verifysend(). + * - Interrupts are disabled. + * + ****************************************************************************/ + +int mq_waitsend(mqd_t mqdes) +{ + FAR _TCB *rtcb; + FAR msgq_t *msgq; + + /* Get a pointer to the message queue */ + + msgq = mqdes->msgq; + + /* Verify that the queue is indeed full as the caller thinks */ + + if (msgq->nmsgs >= msgq->maxmsgs) + { + /* Should we block until there is sufficient space in the + * message queue? + */ + + if ((mqdes->oflags & O_NONBLOCK) != 0) + { + /* No... We will return an error to the caller. */ + + *get_errno_ptr() = EAGAIN; + return ERROR; + } + + /* Yes... We will not return control until the message queue is + * available or we receive a signal or at timout occurs. + */ + + else + { + /* Loop until there are fewer than max allowable messages in the + * receiving message queue + */ + + while (msgq->nmsgs >= msgq->maxmsgs) + { + /* Block until the message queue is no longer full. + * When we are unblocked, we will try again + */ + + rtcb = (FAR _TCB*)g_readytorun.head; + rtcb->msgwaitq = msgq; + (msgq->nwaitnotfull)++; + + *get_errno_ptr() = OK; + up_block_task(rtcb, TSTATE_WAIT_MQNOTFULL); + + /* When we resume at this point, either (1) the message queue + * is no longer empty, or (2) the wait has been interrupted by + * a signal. We can detect the latter case be examining the + * errno value (should be EINTR or ETIMEOUT). + */ + + if (*get_errno_ptr() != OK) + { + return ERROR; + } + } + } + } + return OK; +} + +/**************************************************************************** + * Function: mq_dosend + * + * Description: + * This is internal, common logic shared by both mq_send and mq_timesend. + * This function adds the specificied message (msg) to the message queue + * (mqdes). Then it notifies any tasks that were waiting for message + * queue notifications setup by mq_notify. And, finally, it awakens any + * tasks that were waiting for the message not empty event. + * + * Parameters: + * mqdes - Message queue descriptor + * msg - Message to send + * msglen - The length of the message in bytes + * prio - The priority of the message + * + * Return Value: + * This function always returns OK. + * + * Assumptions/restrictions: + * + ****************************************************************************/ + +int mq_dosend(mqd_t mqdes, FAR mqmsg_t *mqmsg, const void *msg, size_t msglen, int prio) +{ + FAR _TCB *btcb; + FAR msgq_t *msgq; + FAR mqmsg_t *next; + FAR mqmsg_t *prev; + irqstate_t saved_state; + + /* Get a pointer to the message queue */ + + sched_lock(); + msgq = mqdes->msgq; + + /* Construct the message header info */ + + mqmsg->priority = prio; + mqmsg->msglen = msglen; + + /* Copy the message data into the message */ + + memcpy((void*)mqmsg->mail, (const void*)msg, msglen); + + /* Insert the new message in the message queue */ + + saved_state = irqsave(); + + /* Search the message list to find the location to insert the new + * message. Each is list is maintained in ascending priority order. + */ + + for (prev = NULL, next = (FAR mqmsg_t*)msgq->msglist.head; + next && prio <= next->priority; + prev = next, next = next->next); + + /* Add the message at the right place */ + + if (prev) + { + sq_addafter((FAR sq_entry_t*)prev, (FAR sq_entry_t*)mqmsg, + &msgq->msglist); + } + else + { + sq_addfirst((FAR sq_entry_t*)mqmsg, &msgq->msglist); + } + + /* Increment the count of messages in the queue */ + + msgq->nmsgs++; + irqrestore(saved_state); + + /* Check if we need to notify any tasks that are attached to the + * message queue + */ + +#ifndef CONFIG_DISABLE_SIGNALS + if (msgq->ntmqdes) + { + /* Remove the message notification data from the message queue. */ + +#ifdef CONFIG_CAN_PASS_STRUCTS + union sigval value = msgq->ntvalue; +#else + void *sival_ptr = msgq->ntvalue.sival_ptr; +#endif + int signo = msgq->ntsigno; + int pid = msgq->ntpid; + + /* Detach the notification */ + + msgq->ntpid = INVALID_PROCESS_ID; + msgq->ntsigno = 0; + msgq->ntvalue.sival_int = 0; + msgq->ntmqdes = NULL; + + /* Queue the signal -- What if this returns an error? */ + +#ifdef CONFIG_CAN_PASS_STRUCTS + sig_mqnotempty(pid, signo, value); +#else + sig_mqnotempty(pid, signo, sival_ptr); +#endif + } +#endif + /* Check if any tasks are waiting for the MQ not empty event. */ + + saved_state = irqsave(); + if (msgq->nwaitnotempty > 0) + { + /* Find the highest priority task that is waiting for + * this queue to be non-empty in g_waitingformqnotempty + * list. sched_lock() should give us sufficent protection since + * interrupts should never cause a change in this list + */ + + for (btcb = (FAR _TCB*)g_waitingformqnotempty.head; + btcb && btcb->msgwaitq != msgq; + btcb = btcb->flink); + + /* If one was found, unblock it */ + + if (!btcb) + { + PANIC(OSERR_MQNONEMPTYCOUNT); + } + else + { + btcb->msgwaitq = NULL; + msgq->nwaitnotempty--; + up_unblock_task(btcb); + } + } + irqrestore(saved_state); + sched_unlock(); + return OK; +} + diff --git a/nuttx/sched/mq_timedreceive.c b/nuttx/sched/mq_timedreceive.c new file mode 100644 index 000000000..39fafdc7d --- /dev/null +++ b/nuttx/sched/mq_timedreceive.c @@ -0,0 +1,308 @@ +/**************************************************************************** + * mq_timedreceive.c + * + * Copyright (C) 2007 Gregory Nutt. All rights reserved. + * Author: Gregory Nutt <spudmonkey@racsa.co.cr> + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name Gregory Nutt nor the names of its contributors may be + * used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS + * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + ****************************************************************************/ + +/**************************************************************************** + * Included Files + ****************************************************************************/ + +#include <sys/types.h> +#include <unistd.h> +#include <errno.h> +#include <mqueue.h> +#include <wdog.h> +#include <debug.h> +#include <nuttx/arch.h> +#include "os_internal.h" +#include "clock_internal.h" +#include "mq_internal.h" + +/**************************************************************************** + * Definitions + ****************************************************************************/ + +/**************************************************************************** + * Private Type Declarations + ****************************************************************************/ + +/**************************************************************************** + * Global Variables + ****************************************************************************/ + +/**************************************************************************** + * Private Variables + ****************************************************************************/ + +/**************************************************************************** + * Private Functions + ****************************************************************************/ + +/**************************************************************************** + * Function: mq_rcvtimeout + * + * Description: + * This function is called if the timeout elapses before the message queue + * becomes non-empty. + * + * Parameters: + * argc - the number of arguments (should be 1) + * pid - the task ID of the task to wakeup + * + * Return Value: + * None + * + * Assumptions: + * + ****************************************************************************/ + +static void mq_rcvtimeout(int argc, uint32 pid, ...) +{ + FAR _TCB *wtcb; + irqstate_t saved_state; + + /* Disable interrupts. This is necessary because an + * interrupt handler may attempt to send a message while we are + * doing this. + */ + + saved_state = irqsave(); + + /* Get the TCB associated with this pid. It is possible that + * task may no longer be active when this watchdog goes off. + */ + + wtcb = sched_gettcb((pid_t)pid); + + /* It is also possible that an interrupt/context switch beat us to the + * punch and already changed the task's state. + */ + + if (wtcb && wtcb->task_state == TSTATE_WAIT_MQNOTEMPTY) + { + /* Mark the errno value for the thread. */ + + wtcb->errno = ETIMEDOUT; + + /* Restart the the task. */ + + up_unblock_task(wtcb); + } + + /* Interrupts may now be enabled. */ + + irqrestore(saved_state); +} + +/**************************************************************************** + * Public Functions + ****************************************************************************/ + +/**************************************************************************** + * Function: mq_timedreceive + * + * Description: + * This function receives the oldest of the highest + * priority messages from the message queue specified by + * "mqdes." If the size of the buffer in bytes (msglen) is + * less than the "mq_msgsize" attribute of the message + * queue, mq_timedreceive will return an error. Otherwise, the + * selected message is removed from the queue and copied to + * "msg." + * + * If the message queue is empty and O_NONBLOCK was not + * set, mq_timedreceive() will block until a message is added + * to the message queue (or until a timeout occurs). If more + * than one task is waiting to receive a message, only the + * task with the highest priority that has waited the longest + * will be unblocked. + * + * mq_timedreceive() behaves just like mq_receive(), except + * that if the queue is empty and the O_NONBLOCK flag is not + * enabled for the message queue description, then abstime + * points to a structure which specifies a ceiling on the time + * for which the call will block. This ceiling is an absolute + * timeout in seconds and nanoseconds since the Epoch (midnight + * on the morning of 1 January 1970). + * + * If no message is available, and the timeout has already + * expired by the time of the call, mq_timedreceive() returns + * immediately. + * + * Parameters: + * mqdes - Message Queue Descriptor + * msg - Buffer to receive the message + * msglen - Size of the buffer in bytes + * prio - If not NULL, the location to store message priority. + * abstime - the absolute time to wait until a timeout is declared. + * + * Return Value: + * One success, the length of the selected message in bytes.is + * returned. On failure, -1 (ERROR) is returned and the errno + * is set appropriately: + * + * EAGAIN The queue was empty, and the O_NONBLOCK flag was set + * for the message queue description referred to by 'mqdes'. + * EPERM Message queue opened not opened for reading. + * EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the + * message queue. + * EINTR The call was interrupted by a signal handler. + * EINVAL Invalid 'msg' or 'mqdes' or 'abstime' + * ETIMEDOUT The call timed out before a message could be transferred. + * + * Assumptions: + * + ****************************************************************************/ + +ssize_t mq_timedreceive(mqd_t mqdes, void *msg, size_t msglen, + int *prio, const struct timespec *abstime) +{ + WDOG_ID wdog; + FAR mqmsg_t *mqmsg; + irqstate_t saved_state; + int ret = ERROR; + + DEBUGASSERT(!up_interrupt_context()); + + /* Verify the input parameters and, in case of an error, set + * errno appropriately. + */ + + if (mq_verifyreceive(mqdes, msg, msglen) != OK) + { + return ERROR; + } + + if (!abstime || abstime->tv_sec < 0 || abstime->tv_nsec > 1000000000) + { + *get_errno_ptr() = EINVAL; + return ERROR; + } + + /* Create a watchdog. We will not actually need this watchdog + * unless the queue is not empty, but we will reserve it up front + * before we enter the following critical section. + */ + + wdog = wd_create(); + if (!wdog) + { + *get_errno_ptr() = EINVAL; + return ERROR; + } + + /* Get the next mesage from the message queue. We will disable + * pre-emption until we have completed the message received. This + * is not too bad because if the receipt takes a long time, it will + * be because we are blocked waiting for a message and pre-emption + * will be re-enabled while we are blocked + */ + + sched_lock(); + + /* Furthermore, mq_waitreceive() expects to have interrupts disabled + * because messages can be sent from interrupt level. + */ + + saved_state = irqsave(); + + /* Check if the message queue is empty. If it is NOT empty, then we + * will not need to start timer. + */ + + if (mqdes->msgq->msglist.head == NULL) + { + sint32 ticks; + + /* Convert the timespec to clock ticks. We must have interrupts + * disabled here so that this time stays valid until the wait begins. + */ + + ret = clock_abstime2ticks(CLOCK_REALTIME, abstime, &ticks); + + /* If the time has already expired and the message queue is empty, + * return immediately. + */ + + if (ret == OK && ticks <= 0) + { + ret = ETIMEDOUT; + } + + /* Handle any time-related errors */ + + if (ret != OK) + { + *get_errno_ptr() = ret; + irqrestore(saved_state); + sched_unlock(); + wd_delete(wdog); + return ERROR; + } + + /* Start the watchdog */ + + wd_start(wdog, ticks, (wdentry_t)mq_rcvtimeout, 1, getpid()); + } + + /* Get the message from the message queue */ + + mqmsg = mq_waitreceive(mqdes); + + /* Stop the watchdog timer (this is not harmful in the case where + * it was never started) + */ + + wd_cancel(wdog); + + /* We can now restore interrupts */ + + irqrestore(saved_state); + + /* Check if we got a message from the message queue. We might + * not have a message if: + * + * - The message queue is empty and O_NONBLOCK is set in the mqdes + * - The wait was interrupted by a signal + * - The watchdog timeout expired + */ + + if (mqmsg) + { + ret = mq_doreceive(mqdes, mqmsg, msg, prio); + } + + sched_unlock(); + wd_delete(wdog); + return ret; +} diff --git a/nuttx/sched/mq_timedsend.c b/nuttx/sched/mq_timedsend.c new file mode 100644 index 000000000..710df84d3 --- /dev/null +++ b/nuttx/sched/mq_timedsend.c @@ -0,0 +1,317 @@ +/**************************************************************************** + * mq_timedsend.c + * + * Copyright (C) 2007 Gregory Nutt. All rights reserved. + * Author: Gregory Nutt <spudmonkey@racsa.co.cr> + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name Gregory Nutt nor the names of its contributors may be + * used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS + * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + ****************************************************************************/ + +/**************************************************************************** + * Included Files + ****************************************************************************/ + +#include <nuttx/config.h> +#include <sys/types.h> +#include <unistd.h> +#include <mqueue.h> +#include <wdog.h> +#include <errno.h> +#include <debug.h> +#include <nuttx/arch.h> +#include "clock_internal.h" +#include "os_internal.h" +#include "mq_internal.h" + +/**************************************************************************** + * Definitions + ****************************************************************************/ + +/**************************************************************************** + * Private Type Declarations + ****************************************************************************/ + +/**************************************************************************** + * Global Variables + ****************************************************************************/ + +/**************************************************************************** + * Private Variables + ****************************************************************************/ + +/**************************************************************************** + * Private Functions + ****************************************************************************/ + +/**************************************************************************** + * Function: mq_sndtimeout + * + * Description: + * This function is called if the timeout elapses before the message queue + * becomes non-full. + * + * Parameters: + * argc - the number of arguments (should be 1) + * pid - the task ID of the task to wakeup + * + * Return Value: + * None + * + * Assumptions: + * + ****************************************************************************/ + +static void mq_sndtimeout(int argc, uint32 pid, ...) +{ + FAR _TCB *wtcb; + irqstate_t saved_state; + + /* Disable interrupts. This is necessary because an + * interrupt handler may attempt to send a message while we are + * doing this. + */ + + saved_state = irqsave(); + + /* Get the TCB associated with this pid. It is possible that + * task may no longer be active when this watchdog goes off. + */ + + wtcb = sched_gettcb((pid_t)pid); + + /* It is also possible that an interrupt/context switch beat us to the + * punch and already changed the task's state. + */ + + if (wtcb && wtcb->task_state == TSTATE_WAIT_MQNOTEMPTY) + { + /* Mark the errno value for the thread. */ + + wtcb->errno = ETIMEDOUT; + + /* Restart the the task. */ + + up_unblock_task(wtcb); + } + + /* Interrupts may now be enabled. */ + + irqrestore(saved_state); +} + +/**************************************************************************** + * Public Functions + ****************************************************************************/ + +/**************************************************************************** + * Function: mq_send + * + * Description: + * This function adds the specificied message (msg) to the message queue + * (mqdes). The "msglen" parameter specifies the length of the message + * in bytes pointed to by "msg." This length must not exceed the maximum + * message length from the mq_getattr(). + * + * If the message queue is not full, mq_timedsend() place the message in the + * message queue at the position indicated by the "prio" argrument. + * Messages with higher priority will be inserted before lower priority + * messages. The value of "prio" must not exceed MQ_PRIO_MAX. + * + * If the specified message queue is full and O_NONBLOCK is not set in the + * message queue, then mq_timedsend() will block until space becomes available + * to the queue the message or a timeout occurs. + * + * mq_timedsend() behaves just like mq_send(), except that if the queue + * is full and the O_NONBLOCK flag is not enabled for the message queue + * description, then abstime points to a structure which specifies a + * ceiling on the time for which the call will block. This ceiling is an + * absolute timeout in seconds and nanoseconds since the Epoch (midnight + * on the morning of 1 January 1970). + * + * If the message queue is full, and the timeout has already expired by + * the time of the call, mq_timedsend() returns immediately. + * + * Parameters: + * mqdes - Message queue descriptor + * msg - Message to send + * msglen - The length of the message in bytes + * prio - The priority of the message + * abstime - the absolute time to wait until a timeout is decleared + * + * Return Value: + * On success, mq_send() returns 0 (OK); on error, -1 (ERROR) + * is returned, with errno set to indicate the error: + * + * EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the + * message queue description referred to by mqdes. + * EINVAL Either msg or mqdes is NULL or the value of prio is invalid. + * EPERM Message queue opened not opened for writing. + * EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the + * message queue. + * EINTR The call was interrupted by a signal handler. + * + * Assumptions/restrictions: + * + ****************************************************************************/ + +int mq_timedsend(mqd_t mqdes, const char *msg, size_t msglen, int prio, + const struct timespec *abstime) +{ + WDOG_ID wdog; + FAR msgq_t *msgq; + FAR mqmsg_t *mqmsg = NULL; + irqstate_t saved_state; + int ret = ERROR; + + DEBUGASSERT(!up_interrupt_context()); + + /* Verify the input parameters -- setting errno appropriately + * on any failures to verify. + */ + + if (mq_verifysend(mqdes, msg, msglen, prio) != OK) + { + return ERROR; + } + + if (!abstime || abstime->tv_sec < 0 || abstime->tv_nsec > 1000000000) + { + *get_errno_ptr() = EINVAL; + return ERROR; + } + + /* Get a pointer to the message queue */ + + msgq = mqdes->msgq; + + /* Create a watchdog. We will not actually need this watchdog + * unless the queue is full, but we will reserve it up front + * before we enter the following critical section. + */ + + wdog = wd_create(); + if (!wdog) + { + *get_errno_ptr() = EINVAL; + return ERROR; + } + + /* Allocate a message structure: + * - If we are called from an interrupt handler, or + * - If the message queue is not full, or + */ + + sched_lock(); + saved_state = irqsave(); + if (up_interrupt_context() || /* In an interrupt handler */ + msgq->nmsgs < msgq->maxmsgs) /* OR Message queue not full */ + { + /* Allocate the message */ + + irqrestore(saved_state); + mqmsg = mq_msgalloc(); + } + else + { + sint32 ticks; + int result; + + /* We are not in an interupt handler and the message queue is full. + * set up a timed wait for the message queue to become non-full. + * + * Convert the timespec to clock ticks. We must have interrupts + * disabled here so that this time stays valid until the wait begins. + */ + + result = clock_abstime2ticks(CLOCK_REALTIME, abstime, &ticks); + + /* If the time has already expired and the message queue is empty, + * return immediately. + */ + + if (result == OK && ticks <= 0) + { + result = ETIMEDOUT; + } + + /* Handle any time-related errors */ + + if (result == OK) + { + /* Start the watchdog */ + + wd_start(wdog, ticks, (wdentry_t)mq_sndtimeout, 1, getpid()); + + /* And wait for the message queue to be non-empty */ + + result = mq_waitsend(mqdes); + + /* This may return with an error and errno set to either EINTR + * or ETIMEOUT. Cancel the watchdog timer in any event. + */ + + wd_cancel(wdog); + } + + /* That is the end of the atomic operations */ + + irqrestore(saved_state); + + /* If any of the above failed, set the errno. Otherwise, there should + * be space for another message in the message queue. NOW we can allocate + * the message structure. + */ + + if (result != OK) + { + *get_errno_ptr() = result; + } + else + { + mqmsg = mq_msgalloc(); + } + } + + /* Check if we were able to get a message structure -- this can fail + * either because we cannot send the message (and didn't bother trying + * to allocate it) or because the allocation failed. + */ + + if (mqmsg) + { + /* Yes, peforrm the message send. */ + + ret = mq_dosend(mqdes, mqmsg, msg, msglen, prio); + } + + sched_unlock(); + wd_delete(wdog); + return ret; +} + |