summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorpatacongo <patacongo@42af7a65-404d-4744-a932-0658087f49c3>2007-03-29 13:25:18 +0000
committerpatacongo <patacongo@42af7a65-404d-4744-a932-0658087f49c3>2007-03-29 13:25:18 +0000
commit605bd84ced2cf37e267eccf604bdff1a985a65d8 (patch)
treec16c8b2b4e77974763bc6df44c2c116eac67f454
parenta300116bf7139941033b6d36dc3814af9d1e67c9 (diff)
downloadnuttx-605bd84ced2cf37e267eccf604bdff1a985a65d8.tar.gz
nuttx-605bd84ced2cf37e267eccf604bdff1a985a65d8.tar.bz2
nuttx-605bd84ced2cf37e267eccf604bdff1a985a65d8.zip
Added mq_timedsend() and mq_timedreceive()
git-svn-id: svn://svn.code.sf.net/p/nuttx/code/trunk@166 42af7a65-404d-4744-a932-0658087f49c3
-rw-r--r--nuttx/ChangeLog5
-rw-r--r--nuttx/Documentation/NuttX.html5
-rw-r--r--nuttx/Documentation/NuttxUserGuide.html189
-rw-r--r--nuttx/include/mqueue.h14
-rw-r--r--nuttx/sched/Makefile4
-rw-r--r--nuttx/sched/mq_internal.h76
-rw-r--r--nuttx/sched/mq_rcvinternal.c314
-rw-r--r--nuttx/sched/mq_receive.c167
-rw-r--r--nuttx/sched/mq_send.c418
-rw-r--r--nuttx/sched/mq_sndinternal.c448
-rw-r--r--nuttx/sched/mq_timedreceive.c308
-rw-r--r--nuttx/sched/mq_timedsend.c317
12 files changed, 1732 insertions, 533 deletions
diff --git a/nuttx/ChangeLog b/nuttx/ChangeLog
index 476f87bbb..181082fc7 100644
--- a/nuttx/ChangeLog
+++ b/nuttx/ChangeLog
@@ -93,6 +93,11 @@
* mq_receive and mq_send now return errno's appropriately
* mq_receive and mq_send are now correctly awakened by signals.
+ * Fixed an unmatched sched_lock/unlock pair in task_delete().
+ * sched_lock must be called in _exit() because operation of
+ task_delete() can cause pending tasks to be merged and a
+ context switch to occur.
+ * Added mq_timedreceive() and mq_timedsend()
* Started m68322
diff --git a/nuttx/Documentation/NuttX.html b/nuttx/Documentation/NuttX.html
index 087fd48a7..fcdb9b1d1 100644
--- a/nuttx/Documentation/NuttX.html
+++ b/nuttx/Documentation/NuttX.html
@@ -454,6 +454,11 @@ Other memory:
* mq_receive and mq_send now return errno's appropriately
* mq_receive and mq_send are now correctly awakened by signals.
+ * Fixed an unmatched sched_lock/unlock pair in task_delete().
+ * sched_lock must be called in _exit() because operation of
+ task_delete() can cause pending tasks to be merged and a
+ context switch to occur.
+ * Added mq_timedreceive() and mq_timedsend()
* Started m68322
</pre></ul>
diff --git a/nuttx/Documentation/NuttxUserGuide.html b/nuttx/Documentation/NuttxUserGuide.html
index d706c60c6..3fd59357c 100644
--- a/nuttx/Documentation/NuttxUserGuide.html
+++ b/nuttx/Documentation/NuttxUserGuide.html
@@ -1021,10 +1021,12 @@ on this thread of execution.
<li><a href="#mqclose">2.4.2 mq_close</a></li>
<li><a href="#mqunlink">2.4.3 mq_unlink</a></li>
<li><a href="#mqsend">2.4.4 mq_send</a></li>
- <li><a href="#mqreceive">2.4.5 mq_receive</a></li>
- <li><a href="#mqnotify">2.4.6 mq_notify</a></li>
- <li><a href="#mqsetattr">2.4.7 mq_setattr</a></li>
- <li><a href="#mqgetattr">2.4.8 mq_getattr</a></li>
+ <li><a href="#mqtimedsend">2.4.5 mq_timedsend</a></li>
+ <li><a href="#mqreceive">2.4.6 mq_receive</a></li>
+ <li><a href="#mqtimedreceive">2.4.7 mq_timedreceive</a></li>
+ <li><a href="#mqnotify">2.4.8 mq_notify</a></li>
+ <li><a href="#mqsetattr">2.4.9 mq_setattr</a></li>
+ <li><a href="#mqgetattr">2.4.10 mq_getattr</a></li>
</ul>
<H3><a name="mqopen">2.4.1 mq_open</a></H3>
@@ -1171,7 +1173,6 @@ closed.
interface of the same name.
<H3><a name="mqsend">2.4.4 mq_send</a></H3>
-
<p>
<b>Function Prototype:</b>
</p>
@@ -1246,13 +1247,96 @@ interface of the same name.
Comparable to the POSIX interface of the same name.
</p>
+<h3><a name="mqtimedsend">mq_timedsend</a></h3>
+<b>Function Prototype:</b>
+</p>
+<pre>
+ #include &lt;mqueue.h&gt;
+ int mq_timedsend(mqd_t mqdes, const char *msg, size_t msglen, int prio,
+ const struct timespec *abstime);
+</pre>
+<p>
+<b>Description:</b>
+ This function adds the specified message, <code>msg</code>,
+ to the message queue, <code>mqdes</code>.
+ The <code>msglen</code> parameter specifies the length of the message in bytes pointed to by <code>msg</code>.
+ This length must not exceed the maximum message length from the <code>mq_getattr()</code>.
+</p>
+<p>
+ If the message queue is not full, <code>mq_timedsend()</code> will place the <code>msg</code>
+ in the message queue at the position indicated by the <code>prio</code> argument.
+ Messages with higher priority will be inserted before lower priority messages
+ The value of <code>prio</code> must not exceed <code>MQ_PRIO_MAX</code>.
+</p>
+<p>
+ If the specified message queue is full and <code>O_NONBLOCK</code> is not
+ set in the message queue, then <code>mq_send()</code> will block until space
+ becomes available to the queue the message or until a timeout occurs.
+</p>
+<p>
+ <code>mq_timedsend()</code> behaves just like <code>mq_send()</code>, except
+ that if the queue is full and the <code>O_NONBLOCK</code> flag is not enabled
+ for the message queue description, then <code>abstime</code> points to a
+ structure which specifies a ceiling on the time for which the call will block.
+ This ceiling is an absolute timeout in seconds and nanoseconds since the
+ Epoch (midnight on the morning of 1 January 1970).
+</p>
+<p>
+ If the message queue is full, and the timeout has already expired by the time
+ of the call, <code>mq_timedsend()<code> returns immediately.
+</p>
+<p>
+ <b>Input Parameters:</b>
+</p>
+<ul>
+ <li><code>mqdes</code>. Message queue descriptor.</li>
+ <li><code>msg</code>. Message to send.</li>
+ <li><code>msglen</code>. The length of the message in bytes.</li>
+ <li><code>prio</code>. The priority of the message.</li>
+</ul>
+<p>
+ <b>Returned Values:</b>
+ On success, <code>mq_send()</code> returns 0 (<code>OK</code>);
+ on error, -1 (<code>ERROR</code>) is returned, with <code>errno</code> set
+ to indicate the error:
+</p>
+<ul>
+ <li>
+ <code>EAGAIN</code>.
+ The queue was empty, and the <code>O_NONBLOCK</code> flag was set for the message queue description referred to by <code>mqdes</code>.
+ </li>
+ <li>
+ <code>EINVAL</code>.
+ Either <code>msg</code> or <code>mqdes</code> is <code>NULL</code> or the value of <code>prio</code> is invalid.
+ </li>
+ <li>
+ <code>EPERM</code>.
+ Message queue opened not opened for writing.
+ </li>
+ <li>
+ <code>EMSGSIZE</code>.
+ <code>msglen</code> was greater than the <code>maxmsgsize</code> attribute of the message queue.
+ </li>
+ <li>
+ <code>EINTR</code>.
+ The call was interrupted by a signal handler.
+ </li>
+</ul>
+<p>
+ <b>Assumptions/Limitations:</b>
+</p>
+<p>
+ <b>POSIX Compatibility:</b>
+ Comparable to the POSIX interface of the same name.
+</p>
+
<h3><a name="mqreceive">2.4.5 mq_receive</a></h3>
<p>
<b>Function Prototype:</b>
</p>
<pre>
#include &lt;mqueue.h&gt;
- int mq_receive(mqd_t mqdes, void *msg, size_t msglen, int *prio);
+ ssize_t mq_receive(mqd_t mqdes, void *msg, size_t msglen, int *prio);
</pre>
<p>
<b>Description:</b>
@@ -1316,7 +1400,92 @@ interface of the same name.
Comparable to the POSIX interface of the same name.
</p>
-<H3><a name="mqnotify">2.4.6 mq_notify</a></H3>
+<h3><a name="mqtimedreceive">2.4.6 mq_timedreceive</a></h3>
+<p>
+ <b>Function Prototype:</b>
+</p>
+<pre>
+ #include &lt;mqueue.h&gt;
+ ssize_t mq_timedreceive(mqd_t mqdes, void *msg, size_t msglen,
+ int *prio, const struct timespec *abstime);
+</pre>
+<p>
+ <b>Description:</b>
+ This function receives the oldest of the highest priority messages from the message
+ queue specified by <code>mqdes</code>.
+ If the size of the buffer in bytes, <code>msgLen</code>, is less than the
+ <code>mq_msgsize</code> attribute of the message queue, <code>mq_timedreceive()</code> will
+ return an error.
+ Otherwise, the selected message is removed from the queue and copied to <code>msg</code>.
+</p>
+<p>
+ If the message queue is empty and <code>O_NONBLOCK</code> was not set, <code>mq_timedreceive()</code>
+ will block until a message is added to the message queue (or until a timeout occurs).
+ If more than one task is waiting to receive a message, only the task with the highest
+ priority that has waited the longest will be unblocked.
+</p>
+<p>
+ <code>mq_timedreceive()</code> behaves just like <code>mq_receive()<code>, except
+ that if the queue is empty and the <code>O_NONBLOCK<c/ode> flag is not enabled
+ for the message queue description, then <code>abstime</code> points to a structure
+ which specifies a ceiling on the time for which the call will block.
+ This ceiling is an absolute timeout in seconds and nanoseconds since the Epoch
+ (midnight on the morning of 1 January 1970).
+</p>
+<p>
+ If no message is available, and the timeout has already expired by the time of
+ the call, <code>mq_timedreceive()</code> returns immediately.
+</p>
+<p>
+ <b>Input Parameters:</b>
+</p>
+<ul>
+ <li><code>mqdes</code>. Message Queue Descriptor.</li>
+ <li><code>msg</code>. Buffer to receive the message.</li>
+ <li><code>msglen</code>. Size of the buffer in bytes.</li>
+ <li><code>prio</code>. If not NULL, the location to store message priority.
+ <li><code>abstime</code>. The absolute time to wait until a timeout is declared.
+</ul>
+<p>
+ <b>Returned Values:</b>.
+ One success, the length of the selected message in bytes is returned.
+ On failure, -1 (<code>ERROR</code>) is returned and the <code>errno</code> is set appropriately:
+</p>
+<ul>
+ <li>
+ <code>EAGAIN</code>:
+ The queue was empty and the <code>O_NONBLOCK</code> flag was set for the message queue description referred to by <code>mqdes</code>.
+ </li>
+ <li>
+ <code>EPERM</code>:
+ Message queue opened not opened for reading.
+ </li>
+ <li>
+ <code>EMSGSIZE</code>:
+ <code>msglen</code> was less than the <code>maxmsgsize</code> attribute of the message queue.
+ </li>
+ <li>
+ <code>EINTR</code>:
+ The call was interrupted by a signal handler.
+ </li>
+ <li>
+ <code>EINVAL</code>:
+ Invalid <code>msg</code> or <code>mqdes</code> or <code>abstime</code>
+ </li>
+ <li>
+ <code>ETIMEDOUT</code>:
+ The call timed out before a message could be transferred.
+ </li>
+</ul>
+<p>
+ <b>Assumptions/Limitations:</b>
+</p>
+<p>
+ <b>POSIX Compatibility:</b>
+ Comparable to the POSIX interface of the same name.
+</p>
+
+<h3><a name="mqnotify">2.4.7 mq_notify</a></h3>
<p>
<b>Function Prototype:</b>
@@ -1372,7 +1541,7 @@ appropriate <I>mq_receive()</I> ... The resulting behavior is as if the
message queue remains empty, and no notification shall be sent.&quot;
</ul>
-<H3><a name="mqsetattr">2.4.7 mq_setattr</a></H3>
+<H3><a name="mqsetattr">2.4.8 mq_setattr</a></H3>
<p>
<b>Function Prototype:</b>
@@ -1411,7 +1580,7 @@ would have been returned by mq_getattr()).
<b> POSIX Compatibility:</b> Comparable to the POSIX
interface of the same name.
-<H3><a name="mqgetattr">2.4.8 mq_getattr</a></H3>
+<H3><a name="mqgetattr">2.4.9 mq_getattr</a></H3>
<p>
<b>Function Prototype:</b>
@@ -5641,6 +5810,8 @@ notify a task when a message is available on a queue.
<li><a href="#mqreceive">mq_receive</a></li>
<li><a href="#mqsend">mq_send</a></li>
<li><a href="#mqsetattr">mq_setattr</a></li>
+ <li><a href="#mqtimedreceive">mq_timedreceive</a></li>
+ <li><a href="#mqtimedsend">mq_timedsend</a></li>
<li><a href="#mqunlink">mq_unlink</a></li>
<li><a href="#OS_Interfaces">OS Interfaces</a>
<li><a href="#Pthread">Pthread Interfaces</a>
diff --git a/nuttx/include/mqueue.h b/nuttx/include/mqueue.h
index 22e853a16..d13fe7006 100644
--- a/nuttx/include/mqueue.h
+++ b/nuttx/include/mqueue.h
@@ -87,12 +87,16 @@ extern "C" {
#define EXTERN extern
#endif
-EXTERN mqd_t mq_open(const char *mq_name, int oflags, ... );
+EXTERN mqd_t mq_open(const char *mq_name, int oflags, ...);
EXTERN int mq_close(mqd_t mqdes );
-EXTERN int mq_unlink(const char *mq_name );
-EXTERN int mq_send(mqd_t mqdes, const void *msg, size_t msglen, int prio );
-EXTERN int mq_receive(mqd_t mqdes, void *msg, size_t msglen, int *prio );
-EXTERN int mq_notify(mqd_t mqdes, const struct sigevent *notification );
+EXTERN int mq_unlink(const char *mq_name);
+EXTERN int mq_send(mqd_t mqdes, const void *msg, size_t msglen, int prio);
+EXTERN int mq_timedsend(mqd_t mqdes, const char *msg, size_t msglen, int prio,
+ const struct timespec *abstime);
+EXTERN ssize_t mq_receive(mqd_t mqdes, void *msg, size_t msglen, int *prio);
+EXTERN ssize_t mq_timedreceive(mqd_t mqdes, void *msg, size_t msglen,
+ int *prio, const struct timespec *abstime);
+EXTERN int mq_notify(mqd_t mqdes, const struct sigevent *notification);
EXTERN int mq_setattr(mqd_t mqdes, const struct mq_attr *mq_stat,
struct mq_attr *oldstat);
EXTERN int mq_getattr(mqd_t mqdes, struct mq_attr *mq_stat);
diff --git a/nuttx/sched/Makefile b/nuttx/sched/Makefile
index 1f6ec2384..50992734a 100644
--- a/nuttx/sched/Makefile
+++ b/nuttx/sched/Makefile
@@ -69,7 +69,9 @@ SIGNAL_SRCS = sig_initialize.c \
sig_unmaskpendingsignal.c sig_removependingsignal.c \
sig_releasependingsignal.c sig_lowest.c sig_mqnotempty.c \
sig_cleanup.c sig_received.c sig_deliver.c
-MQUEUE_SRCS = mq_open.c mq_close.c mq_unlink.c mq_send.c mq_receive.c \
+MQUEUE_SRCS = mq_open.c mq_close.c mq_unlink.c \
+ mq_send.c mq_timedsend.c mq_sndinternal.c \
+ mq_receive.c mq_timedreceive.c mq_rcvinternal.c \
mq_setattr.c mq_getattr.c mq_initialize.c mq_descreate.c \
mq_findnamed.c mq_msgfree.c mq_msgqfree.c mq_waitirq.c
ifneq ($(CONFIG_DISABLE_SIGNALS),y)
diff --git a/nuttx/sched/mq_internal.h b/nuttx/sched/mq_internal.h
index a4e21d3c7..d7510a5db 100644
--- a/nuttx/sched/mq_internal.h
+++ b/nuttx/sched/mq_internal.h
@@ -1,4 +1,4 @@
-/************************************************************
+/****************************************************************************
* mq_internal.h
*
* Copyright (C) 2007 Gregory Nutt. All rights reserved.
@@ -31,14 +31,14 @@
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
- ************************************************************/
+ ****************************************************************************/
#ifndef __MQ_INTERNAL_H
#define __MQ_INTERNAL_H
-/************************************************************
+/****************************************************************************
* Included Files
- ************************************************************/
+ ****************************************************************************/
#include <sys/types.h>
#include <limits.h>
@@ -47,16 +47,15 @@
#include <signal.h>
#include <nuttx/compiler.h>
-/************************************************************
+/****************************************************************************
* Compilations Switches
- ************************************************************/
+ ****************************************************************************/
-/************************************************************
+/****************************************************************************
* Definitions
- ************************************************************/
+ ****************************************************************************/
#define MQ_MAX_BYTES CONFIG_MQ_MAXMSGSIZE
-#define MQ_MAX_HWORDS ((MQ_MAX_BYTES + sizeof(uint16) - 1) / sizeof(uint16))
#define MQ_MAX_MSGS 16
#define MQ_PRIO_MAX _POSIX_MQ_PRIO_MAX
@@ -72,9 +71,9 @@
#define NUM_INTERRUPT_MSGS 8
-/************************************************************
+/****************************************************************************
* Global Type Declarations
- ************************************************************/
+ ****************************************************************************/
enum mqalloc_e
{
@@ -85,23 +84,18 @@ enum mqalloc_e
typedef enum mqalloc_e mqalloc_t;
/* This structure describes one buffered POSIX message. */
-/* NOTE: This structure is allocated from the same pool as MQ_type.
- * Therefore, (1) it must have a fixed "mail" size, and (2) must
- * exactly match MQ_type in size.
- */
struct mqmsg
{
- /* The position of the following two field must exactly match
- * MQ_type.
- */
-
FAR struct mqmsg *next; /* Forward link to next message */
ubyte type; /* (Used to manage allocations) */
-
ubyte priority; /* priority of message */
+#if MQ_MAX_BYTES < 256
ubyte msglen; /* Message data length */
- uint16 mail[MQ_MAX_HWORDS]; /* Message data */
+#else
+ uint16 msglen; /* Message data length */
+#endif
+ ubyte mail[MQ_MAX_BYTES]; /* Message data */
};
typedef struct mqmsg mqmsg_t;
@@ -142,9 +136,9 @@ struct mq_des
int oflags; /* Flags set when message queue was opened */
};
-/************************************************************
+/****************************************************************************
* Global Variables
- ************************************************************/
+ ****************************************************************************/
/* This is a list of all opened message queues */
@@ -170,9 +164,9 @@ extern sq_queue_t g_msgfreeirq;
extern sq_queue_t g_desfree;
-/************************************************************
+/****************************************************************************
* Global Function Prototypes
- ************************************************************/
+ ****************************************************************************/
#ifdef __cplusplus
#define EXTERN extern "C"
@@ -181,19 +175,35 @@ extern "C" {
#define EXTERN extern
#endif
-/* Functions defined in mq_initialize.c ********************/
+/* Functions defined in mq_initialize.c ************************************/
EXTERN void weak_function mq_initialize(void);
-EXTERN void mq_desblockalloc(void);
+EXTERN void mq_desblockalloc(void);
+
+EXTERN mqd_t mq_descreate(FAR _TCB* mtcb, FAR msgq_t* msgq, int oflags);
+EXTERN FAR msgq_t *mq_findnamed(const char *mq_name);
+EXTERN void mq_msgfree(FAR mqmsg_t *mqmsg);
+EXTERN void mq_msgqfree(FAR msgq_t *msgq);
+
+/* mq_waitirq.c ************************************************************/
+
+EXTERN void mq_waitirq(FAR _TCB *wtcb);
+
+/* mq_rcvinternal.c ********************************************************/
-EXTERN mqd_t mq_descreate(FAR _TCB* mtcb, FAR msgq_t* msgq, int oflags);
-EXTERN FAR msgq_t *mq_findnamed(const char *mq_name);
-EXTERN void mq_msgfree(FAR mqmsg_t *mqmsg);
-EXTERN void mq_msgqfree(FAR msgq_t *msgq);
+EXTERN int mq_verifyreceive(mqd_t mqdes, void *msg, size_t msglen);
+EXTERN FAR mqmsg_t *mq_waitreceive(mqd_t mqdes);
+EXTERN ssize_t mq_doreceive(mqd_t mqdes, mqmsg_t *mqmsg, void *ubuffer,
+ int *prio);
-/* mq_waitirq.c ********************************************/
+/* mq_sndinternal.c ********************************************************/
-EXTERN void mq_waitirq(FAR _TCB *wtcb);
+EXTERN int mq_verifysend(mqd_t mqdes, const void *msg, size_t msglen,
+ int prio);
+EXTERN FAR mqmsg_t *mq_msgalloc(void);
+EXTERN int mq_waitsend(mqd_t mqdes);
+EXTERN int mq_dosend(mqd_t mqdes, FAR mqmsg_t *mqmsg, const void *msg,
+ size_t msglen, int prio);
#undef EXTERN
#ifdef __cplusplus
diff --git a/nuttx/sched/mq_rcvinternal.c b/nuttx/sched/mq_rcvinternal.c
new file mode 100644
index 000000000..dcbfc0b31
--- /dev/null
+++ b/nuttx/sched/mq_rcvinternal.c
@@ -0,0 +1,314 @@
+/************************************************************
+ * mq_rcvinternal.c
+ *
+ * Copyright (C) 2007 Gregory Nutt. All rights reserved.
+ * Author: Gregory Nutt <spudmonkey@racsa.co.cr>
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * 3. Neither the name Gregory Nutt nor the names of its contributors may be
+ * used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+ * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
+ * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ ************************************************************/
+
+/************************************************************
+ * Included Files
+ ************************************************************/
+
+#include <sys/types.h>
+#include <fcntl.h> /* O_NONBLOCK */
+#include <string.h>
+#include <assert.h>
+#include <errno.h>
+#include <mqueue.h>
+#include <sched.h>
+#include <debug.h>
+#include <nuttx/arch.h>
+#include <nuttx/os_external.h>
+#include "os_internal.h"
+#include "mq_internal.h"
+
+/************************************************************
+ * Definitions
+ ************************************************************/
+
+/************************************************************
+ * Private Type Declarations
+ ************************************************************/
+
+/************************************************************
+ * Global Variables
+ ************************************************************/
+
+/************************************************************
+ * Private Variables
+ ************************************************************/
+
+/************************************************************
+ * Private Functions
+ ************************************************************/
+
+/************************************************************
+ * Public Functions
+ ************************************************************/
+
+/************************************************************
+ * Name: mq_verifyreceive
+ *
+ * Description:
+ * This is internal, common logic shared by both mq_receive
+ * and mq_timedreceive. This function verifies the
+ * input parameters that are common to both functions.
+ *
+ * Parameters:
+ * mqdes - Message Queue Descriptor
+ * msg - Buffer to receive the message
+ * msglen - Size of the buffer in bytes
+ *
+ * Return Value:
+ * One success, 0 (OK) is returned. On failure, -1 (ERROR) is
+ * returned and the errno is set appropriately:
+ *
+ * EPERM Message queue opened not opened for reading.
+ * EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the
+ * message queue.
+ * EINVAL Invalid 'msg' or 'mqdes'
+ *
+ * Assumptions:
+ *
+ ************************************************************/
+
+int mq_verifyreceive(mqd_t mqdes, void *msg, size_t msglen)
+{
+ /* Verify the input parameters */
+
+ if (!msg || !mqdes)
+ {
+ *get_errno_ptr() = EINVAL;
+ return ERROR;
+ }
+
+ if ((mqdes->oflags & O_RDOK) == 0)
+ {
+ *get_errno_ptr() = EPERM;
+ return ERROR;
+ }
+
+ if (msglen < (size_t)mqdes->msgq->maxmsgsize)
+ {
+ *get_errno_ptr() = EMSGSIZE;
+ return ERROR;
+ }
+
+ return OK;
+}
+
+/************************************************************
+ * Function: mq_waitreceive
+ *
+ * Description:
+ * This is internal, common logic shared by both mq_receive
+ * and mq_timedreceive. This function waits for a message to
+ * be received on the specified message queue, removes the
+ * message from the queue, and returns it.
+ *
+ * Parameters:
+ * mqdes - Message queue descriptor
+ *
+ * Return Value:
+ * On success, a reference to the received message. If the
+ * wait was interrupted by a signal or a timeout, then the
+ * errno will be set appropriately and NULL will be returned.
+ *
+ * Assumptions:
+ * - The caller has provided all validity checking of the
+ * input parameters using mq_verifyreceive.
+ * - Interrupts should be disabled throughout this call. This
+ * is necessary because messages can be sent from interrupt
+ * level processing.
+ * - For mq_timedreceive, setting of the timer and this wait
+ * must be atomic.
+ *
+ ************************************************************/
+
+FAR mqmsg_t *mq_waitreceive(mqd_t mqdes)
+{
+ FAR _TCB *rtcb;
+ FAR msgq_t *msgq;
+ FAR mqmsg_t *rcvmsg;
+
+ /* Get a pointer to the message queue */
+
+ msgq = mqdes->msgq;
+
+ /* Get the message from the head of the queue */
+
+ while ((rcvmsg = (FAR mqmsg_t*)sq_remfirst(&msgq->msglist)) == NULL)
+ {
+ /* Should we block until there the above condition has been
+ * satisfied?
+ */
+
+ if (!(mqdes->oflags & O_NONBLOCK))
+ {
+ /* Block and try again */
+
+ rtcb = (FAR _TCB*)g_readytorun.head;
+ rtcb->msgwaitq = msgq;
+ msgq->nwaitnotempty++;
+
+ *get_errno_ptr() = OK;
+ up_block_task(rtcb, TSTATE_WAIT_MQNOTEMPTY);
+
+ /* When we resume at this point, either (1) the message queue
+ * is no longer empty, or (2) the wait has been interrupted by
+ * a signal. We can detect the latter case be examining the
+ * errno value (should be either EINTR or ETIMEDOUT).
+ */
+
+ if (*get_errno_ptr() != OK)
+ {
+ break;
+ }
+ }
+ else
+ {
+ /* The queue was empty, and the O_NONBLOCK flag was set for the
+ * message queue description referred to by 'mqdes'.
+ */
+
+ *get_errno_ptr() = EAGAIN;
+ break;
+ }
+ }
+
+ /* If we got message, then decrement the number of messages in
+ * the queue while we are still in the critical section
+ */
+
+ if (rcvmsg)
+ {
+ msgq->nmsgs--;
+ }
+ return rcvmsg;
+}
+
+/************************************************************
+ * Function: mq_doreceive
+ *
+ * Description:
+ * This is internal, common logic shared by both mq_receive
+ * and mq_timedreceive. This function accepts the message
+ * obtained by mq_waitmsg, provides the message content to
+ * the user, notifies any threads that were waiting for
+ * the message queue to become non-full, and disposes of the
+ * message structure
+ *
+ * Parameters:
+ * mqdes - Message queue descriptor
+ * mqmsg - The message obtained by mq_waitmsg()
+ * ubuffer - The address of the user provided buffer to
+ * receive the message
+ * prio - The user-provided location to return the
+ * message priority.
+ *
+ * Return Value:
+ * Returns the length of the received message. This
+ * function does not fail.
+ *
+ * Assumptions:
+ * - The caller has provided all validity checking of the
+ * input parameters using mq_verifyreceive.
+ * - The user buffer, ubuffer, is known to be large enough
+ * to accept the largest message that an be sent on this
+ * message queue
+ * - Pre-emption should be disabled throughout this call.
+ *
+ ************************************************************/
+
+ssize_t mq_doreceive(mqd_t mqdes, mqmsg_t *mqmsg, void *ubuffer, int *prio)
+{
+ FAR _TCB *btcb;
+ irqstate_t saved_state;
+ FAR msgq_t *msgq;
+ ssize_t rcvmsglen;
+
+ /* Get the length of the message (also the return value) */
+
+ rcvmsglen = mqmsg->msglen;
+
+ /* Copy the message into the caller's buffer */
+
+ memcpy(ubuffer, (const void*)mqmsg->mail, rcvmsglen);
+
+ /* Copy the message priority as well (if a buffer is provided) */
+
+ if (prio)
+ {
+ *prio = mqmsg->priority;
+ }
+
+ /* We are done with the message. Deallocate it now. */
+
+ mq_msgfree(mqmsg);
+
+ /* Check if any tasks are waiting for the MQ not full event. */
+
+ msgq = mqdes->msgq;
+ if (msgq->nwaitnotfull > 0)
+ {
+ /* Find the highest priority task that is waiting for
+ * this queue to be not-full in g_waitingformqnotfull list.
+ * This must be performed in a critical section because
+ * messages can be sent from interrupt handlers.
+ */
+
+ saved_state = irqsave();
+ for (btcb = (FAR _TCB*)g_waitingformqnotfull.head;
+ btcb && btcb->msgwaitq != msgq;
+ btcb = btcb->flink);
+
+ /* If one was found, unblock it. NOTE: There is a race
+ * condition here: the queue might be full again by the
+ * time the task is unblocked
+ */
+
+ if (!btcb)
+ {
+ PANIC(OSERR_MQNOTFULLCOUNT);
+ }
+ else
+ {
+ btcb->msgwaitq = NULL;
+ msgq->nwaitnotfull--;
+ up_unblock_task(btcb);
+ }
+ irqrestore(saved_state);
+ }
+
+ /* Return the length of the message transferred to the user buffer */
+
+ return rcvmsglen;
+}
diff --git a/nuttx/sched/mq_receive.c b/nuttx/sched/mq_receive.c
index 9d1272ae9..b19567058 100644
--- a/nuttx/sched/mq_receive.c
+++ b/nuttx/sched/mq_receive.c
@@ -37,21 +37,12 @@
* Included Files
************************************************************/
-#include <sys/types.h> /* uint32, etc. */
-#include <stdarg.h> /* va_list */
-#include <unistd.h>
-#include <fcntl.h> /* O_NONBLOCK */
-#include <string.h>
-#include <assert.h>
+#include <nuttx/config.h>
+#include <sys/types.h>
#include <errno.h>
#include <mqueue.h>
-#include <sched.h>
#include <debug.h>
-#include <nuttx/kmalloc.h>
#include <nuttx/arch.h>
-#include <nuttx/os_external.h>
-#include "os_internal.h"
-#include "sig_internal.h"
#include "mq_internal.h"
/************************************************************
@@ -122,155 +113,53 @@
*
************************************************************/
-int mq_receive(mqd_t mqdes, void *msg, size_t msglen, int *prio)
+ssize_t mq_receive(mqd_t mqdes, void *msg, size_t msglen, int *prio)
{
- FAR _TCB *rtcb;
- FAR _TCB *btcb;
- FAR msgq_t *msgq;
- FAR mqmsg_t *curr;
+ FAR mqmsg_t *mqmsg;
irqstate_t saved_state;
- ubyte rcvmsglen;
- int ret = ERROR;
+ ssize_t ret = ERROR;
- /* Verify the input parameters */
+ DEBUGASSERT(!up_interrupt_context());
- if (!msg || !mqdes)
- {
- *get_errno_ptr() = EINVAL;
- return ERROR;
- }
-
- if ((mqdes->oflags & O_RDOK) == 0)
- {
- *get_errno_ptr() = EPERM;
- return ERROR;
- }
+ /* Verify the input parameters and, in case of an error, set
+ * errno appropriately.
+ */
- if (msglen < (size_t)mqdes->msgq->maxmsgsize)
+ if (mq_verifyreceive(mqdes, msg, msglen) != OK)
{
- *get_errno_ptr() = EMSGSIZE;
return ERROR;
}
- /* Get a pointer to the message queue */
+ /* Get the next mesage from the message queue. We will disable
+ * pre-emption until we have completed the message received. This
+ * is not too bad because if the receipt takes a long time, it will
+ * be because we are blocked waiting for a message and pre-emption
+ * will be re-enabled while we are blocked
+ */
sched_lock();
- msgq = mqdes->msgq;
- /* Several operations must be performed below: We must determine if
- * a message is pending and, if not, wait for the message. Since
- * messages can be sent from the interrupt level, there is a race
- * condition that can only be eliminated by disabling interrupts!
+ /* Furthermore, mq_waitreceive() expects to have interrupts disabled
+ * because messages can be sent from interrupt level.
*/
saved_state = irqsave();
- /* Get the message from the head of the queue */
-
- while ((curr = (FAR mqmsg_t*)sq_remfirst(&msgq->msglist)) == NULL)
- {
- /* Should we block until there the above condition has been
- * satisfied?
- */
-
- if (!(mqdes->oflags & O_NONBLOCK))
- {
- /* Block and try again */
-
- rtcb = (FAR _TCB*)g_readytorun.head;
- rtcb->msgwaitq = msgq;
- msgq->nwaitnotempty++;
-
- *get_errno_ptr() = OK;
- up_block_task(rtcb, TSTATE_WAIT_MQNOTEMPTY);
-
- /* When we resume at this point, either (1) the message queue
- * is no longer empty, or (2) the wait has been interrupted by
- * a signal. We can detect the latter case be examining the
- * errno value (should be EINTR).
- */
-
- if (*get_errno_ptr() != OK)
- {
- break;
- }
- }
- else
- {
- /* The queue was empty, and the O_NONBLOCK flag was set for the
- * message queue description referred to by 'mqdes'.
- */
-
- *get_errno_ptr() = EAGAIN;
- break;
- }
- }
+ /* Get the message from the message queue */
- /* If we got message, then decrement the number of messages in
- * the queue while we are still in the critical section
- */
-
- if (curr)
- {
- msgq->nmsgs--;
- }
+ mqmsg = mq_waitreceive(mqdes);
irqrestore(saved_state);
- /* Check (again) if we got a message from the message queue*/
+ /* Check if we got a message from the message queue. We might
+ * not have a message if:
+ *
+ * - The message queue is empty and O_NONBLOCK is set in the mqdes
+ * - The wait was interrupted by a signal
+ */
- if (curr)
+ if (mqmsg)
{
- /* Get the length of the message (also the return value) */
-
- ret = rcvmsglen = curr->msglen;
-
- /* Copy the message into the caller's buffer */
-
- memcpy(msg, (const void*)curr->mail, rcvmsglen);
-
- /* Copy the message priority as well (if a buffer is provided) */
-
- if (prio)
- {
- *prio = curr->priority;
- }
-
- /* We are done with the message. Deallocate it now. */
-
- mq_msgfree(curr);
-
- /* Check if any tasks are waiting for the MQ not full event. */
-
- if (msgq->nwaitnotfull > 0)
- {
- /* Find the highest priority task that is waiting for
- * this queue to be not-full in g_waitingformqnotfull list.
- * This must be performed in a critical section because
- * messages can be sent from interrupt handlers.
- */
-
- saved_state = irqsave();
- for (btcb = (FAR _TCB*)g_waitingformqnotfull.head;
- btcb && btcb->msgwaitq != msgq;
- btcb = btcb->flink);
-
- /* If one was found, unblock it. NOTE: There is a race
- * condition here: the queue might be full again by the
- * time the task is unblocked
- */
-
- if (!btcb)
- {
- PANIC(OSERR_MQNOTFULLCOUNT);
- }
- else
- {
- btcb->msgwaitq = NULL;
- msgq->nwaitnotfull--;
- up_unblock_task(btcb);
- }
- irqrestore(saved_state);
- }
+ ret = mq_doreceive(mqdes, mqmsg, msg, prio);
}
sched_unlock();
diff --git a/nuttx/sched/mq_send.c b/nuttx/sched/mq_send.c
index faa03e720..84e6e5842 100644
--- a/nuttx/sched/mq_send.c
+++ b/nuttx/sched/mq_send.c
@@ -1,4 +1,4 @@
-/************************************************************
+/****************************************************************************
* mq_send.c
*
* Copyright (C) 2007 Gregory Nutt. All rights reserved.
@@ -31,165 +31,65 @@
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
- ************************************************************/
+ ****************************************************************************/
-/************************************************************
+/****************************************************************************
* Included Files
- ************************************************************/
+ ****************************************************************************/
-#include <nuttx/compiler.h>
-#include <nuttx/kmalloc.h>
-#include <sys/types.h> /* uint32, etc. */
-#include <fcntl.h>
+#include <nuttx/config.h>
+#include <sys/types.h>
#include <mqueue.h>
-#include <string.h>
#include <errno.h>
-#include <sched.h>
#include <debug.h>
#include <nuttx/arch.h>
#include "os_internal.h"
-#ifndef CONFIG_DISABLE_SIGNALS
-# include "sig_internal.h"
-#endif
#include "mq_internal.h"
-/************************************************************
+/****************************************************************************
* Definitions
- ************************************************************/
+ ****************************************************************************/
-/************************************************************
+/****************************************************************************
* Private Type Declarations
- ************************************************************/
+ ****************************************************************************/
-/************************************************************
+/****************************************************************************
* Global Variables
- ************************************************************/
+ ****************************************************************************/
-/************************************************************
+/****************************************************************************
* Private Variables
- ************************************************************/
+ ****************************************************************************/
-/************************************************************
- * Function: mq_msgalloc
- *
- * Description:
- * The mq_msgalloc function will get a free message for use
- * by the operating system. The message will be allocated
- * from the g_msgfree list.
- *
- * If the list is empty AND the message is NOT being
- * allocated from the interrupt level, then the message
- * will be allocated. If a message cannot be obtained,
- * the operating system is dead and therefore cannot
- * continue.
- *
- * If the list is empty AND the message IS being allocated
- * from the interrupt level. This function will attempt to
- * get a message from the g_msgfreeirq list. If this is
- * unsuccessful, the calling interrupt handler will be
- * notified.
- *
- * Inputs:
- * None
- *
- * Return Value:
- * A reference to the allocated msg structure
- *
- ************************************************************/
-
-FAR mqmsg_t *mq_msgalloc(void)
-{
- FAR mqmsg_t *mqmsg;
- irqstate_t saved_state;
-
- /* If we were called from an interrupt handler, then try to
- * get the message from generally available list of messages.
- * If this fails, then try the list of messages reserved for
- * interrupt handlers
- */
-
- if (up_interrupt_context())
- {
- /* Try the general free list */
-
- mqmsg = (FAR mqmsg_t*)sq_remfirst(&g_msgfree);
- if (!mqmsg)
- {
- /* Try the free list reserved for interrupt handlers */
-
- mqmsg = (FAR mqmsg_t*)sq_remfirst(&g_msgfreeirq);
- }
- }
-
- /* We were not called from an interrupt handler. */
-
- else
- {
- /* Try to get the message from the generally available free list.
- * Disable interrupts -- we might be called from an interrupt handler.
- */
-
- saved_state = irqsave();
- mqmsg = (FAR mqmsg_t*)sq_remfirst(&g_msgfree);
- irqrestore(saved_state);
-
- /* If we cannot a message from the free list, then we will have to allocate one. */
-
- if (!mqmsg)
- {
- mqmsg = (FAR mqmsg_t *)kmalloc((sizeof (mqmsg_t)));
-
- /* Check if we got an allocated message */
-
- if (mqmsg)
- {
- mqmsg->type = MQ_ALLOC_DYN;
- }
-
- /* No? We are dead */
-
- else
- {
- dbg("Out of messages\n");
- PANIC((uint32)OSERR_OUTOFMESSAGES);
- }
- }
- }
-
- return(mqmsg);
-}
-
-/************************************************************
+/****************************************************************************
* Private Functions
- ************************************************************/
+ ****************************************************************************/
-/************************************************************
+/****************************************************************************
* Public Functions
- ************************************************************/
+ ****************************************************************************/
-/************************************************************
+/****************************************************************************
* Function: mq_send
*
* Description:
- * This function adds the specificied message (msg) to the
- * message queue (mqdes). The "msglen" parameter specifies
- * the length of the message in bytes pointed to by "msg."
- * This length must not exceed the maximum message length
- * from the mq_getattr().
- *
- * If the message queue is not full, mq_send() place the
- * message in the message queue at the position indicated
- * by the "prio" argrument. Messages with higher priority
- * will be inserted before lower priority messages. The
- * value of "prio" must not exceed MQ_PRIO_MAX.
- *
- * If the specified message queue is full and O_NONBLOCK
- * is not set in the message queue, then mq_send() will
- * block until space becomes available to the queue the
- * message.
- *
- * If the message queue is full and O_NONBLOCK is set,
- * the message is not queued and ERROR is returned.
+ * This function adds the specificied message (msg) to the message queue
+ * (mqdes). The "msglen" parameter specifies the length of the message
+ * in bytes pointed to by "msg." This length must not exceed the maximum
+ * message length from the mq_getattr().
+ *
+ * If the message queue is not full, mq_send() place the message in the
+ * message queue at the position indicated by the "prio" argrument.
+ * Messages with higher priority will be inserted before lower priority
+ * messages. The value of "prio" must not exceed MQ_PRIO_MAX.
+ *
+ * If the specified message queue is full and O_NONBLOCK is not set in the
+ * message queue, then mq_send() will block until space becomes available
+ * to the queue the message.
+ *
+ * If the message queue is full and O_NONBLOCK is set, the message is not
+ * queued and ERROR is returned.
*
* Parameters:
* mqdes - Message queue descriptor
@@ -201,48 +101,31 @@ FAR mqmsg_t *mq_msgalloc(void)
* On success, mq_send() returns 0 (OK); on error, -1 (ERROR)
* is returned, with errno set to indicate the error:
*
- * EAGAIN The queue was empty, and the O_NONBLOCK flag was
- * set for the message queue description referred to
- * by mqdes.
- * EINVAL Either msg or mqdes is NULL or the value of prio
- * is invalid.
+ * EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the
+ * message queue description referred to by mqdes.
+ * EINVAL Either msg or mqdes is NULL or the value of prio is invalid.
* EPERM Message queue opened not opened for writing.
- * EMSGSIZE 'msglen' was greater than the maxmsgsize attribute
- * of the message queue.
+ * EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
+ * message queue.
* EINTR The call was interrupted by a signal handler.
*
* Assumptions/restrictions:
*
- ************************************************************/
+ ****************************************************************************/
int mq_send(mqd_t mqdes, const void *msg, size_t msglen, int prio)
{
- FAR _TCB *rtcb;
- FAR _TCB *btcb;
FAR msgq_t *msgq;
- FAR mqmsg_t *curr;
- FAR mqmsg_t *next;
- FAR mqmsg_t *prev;
+ FAR mqmsg_t *mqmsg = NULL;
irqstate_t saved_state;
int ret = ERROR;
- /* Verify the input parameters */
-
- if (!msg || !mqdes || prio < 0 || prio > MQ_PRIO_MAX)
- {
- *get_errno_ptr() = EINVAL;
- return ERROR;
- }
-
- if ((mqdes->oflags & O_WROK) == 0)
- {
- *get_errno_ptr() = EPERM;
- return ERROR;
- }
+ /* Verify the input parameters -- setting errno appropriately
+ * on any failures to verify.
+ */
- if (msglen < 0 || msglen > (size_t)mqdes->msgq->maxmsgsize)
+ if (mq_verifysend(mqdes, msg, msglen, prio) != OK)
{
- *get_errno_ptr() = EMSGSIZE;
return ERROR;
}
@@ -251,205 +134,48 @@ int mq_send(mqd_t mqdes, const void *msg, size_t msglen, int prio)
sched_lock();
msgq = mqdes->msgq;
- /* If we are sending a message from an interrupt handler, then
- * try to get message structure unconditionally.
+ /* Allocate a message structure:
+ * - Immediately if we are called from an interrupt handler.
+ * - Immediately if the message queue is not full, or
+ * - After successfully waiting for the message queue to become
+ * non-FULL. This would fail with EAGAIN, EINTR, or ETIMEOUT.
*/
saved_state = irqsave();
- if (up_interrupt_context())
+ if (up_interrupt_context() || /* In an interrupt handler */
+ msgq->nmsgs < msgq->maxmsgs || /* OR Message queue not full */
+ mq_waitsend(mqdes) == OK) /* OR Successfully waited for mq not full */
{
- curr = mq_msgalloc();
- }
+ /* Allocate the message */
- /* Otherwise, arbitrarily limit the number of messages in the
- * queue to the value determined when the message queue was opened.
- * This makes us more POSIX-like as well as prohibits one slow
- * responding task from consuming all available memory.
- */
-
- else if (msgq->nmsgs >= msgq->maxmsgs)
- {
- /* Should we block until there is sufficient space in the
- * message queue?
- */
-
- if ((mqdes->oflags & O_NONBLOCK) != 0)
- {
- /* No... We will return an error to the caller. */
-
- *get_errno_ptr() = EAGAIN;
- curr = NULL;
- }
-
- /* Yes... We will not return control until the message queue is
- * available.
- */
-
- else
- {
- boolean interrupted = FALSE;
-
- /* Loop until there are fewer than max allowable messages in the
- * receiving message queue
- */
-
- while (msgq->nmsgs >= msgq->maxmsgs)
- {
- /* Block until the message queue is no longer full.
- * When we are unblocked, we will try again
- */
-
- rtcb = (FAR _TCB*)g_readytorun.head;
- rtcb->msgwaitq = msgq;
- (msgq->nwaitnotfull)++;
-
- *get_errno_ptr() = OK;
- up_block_task(rtcb, TSTATE_WAIT_MQNOTFULL);
-
- /* When we resume at this point, either (1) the message queue
- * is no longer empty, or (2) the wait has been interrupted by
- * a signal. We can detect the latter case be examining the
- * errno value (should be EINTR).
- */
-
- if (*get_errno_ptr() != OK)
- {
- interrupted = TRUE;
- break;
- }
- }
-
- /* If we were not interrupted, then it should be okay to add
- * a message to the receiving message queue now.
- */
-
- if (!interrupted)
- {
- curr = mq_msgalloc();
- }
- }
+ irqrestore(saved_state);
+ mqmsg = mq_msgalloc();
}
-
- /* We are not in an interrupt handler and the receiving message queue
- * is not full
- */
-
else
{
- /* Just allocate a message */
-
- curr = mq_msgalloc();
- }
- irqrestore(saved_state);
-
- /* Check if we were able to get a message structure */
-
- if (curr)
- {
- /* Construct the current message header info */
-
- curr->priority = (ubyte)prio;
- curr->msglen = (ubyte)msglen;
-
- /* Copy the message data into the message */
-
- memcpy((void*)curr->mail, (const void*)msg, msglen);
-
- /* Insert the new message in the message queue */
-
- saved_state = irqsave();
-
- /* Search the message list to find the location to insert the new
- * message. Each is list is maintained in ascending priority order.
+ /* We cannot send the message (and didn't even try to allocate it)
+ * because:
+ * - We are not in an interrupt handler AND
+ * - The message queue is full AND
+ * - When we tried waiting, the wait was unsuccessful.
*/
- for (prev = NULL, next = (FAR mqmsg_t*)msgq->msglist.head;
- next && prio <= next->priority;
- prev = next, next = next->next);
-
- /* Add the message at the right place */
-
- if (prev)
- {
- sq_addafter((FAR sq_entry_t*)prev, (FAR sq_entry_t*)curr,
- &msgq->msglist);
- }
- else
- {
- sq_addfirst((FAR sq_entry_t*)curr, &msgq->msglist);
- }
-
- /* Increment the count of message in the queue */
-
- msgq->nmsgs++;
irqrestore(saved_state);
+ }
- /* Check if we need to notify any tasks that are attached to the
- * message queue
- */
-
-#ifndef CONFIG_DISABLE_SIGNALS
- if (msgq->ntmqdes)
- {
- /* Remove the message notification data from the message queue. */
-
-#ifdef CONFIG_CAN_PASS_STRUCTS
- union sigval value = msgq->ntvalue;
-#else
- void *sival_ptr = msgq->ntvalue.sival_ptr;
-#endif
- int signo = msgq->ntsigno;
- int pid = msgq->ntpid;
-
- /* Detach the notification */
-
- msgq->ntpid = INVALID_PROCESS_ID;
- msgq->ntsigno = 0;
- msgq->ntvalue.sival_int = 0;
- msgq->ntmqdes = NULL;
-
- /* Queue the signal -- What if this returns an error? */
-
-#ifdef CONFIG_CAN_PASS_STRUCTS
- sig_mqnotempty(pid, signo, value);
-#else
- sig_mqnotempty(pid, signo, sival_ptr);
-#endif
- }
-#endif
- /* Check if any tasks are waiting for the MQ not empty event. */
-
- saved_state = irqsave();
- if (msgq->nwaitnotempty > 0)
- {
- /* Find the highest priority task that is waiting for
- * this queue to be non-empty in g_waitingformqnotempty
- * list. sched_lock() should give us sufficent protection since
- * interrupts should never cause a change in this list
- */
-
- for (btcb = (FAR _TCB*)g_waitingformqnotempty.head;
- btcb && btcb->msgwaitq != msgq;
- btcb = btcb->flink);
+ /* Check if we were able to get a message structure -- this can fail
+ * either because we cannot send the message (and didn't bother trying
+ * to allocate it) or because the allocation failed.
+ */
- /* If one was found, unblock it */
+ if (mqmsg)
+ {
+ /* Yes, peforrm the message send. */
- if (!btcb)
- {
- PANIC(OSERR_MQNONEMPTYCOUNT);
- }
- else
- {
- btcb->msgwaitq = NULL;
- msgq->nwaitnotempty--;
- up_unblock_task(btcb);
- }
- }
- irqrestore(saved_state);
- ret = OK;
+ ret = mq_dosend(mqdes, mqmsg, msg, msglen, prio);
}
sched_unlock();
- return(ret);
+ return ret;
}
diff --git a/nuttx/sched/mq_sndinternal.c b/nuttx/sched/mq_sndinternal.c
new file mode 100644
index 000000000..09d56333b
--- /dev/null
+++ b/nuttx/sched/mq_sndinternal.c
@@ -0,0 +1,448 @@
+/****************************************************************************
+ * mq_send.c
+ *
+ * Copyright (C) 2007 Gregory Nutt. All rights reserved.
+ * Author: Gregory Nutt <spudmonkey@racsa.co.cr>
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * 3. Neither the name Gregory Nutt nor the names of its contributors may be
+ * used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+ * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
+ * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ ****************************************************************************/
+
+/****************************************************************************
+ * Included Files
+ ****************************************************************************/
+
+#include <nuttx/config.h>
+#include <nuttx/kmalloc.h>
+#include <sys/types.h>
+#include <fcntl.h>
+#include <mqueue.h>
+#include <string.h>
+#include <errno.h>
+#include <sched.h>
+#include <debug.h>
+#include <nuttx/arch.h>
+#include "os_internal.h"
+#ifndef CONFIG_DISABLE_SIGNALS
+# include "sig_internal.h"
+#endif
+#include "mq_internal.h"
+
+/****************************************************************************
+ * Definitions
+ ****************************************************************************/
+
+/****************************************************************************
+ * Private Type Declarations
+ ****************************************************************************/
+
+/****************************************************************************
+ * Global Variables
+ ****************************************************************************/
+
+/****************************************************************************
+ * Private Variables
+ ****************************************************************************/
+
+/****************************************************************************
+ * Private Functions
+ ****************************************************************************/
+
+/****************************************************************************
+ * Public Functions
+ ****************************************************************************/
+
+/****************************************************************************
+ * Name: mq_verifysend
+ *
+ * Description:
+ * This is internal, common logic shared by both mq_send and mq_timesend.
+ * This function verifies the input parameters that are common to both
+ * functions.
+ *
+ * Parameters:
+ * mqdes - Message queue descriptor
+ * msg - Message to send
+ * msglen - The length of the message in bytes
+ * prio - The priority of the message
+ *
+ * Return Value:
+ * One success, 0 (OK) is returned. On failure, -1 (ERROR) is returned and
+ * the errno is set appropriately:
+ *
+ * EINVAL Either msg or mqdes is NULL or the value of prio is invalid.
+ * EPERM Message queue opened not opened for writing.
+ * EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
+ * message queue.
+ *
+ * Assumptions:
+ *
+ ****************************************************************************/
+
+int mq_verifysend(mqd_t mqdes, const void *msg, size_t msglen, int prio)
+{
+ /* Verify the input parameters */
+
+ if (!msg || !mqdes || prio < 0 || prio > MQ_PRIO_MAX)
+ {
+ *get_errno_ptr() = EINVAL;
+ return ERROR;
+ }
+
+ if ((mqdes->oflags & O_WROK) == 0)
+ {
+ *get_errno_ptr() = EPERM;
+ return ERROR;
+ }
+
+ if (msglen < 0 || msglen > (size_t)mqdes->msgq->maxmsgsize)
+ {
+ *get_errno_ptr() = EMSGSIZE;
+ return ERROR;
+ }
+
+ return OK;
+}
+
+/****************************************************************************
+ * Function: mq_msgalloc
+ *
+ * Description:
+ * The mq_msgalloc function will get a free message for use by the
+ * operating system. The message will be allocated from the g_msgfree list.
+ *
+ * If the list is empty AND the message is NOT being allocated from the
+ * interrupt level, then the message will be allocated. If a message
+ * cannot be obtained, the operating system is dead and therefore cannot
+ * continue.
+ *
+ * If the list is empty AND the message IS being allocated from the
+ * interrupt level. This function will attempt to get a message from
+ * the g_msgfreeirq list. If this is unsuccessful, the calling interrupt
+ * handler will be notified.
+ *
+ * Inputs:
+ * None
+ *
+ * Return Value:
+ * A reference to the allocated msg structure. On a failure to allocate,
+ * this function PANICs.
+ *
+ ****************************************************************************/
+
+FAR mqmsg_t *mq_msgalloc(void)
+{
+ FAR mqmsg_t *mqmsg;
+ irqstate_t saved_state;
+
+ /* If we were called from an interrupt handler, then try to get the message
+ * from generally available list of messages. If this fails, then try the
+ * list of messages reserved for interrupt handlers
+ */
+
+ if (up_interrupt_context())
+ {
+ /* Try the general free list */
+
+ mqmsg = (FAR mqmsg_t*)sq_remfirst(&g_msgfree);
+ if (!mqmsg)
+ {
+ /* Try the free list reserved for interrupt handlers */
+
+ mqmsg = (FAR mqmsg_t*)sq_remfirst(&g_msgfreeirq);
+ }
+ }
+
+ /* We were not called from an interrupt handler. */
+
+ else
+ {
+ /* Try to get the message from the generally available free list.
+ * Disable interrupts -- we might be called from an interrupt handler.
+ */
+
+ saved_state = irqsave();
+ mqmsg = (FAR mqmsg_t*)sq_remfirst(&g_msgfree);
+ irqrestore(saved_state);
+
+ /* If we cannot a message from the free list, then we will have to allocate one. */
+
+ if (!mqmsg)
+ {
+ mqmsg = (FAR mqmsg_t *)kmalloc((sizeof (mqmsg_t)));
+
+ /* Check if we got an allocated message */
+
+ if (mqmsg)
+ {
+ mqmsg->type = MQ_ALLOC_DYN;
+ }
+
+ /* No? We are dead */
+
+ else
+ {
+ dbg("Out of messages\n");
+ PANIC((uint32)OSERR_OUTOFMESSAGES);
+ }
+ }
+ }
+
+ return mqmsg;
+}
+
+/****************************************************************************
+ * Function: mq_waitsend
+ *
+ * Description:
+ * This is internal, common logic shared by both mq_send and mq_timesend.
+ * This function waits until the message queue is not full.
+ *
+ * Parameters:
+ * mqdes - Message queue descriptor
+ *
+ * Return Value:
+ * On success, mq_waitmqnotfull() returns 0 (OK); on error, -1 (ERROR) is
+ * returned, with errno set to indicate the error:
+ *
+ * EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the
+ * message queue description referred to by mqdes.
+ * EINTR The call was interrupted by a signal handler.
+ * ETIMEOUT A timeout expired before the message queue became non-full
+ * (mq_timedsend only).
+ *
+ * Assumptions/restrictions:
+ * - The caller has verified the input parameters using mq_verifysend().
+ * - Interrupts are disabled.
+ *
+ ****************************************************************************/
+
+int mq_waitsend(mqd_t mqdes)
+{
+ FAR _TCB *rtcb;
+ FAR msgq_t *msgq;
+
+ /* Get a pointer to the message queue */
+
+ msgq = mqdes->msgq;
+
+ /* Verify that the queue is indeed full as the caller thinks */
+
+ if (msgq->nmsgs >= msgq->maxmsgs)
+ {
+ /* Should we block until there is sufficient space in the
+ * message queue?
+ */
+
+ if ((mqdes->oflags & O_NONBLOCK) != 0)
+ {
+ /* No... We will return an error to the caller. */
+
+ *get_errno_ptr() = EAGAIN;
+ return ERROR;
+ }
+
+ /* Yes... We will not return control until the message queue is
+ * available or we receive a signal or at timout occurs.
+ */
+
+ else
+ {
+ /* Loop until there are fewer than max allowable messages in the
+ * receiving message queue
+ */
+
+ while (msgq->nmsgs >= msgq->maxmsgs)
+ {
+ /* Block until the message queue is no longer full.
+ * When we are unblocked, we will try again
+ */
+
+ rtcb = (FAR _TCB*)g_readytorun.head;
+ rtcb->msgwaitq = msgq;
+ (msgq->nwaitnotfull)++;
+
+ *get_errno_ptr() = OK;
+ up_block_task(rtcb, TSTATE_WAIT_MQNOTFULL);
+
+ /* When we resume at this point, either (1) the message queue
+ * is no longer empty, or (2) the wait has been interrupted by
+ * a signal. We can detect the latter case be examining the
+ * errno value (should be EINTR or ETIMEOUT).
+ */
+
+ if (*get_errno_ptr() != OK)
+ {
+ return ERROR;
+ }
+ }
+ }
+ }
+ return OK;
+}
+
+/****************************************************************************
+ * Function: mq_dosend
+ *
+ * Description:
+ * This is internal, common logic shared by both mq_send and mq_timesend.
+ * This function adds the specificied message (msg) to the message queue
+ * (mqdes). Then it notifies any tasks that were waiting for message
+ * queue notifications setup by mq_notify. And, finally, it awakens any
+ * tasks that were waiting for the message not empty event.
+ *
+ * Parameters:
+ * mqdes - Message queue descriptor
+ * msg - Message to send
+ * msglen - The length of the message in bytes
+ * prio - The priority of the message
+ *
+ * Return Value:
+ * This function always returns OK.
+ *
+ * Assumptions/restrictions:
+ *
+ ****************************************************************************/
+
+int mq_dosend(mqd_t mqdes, FAR mqmsg_t *mqmsg, const void *msg, size_t msglen, int prio)
+{
+ FAR _TCB *btcb;
+ FAR msgq_t *msgq;
+ FAR mqmsg_t *next;
+ FAR mqmsg_t *prev;
+ irqstate_t saved_state;
+
+ /* Get a pointer to the message queue */
+
+ sched_lock();
+ msgq = mqdes->msgq;
+
+ /* Construct the message header info */
+
+ mqmsg->priority = prio;
+ mqmsg->msglen = msglen;
+
+ /* Copy the message data into the message */
+
+ memcpy((void*)mqmsg->mail, (const void*)msg, msglen);
+
+ /* Insert the new message in the message queue */
+
+ saved_state = irqsave();
+
+ /* Search the message list to find the location to insert the new
+ * message. Each is list is maintained in ascending priority order.
+ */
+
+ for (prev = NULL, next = (FAR mqmsg_t*)msgq->msglist.head;
+ next && prio <= next->priority;
+ prev = next, next = next->next);
+
+ /* Add the message at the right place */
+
+ if (prev)
+ {
+ sq_addafter((FAR sq_entry_t*)prev, (FAR sq_entry_t*)mqmsg,
+ &msgq->msglist);
+ }
+ else
+ {
+ sq_addfirst((FAR sq_entry_t*)mqmsg, &msgq->msglist);
+ }
+
+ /* Increment the count of messages in the queue */
+
+ msgq->nmsgs++;
+ irqrestore(saved_state);
+
+ /* Check if we need to notify any tasks that are attached to the
+ * message queue
+ */
+
+#ifndef CONFIG_DISABLE_SIGNALS
+ if (msgq->ntmqdes)
+ {
+ /* Remove the message notification data from the message queue. */
+
+#ifdef CONFIG_CAN_PASS_STRUCTS
+ union sigval value = msgq->ntvalue;
+#else
+ void *sival_ptr = msgq->ntvalue.sival_ptr;
+#endif
+ int signo = msgq->ntsigno;
+ int pid = msgq->ntpid;
+
+ /* Detach the notification */
+
+ msgq->ntpid = INVALID_PROCESS_ID;
+ msgq->ntsigno = 0;
+ msgq->ntvalue.sival_int = 0;
+ msgq->ntmqdes = NULL;
+
+ /* Queue the signal -- What if this returns an error? */
+
+#ifdef CONFIG_CAN_PASS_STRUCTS
+ sig_mqnotempty(pid, signo, value);
+#else
+ sig_mqnotempty(pid, signo, sival_ptr);
+#endif
+ }
+#endif
+ /* Check if any tasks are waiting for the MQ not empty event. */
+
+ saved_state = irqsave();
+ if (msgq->nwaitnotempty > 0)
+ {
+ /* Find the highest priority task that is waiting for
+ * this queue to be non-empty in g_waitingformqnotempty
+ * list. sched_lock() should give us sufficent protection since
+ * interrupts should never cause a change in this list
+ */
+
+ for (btcb = (FAR _TCB*)g_waitingformqnotempty.head;
+ btcb && btcb->msgwaitq != msgq;
+ btcb = btcb->flink);
+
+ /* If one was found, unblock it */
+
+ if (!btcb)
+ {
+ PANIC(OSERR_MQNONEMPTYCOUNT);
+ }
+ else
+ {
+ btcb->msgwaitq = NULL;
+ msgq->nwaitnotempty--;
+ up_unblock_task(btcb);
+ }
+ }
+ irqrestore(saved_state);
+ sched_unlock();
+ return OK;
+}
+
diff --git a/nuttx/sched/mq_timedreceive.c b/nuttx/sched/mq_timedreceive.c
new file mode 100644
index 000000000..39fafdc7d
--- /dev/null
+++ b/nuttx/sched/mq_timedreceive.c
@@ -0,0 +1,308 @@
+/****************************************************************************
+ * mq_timedreceive.c
+ *
+ * Copyright (C) 2007 Gregory Nutt. All rights reserved.
+ * Author: Gregory Nutt <spudmonkey@racsa.co.cr>
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * 3. Neither the name Gregory Nutt nor the names of its contributors may be
+ * used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+ * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
+ * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ ****************************************************************************/
+
+/****************************************************************************
+ * Included Files
+ ****************************************************************************/
+
+#include <sys/types.h>
+#include <unistd.h>
+#include <errno.h>
+#include <mqueue.h>
+#include <wdog.h>
+#include <debug.h>
+#include <nuttx/arch.h>
+#include "os_internal.h"
+#include "clock_internal.h"
+#include "mq_internal.h"
+
+/****************************************************************************
+ * Definitions
+ ****************************************************************************/
+
+/****************************************************************************
+ * Private Type Declarations
+ ****************************************************************************/
+
+/****************************************************************************
+ * Global Variables
+ ****************************************************************************/
+
+/****************************************************************************
+ * Private Variables
+ ****************************************************************************/
+
+/****************************************************************************
+ * Private Functions
+ ****************************************************************************/
+
+/****************************************************************************
+ * Function: mq_rcvtimeout
+ *
+ * Description:
+ * This function is called if the timeout elapses before the message queue
+ * becomes non-empty.
+ *
+ * Parameters:
+ * argc - the number of arguments (should be 1)
+ * pid - the task ID of the task to wakeup
+ *
+ * Return Value:
+ * None
+ *
+ * Assumptions:
+ *
+ ****************************************************************************/
+
+static void mq_rcvtimeout(int argc, uint32 pid, ...)
+{
+ FAR _TCB *wtcb;
+ irqstate_t saved_state;
+
+ /* Disable interrupts. This is necessary because an
+ * interrupt handler may attempt to send a message while we are
+ * doing this.
+ */
+
+ saved_state = irqsave();
+
+ /* Get the TCB associated with this pid. It is possible that
+ * task may no longer be active when this watchdog goes off.
+ */
+
+ wtcb = sched_gettcb((pid_t)pid);
+
+ /* It is also possible that an interrupt/context switch beat us to the
+ * punch and already changed the task's state.
+ */
+
+ if (wtcb && wtcb->task_state == TSTATE_WAIT_MQNOTEMPTY)
+ {
+ /* Mark the errno value for the thread. */
+
+ wtcb->errno = ETIMEDOUT;
+
+ /* Restart the the task. */
+
+ up_unblock_task(wtcb);
+ }
+
+ /* Interrupts may now be enabled. */
+
+ irqrestore(saved_state);
+}
+
+/****************************************************************************
+ * Public Functions
+ ****************************************************************************/
+
+/****************************************************************************
+ * Function: mq_timedreceive
+ *
+ * Description:
+ * This function receives the oldest of the highest
+ * priority messages from the message queue specified by
+ * "mqdes." If the size of the buffer in bytes (msglen) is
+ * less than the "mq_msgsize" attribute of the message
+ * queue, mq_timedreceive will return an error. Otherwise, the
+ * selected message is removed from the queue and copied to
+ * "msg."
+ *
+ * If the message queue is empty and O_NONBLOCK was not
+ * set, mq_timedreceive() will block until a message is added
+ * to the message queue (or until a timeout occurs). If more
+ * than one task is waiting to receive a message, only the
+ * task with the highest priority that has waited the longest
+ * will be unblocked.
+ *
+ * mq_timedreceive() behaves just like mq_receive(), except
+ * that if the queue is empty and the O_NONBLOCK flag is not
+ * enabled for the message queue description, then abstime
+ * points to a structure which specifies a ceiling on the time
+ * for which the call will block. This ceiling is an absolute
+ * timeout in seconds and nanoseconds since the Epoch (midnight
+ * on the morning of 1 January 1970).
+ *
+ * If no message is available, and the timeout has already
+ * expired by the time of the call, mq_timedreceive() returns
+ * immediately.
+ *
+ * Parameters:
+ * mqdes - Message Queue Descriptor
+ * msg - Buffer to receive the message
+ * msglen - Size of the buffer in bytes
+ * prio - If not NULL, the location to store message priority.
+ * abstime - the absolute time to wait until a timeout is declared.
+ *
+ * Return Value:
+ * One success, the length of the selected message in bytes.is
+ * returned. On failure, -1 (ERROR) is returned and the errno
+ * is set appropriately:
+ *
+ * EAGAIN The queue was empty, and the O_NONBLOCK flag was set
+ * for the message queue description referred to by 'mqdes'.
+ * EPERM Message queue opened not opened for reading.
+ * EMSGSIZE 'msglen' was less than the maxmsgsize attribute of the
+ * message queue.
+ * EINTR The call was interrupted by a signal handler.
+ * EINVAL Invalid 'msg' or 'mqdes' or 'abstime'
+ * ETIMEDOUT The call timed out before a message could be transferred.
+ *
+ * Assumptions:
+ *
+ ****************************************************************************/
+
+ssize_t mq_timedreceive(mqd_t mqdes, void *msg, size_t msglen,
+ int *prio, const struct timespec *abstime)
+{
+ WDOG_ID wdog;
+ FAR mqmsg_t *mqmsg;
+ irqstate_t saved_state;
+ int ret = ERROR;
+
+ DEBUGASSERT(!up_interrupt_context());
+
+ /* Verify the input parameters and, in case of an error, set
+ * errno appropriately.
+ */
+
+ if (mq_verifyreceive(mqdes, msg, msglen) != OK)
+ {
+ return ERROR;
+ }
+
+ if (!abstime || abstime->tv_sec < 0 || abstime->tv_nsec > 1000000000)
+ {
+ *get_errno_ptr() = EINVAL;
+ return ERROR;
+ }
+
+ /* Create a watchdog. We will not actually need this watchdog
+ * unless the queue is not empty, but we will reserve it up front
+ * before we enter the following critical section.
+ */
+
+ wdog = wd_create();
+ if (!wdog)
+ {
+ *get_errno_ptr() = EINVAL;
+ return ERROR;
+ }
+
+ /* Get the next mesage from the message queue. We will disable
+ * pre-emption until we have completed the message received. This
+ * is not too bad because if the receipt takes a long time, it will
+ * be because we are blocked waiting for a message and pre-emption
+ * will be re-enabled while we are blocked
+ */
+
+ sched_lock();
+
+ /* Furthermore, mq_waitreceive() expects to have interrupts disabled
+ * because messages can be sent from interrupt level.
+ */
+
+ saved_state = irqsave();
+
+ /* Check if the message queue is empty. If it is NOT empty, then we
+ * will not need to start timer.
+ */
+
+ if (mqdes->msgq->msglist.head == NULL)
+ {
+ sint32 ticks;
+
+ /* Convert the timespec to clock ticks. We must have interrupts
+ * disabled here so that this time stays valid until the wait begins.
+ */
+
+ ret = clock_abstime2ticks(CLOCK_REALTIME, abstime, &ticks);
+
+ /* If the time has already expired and the message queue is empty,
+ * return immediately.
+ */
+
+ if (ret == OK && ticks <= 0)
+ {
+ ret = ETIMEDOUT;
+ }
+
+ /* Handle any time-related errors */
+
+ if (ret != OK)
+ {
+ *get_errno_ptr() = ret;
+ irqrestore(saved_state);
+ sched_unlock();
+ wd_delete(wdog);
+ return ERROR;
+ }
+
+ /* Start the watchdog */
+
+ wd_start(wdog, ticks, (wdentry_t)mq_rcvtimeout, 1, getpid());
+ }
+
+ /* Get the message from the message queue */
+
+ mqmsg = mq_waitreceive(mqdes);
+
+ /* Stop the watchdog timer (this is not harmful in the case where
+ * it was never started)
+ */
+
+ wd_cancel(wdog);
+
+ /* We can now restore interrupts */
+
+ irqrestore(saved_state);
+
+ /* Check if we got a message from the message queue. We might
+ * not have a message if:
+ *
+ * - The message queue is empty and O_NONBLOCK is set in the mqdes
+ * - The wait was interrupted by a signal
+ * - The watchdog timeout expired
+ */
+
+ if (mqmsg)
+ {
+ ret = mq_doreceive(mqdes, mqmsg, msg, prio);
+ }
+
+ sched_unlock();
+ wd_delete(wdog);
+ return ret;
+}
diff --git a/nuttx/sched/mq_timedsend.c b/nuttx/sched/mq_timedsend.c
new file mode 100644
index 000000000..710df84d3
--- /dev/null
+++ b/nuttx/sched/mq_timedsend.c
@@ -0,0 +1,317 @@
+/****************************************************************************
+ * mq_timedsend.c
+ *
+ * Copyright (C) 2007 Gregory Nutt. All rights reserved.
+ * Author: Gregory Nutt <spudmonkey@racsa.co.cr>
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * 3. Neither the name Gregory Nutt nor the names of its contributors may be
+ * used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+ * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
+ * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ ****************************************************************************/
+
+/****************************************************************************
+ * Included Files
+ ****************************************************************************/
+
+#include <nuttx/config.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <mqueue.h>
+#include <wdog.h>
+#include <errno.h>
+#include <debug.h>
+#include <nuttx/arch.h>
+#include "clock_internal.h"
+#include "os_internal.h"
+#include "mq_internal.h"
+
+/****************************************************************************
+ * Definitions
+ ****************************************************************************/
+
+/****************************************************************************
+ * Private Type Declarations
+ ****************************************************************************/
+
+/****************************************************************************
+ * Global Variables
+ ****************************************************************************/
+
+/****************************************************************************
+ * Private Variables
+ ****************************************************************************/
+
+/****************************************************************************
+ * Private Functions
+ ****************************************************************************/
+
+/****************************************************************************
+ * Function: mq_sndtimeout
+ *
+ * Description:
+ * This function is called if the timeout elapses before the message queue
+ * becomes non-full.
+ *
+ * Parameters:
+ * argc - the number of arguments (should be 1)
+ * pid - the task ID of the task to wakeup
+ *
+ * Return Value:
+ * None
+ *
+ * Assumptions:
+ *
+ ****************************************************************************/
+
+static void mq_sndtimeout(int argc, uint32 pid, ...)
+{
+ FAR _TCB *wtcb;
+ irqstate_t saved_state;
+
+ /* Disable interrupts. This is necessary because an
+ * interrupt handler may attempt to send a message while we are
+ * doing this.
+ */
+
+ saved_state = irqsave();
+
+ /* Get the TCB associated with this pid. It is possible that
+ * task may no longer be active when this watchdog goes off.
+ */
+
+ wtcb = sched_gettcb((pid_t)pid);
+
+ /* It is also possible that an interrupt/context switch beat us to the
+ * punch and already changed the task's state.
+ */
+
+ if (wtcb && wtcb->task_state == TSTATE_WAIT_MQNOTEMPTY)
+ {
+ /* Mark the errno value for the thread. */
+
+ wtcb->errno = ETIMEDOUT;
+
+ /* Restart the the task. */
+
+ up_unblock_task(wtcb);
+ }
+
+ /* Interrupts may now be enabled. */
+
+ irqrestore(saved_state);
+}
+
+/****************************************************************************
+ * Public Functions
+ ****************************************************************************/
+
+/****************************************************************************
+ * Function: mq_send
+ *
+ * Description:
+ * This function adds the specificied message (msg) to the message queue
+ * (mqdes). The "msglen" parameter specifies the length of the message
+ * in bytes pointed to by "msg." This length must not exceed the maximum
+ * message length from the mq_getattr().
+ *
+ * If the message queue is not full, mq_timedsend() place the message in the
+ * message queue at the position indicated by the "prio" argrument.
+ * Messages with higher priority will be inserted before lower priority
+ * messages. The value of "prio" must not exceed MQ_PRIO_MAX.
+ *
+ * If the specified message queue is full and O_NONBLOCK is not set in the
+ * message queue, then mq_timedsend() will block until space becomes available
+ * to the queue the message or a timeout occurs.
+ *
+ * mq_timedsend() behaves just like mq_send(), except that if the queue
+ * is full and the O_NONBLOCK flag is not enabled for the message queue
+ * description, then abstime points to a structure which specifies a
+ * ceiling on the time for which the call will block. This ceiling is an
+ * absolute timeout in seconds and nanoseconds since the Epoch (midnight
+ * on the morning of 1 January 1970).
+ *
+ * If the message queue is full, and the timeout has already expired by
+ * the time of the call, mq_timedsend() returns immediately.
+ *
+ * Parameters:
+ * mqdes - Message queue descriptor
+ * msg - Message to send
+ * msglen - The length of the message in bytes
+ * prio - The priority of the message
+ * abstime - the absolute time to wait until a timeout is decleared
+ *
+ * Return Value:
+ * On success, mq_send() returns 0 (OK); on error, -1 (ERROR)
+ * is returned, with errno set to indicate the error:
+ *
+ * EAGAIN The queue was empty, and the O_NONBLOCK flag was set for the
+ * message queue description referred to by mqdes.
+ * EINVAL Either msg or mqdes is NULL or the value of prio is invalid.
+ * EPERM Message queue opened not opened for writing.
+ * EMSGSIZE 'msglen' was greater than the maxmsgsize attribute of the
+ * message queue.
+ * EINTR The call was interrupted by a signal handler.
+ *
+ * Assumptions/restrictions:
+ *
+ ****************************************************************************/
+
+int mq_timedsend(mqd_t mqdes, const char *msg, size_t msglen, int prio,
+ const struct timespec *abstime)
+{
+ WDOG_ID wdog;
+ FAR msgq_t *msgq;
+ FAR mqmsg_t *mqmsg = NULL;
+ irqstate_t saved_state;
+ int ret = ERROR;
+
+ DEBUGASSERT(!up_interrupt_context());
+
+ /* Verify the input parameters -- setting errno appropriately
+ * on any failures to verify.
+ */
+
+ if (mq_verifysend(mqdes, msg, msglen, prio) != OK)
+ {
+ return ERROR;
+ }
+
+ if (!abstime || abstime->tv_sec < 0 || abstime->tv_nsec > 1000000000)
+ {
+ *get_errno_ptr() = EINVAL;
+ return ERROR;
+ }
+
+ /* Get a pointer to the message queue */
+
+ msgq = mqdes->msgq;
+
+ /* Create a watchdog. We will not actually need this watchdog
+ * unless the queue is full, but we will reserve it up front
+ * before we enter the following critical section.
+ */
+
+ wdog = wd_create();
+ if (!wdog)
+ {
+ *get_errno_ptr() = EINVAL;
+ return ERROR;
+ }
+
+ /* Allocate a message structure:
+ * - If we are called from an interrupt handler, or
+ * - If the message queue is not full, or
+ */
+
+ sched_lock();
+ saved_state = irqsave();
+ if (up_interrupt_context() || /* In an interrupt handler */
+ msgq->nmsgs < msgq->maxmsgs) /* OR Message queue not full */
+ {
+ /* Allocate the message */
+
+ irqrestore(saved_state);
+ mqmsg = mq_msgalloc();
+ }
+ else
+ {
+ sint32 ticks;
+ int result;
+
+ /* We are not in an interupt handler and the message queue is full.
+ * set up a timed wait for the message queue to become non-full.
+ *
+ * Convert the timespec to clock ticks. We must have interrupts
+ * disabled here so that this time stays valid until the wait begins.
+ */
+
+ result = clock_abstime2ticks(CLOCK_REALTIME, abstime, &ticks);
+
+ /* If the time has already expired and the message queue is empty,
+ * return immediately.
+ */
+
+ if (result == OK && ticks <= 0)
+ {
+ result = ETIMEDOUT;
+ }
+
+ /* Handle any time-related errors */
+
+ if (result == OK)
+ {
+ /* Start the watchdog */
+
+ wd_start(wdog, ticks, (wdentry_t)mq_sndtimeout, 1, getpid());
+
+ /* And wait for the message queue to be non-empty */
+
+ result = mq_waitsend(mqdes);
+
+ /* This may return with an error and errno set to either EINTR
+ * or ETIMEOUT. Cancel the watchdog timer in any event.
+ */
+
+ wd_cancel(wdog);
+ }
+
+ /* That is the end of the atomic operations */
+
+ irqrestore(saved_state);
+
+ /* If any of the above failed, set the errno. Otherwise, there should
+ * be space for another message in the message queue. NOW we can allocate
+ * the message structure.
+ */
+
+ if (result != OK)
+ {
+ *get_errno_ptr() = result;
+ }
+ else
+ {
+ mqmsg = mq_msgalloc();
+ }
+ }
+
+ /* Check if we were able to get a message structure -- this can fail
+ * either because we cannot send the message (and didn't bother trying
+ * to allocate it) or because the allocation failed.
+ */
+
+ if (mqmsg)
+ {
+ /* Yes, peforrm the message send. */
+
+ ret = mq_dosend(mqdes, mqmsg, msg, msglen, prio);
+ }
+
+ sched_unlock();
+ wd_delete(wdog);
+ return ret;
+}
+