aboutsummaryrefslogtreecommitdiff
path: root/src/modules/dataman/dataman.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/modules/dataman/dataman.c')
-rw-r--r--src/modules/dataman/dataman.c872
1 files changed, 872 insertions, 0 deletions
diff --git a/src/modules/dataman/dataman.c b/src/modules/dataman/dataman.c
new file mode 100644
index 000000000..ca1fe9bbb
--- /dev/null
+++ b/src/modules/dataman/dataman.c
@@ -0,0 +1,872 @@
+/****************************************************************************
+ *
+ * Copyright (c) 2013, 2014 PX4 Development Team. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * 3. Neither the name PX4 nor the names of its contributors may be
+ * used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+ * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
+ * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ ****************************************************************************/
+/**
+ * @file dataman.c
+ * DATAMANAGER driver.
+ *
+ * @author Jean Cyr
+ * @author Lorenz Meier
+ * @author Julian Oes
+ * @author Thomas Gubler
+ */
+
+#include <nuttx/config.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <fcntl.h>
+#include <systemlib/systemlib.h>
+#include <systemlib/err.h>
+#include <queue.h>
+#include <string.h>
+
+#include "dataman.h"
+#include <systemlib/param/param.h>
+
+/**
+ * data manager app start / stop handling function
+ *
+ * @ingroup apps
+ */
+
+__EXPORT int dataman_main(int argc, char *argv[]);
+__EXPORT ssize_t dm_read(dm_item_t item, unsigned char index, void *buffer, size_t buflen);
+__EXPORT ssize_t dm_write(dm_item_t item, unsigned char index, dm_persitence_t persistence, const void *buffer, size_t buflen);
+__EXPORT int dm_clear(dm_item_t item);
+__EXPORT void dm_lock(dm_item_t item);
+__EXPORT void dm_unlock(dm_item_t item);
+__EXPORT int dm_restart(dm_reset_reason restart_type);
+
+/** Types of function calls supported by the worker task */
+typedef enum {
+ dm_write_func = 0,
+ dm_read_func,
+ dm_clear_func,
+ dm_restart_func,
+ dm_number_of_funcs
+} dm_function_t;
+
+/** Work task work item */
+typedef struct {
+ sq_entry_t link; /**< list linkage */
+ sem_t wait_sem;
+ unsigned char first;
+ unsigned char func;
+ ssize_t result;
+ union {
+ struct {
+ dm_item_t item;
+ unsigned char index;
+ dm_persitence_t persistence;
+ const void *buf;
+ size_t count;
+ } write_params;
+ struct {
+ dm_item_t item;
+ unsigned char index;
+ void *buf;
+ size_t count;
+ } read_params;
+ struct {
+ dm_item_t item;
+ } clear_params;
+ struct {
+ dm_reset_reason reason;
+ } restart_params;
+ };
+} work_q_item_t;
+
+const size_t k_work_item_allocation_chunk_size = 8;
+
+/* Usage statistics */
+static unsigned g_func_counts[dm_number_of_funcs];
+
+/* table of maximum number of instances for each item type */
+static const unsigned g_per_item_max_index[DM_KEY_NUM_KEYS] = {
+ DM_KEY_SAFE_POINTS_MAX,
+ DM_KEY_FENCE_POINTS_MAX,
+ DM_KEY_WAYPOINTS_OFFBOARD_0_MAX,
+ DM_KEY_WAYPOINTS_OFFBOARD_1_MAX,
+ DM_KEY_WAYPOINTS_ONBOARD_MAX,
+ DM_KEY_MISSION_STATE_MAX
+};
+
+/* Table of offset for index 0 of each item type */
+static unsigned int g_key_offsets[DM_KEY_NUM_KEYS];
+
+/* Item type lock mutexes */
+static sem_t *g_item_locks[DM_KEY_NUM_KEYS];
+static sem_t g_sys_state_mutex;
+
+/* The data manager store file handle and file name */
+static int g_fd = -1, g_task_fd = -1;
+static const char *k_data_manager_device_path = "/fs/microsd/dataman";
+
+/* The data manager work queues */
+
+typedef struct {
+ sq_queue_t q; /* Nuttx queue */
+ sem_t mutex; /* Mutual exclusion on work queue adds and deletes */
+ unsigned size; /* Current size of queue */
+ unsigned max_size; /* Maximum queue size reached */
+} work_q_t;
+
+static work_q_t g_free_q; /* queue of free work items. So that we don't always need to call malloc and free*/
+static work_q_t g_work_q; /* pending work items. To be consumed by worker thread */
+
+sem_t g_work_queued_sema; /* To notify worker thread a work item has been queued */
+sem_t g_init_sema;
+
+static bool g_task_should_exit; /**< if true, dataman task should exit */
+
+#define DM_SECTOR_HDR_SIZE 4 /* data manager per item header overhead */
+static const unsigned k_sector_size = DM_MAX_DATA_SIZE + DM_SECTOR_HDR_SIZE; /* total item sorage space */
+
+static void init_q(work_q_t *q)
+{
+ sq_init(&(q->q)); /* Initialize the NuttX queue structure */
+ sem_init(&(q->mutex), 1, 1); /* Queue is initially unlocked */
+ q->size = q->max_size = 0; /* Queue is initially empty */
+}
+
+static inline void
+destroy_q(work_q_t *q)
+{
+ sem_destroy(&(q->mutex)); /* Destroy the queue lock */
+}
+
+static inline void
+lock_queue(work_q_t *q)
+{
+ sem_wait(&(q->mutex)); /* Acquire the queue lock */
+}
+
+static inline void
+unlock_queue(work_q_t *q)
+{
+ sem_post(&(q->mutex)); /* Release the queue lock */
+}
+
+static work_q_item_t *
+create_work_item(void)
+{
+ work_q_item_t *item;
+
+ /* Try to reuse item from free item queue */
+ lock_queue(&g_free_q);
+
+ if ((item = (work_q_item_t *)sq_remfirst(&(g_free_q.q))))
+ g_free_q.size--;
+
+ unlock_queue(&g_free_q);
+
+ /* If we there weren't any free items then obtain memory for a new ones */
+ if (item == NULL) {
+ item = (work_q_item_t *)malloc(k_work_item_allocation_chunk_size * sizeof(work_q_item_t));
+ if (item) {
+ item->first = 1;
+ lock_queue(&g_free_q);
+ for (size_t i = 1; i < k_work_item_allocation_chunk_size; i++) {
+ (item + i)->first = 0;
+ sq_addfirst(&(item + i)->link, &(g_free_q.q));
+ }
+ /* Update the queue size and potentially the maximum queue size */
+ g_free_q.size += k_work_item_allocation_chunk_size - 1;
+ if (g_free_q.size > g_free_q.max_size)
+ g_free_q.max_size = g_free_q.size;
+ unlock_queue(&g_free_q);
+ }
+ }
+
+ /* If we got one then lock the item*/
+ if (item)
+ sem_init(&item->wait_sem, 1, 0); /* Caller will wait on this... initially locked */
+
+ /* return the item pointer, or NULL if all failed */
+ return item;
+}
+
+/* Work queue management functions */
+
+static inline void
+destroy_work_item(work_q_item_t *item)
+{
+ sem_destroy(&item->wait_sem); /* Destroy the item lock */
+ /* Return the item to the free item queue for later reuse */
+ lock_queue(&g_free_q);
+ sq_addfirst(&item->link, &(g_free_q.q));
+
+ /* Update the queue size and potentially the maximum queue size */
+ if (++g_free_q.size > g_free_q.max_size)
+ g_free_q.max_size = g_free_q.size;
+
+ unlock_queue(&g_free_q);
+}
+
+static inline work_q_item_t *
+dequeue_work_item(void)
+{
+ work_q_item_t *work;
+
+ /* retrieve the 1st item on the work queue */
+ lock_queue(&g_work_q);
+
+ if ((work = (work_q_item_t *)sq_remfirst(&g_work_q.q)))
+ g_work_q.size--;
+
+ unlock_queue(&g_work_q);
+ return work;
+}
+
+static int
+enqueue_work_item_and_wait_for_result(work_q_item_t *item)
+{
+ /* put the work item at the end of the work queue */
+ lock_queue(&g_work_q);
+ sq_addlast(&item->link, &(g_work_q.q));
+
+ /* Adjust the queue size and potentially the maximum queue size */
+ if (++g_work_q.size > g_work_q.max_size)
+ g_work_q.max_size = g_work_q.size;
+
+ unlock_queue(&g_work_q);
+
+ /* tell the work thread that work is available */
+ sem_post(&g_work_queued_sema);
+
+ /* wait for the result */
+ sem_wait(&item->wait_sem);
+
+ int result = item->result;
+
+ destroy_work_item(item);
+
+ return result;
+}
+
+/* Calculate the offset in file of specific item */
+static int
+calculate_offset(dm_item_t item, unsigned char index)
+{
+
+ /* Make sure the item type is valid */
+ if (item >= DM_KEY_NUM_KEYS)
+ return -1;
+
+ /* Make sure the index for this item type is valid */
+ if (index >= g_per_item_max_index[item])
+ return -1;
+
+ /* Calculate and return the item index based on type and index */
+ return g_key_offsets[item] + (index * k_sector_size);
+}
+
+/* Each data item is stored as follows
+ *
+ * byte 0: Length of user data item
+ * byte 1: Persistence of this data item
+ * byte 2: Unused (for future use)
+ * byte 3: Unused (for future use)
+ * byte DM_SECTOR_HDR_SIZE... : data item value
+ *
+ * The total size must not exceed k_sector_size
+ */
+
+/* write to the data manager file */
+static ssize_t
+_write(dm_item_t item, unsigned char index, dm_persitence_t persistence, const void *buf, size_t count)
+{
+ unsigned char buffer[k_sector_size];
+ size_t len;
+ int offset;
+
+ /* Get the offset for this item */
+ offset = calculate_offset(item, index);
+
+ /* If item type or index out of range, return error */
+ if (offset < 0)
+ return -1;
+
+ /* Make sure caller has not given us more data than we can handle */
+ if (count > DM_MAX_DATA_SIZE)
+ return -1;
+
+ /* Write out the data, prefixed with length and persistence level */
+ buffer[0] = count;
+ buffer[1] = persistence;
+ buffer[2] = 0;
+ buffer[3] = 0;
+ if (count > 0) {
+ memcpy(buffer + DM_SECTOR_HDR_SIZE, buf, count);
+ }
+ count += DM_SECTOR_HDR_SIZE;
+
+ len = -1;
+
+ /* Seek to the right spot in the data manager file and write the data item */
+ if (lseek(g_task_fd, offset, SEEK_SET) == offset)
+ if ((len = write(g_task_fd, buffer, count)) == count)
+ fsync(g_task_fd); /* Make sure data is written to physical media */
+
+ /* Make sure the write succeeded */
+ if (len != count)
+ return -1;
+
+ /* All is well... return the number of user data written */
+ return count - DM_SECTOR_HDR_SIZE;
+}
+
+/* Retrieve from the data manager file */
+static ssize_t
+_read(dm_item_t item, unsigned char index, void *buf, size_t count)
+{
+ unsigned char buffer[k_sector_size];
+ int len, offset;
+
+ /* Get the offset for this item */
+ offset = calculate_offset(item, index);
+
+ /* If item type or index out of range, return error */
+ if (offset < 0)
+ return -1;
+
+ /* Make sure the caller hasn't asked for more data than we can handle */
+ if (count > DM_MAX_DATA_SIZE)
+ return -1;
+
+ /* Read the prefix and data */
+ len = -1;
+
+ if (lseek(g_task_fd, offset, SEEK_SET) == offset)
+ len = read(g_task_fd, buffer, count + DM_SECTOR_HDR_SIZE);
+
+ /* Check for read error */
+ if (len < 0)
+ return -1;
+
+ /* A zero length entry is a empty entry */
+ if (len == 0)
+ buffer[0] = 0;
+
+ /* See if we got data */
+ if (buffer[0] > 0) {
+ /* We got more than requested!!! */
+ if (buffer[0] > count)
+ return -1;
+
+ /* Looks good, copy it to the caller's buffer */
+ memcpy(buf, buffer + DM_SECTOR_HDR_SIZE, buffer[0]);
+ }
+
+ /* Return the number of bytes of caller data read */
+ return buffer[0];
+}
+
+static int
+_clear(dm_item_t item)
+{
+ int i, result = 0;
+
+ /* Get the offset of 1st item of this type */
+ int offset = calculate_offset(item, 0);
+
+ /* Check for item type out of range */
+ if (offset < 0)
+ return -1;
+
+ /* Clear all items of this type */
+ for (i = 0; (unsigned)i < g_per_item_max_index[item]; i++) {
+ char buf[1];
+
+ if (lseek(g_task_fd, offset, SEEK_SET) != offset) {
+ result = -1;
+ break;
+ }
+
+ /* Avoid SD flash wear by only doing writes where necessary */
+ if (read(g_task_fd, buf, 1) < 1)
+ break;
+
+ /* If item has length greater than 0 it needs to be overwritten */
+ if (buf[0]) {
+ if (lseek(g_task_fd, offset, SEEK_SET) != offset) {
+ result = -1;
+ break;
+ }
+
+ buf[0] = 0;
+
+ if (write(g_task_fd, buf, 1) != 1) {
+ result = -1;
+ break;
+ }
+ }
+
+ offset += k_sector_size;
+ }
+
+ /* Make sure data is actually written to physical media */
+ fsync(g_task_fd);
+ return result;
+}
+
+/** Tell the data manager about the type of the last reset */
+static int
+_restart(dm_reset_reason reason)
+{
+ unsigned char buffer[2];
+ int offset = 0, result = 0;
+
+ /* We need to scan the entire file and invalidate and data that should not persist after the last reset */
+
+ /* Loop through all of the data segments and delete those that are not persistent */
+ while (1) {
+ size_t len;
+
+ /* Get data segment at current offset */
+ if (lseek(g_task_fd, offset, SEEK_SET) != offset) {
+ /* must be at eof */
+ break;
+ }
+
+ len = read(g_task_fd, buffer, sizeof(buffer));
+
+ if (len != sizeof(buffer)) {
+ /* must be at eof */
+ break;
+ }
+
+ /* check if segment contains data */
+ if (buffer[0]) {
+ int clear_entry = 0;
+
+ /* Whether data gets deleted depends on reset type and data segment's persistence setting */
+ if (reason == DM_INIT_REASON_POWER_ON) {
+ if (buffer[1] > DM_PERSIST_POWER_ON_RESET) {
+ clear_entry = 1;
+ }
+
+ } else {
+ if (buffer[1] > DM_PERSIST_IN_FLIGHT_RESET) {
+ clear_entry = 1;
+ }
+ }
+
+ /* Set segment to unused if data does not persist */
+ if (clear_entry) {
+ if (lseek(g_task_fd, offset, SEEK_SET) != offset) {
+ result = -1;
+ break;
+ }
+
+ buffer[0] = 0;
+
+ len = write(g_task_fd, buffer, 1);
+
+ if (len != 1) {
+ result = -1;
+ break;
+ }
+ }
+ }
+
+ offset += k_sector_size;
+ }
+
+ fsync(g_task_fd);
+
+ /* tell the caller how it went */
+ return result;
+}
+
+/** Write to the data manager file */
+__EXPORT ssize_t
+dm_write(dm_item_t item, unsigned char index, dm_persitence_t persistence, const void *buf, size_t count)
+{
+ work_q_item_t *work;
+
+ /* Make sure data manager has been started and is not shutting down */
+ if ((g_fd < 0) || g_task_should_exit)
+ return -1;
+
+ /* get a work item and queue up a write request */
+ if ((work = create_work_item()) == NULL)
+ return -1;
+
+ work->func = dm_write_func;
+ work->write_params.item = item;
+ work->write_params.index = index;
+ work->write_params.persistence = persistence;
+ work->write_params.buf = buf;
+ work->write_params.count = count;
+
+ /* Enqueue the item on the work queue and wait for the worker thread to complete processing it */
+ return (ssize_t)enqueue_work_item_and_wait_for_result(work);
+}
+
+/** Retrieve from the data manager file */
+__EXPORT ssize_t
+dm_read(dm_item_t item, unsigned char index, void *buf, size_t count)
+{
+ work_q_item_t *work;
+
+ /* Make sure data manager has been started and is not shutting down */
+ if ((g_fd < 0) || g_task_should_exit)
+ return -1;
+
+ /* get a work item and queue up a read request */
+ if ((work = create_work_item()) == NULL)
+ return -1;
+
+ work->func = dm_read_func;
+ work->read_params.item = item;
+ work->read_params.index = index;
+ work->read_params.buf = buf;
+ work->read_params.count = count;
+
+ /* Enqueue the item on the work queue and wait for the worker thread to complete processing it */
+ return (ssize_t)enqueue_work_item_and_wait_for_result(work);
+}
+
+__EXPORT int
+dm_clear(dm_item_t item)
+{
+ work_q_item_t *work;
+
+ /* Make sure data manager has been started and is not shutting down */
+ if ((g_fd < 0) || g_task_should_exit)
+ return -1;
+
+ /* get a work item and queue up a clear request */
+ if ((work = create_work_item()) == NULL)
+ return -1;
+
+ work->func = dm_clear_func;
+ work->clear_params.item = item;
+
+ /* Enqueue the item on the work queue and wait for the worker thread to complete processing it */
+ return enqueue_work_item_and_wait_for_result(work);
+}
+
+__EXPORT void
+dm_lock(dm_item_t item)
+{
+ /* Make sure data manager has been started and is not shutting down */
+ if ((g_fd < 0) || g_task_should_exit)
+ return;
+ if (item >= DM_KEY_NUM_KEYS)
+ return;
+ if (g_item_locks[item]) {
+ sem_wait(g_item_locks[item]);
+ }
+}
+
+__EXPORT void
+dm_unlock(dm_item_t item)
+{
+ /* Make sure data manager has been started and is not shutting down */
+ if ((g_fd < 0) || g_task_should_exit)
+ return;
+ if (item >= DM_KEY_NUM_KEYS)
+ return;
+ if (g_item_locks[item]) {
+ sem_post(g_item_locks[item]);
+ }
+}
+
+/* Tell the data manager about the type of the last reset */
+__EXPORT int
+dm_restart(dm_reset_reason reason)
+{
+ work_q_item_t *work;
+
+ /* Make sure data manager has been started and is not shutting down */
+ if ((g_fd < 0) || g_task_should_exit)
+ return -1;
+
+ /* get a work item and queue up a restart request */
+ if ((work = create_work_item()) == NULL)
+ return -1;
+
+ work->func = dm_restart_func;
+ work->restart_params.reason = reason;
+
+ /* Enqueue the item on the work queue and wait for the worker thread to complete processing it */
+ return enqueue_work_item_and_wait_for_result(work);
+}
+
+static int
+task_main(int argc, char *argv[])
+{
+ work_q_item_t *work;
+
+ /* inform about start */
+ warnx("Initializing..");
+
+ /* Initialize global variables */
+ g_key_offsets[0] = 0;
+
+ for (unsigned i = 0; i < (DM_KEY_NUM_KEYS - 1); i++)
+ g_key_offsets[i + 1] = g_key_offsets[i] + (g_per_item_max_index[i] * k_sector_size);
+
+ unsigned max_offset = g_key_offsets[DM_KEY_NUM_KEYS - 1] + (g_per_item_max_index[DM_KEY_NUM_KEYS - 1] * k_sector_size);
+
+ for (unsigned i = 0; i < dm_number_of_funcs; i++)
+ g_func_counts[i] = 0;
+
+ /* Initialize the item type locks, for now only DM_KEY_MISSION_STATE supports locking */
+ sem_init(&g_sys_state_mutex, 1, 1); /* Initially unlocked */
+ for (unsigned i = 0; i < DM_KEY_NUM_KEYS; i++)
+ g_item_locks[i] = NULL;
+ g_item_locks[DM_KEY_MISSION_STATE] = &g_sys_state_mutex;
+
+ g_task_should_exit = false;
+
+ init_q(&g_work_q);
+ init_q(&g_free_q);
+
+ sem_init(&g_work_queued_sema, 1, 0);
+
+ /* See if the data manage file exists and is a multiple of the sector size */
+ g_task_fd = open(k_data_manager_device_path, O_RDONLY | O_BINARY);
+ if (g_task_fd >= 0) {
+ /* File exists, check its size */
+ int file_size = lseek(g_task_fd, 0, SEEK_END);
+ if ((file_size % k_sector_size) != 0) {
+ warnx("Incompatible data manager file %s, resetting it", k_data_manager_device_path);
+ close(g_task_fd);
+ unlink(k_data_manager_device_path);
+ }
+ else
+ close(g_task_fd);
+ }
+
+ /* Open or create the data manager file */
+ g_task_fd = open(k_data_manager_device_path, O_RDWR | O_CREAT | O_BINARY);
+
+ if (g_task_fd < 0) {
+ warnx("Could not open data manager file %s", k_data_manager_device_path);
+ sem_post(&g_init_sema); /* Don't want to hang startup */
+ return -1;
+ }
+
+ if ((unsigned)lseek(g_task_fd, max_offset, SEEK_SET) != max_offset) {
+ close(g_task_fd);
+ warnx("Could not seek data manager file %s", k_data_manager_device_path);
+ sem_post(&g_init_sema); /* Don't want to hang startup */
+ return -1;
+ }
+
+ fsync(g_task_fd);
+
+ /* see if we need to erase any items based on restart type */
+ int sys_restart_val;
+ if (param_get(param_find("SYS_RESTART_TYPE"), &sys_restart_val) == OK) {
+ if (sys_restart_val == DM_INIT_REASON_POWER_ON) {
+ warnx("Power on restart");
+ _restart(DM_INIT_REASON_POWER_ON);
+ }
+ else if (sys_restart_val == DM_INIT_REASON_IN_FLIGHT) {
+ warnx("In flight restart");
+ _restart(DM_INIT_REASON_IN_FLIGHT);
+ }
+ else
+ warnx("Unknown restart");
+ }
+ else
+ warnx("Unknown restart");
+
+ /* We use two file descriptors, one for the caller context and one for the worker thread */
+ /* They are actually the same but we need to some way to reject caller request while the */
+ /* worker thread is shutting down but still processing requests */
+ g_fd = g_task_fd;
+
+ warnx("Initialized, data manager file '%s' size is %d bytes", k_data_manager_device_path, max_offset);
+
+ /* Tell startup that the worker thread has completed its initialization */
+ sem_post(&g_init_sema);
+
+ /* Start the endless loop, waiting for then processing work requests */
+ while (true) {
+
+ /* do we need to exit ??? */
+ if ((g_task_should_exit) && (g_fd >= 0)) {
+ /* Close the file handle to stop further queuing */
+ g_fd = -1;
+ }
+
+ if (!g_task_should_exit) {
+ /* wait for work */
+ sem_wait(&g_work_queued_sema);
+ }
+
+ /* Empty the work queue */
+ while ((work = dequeue_work_item())) {
+
+ /* handle each work item with the appropriate handler */
+ switch (work->func) {
+ case dm_write_func:
+ g_func_counts[dm_write_func]++;
+ work->result =
+ _write(work->write_params.item, work->write_params.index, work->write_params.persistence, work->write_params.buf, work->write_params.count);
+ break;
+
+ case dm_read_func:
+ g_func_counts[dm_read_func]++;
+ work->result =
+ _read(work->read_params.item, work->read_params.index, work->read_params.buf, work->read_params.count);
+ break;
+
+ case dm_clear_func:
+ g_func_counts[dm_clear_func]++;
+ work->result = _clear(work->clear_params.item);
+ break;
+
+ case dm_restart_func:
+ g_func_counts[dm_restart_func]++;
+ work->result = _restart(work->restart_params.reason);
+ break;
+
+ default: /* should never happen */
+ work->result = -1;
+ break;
+ }
+
+ /* Inform the caller that work is done */
+ sem_post(&work->wait_sem);
+ }
+
+ /* time to go???? */
+ if ((g_task_should_exit) && (g_fd < 0))
+ break;
+ }
+
+ close(g_task_fd);
+ g_task_fd = -1;
+
+ /* The work queue is now empty, empty the free queue */
+ for (;;) {
+ if ((work = (work_q_item_t *)sq_remfirst(&(g_free_q.q))) == NULL)
+ break;
+ if (work->first)
+ free(work);
+ }
+
+ destroy_q(&g_work_q);
+ destroy_q(&g_free_q);
+ sem_destroy(&g_work_queued_sema);
+ sem_destroy(&g_sys_state_mutex);
+
+ return 0;
+}
+
+static int
+start(void)
+{
+ int task;
+
+ sem_init(&g_init_sema, 1, 0);
+
+ /* start the worker thread */
+ if ((task = task_spawn_cmd("dataman", SCHED_DEFAULT, SCHED_PRIORITY_MAX - 5, 2000, task_main, NULL)) <= 0) {
+ warn("task start failed");
+ return -1;
+ }
+
+ /* wait for the thread to actually initialize */
+ sem_wait(&g_init_sema);
+ sem_destroy(&g_init_sema);
+
+ return 0;
+}
+
+static void
+status(void)
+{
+ /* display usage statistics */
+ warnx("Writes %d", g_func_counts[dm_write_func]);
+ warnx("Reads %d", g_func_counts[dm_read_func]);
+ warnx("Clears %d", g_func_counts[dm_clear_func]);
+ warnx("Restarts %d", g_func_counts[dm_restart_func]);
+ warnx("Max Q lengths work %d, free %d", g_work_q.max_size, g_free_q.max_size);
+}
+
+static void
+stop(void)
+{
+ /* Tell the worker task to shut down */
+ g_task_should_exit = true;
+ sem_post(&g_work_queued_sema);
+}
+
+static void
+usage(void)
+{
+ errx(1, "usage: dataman {start|stop|status|poweronrestart|inflightrestart}");
+}
+
+int
+dataman_main(int argc, char *argv[])
+{
+ if (argc < 2)
+ usage();
+
+ if (!strcmp(argv[1], "start")) {
+
+ if (g_fd >= 0)
+ errx(1, "already running");
+
+ start();
+
+ if (g_fd < 0)
+ errx(1, "start failed");
+
+ exit(0);
+ }
+
+ /* Worker thread should be running for all other commands */
+ if (g_fd < 0)
+ errx(1, "not running");
+
+ if (!strcmp(argv[1], "stop"))
+ stop();
+ else if (!strcmp(argv[1], "status"))
+ status();
+ else if (!strcmp(argv[1], "poweronrestart"))
+ dm_restart(DM_INIT_REASON_POWER_ON);
+ else if (!strcmp(argv[1], "inflightrestart"))
+ dm_restart(DM_INIT_REASON_IN_FLIGHT);
+ else
+ usage();
+
+ exit(1);
+}