aboutsummaryrefslogblamecommitdiff
path: root/mavlink/include/mavlink/v1.0/mavlink_protobuf_manager.hpp
blob: fd3ddd026f709c1acb8c31d7493523892924ab49 (plain) (tree)
























































































































































































































































































































































































                                                                                                                              
#ifndef MAVLINKPROTOBUFMANAGER_HPP
#define MAVLINKPROTOBUFMANAGER_HPP

#include <deque>
#include <google/protobuf/message.h>
#include <iostream>
#include <tr1/memory>

#include <checksum.h>
#include <common/mavlink.h>
#include <mavlink_types.h>
#include <pixhawk/pixhawk.pb.h>

namespace mavlink
{

class ProtobufManager
{
public:
	ProtobufManager()
	 : mRegisteredTypeCount(0)
	 , mStreamID(0)
	 , mVerbose(false)
	 , kExtendedHeaderSize(MAVLINK_EXTENDED_HEADER_LEN)
	 , kExtendedPayloadMaxSize(MAVLINK_MAX_EXTENDED_PAYLOAD_LEN)
	{
		// register GLOverlay
		{
			std::tr1::shared_ptr<px::GLOverlay> msg(new px::GLOverlay);
			registerType(msg);
		}

		// register ObstacleList
		{
			std::tr1::shared_ptr<px::ObstacleList> msg(new px::ObstacleList);
			registerType(msg);
		}

		// register ObstacleMap
		{
			std::tr1::shared_ptr<px::ObstacleMap> msg(new px::ObstacleMap);
			registerType(msg);
		}

		// register Path
		{
			std::tr1::shared_ptr<px::Path> msg(new px::Path);
                        registerType(msg);
		}

		// register PointCloudXYZI
		{
			std::tr1::shared_ptr<px::PointCloudXYZI> msg(new px::PointCloudXYZI);
			registerType(msg);
		}

		// register PointCloudXYZRGB
		{
			std::tr1::shared_ptr<px::PointCloudXYZRGB> msg(new px::PointCloudXYZRGB);
			registerType(msg);
		}

		// register RGBDImage
		{
			std::tr1::shared_ptr<px::RGBDImage> msg(new px::RGBDImage);
			registerType(msg);
		}

		srand(time(NULL));
		mStreamID = rand() + 1;
	}

	bool fragmentMessage(uint8_t system_id, uint8_t component_id,
						 uint8_t target_system, uint8_t target_component,
						 const google::protobuf::Message& protobuf_msg,
						 std::vector<mavlink_extended_message_t>& fragments) const
	{
		TypeMap::const_iterator it = mTypeMap.find(protobuf_msg.GetTypeName());
		if (it == mTypeMap.end())
		{
			std::cout << "# WARNING: Protobuf message with type "
					  << protobuf_msg.GetTypeName() << " is not registered."
					  << std::endl;
			return false;
		}

		uint8_t typecode = it->second;

		std::string data = protobuf_msg.SerializeAsString();

		int fragmentCount = (protobuf_msg.ByteSize() + kExtendedPayloadMaxSize - 1) / kExtendedPayloadMaxSize;
		unsigned int offset = 0;

		for (int i = 0; i < fragmentCount; ++i)
		{
			mavlink_extended_message_t fragment;			

			// write extended header data
			uint8_t* payload = reinterpret_cast<uint8_t*>(fragment.base_msg.payload64);
			unsigned int length = 0;
			uint8_t flags = 0;

			if (i < fragmentCount - 1)
			{
				length = kExtendedPayloadMaxSize;
				flags |= 0x1;
			}
			else
			{
				length = protobuf_msg.ByteSize() - kExtendedPayloadMaxSize * (fragmentCount - 1);
			}

			memcpy(payload, &target_system, 1);
			memcpy(payload + 1, &target_component, 1);
			memcpy(payload + 2, &typecode, 1);
			memcpy(payload + 3, &length, 4);
			memcpy(payload + 7, &mStreamID, 2);
			memcpy(payload + 9, &offset, 4);
			memcpy(payload + 13, &flags, 1);

			fragment.base_msg.msgid = MAVLINK_MSG_ID_EXTENDED_MESSAGE;
			mavlink_finalize_message(&fragment.base_msg, system_id, component_id, kExtendedHeaderSize, 0);
			
			// write extended payload data
			fragment.extended_payload_len = length;
			memcpy(fragment.extended_payload, &data[offset], length);

			fragments.push_back(fragment);
			offset += length;
		}

		if (mVerbose)
		{
			std::cerr << "# INFO: Split extended message with size "
					  << protobuf_msg.ByteSize() << " into "
					  << fragmentCount << " fragments." << std::endl;
		}

		return true;
	}

