diff options
Diffstat (limited to 'mavlink/share/pyshared/pymavlink/generator/mavgen_python.py')
-rw-r--r-- | mavlink/share/pyshared/pymavlink/generator/mavgen_python.py | 455 |
1 files changed, 0 insertions, 455 deletions
diff --git a/mavlink/share/pyshared/pymavlink/generator/mavgen_python.py b/mavlink/share/pyshared/pymavlink/generator/mavgen_python.py deleted file mode 100644 index fad366a68..000000000 --- a/mavlink/share/pyshared/pymavlink/generator/mavgen_python.py +++ /dev/null @@ -1,455 +0,0 @@ -#!/usr/bin/env python -''' -parse a MAVLink protocol XML file and generate a python implementation - -Copyright Andrew Tridgell 2011 -Released under GNU GPL version 3 or later -''' - -import sys, textwrap, os -import mavparse, mavtemplate - -t = mavtemplate.MAVTemplate() - -def generate_preamble(outf, msgs, args, xml): - print("Generating preamble") - t.write(outf, """ -''' -MAVLink protocol implementation (auto-generated by mavgen.py) - -Generated from: ${FILELIST} - -Note: this file has been auto-generated. DO NOT EDIT -''' - -import struct, array, mavutil, time - -WIRE_PROTOCOL_VERSION = "${WIRE_PROTOCOL_VERSION}" - -class MAVLink_header(object): - '''MAVLink message header''' - def __init__(self, msgId, mlen=0, seq=0, srcSystem=0, srcComponent=0): - self.mlen = mlen - self.seq = seq - self.srcSystem = srcSystem - self.srcComponent = srcComponent - self.msgId = msgId - - def pack(self): - return struct.pack('BBBBBB', ${PROTOCOL_MARKER}, self.mlen, self.seq, - self.srcSystem, self.srcComponent, self.msgId) - -class MAVLink_message(object): - '''base MAVLink message class''' - def __init__(self, msgId, name): - self._header = MAVLink_header(msgId) - self._payload = None - self._msgbuf = None - self._crc = None - self._fieldnames = [] - self._type = name - - def get_msgbuf(self): - return self._msgbuf - - def get_header(self): - return self._header - - def get_payload(self): - return self._payload - - def get_crc(self): - return self._crc - - def get_fieldnames(self): - return self._fieldnames - - def get_type(self): - return self._type - - def get_msgId(self): - return self._header.msgId - - def get_srcSystem(self): - return self._header.srcSystem - - def get_srcComponent(self): - return self._header.srcComponent - - def get_seq(self): - return self._header.seq - - def __str__(self): - ret = '%s {' % self._type - for a in self._fieldnames: - v = getattr(self, a) - ret += '%s : %s, ' % (a, v) - ret = ret[0:-2] + '}' - return ret - - def pack(self, mav, crc_extra, payload): - self._payload = payload - self._header = MAVLink_header(self._header.msgId, len(payload), mav.seq, - mav.srcSystem, mav.srcComponent) - self._msgbuf = self._header.pack() + payload - crc = mavutil.x25crc(self._msgbuf[1:]) - if ${crc_extra}: # using CRC extra - crc.accumulate(chr(crc_extra)) - self._crc = crc.crc - self._msgbuf += struct.pack('<H', self._crc) - return self._msgbuf - -""", {'FILELIST' : ",".join(args), - 'PROTOCOL_MARKER' : xml.protocol_marker, - 'crc_extra' : xml.crc_extra, - 'WIRE_PROTOCOL_VERSION' : xml.wire_protocol_version }) - - -def generate_enums(outf, enums): - print("Generating enums") - outf.write("\n# enums\n") - wrapper = textwrap.TextWrapper(initial_indent="", subsequent_indent=" # ") - for e in enums: - outf.write("\n# %s\n" % e.name) - for entry in e.entry: - outf.write("%s = %u # %s\n" % (entry.name, entry.value, wrapper.fill(entry.description))) - -def generate_message_ids(outf, msgs): - print("Generating message IDs") - outf.write("\n# message IDs\n") - outf.write("MAVLINK_MSG_ID_BAD_DATA = -1\n") - for m in msgs: - outf.write("MAVLINK_MSG_ID_%s = %u\n" % (m.name.upper(), m.id)) - -def generate_classes(outf, msgs): - print("Generating class definitions") - wrapper = textwrap.TextWrapper(initial_indent=" ", subsequent_indent=" ") - for m in msgs: - outf.write(""" -class MAVLink_%s_message(MAVLink_message): - ''' -%s - ''' - def __init__(self""" % (m.name.lower(), wrapper.fill(m.description.strip()))) - if len(m.fields) != 0: - outf.write(", " + ", ".join(m.fieldnames)) - outf.write("):\n") - outf.write(" MAVLink_message.__init__(self, MAVLINK_MSG_ID_%s, '%s')\n" % (m.name.upper(), m.name.upper())) - if len(m.fieldnames) != 0: - outf.write(" self._fieldnames = ['%s']\n" % "', '".join(m.fieldnames)) - for f in m.fields: - outf.write(" self.%s = %s\n" % (f.name, f.name)) - outf.write(""" - def pack(self, mav): - return MAVLink_message.pack(self, mav, %u, struct.pack('%s'""" % (m.crc_extra, m.fmtstr)) - if len(m.fields) != 0: - outf.write(", self." + ", self.".join(m.ordered_fieldnames)) - outf.write("))\n") - - -def mavfmt(field): - '''work out the struct format for a type''' - map = { - 'float' : 'f', - 'double' : 'd', - 'char' : 'c', - 'int8_t' : 'b', - 'uint8_t' : 'B', - 'uint8_t_mavlink_version' : 'B', - 'int16_t' : 'h', - 'uint16_t' : 'H', - 'int32_t' : 'i', - 'uint32_t' : 'I', - 'int64_t' : 'q', - 'uint64_t' : 'Q', - } - - if field.array_length: - if field.type in ['char', 'int8_t', 'uint8_t']: - return str(field.array_length)+'s' - return str(field.array_length)+map[field.type] - return map[field.type] - -def generate_mavlink_class(outf, msgs, xml): - print("Generating MAVLink class") - - outf.write("\n\nmavlink_map = {\n"); - for m in msgs: - outf.write(" MAVLINK_MSG_ID_%s : ( '%s', MAVLink_%s_message, %s, %u ),\n" % ( - m.name.upper(), m.fmtstr, m.name.lower(), m.order_map, m.crc_extra)) - outf.write("}\n\n") - - t.write(outf, """ -class MAVError(Exception): - '''MAVLink error class''' - def __init__(self, msg): - Exception.__init__(self, msg) - self.message = msg - -class MAVString(str): - '''NUL terminated string''' - def __init__(self, s): - str.__init__(self) - def __str__(self): - i = self.find(chr(0)) - if i == -1: - return self[:] - return self[0:i] - -class MAVLink_bad_data(MAVLink_message): - ''' - a piece of bad data in a mavlink stream - ''' - def __init__(self, data, reason): - MAVLink_message.__init__(self, MAVLINK_MSG_ID_BAD_DATA, 'BAD_DATA') - self._fieldnames = ['data', 'reason'] - self.data = data - self.reason = reason - self._msgbuf = data - -class MAVLink(object): - '''MAVLink protocol handling class''' - def __init__(self, file, srcSystem=0, srcComponent=0): - self.seq = 0 - self.file = file - self.srcSystem = srcSystem - self.srcComponent = srcComponent - self.callback = None - self.callback_args = None - self.callback_kwargs = None - self.buf = array.array('B') - self.expected_length = 6 - self.have_prefix_error = False - self.robust_parsing = False - self.protocol_marker = ${protocol_marker} - self.little_endian = ${little_endian} - self.crc_extra = ${crc_extra} - self.sort_fields = ${sort_fields} - self.total_packets_sent = 0 - self.total_bytes_sent = 0 - self.total_packets_received = 0 - self.total_bytes_received = 0 - self.total_receive_errors = 0 - self.startup_time = time.time() - - def set_callback(self, callback, *args, **kwargs): - self.callback = callback - self.callback_args = args - self.callback_kwargs = kwargs - - def send(self, mavmsg): - '''send a MAVLink message''' - buf = mavmsg.pack(self) - self.file.write(buf) - self.seq = (self.seq + 1) % 255 - self.total_packets_sent += 1 - self.total_bytes_sent += len(buf) - - def bytes_needed(self): - '''return number of bytes needed for next parsing stage''' - ret = self.expected_length - len(self.buf) - if ret <= 0: - return 1 - return ret - - def parse_char(self, c): - '''input some data bytes, possibly returning a new message''' - if isinstance(c, str): - self.buf.fromstring(c) - else: - self.buf.extend(c) - self.total_bytes_received += len(c) - if len(self.buf) >= 1 and self.buf[0] != ${protocol_marker}: - magic = self.buf[0] - self.buf = self.buf[1:] - if self.robust_parsing: - m = MAVLink_bad_data(chr(magic), "Bad prefix") - if self.callback: - self.callback(m, *self.callback_args, **self.callback_kwargs) - self.expected_length = 6 - self.total_receive_errors += 1 - return m - if self.have_prefix_error: - return None - self.have_prefix_error = True - self.total_receive_errors += 1 - raise MAVError("invalid MAVLink prefix '%s'" % magic) - self.have_prefix_error = False - if len(self.buf) >= 2: - (magic, self.expected_length) = struct.unpack('BB', self.buf[0:2]) - self.expected_length += 8 - if self.expected_length >= 8 and len(self.buf) >= self.expected_length: - mbuf = self.buf[0:self.expected_length] - self.buf = self.buf[self.expected_length:] - self.expected_length = 6 - if self.robust_parsing: - try: - m = self.decode(mbuf) - self.total_packets_received += 1 - except MAVError as reason: - m = MAVLink_bad_data(mbuf, reason.message) - self.total_receive_errors += 1 - else: - m = self.decode(mbuf) - self.total_packets_received += 1 - if self.callback: - self.callback(m, *self.callback_args, **self.callback_kwargs) - return m - return None - - def parse_buffer(self, s): - '''input some data bytes, possibly returning a list of new messages''' - m = self.parse_char(s) - if m is None: - return None - ret = [m] - while True: - m = self.parse_char("") - if m is None: - return ret - ret.append(m) - return ret - - def decode(self, msgbuf): - '''decode a buffer as a MAVLink message''' - # decode the header - try: - magic, mlen, seq, srcSystem, srcComponent, msgId = struct.unpack('cBBBBB', msgbuf[:6]) - except struct.error as emsg: - raise MAVError('Unable to unpack MAVLink header: %s' % emsg) - if ord(magic) != ${protocol_marker}: - raise MAVError("invalid MAVLink prefix '%s'" % magic) - if mlen != len(msgbuf)-8: - raise MAVError('invalid MAVLink message length. Got %u expected %u, msgId=%u' % (len(msgbuf)-8, mlen, msgId)) - - if not msgId in mavlink_map: - raise MAVError('unknown MAVLink message ID %u' % msgId) - - # decode the payload - (fmt, type, order_map, crc_extra) = mavlink_map[msgId] - - # decode the checksum - try: - crc, = struct.unpack('<H', msgbuf[-2:]) - except struct.error as emsg: - raise MAVError('Unable to unpack MAVLink CRC: %s' % emsg) - crc2 = mavutil.x25crc(msgbuf[1:-2]) - if ${crc_extra}: # using CRC extra - crc2.accumulate(chr(crc_extra)) - if crc != crc2.crc: - raise MAVError('invalid MAVLink CRC in msgID %u 0x%04x should be 0x%04x' % (msgId, crc, crc2.crc)) - - try: - t = struct.unpack(fmt, msgbuf[6:-2]) - except struct.error as emsg: - raise MAVError('Unable to unpack MAVLink payload type=%s fmt=%s payloadLength=%u: %s' % ( - type, fmt, len(msgbuf[6:-2]), emsg)) - - tlist = list(t) - # handle sorted fields - if ${sort_fields}: - t = tlist[:] - for i in range(0, len(tlist)): - tlist[i] = t[order_map[i]] - - # terminate any strings - for i in range(0, len(tlist)): - if isinstance(tlist[i], str): - tlist[i] = MAVString(tlist[i]) - t = tuple(tlist) - # construct the message object - try: - m = type(*t) - except Exception as emsg: - raise MAVError('Unable to instantiate MAVLink message of type %s : %s' % (type, emsg)) - m._msgbuf = msgbuf - m._payload = msgbuf[6:-2] - m._crc = crc - m._header = MAVLink_header(msgId, mlen, seq, srcSystem, srcComponent) - return m -""", xml) - -def generate_methods(outf, msgs): - print("Generating methods") - - def field_descriptions(fields): - ret = "" - for f in fields: - ret += " %-18s : %s (%s)\n" % (f.name, f.description.strip(), f.type) - return ret - - wrapper = textwrap.TextWrapper(initial_indent="", subsequent_indent=" ") - - for m in msgs: - comment = "%s\n\n%s" % (wrapper.fill(m.description.strip()), field_descriptions(m.fields)) - - selffieldnames = 'self, ' - for f in m.fields: - if f.omit_arg: - selffieldnames += '%s=%s, ' % (f.name, f.const_value) - else: - selffieldnames += '%s, ' % f.name - selffieldnames = selffieldnames[:-2] - - sub = {'NAMELOWER' : m.name.lower(), - 'SELFFIELDNAMES' : selffieldnames, - 'COMMENT' : comment, - 'FIELDNAMES' : ", ".join(m.fieldnames)} - - t.write(outf, """ - def ${NAMELOWER}_encode(${SELFFIELDNAMES}): - ''' - ${COMMENT} - ''' - msg = MAVLink_${NAMELOWER}_message(${FIELDNAMES}) - msg.pack(self) - return msg - -""", sub) - - t.write(outf, """ - def ${NAMELOWER}_send(${SELFFIELDNAMES}): - ''' - ${COMMENT} - ''' - return self.send(self.${NAMELOWER}_encode(${FIELDNAMES})) - -""", sub) - - -def generate(basename, xml): - '''generate complete python implemenation''' - if basename.endswith('.py'): - filename = basename - else: - filename = basename + '.py' - - msgs = [] - enums = [] - filelist = [] - for x in xml: - msgs.extend(x.message) - enums.extend(x.enum) - filelist.append(os.path.basename(x.filename)) - - for m in msgs: - if xml[0].little_endian: - m.fmtstr = '<' - else: - m.fmtstr = '>' - for f in m.ordered_fields: - m.fmtstr += mavfmt(f) - m.order_map = [ 0 ] * len(m.fieldnames) - for i in range(0, len(m.fieldnames)): - m.order_map[i] = m.ordered_fieldnames.index(m.fieldnames[i]) - - print("Generating %s" % filename) - outf = open(filename, "w") - generate_preamble(outf, msgs, filelist, xml[0]) - generate_enums(outf, enums) - generate_message_ids(outf, msgs) - generate_classes(outf, msgs) - generate_mavlink_class(outf, msgs, xml[0]) - generate_methods(outf, msgs) - outf.close() - print("Generated %s OK" % filename) |