From 15defeb0f1e6ce396562685316a8734fcd867081 Mon Sep 17 00:00:00 2001 From: Julian Oes Date: Thu, 3 Apr 2014 21:15:47 +0200 Subject: mavlink: implemented multicasting between mavlink instances (two options: forwarding: forward received messages from self to other mavlink instances, passing: send out messages received from other mavlink intances over serial --- src/modules/mavlink/mavlink_main.cpp | 197 ++++++++++++++++++++++++++++++++++- 1 file changed, 194 insertions(+), 3 deletions(-) (limited to 'src/modules/mavlink/mavlink_main.cpp') diff --git a/src/modules/mavlink/mavlink_main.cpp b/src/modules/mavlink/mavlink_main.cpp index 18df577fe..c5055939e 100644 --- a/src/modules/mavlink/mavlink_main.cpp +++ b/src/modules/mavlink/mavlink_main.cpp @@ -207,10 +207,15 @@ Mavlink::Mavlink() : _subscriptions(nullptr), _streams(nullptr), _mission_pub(-1), + _verbose(false), + _forwarding_on(false), + _passing_on(false), + _uart_fd(-1), _mavlink_param_queue_index(0), _subscribe_to_stream(nullptr), _subscribe_to_stream_rate(0.0f), _flow_control_enabled(true), + _message_buffer({}), /* performance counters */ _loop_perf(perf_alloc(PC_ELAPSED, "mavlink")) @@ -261,7 +266,6 @@ Mavlink::Mavlink() : errx(1, "instance ID is out of range"); break; } - } Mavlink::~Mavlink() @@ -394,6 +398,20 @@ Mavlink::instance_exists(const char *device_name, Mavlink *self) return false; } +void +Mavlink::forward_message(mavlink_message_t *msg, Mavlink *self) +{ + Mavlink *inst = ::_mavlink_instances; + + while (inst != nullptr) { + /* don't broadcast to itself */ + if (inst != self) { + inst->pass_message(msg); + } + inst = inst->next; + } +} + int Mavlink::get_uart_fd(unsigned index) { @@ -1616,6 +1634,125 @@ Mavlink::configure_stream_threadsafe(const char *stream_name, const float rate) } } +int +Mavlink::message_buffer_init(int size) +{ + _message_buffer.size = size; + _message_buffer.write_ptr = 0; + _message_buffer.read_ptr = 0; + _message_buffer.data = (char*)malloc(_message_buffer.size); + return (_message_buffer.data == 0) ? ERROR : OK; +} + +void +Mavlink::message_buffer_destroy() +{ + _message_buffer.size = 0; + _message_buffer.write_ptr = 0; + _message_buffer.read_ptr = 0; + free(_message_buffer.data); +} + +int +Mavlink::message_buffer_count() +{ + int n = _message_buffer.write_ptr - _message_buffer.read_ptr; + + if (n < 0) { + n += _message_buffer.size; + } + + return n; +} + +int +Mavlink::message_buffer_is_empty() +{ + return _message_buffer.read_ptr == _message_buffer.write_ptr; +} + + +bool +Mavlink::message_buffer_write(void *ptr, int size) +{ + // bytes available to write + int available = _message_buffer.read_ptr - _message_buffer.write_ptr - 1; + + if (available < 0) { + available += _message_buffer.size; + } + + if (size > available) { + // buffer overflow + return false; + } + + char *c = (char *) ptr; + int n = _message_buffer.size - _message_buffer.write_ptr; // bytes to end of the buffer + + if (n < size) { + // message goes over end of the buffer + memcpy(&(_message_buffer.data[_message_buffer.write_ptr]), c, n); + _message_buffer.write_ptr = 0; + + } else { + n = 0; + } + + // now: n = bytes already written + int p = size - n; // number of bytes to write + memcpy(&(_message_buffer.data[_message_buffer.write_ptr]), &(c[n]), p); + _message_buffer.write_ptr = (_message_buffer.write_ptr + p) % _message_buffer.size; + return true; +} + +int +Mavlink::message_buffer_get_ptr(void **ptr, bool *is_part) +{ + // bytes available to read + int available = _message_buffer.write_ptr - _message_buffer.read_ptr; + + if (available == 0) { + return 0; // buffer is empty + } + + int n = 0; + + if (available > 0) { + // read pointer is before write pointer, all available bytes can be read + n = available; + *is_part = false; + + } else { + // read pointer is after write pointer, read bytes from read_ptr to end of the buffer + n = _message_buffer.size - _message_buffer.read_ptr; + *is_part = _message_buffer.write_ptr > 0; + } + + *ptr = &(_message_buffer.data[_message_buffer.read_ptr]); + return n; +} + +void +Mavlink::message_buffer_mark_read(int n) +{ + _message_buffer.read_ptr = (_message_buffer.read_ptr + n) % _message_buffer.size; +} + +void +Mavlink::pass_message(mavlink_message_t *msg) +{ + if (_passing_on) { + /* size is 8 bytes plus variable payload */ + int size = MAVLINK_NUM_NON_PAYLOAD_BYTES + msg->len; + pthread_mutex_lock(&_message_buffer_mutex); + message_buffer_write(msg, size); + pthread_mutex_unlock(&_message_buffer_mutex); + } +} + + + int Mavlink::task_main(int argc, char *argv[]) { @@ -1632,7 +1769,7 @@ Mavlink::task_main(int argc, char *argv[]) * set error flag instead */ bool err_flag = false; - while ((ch = getopt(argc, argv, "b:r:d:m:v")) != EOF) { + while ((ch = getopt(argc, argv, "b:r:d:m:fpv")) != EOF) { switch (ch) { case 'b': _baudrate = strtoul(optarg, NULL, 10); @@ -1672,6 +1809,14 @@ Mavlink::task_main(int argc, char *argv[]) break; + case 'f': + _forwarding_on = true; + break; + + case 'p': + _passing_on = true; + break; + case 'v': _verbose = true; break; @@ -1740,6 +1885,17 @@ Mavlink::task_main(int argc, char *argv[]) /* initialize mavlink text message buffering */ mavlink_logbuffer_init(&_logbuffer, 5); + /* if we are passing on mavlink messages, we need to prepare a buffer for this instance */ + if (_passing_on) { + /* initialize message buffer if multiplexing is on */ + if (OK != message_buffer_init(500)) { + errx(1, "can't allocate message buffer, exiting"); + } + + /* initialize message buffer mutex */ + pthread_mutex_init(&_message_buffer_mutex, NULL); + } + /* create the device node that's used for sending text log messages, etc. */ register_driver(MAVLINK_LOG_DEVICE, &fops, 0666, NULL); @@ -1884,6 +2040,37 @@ Mavlink::task_main(int argc, char *argv[]) } } + /* pass messages from other UARTs */ + if (_passing_on) { + + bool is_part; + void *read_ptr; + + /* guard get ptr by mutex */ + pthread_mutex_lock(&_message_buffer_mutex); + int available = message_buffer_get_ptr(&read_ptr, &is_part); + pthread_mutex_unlock(&_message_buffer_mutex); + + if (available > 0) { + /* write first part of buffer */ + _mavlink_resend_uart(_channel, (const mavlink_message_t*)read_ptr); + message_buffer_mark_read(available); + + /* write second part of buffer if there is some */ + if (is_part) { + /* guard get ptr by mutex */ + pthread_mutex_lock(&_message_buffer_mutex); + available = message_buffer_get_ptr(&read_ptr, &is_part); + pthread_mutex_unlock(&_message_buffer_mutex); + + _mavlink_resend_uart(_channel, (const mavlink_message_t*)read_ptr); + message_buffer_mark_read(available); + } + } + } + + + perf_end(_loop_perf); } @@ -1928,6 +2115,10 @@ Mavlink::task_main(int argc, char *argv[]) /* close mavlink logging device */ close(_mavlink_fd); + if (_passing_on) { + message_buffer_destroy(); + pthread_mutex_destroy(&_message_buffer_mutex); + } /* destroy log buffer */ mavlink_logbuffer_destroy(&_logbuffer); @@ -2067,7 +2258,7 @@ Mavlink::stream(int argc, char *argv[]) static void usage() { - warnx("usage: mavlink {start|stop-all|stream} [-d device] [-b baudrate] [-r rate] [-m mode] [-s stream] [-v]"); + warnx("usage: mavlink {start|stop-all|stream} [-d device] [-b baudrate] [-r rate] [-m mode] [-s stream] [-f] [-p] [-v]"); } int mavlink_main(int argc, char *argv[]) -- cgit v1.2.3