diff options
author | px4dev <px4@purgatory.org> | 2012-08-04 15:12:36 -0700 |
---|---|---|
committer | px4dev <px4@purgatory.org> | 2012-08-04 15:12:36 -0700 |
commit | 8a365179eafdf3aea98e60ab9f5882b200d4c759 (patch) | |
tree | 4f38d6d4cd80bd0b6e22e2bb534c3f117ce44e56 /mavlink/share/pyshared/pymavlink/generator/mavgen_python.py | |
download | px4-firmware-8a365179eafdf3aea98e60ab9f5882b200d4c759.tar.gz px4-firmware-8a365179eafdf3aea98e60ab9f5882b200d4c759.tar.bz2 px4-firmware-8a365179eafdf3aea98e60ab9f5882b200d4c759.zip |
Fresh import of the PX4 firmware sources.
Diffstat (limited to 'mavlink/share/pyshared/pymavlink/generator/mavgen_python.py')
-rw-r--r-- | mavlink/share/pyshared/pymavlink/generator/mavgen_python.py | 455 |
1 files changed, 455 insertions, 0 deletions
diff --git a/mavlink/share/pyshared/pymavlink/generator/mavgen_python.py b/mavlink/share/pyshared/pymavlink/generator/mavgen_python.py new file mode 100644 index 000000000..fad366a68 --- /dev/null +++ b/mavlink/share/pyshared/pymavlink/generator/mavgen_python.py @@ -0,0 +1,455 @@ +#!/usr/bin/env python +''' +parse a MAVLink protocol XML file and generate a python implementation + +Copyright Andrew Tridgell 2011 +Released under GNU GPL version 3 or later +''' + +import sys, textwrap, os +import mavparse, mavtemplate + +t = mavtemplate.MAVTemplate() + +def generate_preamble(outf, msgs, args, xml): + print("Generating preamble") + t.write(outf, """ +''' +MAVLink protocol implementation (auto-generated by mavgen.py) + +Generated from: ${FILELIST} + +Note: this file has been auto-generated. DO NOT EDIT +''' + +import struct, array, mavutil, time + +WIRE_PROTOCOL_VERSION = "${WIRE_PROTOCOL_VERSION}" + +class MAVLink_header(object): + '''MAVLink message header''' + def __init__(self, msgId, mlen=0, seq=0, srcSystem=0, srcComponent=0): + self.mlen = mlen + self.seq = seq + self.srcSystem = srcSystem + self.srcComponent = srcComponent + self.msgId = msgId + + def pack(self): + return struct.pack('BBBBBB', ${PROTOCOL_MARKER}, self.mlen, self.seq, + self.srcSystem, self.srcComponent, self.msgId) + +class MAVLink_message(object): + '''base MAVLink message class''' + def __init__(self, msgId, name): + self._header = MAVLink_header(msgId) + self._payload = None + self._msgbuf = None + self._crc = None + self._fieldnames = [] + self._type = name + + def get_msgbuf(self): + return self._msgbuf + + def get_header(self): + return self._header + + def get_payload(self): + return self._payload + + def get_crc(self): + return self._crc + + def get_fieldnames(self): + return self._fieldnames + + def get_type(self): + return self._type + + def get_msgId(self): + return self._header.msgId + + def get_srcSystem(self): + return self._header.srcSystem + + def get_srcComponent(self): + return self._header.srcComponent + + def get_seq(self): + return self._header.seq + + def __str__(self): + ret = '%s {' % self._type + for a in self._fieldnames: + v = getattr(self, a) + ret += '%s : %s, ' % (a, v) + ret = ret[0:-2] + '}' + return ret + + def pack(self, mav, crc_extra, payload): + self._payload = payload + self._header = MAVLink_header(self._header.msgId, len(payload), mav.seq, + mav.srcSystem, mav.srcComponent) + self._msgbuf = self._header.pack() + payload + crc = mavutil.x25crc(self._msgbuf[1:]) + if ${crc_extra}: # using CRC extra + crc.accumulate(chr(crc_extra)) + self._crc = crc.crc + self._msgbuf += struct.pack('<H', self._crc) + return self._msgbuf + +""", {'FILELIST' : ",".join(args), + 'PROTOCOL_MARKER' : xml.protocol_marker, + 'crc_extra' : xml.crc_extra, + 'WIRE_PROTOCOL_VERSION' : xml.wire_protocol_version }) + + +def generate_enums(outf, enums): + print("Generating enums") + outf.write("\n# enums\n") + wrapper = textwrap.TextWrapper(initial_indent="", subsequent_indent=" # ") + for e in enums: + outf.write("\n# %s\n" % e.name) + for entry in e.entry: + outf.write("%s = %u # %s\n" % (entry.name, entry.value, wrapper.fill(entry.description))) + +def generate_message_ids(outf, msgs): + print("Generating message IDs") + outf.write("\n# message IDs\n") + outf.write("MAVLINK_MSG_ID_BAD_DATA = -1\n") + for m in msgs: + outf.write("MAVLINK_MSG_ID_%s = %u\n" % (m.name.upper(), m.id)) + +def generate_classes(outf, msgs): + print("Generating class definitions") + wrapper = textwrap.TextWrapper(initial_indent=" ", subsequent_indent=" ") + for m in msgs: + outf.write(""" +class MAVLink_%s_message(MAVLink_message): + ''' +%s + ''' + def __init__(self""" % (m.name.lower(), wrapper.fill(m.description.strip()))) + if len(m.fields) != 0: + outf.write(", " + ", ".join(m.fieldnames)) + outf.write("):\n") + outf.write(" MAVLink_message.__init__(self, MAVLINK_MSG_ID_%s, '%s')\n" % (m.name.upper(), m.name.upper())) + if len(m.fieldnames) != 0: + outf.write(" self._fieldnames = ['%s']\n" % "', '".join(m.fieldnames)) + for f in m.fields: + outf.write(" self.%s = %s\n" % (f.name, f.name)) + outf.write(""" + def pack(self, mav): + return MAVLink_message.pack(self, mav, %u, struct.pack('%s'""" % (m.crc_extra, m.fmtstr)) + if len(m.fields) != 0: + outf.write(", self." + ", self.".join(m.ordered_fieldnames)) + outf.write("))\n") + + +def mavfmt(field): + '''work out the struct format for a type''' + map = { + 'float' : 'f', + 'double' : 'd', + 'char' : 'c', + 'int8_t' : 'b', + 'uint8_t' : 'B', + 'uint8_t_mavlink_version' : 'B', + 'int16_t' : 'h', + 'uint16_t' : 'H', + 'int32_t' : 'i', + 'uint32_t' : 'I', + 'int64_t' : 'q', + 'uint64_t' : 'Q', + } + + if field.array_length: + if field.type in ['char', 'int8_t', 'uint8_t']: + return str(field.array_length)+'s' + return str(field.array_length)+map[field.type] + return map[field.type] + +def generate_mavlink_class(outf, msgs, xml): + print("Generating MAVLink class") + + outf.write("\n\nmavlink_map = {\n"); + for m in msgs: + outf.write(" MAVLINK_MSG_ID_%s : ( '%s', MAVLink_%s_message, %s, %u ),\n" % ( + m.name.upper(), m.fmtstr, m.name.lower(), m.order_map, m.crc_extra)) + outf.write("}\n\n") + + t.write(outf, """ +class MAVError(Exception): + '''MAVLink error class''' + def __init__(self, msg): + Exception.__init__(self, msg) + self.message = msg + +class MAVString(str): + '''NUL terminated string''' + def __init__(self, s): + str.__init__(self) + def __str__(self): + i = self.find(chr(0)) + if i == -1: + return self[:] + return self[0:i] + +class MAVLink_bad_data(MAVLink_message): + ''' + a piece of bad data in a mavlink stream + ''' + def __init__(self, data, reason): + MAVLink_message.__init__(self, MAVLINK_MSG_ID_BAD_DATA, 'BAD_DATA') + self._fieldnames = ['data', 'reason'] + self.data = data + self.reason = reason + self._msgbuf = data + +class MAVLink(object): + '''MAVLink protocol handling class''' + def __init__(self, file, srcSystem=0, srcComponent=0): + self.seq = 0 + self.file = file + self.srcSystem = srcSystem + self.srcComponent = srcComponent + self.callback = None + self.callback_args = None + self.callback_kwargs = None + self.buf = array.array('B') + self.expected_length = 6 + self.have_prefix_error = False + self.robust_parsing = False + self.protocol_marker = ${protocol_marker} + self.little_endian = ${little_endian} + self.crc_extra = ${crc_extra} + self.sort_fields = ${sort_fields} + self.total_packets_sent = 0 + self.total_bytes_sent = 0 + self.total_packets_received = 0 + self.total_bytes_received = 0 + self.total_receive_errors = 0 + self.startup_time = time.time() + + def set_callback(self, callback, *args, **kwargs): + self.callback = callback + self.callback_args = args + self.callback_kwargs = kwargs + + def send(self, mavmsg): + '''send a MAVLink message''' + buf = mavmsg.pack(self) + self.file.write(buf) + self.seq = (self.seq + 1) % 255 + self.total_packets_sent += 1 + self.total_bytes_sent += len(buf) + + def bytes_needed(self): + '''return number of bytes needed for next parsing stage''' + ret = self.expected_length - len(self.buf) + if ret <= 0: + return 1 + return ret + + def parse_char(self, c): + '''input some data bytes, possibly returning a new message''' + if isinstance(c, str): + self.buf.fromstring(c) + else: + self.buf.extend(c) + self.total_bytes_received += len(c) + if len(self.buf) >= 1 and self.buf[0] != ${protocol_marker}: + magic = self.buf[0] + self.buf = self.buf[1:] + if self.robust_parsing: + m = MAVLink_bad_data(chr(magic), "Bad prefix") + if self.callback: + self.callback(m, *self.callback_args, **self.callback_kwargs) + self.expected_length = 6 + self.total_receive_errors += 1 + return m + if self.have_prefix_error: + return None + self.have_prefix_error = True + self.total_receive_errors += 1 + raise MAVError("invalid MAVLink prefix '%s'" % magic) + self.have_prefix_error = False + if len(self.buf) >= 2: + (magic, self.expected_length) = struct.unpack('BB', self.buf[0:2]) + self.expected_length += 8 + if self.expected_length >= 8 and len(self.buf) >= self.expected_length: + mbuf = self.buf[0:self.expected_length] + self.buf = self.buf[self.expected_length:] + self.expected_length = 6 + if self.robust_parsing: + try: + m = self.decode(mbuf) + self.total_packets_received += 1 + except MAVError as reason: + m = MAVLink_bad_data(mbuf, reason.message) + self.total_receive_errors += 1 + else: + m = self.decode(mbuf) + self.total_packets_received += 1 + if self.callback: + self.callback(m, *self.callback_args, **self.callback_kwargs) + return m + return None + + def parse_buffer(self, s): + '''input some data bytes, possibly returning a list of new messages''' + m = self.parse_char(s) + if m is None: + return None + ret = [m] + while True: + m = self.parse_char("") + if m is None: + return ret + ret.append(m) + return ret + + def decode(self, msgbuf): + '''decode a buffer as a MAVLink message''' + # decode the header + try: + magic, mlen, seq, srcSystem, srcComponent, msgId = struct.unpack('cBBBBB', msgbuf[:6]) + except struct.error as emsg: + raise MAVError('Unable to unpack MAVLink header: %s' % emsg) + if ord(magic) != ${protocol_marker}: + raise MAVError("invalid MAVLink prefix '%s'" % magic) + if mlen != len(msgbuf)-8: + raise MAVError('invalid MAVLink message length. Got %u expected %u, msgId=%u' % (len(msgbuf)-8, mlen, msgId)) + + if not msgId in mavlink_map: + raise MAVError('unknown MAVLink message ID %u' % msgId) + + # decode the payload + (fmt, type, order_map, crc_extra) = mavlink_map[msgId] + + # decode the checksum + try: + crc, = struct.unpack('<H', msgbuf[-2:]) + except struct.error as emsg: + raise MAVError('Unable to unpack MAVLink CRC: %s' % emsg) + crc2 = mavutil.x25crc(msgbuf[1:-2]) + if ${crc_extra}: # using CRC extra + crc2.accumulate(chr(crc_extra)) + if crc != crc2.crc: + raise MAVError('invalid MAVLink CRC in msgID %u 0x%04x should be 0x%04x' % (msgId, crc, crc2.crc)) + + try: + t = struct.unpack(fmt, msgbuf[6:-2]) + except struct.error as emsg: + raise MAVError('Unable to unpack MAVLink payload type=%s fmt=%s payloadLength=%u: %s' % ( + type, fmt, len(msgbuf[6:-2]), emsg)) + + tlist = list(t) + # handle sorted fields + if ${sort_fields}: + t = tlist[:] + for i in range(0, len(tlist)): + tlist[i] = t[order_map[i]] + + # terminate any strings + for i in range(0, len(tlist)): + if isinstance(tlist[i], str): + tlist[i] = MAVString(tlist[i]) + t = tuple(tlist) + # construct the message object + try: + m = type(*t) + except Exception as emsg: + raise MAVError('Unable to instantiate MAVLink message of type %s : %s' % (type, emsg)) + m._msgbuf = msgbuf + m._payload = msgbuf[6:-2] + m._crc = crc + m._header = MAVLink_header(msgId, mlen, seq, srcSystem, srcComponent) + return m +""", xml) + +def generate_methods(outf, msgs): + print("Generating methods") + + def field_descriptions(fields): + ret = "" + for f in fields: + ret += " %-18s : %s (%s)\n" % (f.name, f.description.strip(), f.type) + return ret + + wrapper = textwrap.TextWrapper(initial_indent="", subsequent_indent=" ") + + for m in msgs: + comment = "%s\n\n%s" % (wrapper.fill(m.description.strip()), field_descriptions(m.fields)) + + selffieldnames = 'self, ' + for f in m.fields: + if f.omit_arg: + selffieldnames += '%s=%s, ' % (f.name, f.const_value) + else: + selffieldnames += '%s, ' % f.name + selffieldnames = selffieldnames[:-2] + + sub = {'NAMELOWER' : m.name.lower(), + 'SELFFIELDNAMES' : selffieldnames, + 'COMMENT' : comment, + 'FIELDNAMES' : ", ".join(m.fieldnames)} + + t.write(outf, """ + def ${NAMELOWER}_encode(${SELFFIELDNAMES}): + ''' + ${COMMENT} + ''' + msg = MAVLink_${NAMELOWER}_message(${FIELDNAMES}) + msg.pack(self) + return msg + +""", sub) + + t.write(outf, """ + def ${NAMELOWER}_send(${SELFFIELDNAMES}): + ''' + ${COMMENT} + ''' + return self.send(self.${NAMELOWER}_encode(${FIELDNAMES})) + +""", sub) + + +def generate(basename, xml): + '''generate complete python implemenation''' + if basename.endswith('.py'): + filename = basename + else: + filename = basename + '.py' + + msgs = [] + enums = [] + filelist = [] + for x in xml: + msgs.extend(x.message) + enums.extend(x.enum) + filelist.append(os.path.basename(x.filename)) + + for m in msgs: + if xml[0].little_endian: + m.fmtstr = '<' + else: + m.fmtstr = '>' + for f in m.ordered_fields: + m.fmtstr += mavfmt(f) + m.order_map = [ 0 ] * len(m.fieldnames) + for i in range(0, len(m.fieldnames)): + m.order_map[i] = m.ordered_fieldnames.index(m.fieldnames[i]) + + print("Generating %s" % filename) + outf = open(filename, "w") + generate_preamble(outf, msgs, filelist, xml[0]) + generate_enums(outf, enums) + generate_message_ids(outf, msgs) + generate_classes(outf, msgs) + generate_mavlink_class(outf, msgs, xml[0]) + generate_methods(outf, msgs) + outf.close() + print("Generated %s OK" % filename) |