aboutsummaryrefslogtreecommitdiff
path: root/src/modules/dataman/dataman.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/modules/dataman/dataman.c')
-rw-r--r--src/modules/dataman/dataman.c178
1 files changed, 108 insertions, 70 deletions
diff --git a/src/modules/dataman/dataman.c b/src/modules/dataman/dataman.c
index dc2d6c312..fa88dfaff 100644
--- a/src/modules/dataman/dataman.c
+++ b/src/modules/dataman/dataman.c
@@ -126,45 +126,46 @@ static const char *k_data_manager_device_path = "/fs/microsd/dataman";
/* The data manager work queues */
typedef struct {
- sq_queue_t q;
- sem_t mutex; /* Mutual exclusion on work queue adds and deletes */
- unsigned size;
- unsigned max_size;
+ sq_queue_t q; /* Nuttx queue */
+ sem_t mutex; /* Mutual exclusion on work queue adds and deletes */
+ unsigned size; /* Current size of queue */
+ unsigned max_size; /* Maximum queue size reached */
} work_q_t;
-static work_q_t g_free_q;
-static work_q_t g_work_q;
+static work_q_t g_free_q; /* queue of free work items. So that we don't always need to call malloc and free*/
+static work_q_t g_work_q; /* pending work items. To be consumed by worker thread */
-sem_t g_work_queued_sema;
+sem_t g_work_queued_sema; /* To notify worker thread a work item has been queued */
sem_t g_init_sema;
static bool g_task_should_exit; /**< if true, dataman task should exit */
-#define DM_SECTOR_HDR_SIZE 4
-static const unsigned k_sector_size = DM_MAX_DATA_SIZE + DM_SECTOR_HDR_SIZE;
+#define DM_SECTOR_HDR_SIZE 4 /* data manager per item header overhead */
+static const unsigned k_sector_size = DM_MAX_DATA_SIZE + DM_SECTOR_HDR_SIZE; /* total item sorage space */
static void init_q(work_q_t *q)
{
- sq_init(&(q->q));
- sem_init(&(q->mutex), 1, 1);
- q->size = q->max_size = 0;
+ sq_init(&(q->q)); /* Initialize the NuttX queue structure */
+ sem_init(&(q->mutex), 1, 1); /* Queue is initially unlocked */
+ q->size = q->max_size = 0; /* Queue is initially empty */
}
-static void destroy_q(work_q_t *q)
+static inline void
+destroy_q(work_q_t *q)
{
- sem_destroy(&(q->mutex));
+ sem_destroy(&(q->mutex)); /* Destroy the queue lock */
}
static inline void
lock_queue(work_q_t *q)
{
- sem_wait(&(q->mutex));
+ sem_wait(&(q->mutex)); /* Acquire the queue lock */
}
static inline void
unlock_queue(work_q_t *q)
{
- sem_post(&(q->mutex));
+ sem_post(&(q->mutex)); /* Release the queue lock */
}
static work_q_item_t *
@@ -172,54 +173,47 @@ create_work_item(void)
{
work_q_item_t *item;
+ /* Try to reuse item from free item queue */
lock_queue(&g_free_q);
if ((item = (work_q_item_t *)sq_remfirst(&(g_free_q.q))))
g_free_q.size--;
unlock_queue(&g_free_q);
+ /* If we there weren't any free items then obtain memory for a new one */
if (item == NULL)
item = (work_q_item_t *)malloc(sizeof(work_q_item_t));
+ /* If we got one then lock the item*/
if (item)
sem_init(&item->wait_sem, 1, 0); /* Caller will wait on this... initially locked */
+ /* return the item pointer, or NULL if all failed */
return item;
}
/* Work queue management functions */
-static void
-enqueue_work_item(work_q_item_t *item)
-{
- /* put the work item on the work queue */
- lock_queue(&g_work_q);
- sq_addlast(&item->link, &(g_work_q.q));
-
- if (++g_work_q.size > g_work_q.max_size)
- g_work_q.max_size = g_work_q.size;
-
- unlock_queue(&g_work_q);
- /* tell the work thread that work is available */
- sem_post(&g_work_queued_sema);
-}
-
-static void
+static inline void
destroy_work_item(work_q_item_t *item)
{
- sem_destroy(&item->wait_sem);
+ sem_destroy(&item->wait_sem); /* Destroy the item lock */
+ /* Return the item to the free item queue for later reuse */
lock_queue(&g_free_q);
sq_addfirst(&item->link, &(g_free_q.q));
+ /* Update the queue size and potentially the maximum queue size */
if (++g_free_q.size > g_free_q.max_size)
g_free_q.max_size = g_free_q.size;
unlock_queue(&g_free_q);
}
-static work_q_item_t *
+static inline work_q_item_t *
dequeue_work_item(void)
{
work_q_item_t *work;
+
+ /* retrieve the 1st item on the work queue */
lock_queue(&g_work_q);
if ((work = (work_q_item_t *)sq_remfirst(&g_work_q.q)))
@@ -229,6 +223,32 @@ dequeue_work_item(void)
return work;
}
+static int
+enqueue_work_item_and_wait_for_result(work_q_item_t *item)
+{
+ /* put the work item at the end of the work queue */
+ lock_queue(&g_work_q);
+ sq_addlast(&item->link, &(g_work_q.q));
+
+ /* Adjust the queue size and potentially the maximum queue size */
+ if (++g_work_q.size > g_work_q.max_size)
+ g_work_q.max_size = g_work_q.size;
+
+ unlock_queue(&g_work_q);
+
+ /* tell the work thread that work is available */
+ sem_post(&g_work_queued_sema);
+
+ /* wait for the result */
+ sem_wait(&item->wait_sem);
+
+ int result = item->result;
+
+ destroy_work_item(item);
+
+ return result;
+}
+
/* Calculate the offset in file of specific item */
static int
calculate_offset(dm_item_t item, unsigned char index)
@@ -250,6 +270,8 @@ calculate_offset(dm_item_t item, unsigned char index)
*
* byte 0: Length of user data item
* byte 1: Persistence of this data item
+ * byte 2: Unused (for future use)
+ * byte 3: Unused (for future use)
* byte DM_SECTOR_HDR_SIZE... : data item value
*
* The total size must not exceed k_sector_size
@@ -266,6 +288,7 @@ _write(dm_item_t item, unsigned char index, dm_persitence_t persistence, const v
/* Get the offset for this item */
offset = calculate_offset(item, index);
+ /* If item type or index out of range, return error */
if (offset < 0)
return -1;
@@ -283,10 +306,12 @@ _write(dm_item_t item, unsigned char index, dm_persitence_t persistence, const v
len = -1;
+ /* Seek to the right spot in the data manager file and write the data item */
if (lseek(g_task_fd, offset, SEEK_SET) == offset)
if ((len = write(g_task_fd, buffer, count)) == count)
- fsync(g_task_fd);
+ fsync(g_task_fd); /* Make sure data is written to physical media */
+ /* Make sure the write succeeded */
if (len != count)
return -1;
@@ -304,6 +329,7 @@ _read(dm_item_t item, unsigned char index, void *buf, size_t count)
/* Get the offset for this item */
offset = calculate_offset(item, index);
+ /* If item type or index out of range, return error */
if (offset < 0)
return -1;
@@ -316,14 +342,17 @@ _read(dm_item_t item, unsigned char index, void *buf, size_t count)
if (lseek(g_task_fd, offset, SEEK_SET) == offset)
len = read(g_task_fd, buffer, count + DM_SECTOR_HDR_SIZE);
- /* Check for length issues */
+ /* Check for read error */
if (len < 0)
return -1;
+ /* A zero length entry is a empty entry */
if (len == 0)
buffer[0] = 0;
+ /* See if we got data */
if (buffer[0] > 0) {
+ /* We got more than requested!!! */
if (buffer[0] > count)
return -1;
@@ -340,11 +369,14 @@ _clear(dm_item_t item)
{
int i, result = 0;
+ /* Get the offset of 1st item of this type */
int offset = calculate_offset(item, 0);
+ /* Check for item type out of range */
if (offset < 0)
return -1;
+ /* Clear all items of this type */
for (i = 0; (unsigned)i < g_per_item_max_index[item]; i++) {
char buf[1];
@@ -353,9 +385,11 @@ _clear(dm_item_t item)
break;
}
+ /* Avoid SD flash wear by only doing writes where necessary */
if (read(g_task_fd, buf, 1) < 1)
break;
+ /* If item has length greater than 0 it needs to be overwritten */
if (buf[0]) {
if (lseek(g_task_fd, offset, SEEK_SET) != offset) {
result = -1;
@@ -373,6 +407,7 @@ _clear(dm_item_t item)
offset += k_sector_size;
}
+ /* Make sure data is actually written to physical media */
fsync(g_task_fd);
return result;
}
@@ -452,12 +487,13 @@ dm_write(dm_item_t item, unsigned char index, dm_persitence_t persistence, const
{
work_q_item_t *work;
+ /* Make sure data manager has been started and is not shutting down */
if ((g_fd < 0) || g_task_should_exit)
return -1;
- /* Will return with queues locked */
+ /* get a work item and queue up a write request */
if ((work = create_work_item()) == NULL)
- return -1; /* queues unlocked on failure */
+ return -1;
work->func = dm_write_func;
work->write_params.item = item;
@@ -465,12 +501,9 @@ dm_write(dm_item_t item, unsigned char index, dm_persitence_t persistence, const
work->write_params.persistence = persistence;
work->write_params.buf = buf;
work->write_params.count = count;
- enqueue_work_item(work);
- sem_wait(&work->wait_sem);
- ssize_t result = work->result;
- destroy_work_item(work);
- return result;
+ /* Enqueue the item on the work queue and wait for the worker thread to complete processing it */
+ return (ssize_t)enqueue_work_item_and_wait_for_result(work);
}
/* Retrieve from the data manager file */
@@ -479,24 +512,22 @@ dm_read(dm_item_t item, unsigned char index, void *buf, size_t count)
{
work_q_item_t *work;
+ /* Make sure data manager has been started and is not shutting down */
if ((g_fd < 0) || g_task_should_exit)
return -1;
- /* Will return with queues locked */
+ /* get a work item and queue up a read request */
if ((work = create_work_item()) == NULL)
- return -1; /* queues unlocked on failure */
+ return -1;
work->func = dm_read_func;
work->read_params.item = item;
work->read_params.index = index;
work->read_params.buf = buf;
work->read_params.count = count;
- enqueue_work_item(work);
- sem_wait(&work->wait_sem);
- ssize_t result = work->result;
- destroy_work_item(work);
- return result;
+ /* Enqueue the item on the work queue and wait for the worker thread to complete processing it */
+ return (ssize_t)enqueue_work_item_and_wait_for_result(work);
}
__EXPORT int
@@ -504,21 +535,19 @@ dm_clear(dm_item_t item)
{
work_q_item_t *work;
+ /* Make sure data manager has been started and is not shutting down */
if ((g_fd < 0) || g_task_should_exit)
return -1;
- /* Will return with queues locked */
+ /* get a work item and queue up a clear request */
if ((work = create_work_item()) == NULL)
- return -1; /* queues unlocked on failure */
+ return -1;
work->func = dm_clear_func;
work->clear_params.item = item;
- enqueue_work_item(work);
- sem_wait(&work->wait_sem);
- int result = work->result;
- destroy_work_item(work);
- return result;
+ /* Enqueue the item on the work queue and wait for the worker thread to complete processing it */
+ return enqueue_work_item_and_wait_for_result(work);
}
/* Tell the data manager about the type of the last reset */
@@ -527,21 +556,19 @@ dm_restart(dm_reset_reason reason)
{
work_q_item_t *work;
+ /* Make sure data manager has been started and is not shutting down */
if ((g_fd < 0) || g_task_should_exit)
return -1;
- /* Will return with queues locked */
+ /* get a work item and queue up a restart request */
if ((work = create_work_item()) == NULL)
- return -1; /* queues unlocked on failure */
+ return -1;
work->func = dm_restart_func;
work->restart_params.reason = reason;
- enqueue_work_item(work);
- sem_wait(&work->wait_sem);
- int result = work->result;
- destroy_work_item(work);
- return result;
+ /* Enqueue the item on the work queue and wait for the worker thread to complete processing it */
+ return enqueue_work_item_and_wait_for_result(work);
}
static int
@@ -570,24 +597,29 @@ task_main(int argc, char *argv[])
sem_init(&g_work_queued_sema, 1, 0);
+ /* Open or create the data manager file */
g_task_fd = open(k_data_manager_device_path, O_RDWR | O_CREAT | O_BINARY);
if (g_task_fd < 0) {
warnx("Could not open data manager file %s", k_data_manager_device_path);
- sem_post(&g_init_sema);
+ sem_post(&g_init_sema); /* Don't want to hang startup */
return -1;
}
if (lseek(g_task_fd, max_offset, SEEK_SET) != max_offset) {
close(g_task_fd);
warnx("Could not seek data manager file %s", k_data_manager_device_path);
- sem_post(&g_init_sema);
+ sem_post(&g_init_sema); /* Don't want to hang startup */
return -1;
}
fsync(g_task_fd);
+ /* We use two file descriptors, one for the caller context and one for the worker thread */
+ /* They are actually the same but we need to some way to reject caller request while the */
+ /* worker thread is shutting down but still processing requests */
g_fd = g_task_fd;
warnx("Initialized, data manager file '%s' size is %d bytes", k_data_manager_device_path, max_offset);
+ /* Tell startup that the worker thread has completed its initialization */
sem_post(&g_init_sema);
/* Start the endless loop, waiting for then processing work requests */
@@ -595,7 +627,7 @@ task_main(int argc, char *argv[])
/* do we need to exit ??? */
if ((g_task_should_exit) && (g_fd >= 0)) {
- /* Close the file handle to stop further queueing */
+ /* Close the file handle to stop further queuing */
g_fd = -1;
}
@@ -607,6 +639,7 @@ task_main(int argc, char *argv[])
/* Empty the work queue */
while ((work = dequeue_work_item())) {
+ /* handle each work item with the appropriate handler */
switch (work->func) {
case dm_write_func:
g_func_counts[dm_write_func]++;
@@ -647,7 +680,7 @@ task_main(int argc, char *argv[])
close(g_task_fd);
g_task_fd = -1;
- /* Empty the work queue */
+ /* The work queue is now empty, empty the free queue */
for (;;) {
if ((work = (work_q_item_t *)sq_remfirst(&(g_free_q.q))) == NULL)
break;
@@ -669,7 +702,7 @@ start(void)
sem_init(&g_init_sema, 1, 0);
- /* start the task */
+ /* start the worker thread */
if ((task = task_spawn_cmd("dataman", SCHED_DEFAULT, SCHED_PRIORITY_MAX - 5, 2048, task_main, NULL)) <= 0) {
warn("task start failed");
return -1;
@@ -704,7 +737,7 @@ stop(void)
static void
usage(void)
{
- errx(1, "usage: dataman {start|stop|status}");
+ errx(1, "usage: dataman {start|stop|status|poweronrestart|inflightrestart}");
}
int
@@ -726,6 +759,7 @@ dataman_main(int argc, char *argv[])
exit(0);
}
+ /* Worker thread should be running for all other commands */
if (g_fd < 0)
errx(1, "not running");
@@ -733,6 +767,10 @@ dataman_main(int argc, char *argv[])
stop();
else if (!strcmp(argv[1], "status"))
status();
+ else if (!strcmp(argv[1], "poweronrestart"))
+ dm_restart(DM_INIT_REASON_POWER_ON);
+ else if (!strcmp(argv[1], "inflightrestart"))
+ dm_restart(DM_INIT_REASON_IN_FLIGHT);
else
usage();