	bool cacheFragment(mavlink_extended_message_t& msg)
	{
		if (!validFragment(msg))
		{
			if (mVerbose)
			{
				std::cerr << "# WARNING: Message is not a valid fragment. "
						  << "Dropping message..." << std::endl;
			}
			return false;
		}

		// read extended header
		uint8_t* payload = reinterpret_cast<uint8_t*>(msg.base_msg.payload64);
		uint8_t typecode = 0;
		unsigned int length = 0;
		unsigned short streamID = 0;
		unsigned int offset = 0;
		uint8_t flags = 0;

		memcpy(&typecode, payload + 2, 1);
		memcpy(&length, payload + 3, 4);
		memcpy(&streamID, payload + 7, 2);
		memcpy(&offset, payload + 9, 4);
		memcpy(&flags, payload + 13, 1);

		if (typecode >= mTypeMap.size())
		{
			std::cout << "# WARNING: Protobuf message with type code "
					  << static_cast<int>(typecode) << " is not registered." << std::endl;
			return false;
		}

		bool reassemble = false;

		FragmentQueue::iterator it = mFragmentQueue.find(streamID);
		if (it == mFragmentQueue.end())
		{
			if (offset == 0)
			{
				mFragmentQueue[streamID].push_back(msg);

				if ((flags & 0x1) != 0x1)
				{
					reassemble = true;
				}

				if (mVerbose)
				{
					std::cerr << "# INFO: Added fragment to new queue."
							  << std::endl;
				}
			}
			else
			{
				if (mVerbose)
				{
					std::cerr << "# WARNING: Message is not a valid fragment. "
							  << "Dropping message..." << std::endl;
				}
			}
		}
		else
		{
			std::deque<mavlink_extended_message_t>& queue = it->second;

			if (queue.empty())
			{
				if (offset == 0)
				{
					queue.push_back(msg);

					if ((flags & 0x1) != 0x1)
					{
						reassemble = true;
					}
				}
				else
				{
					if (mVerbose)
					{
						std::cerr << "# WARNING: Message is not a valid fragment. "
								  << "Dropping message..." << std::endl;
					}
				}
			}
			else
			{
				if (fragmentDataSize(queue.back()) + fragmentOffset(queue.back()) != offset)
				{
					if (mVerbose)
					{
						std::cerr << "# WARNING: Previous fragment(s) have been lost. "
								  << "Dropping message and clearing queue..." << std::endl;
					}
					queue.clear();
				}
				else
				{
					queue.push_back(msg);

					if ((flags & 0x1) != 0x1)
					{
						reassemble = true;
					}
				}
			}
		}

		if (reassemble)
		{
			std::deque<mavlink_extended_message_t>& queue = mFragmentQueue[streamID];

			std::string data;
			for (size_t i = 0; i < queue.size(); ++i)
			{
				mavlink_extended_message_t& mavlink_msg = queue.at(i);

				data.append(reinterpret_cast<char*>(&mavlink_msg.extended_payload[0]),
							static_cast<size_t>(mavlink_msg.extended_payload_len));
			}

			mMessages.at(typecode)->ParseFromString(data);

			mMessageAvailable.at(typecode) = true;

			queue.clear();

			if (mVerbose)
			{
				std::cerr << "# INFO: Reassembled fragments for message with typename "
						  << mMessages.at(typecode)->GetTypeName() << " and size "
						  << mMessages.at(typecode)->ByteSize()
						  << "." << std::endl;
			}
		}

		return true;
	}

