#!/usr/bin/env python ''' parse a MAVLink protocol XML file and generate a python implementation Copyright Andrew Tridgell 2011 Released under GNU GPL version 3 or later ''' import sys, textwrap, os import mavparse, mavtemplate t = mavtemplate.MAVTemplate() def generate_preamble(outf, msgs, args, xml): print("Generating preamble") t.write(outf, """ ''' MAVLink protocol implementation (auto-generated by mavgen.py) Generated from: ${FILELIST} Note: this file has been auto-generated. DO NOT EDIT ''' import struct, array, mavutil, time WIRE_PROTOCOL_VERSION = "${WIRE_PROTOCOL_VERSION}" class MAVLink_header(object): '''MAVLink message header''' def __init__(self, msgId, mlen=0, seq=0, srcSystem=0, srcComponent=0): self.mlen = mlen self.seq = seq self.srcSystem = srcSystem self.srcComponent = srcComponent self.msgId = msgId def pack(self): return struct.pack('BBBBBB', ${PROTOCOL_MARKER}, self.mlen, self.seq, self.srcSystem, self.srcComponent, self.msgId) class MAVLink_message(object): '''base MAVLink message class''' def __init__(self, msgId, name): self._header = MAVLink_header(msgId) self._payload = None self._msgbuf = None self._crc = None self._fieldnames = [] self._type = name def get_msgbuf(self): return self._msgbuf def get_header(self): return self._header def get_payload(self): return self._payload def get_crc(self): return self._crc def get_fieldnames(self): return self._fieldnames def get_type(self): return self._type def get_msgId(self): return self._header.msgId def get_srcSystem(self): return self._header.srcSystem def get_srcComponent(self): return self._header.srcComponent def get_seq(self): return self._header.seq def __str__(self): ret = '%s {' % self._type for a in self._fieldnames: v = getattr(self, a) ret += '%s : %s, ' % (a, v) ret = ret[0:-2] + '}' return ret def pack(self, mav, crc_extra, payload): self._payload = payload self._header = MAVLink_header(self._header.msgId, len(payload), mav.seq, mav.srcSystem, mav.srcComponent) self._msgbuf = self._header.pack() + payload crc = mavutil.x25crc(self._msgbuf[1:]) if ${crc_extra}: # using CRC extra crc.accumulate(chr(crc_extra)) self._crc = crc.crc self._msgbuf += struct.pack('= 1 and self.buf[0] != ${protocol_marker}: magic = self.buf[0] self.buf = self.buf[1:] if self.robust_parsing: m = MAVLink_bad_data(chr(magic), "Bad prefix") if self.callback: self.callback(m, *self.callback_args, **self.callback_kwargs) self.expected_length = 6 self.total_receive_errors += 1 return m if self.have_prefix_error: return None self.have_prefix_error = True self.total_receive_errors += 1 raise MAVError("invalid MAVLink prefix '%s'" % magic) self.have_prefix_error = False if len(self.buf) >= 2: (magic, self.expected_length) = struct.unpack('BB', self.buf[0:2]) self.expected_length += 8 if self.expected_length >= 8 and len(self.buf) >= self.expected_length: mbuf = self.buf[0:self.expected_length] self.buf = self.buf[self.expected_length:] self.expected_length = 6 if self.robust_parsing: try: m = self.decode(mbuf) self.total_packets_received += 1 except MAVError as reason: m = MAVLink_bad_data(mbuf, reason.message) self.total_receive_errors += 1 else: m = self.decode(mbuf) self.total_packets_received += 1 if self.callback: self.callback(m, *self.callback_args, **self.callback_kwargs) return m return None def parse_buffer(self, s): '''input some data bytes, possibly returning a list of new messages''' m = self.parse_char(s) if m is None: return None ret = [m] while True: m = self.parse_char("") if m is None: return ret ret.append(m) return ret def decode(self, msgbuf): '''decode a buffer as a MAVLink message''' # decode the header try: magic, mlen, seq, srcSystem, srcComponent, msgId = struct.unpack('cBBBBB', msgbuf[:6]) except struct.error as emsg: raise MAVError('Unable to unpack MAVLink header: %s' % emsg) if ord(magic) != ${protocol_marker}: raise MAVError("invalid MAVLink prefix '%s'" % magic) if mlen != len(msgbuf)-8: raise MAVError('invalid MAVLink message length. Got %u expected %u, msgId=%u' % (len(msgbuf)-8, mlen, msgId)) if not msgId in mavlink_map: raise MAVError('unknown MAVLink message ID %u' % msgId) # decode the payload (fmt, type, order_map, crc_extra) = mavlink_map[msgId] # decode the checksum try: crc, = struct.unpack('