	bool getMessage(std::tr1::shared_ptr<google::protobuf::Message>& msg)
	{
		for (size_t i = 0; i < mMessageAvailable.size(); ++i)
		{
			if (mMessageAvailable.at(i))
			{
				msg = mMessages.at(i);
				mMessageAvailable.at(i) = false;

				return true;
			}
		}

		return false;
	}

private:
	void registerType(const std::tr1::shared_ptr<google::protobuf::Message>& msg)
	{
		mTypeMap[msg->GetTypeName()] = mRegisteredTypeCount;
		++mRegisteredTypeCount;
		mMessages.push_back(msg);
		mMessageAvailable.push_back(false);
	}

	bool validFragment(const mavlink_extended_message_t& msg) const
	{
		if (msg.base_msg.magic != MAVLINK_STX ||
			msg.base_msg.len != kExtendedHeaderSize ||
			msg.base_msg.msgid != MAVLINK_MSG_ID_EXTENDED_MESSAGE)
		{
			return false;
		}

		uint16_t checksum;
		checksum = crc_calculate(reinterpret_cast<const uint8_t*>(&msg.base_msg.len), MAVLINK_CORE_HEADER_LEN);
		crc_accumulate_buffer(&checksum, reinterpret_cast<const char*>(&msg.base_msg.payload64), kExtendedHeaderSize);
#if MAVLINK_CRC_EXTRA
		static const uint8_t mavlink_message_crcs[256] = MAVLINK_MESSAGE_CRCS;
		crc_accumulate(mavlink_message_crcs[msg.base_msg.msgid], &checksum);
#endif

		if (mavlink_ck_a(&(msg.base_msg)) != (uint8_t)(checksum & 0xFF) &&
		    mavlink_ck_b(&(msg.base_msg)) != (uint8_t)(checksum >> 8))
		{
			return false;
		}

		return true;
	}

	unsigned int fragmentDataSize(const mavlink_extended_message_t& msg) const
	{
		const uint8_t* payload = reinterpret_cast<const uint8_t*>(msg.base_msg.payload64);

		return *(reinterpret_cast<const unsigned int*>(payload + 3));
	}

	unsigned int fragmentOffset(const mavlink_extended_message_t& msg) const
	{
		const uint8_t* payload = reinterpret_cast<const uint8_t*>(msg.base_msg.payload64);

		return *(reinterpret_cast<const unsigned int*>(payload + 9));
	}

	int mRegisteredTypeCount;
	unsigned short mStreamID;
	bool mVerbose;

	typedef std::map<std::string, uint8_t> TypeMap;
	TypeMap mTypeMap;
	std::vector< std::tr1::shared_ptr<google::protobuf::Message> > mMessages;
	std::vector<bool> mMessageAvailable;

	typedef std::map<unsigned short, std::deque<mavlink_extended_message_t> > FragmentQueue;
	FragmentQueue mFragmentQueue;

	const int kExtendedHeaderSize;
	/**
	 * Extended header structure
	 * =========================
	 *   byte 0 - target_system
	 *   byte 1 - target_component
	 *   byte 2 - extended message id (type code)
	 *   bytes 3-6 - extended payload size in bytes
	 *   byte 7-8 - stream ID
	 *   byte 9-12 - fragment offset
	 *   byte 13 - fragment flags (bit 0 - 1=more fragments, 0=last fragment)
	 */

	const int kExtendedPayloadMaxSize;
};

}

#endif