diff options
Diffstat (limited to 'mavigator-uav/src/main/scala/mavigator/uav')
-rw-r--r-- | mavigator-uav/src/main/scala/mavigator/uav/Backend.scala | 8 | ||||
-rw-r--r-- | mavigator-uav/src/main/scala/mavigator/uav/Core.scala | 100 | ||||
-rw-r--r-- | mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala.disabled | 20 | ||||
-rw-r--r-- | mavigator-uav/src/main/scala/mavigator/uav/Uav.scala | 36 | ||||
-rw-r--r-- | mavigator-uav/src/main/scala/mavigator/uav/mock/MockBackend.scala (renamed from mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala) | 37 | ||||
-rw-r--r-- | mavigator-uav/src/main/scala/mavigator/uav/serial/SerialBackend.scala | 58 |
6 files changed, 214 insertions, 45 deletions
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Backend.scala b/mavigator-uav/src/main/scala/mavigator/uav/Backend.scala new file mode 100644 index 0000000..84b9673 --- /dev/null +++ b/mavigator-uav/src/main/scala/mavigator/uav/Backend.scala @@ -0,0 +1,8 @@ +package mavigator +package uav + +trait Backend { + + def init(core: Core): Unit + +} diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Core.scala b/mavigator-uav/src/main/scala/mavigator/uav/Core.scala new file mode 100644 index 0000000..4418dae --- /dev/null +++ b/mavigator-uav/src/main/scala/mavigator/uav/Core.scala @@ -0,0 +1,100 @@ +package mavigator +package uav + +import akka.NotUsed +import akka.actor.{ActorLogging, ActorRef, ActorSystem, Props} +import akka.stream.Materializer +import akka.stream.actor.{ActorPublisher, ActorPublisherMessage} +import akka.stream.scaladsl.{Flow, Sink, Source} +import akka.util.ByteString +import org.reactivestreams.{Publisher, Subscriber} + +/** A core enables dynamic creation and removal of clients and backend connections. + * It is essentially a dynamic stream multiplexer. */ +class Core(implicit val system: ActorSystem, val materializer: Materializer) { + import Core._ + + /** The actor responsible for forwarding messages from the backend + * to this core. */ + private lazy val backendPublisherActor: ActorRef = system.actorOf(BackendPublisher()) + + /** Publisher that forwards messages received from the backend. */ + private lazy val backendPublisher: Publisher[ByteString] = ActorPublisher(backendPublisherActor) + + private lazy val backend = Flow.fromSinkAndSourceMat( + Sink.ignore, //FIXME: this will need to be changed for controlling the uav from the browser + Source.fromPublisher(backendPublisher) + )((toBackend, fromBackend) => (toBackend, fromBackend)) + + private lazy val clients = Flow.fromSinkAndSourceMat( + Sink.asPublisher[ByteString](true), + Source.asSubscriber[ByteString] + )((toClient, fromClient) => (toClient, fromClient)) + + private lazy val runnable = clients.joinMat(backend){ case ((toClient, fromClient), (toBackend, fromBackend)) => + toClient + } + + private lazy val clientPublisher: Publisher[ByteString] = { + system.log.info("Core started.") + runnable.run() + } + + def setBackend(): Flow[ByteString, ByteString, NotUsed] = { + val sink = Sink.actorRef(backendPublisherActor, BackendPublisher.BackendComplete) + val source = Source.empty[ByteString] //FIXME: this will need to be changed for controlling the uav from the browser + Flow.fromSinkAndSource(sink, source) + } + + def connect(): Flow[ByteString, ByteString, NotUsed] = { + Flow.fromSinkAndSource( + Sink.ignore, + Source.fromPublisher(clientPublisher) + ) + } + +} + +object Core { + + private class BackendPublisher extends ActorPublisher[ByteString] with ActorLogging { + import akka.stream.actor.ActorPublisherMessage._ + + override def preStart() = { + log.info("Starting backend publisher actor...") + } + + private var fromBackend: ByteString = null + + def receive = { + + case msg: ByteString => + fromBackend = msg + deliver() + + case BackendPublisher.BackendComplete => + sys.error("Backend completed normally, this should not happen.") + + // subscriber requests more + case ActorPublisherMessage.Request(_) => + deliver() + + //subscriber cancels + case ActorPublisherMessage.Cancel => + sys.error("Subscriber cancelled backend stream, this should not happen.") + + } + + def deliver() = if (fromBackend != null && totalDemand > 0 && isActive) { + onNext(fromBackend) + fromBackend = null + } + + } + + private object BackendPublisher { + case object BackendComplete + def apply(): Props = Props(classOf[BackendPublisher]) + } + +} diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala.disabled b/mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala.disabled deleted file mode 100644 index 5e48ea1..0000000 --- a/mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala.disabled +++ /dev/null @@ -1,20 +0,0 @@ -package mavigator -package uav - -import akka.stream.scaladsl._ -import akka.stream._ -import org.reactivestreams._ - -private[uav] class Multiplexer[In, Out](service: Flow[In, Out, _])(implicit materializer: Materializer) { - - private val endpoint: Flow[Out, In, (Publisher[Out], Subscriber[In])] = Flow.fromSinkAndSourceMat( - Sink.asPublisher[Out](fanout = true), - Source.asSubscriber[In])((pub, sub) => (pub, sub)) - - private lazy val (publisher, subscriber) = (service.joinMat(endpoint)(Keep.right)).run() - - def connect(client: Flow[Out, In, _]) = { - Source.fromPublisher(publisher).via(client).to(Sink.ignore).run() - } - -} diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala b/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala index 8f8c083..1387983 100644 --- a/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala +++ b/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala @@ -2,40 +2,38 @@ package mavigator package uav import java.lang.IllegalArgumentException -import mock._ + import akka._ import akka.actor._ -import akka.util._ +import akka.stream.ActorMaterializer import akka.stream.scaladsl._ +import akka.util._ +import mock._ +import serial._ + +//TODO: the whole backend system feels hacky, it probably needs a major redesign class Uav(system: ExtendedActorSystem) extends Extension { + private val materializer = ActorMaterializer()(system) + private lazy val config = system.settings.config.getConfig("mavigator.uav") private lazy val tpe = config.getString("type") - private lazy val componentId = config.getInt("componentId").toByte - private lazy val heartbeat = config.getInt("heartbeat") - private lazy val connection = config.getConfig(tpe) - lazy val source = tpe match { - case "mock" => - new MockConnection( - connection.getInt("remote_system_id").toByte, - componentId, - connection.getDouble("prescaler") - ) - - case "serial" => ??? + private lazy val core = new Core()(system, materializer) + lazy val backend: Backend = tpe match { + case "mock" => MockBackend + case "serial" => SerialBackend case _ => throw new IllegalArgumentException(s"Unsupported connection type: $tpe") } - def connect(): Flow[ByteString, ByteString, NotUsed] = { - Flow.fromSinkAndSource( - Sink.ignore, - (new MockConnection(0,0,1)).data //TODO: use source instead of hardcoded value - ) + def init(): Unit = { + backend.init(core) } + def connect(): Flow[ByteString, ByteString, NotUsed] = core.connect() + } object Uav extends ExtensionId[Uav] with ExtensionIdProvider { diff --git a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockBackend.scala index 58b4977..0b89fbd 100644 --- a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala +++ b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockBackend.scala @@ -10,15 +10,16 @@ import akka.stream.Attributes._ import akka.stream.scaladsl._ import akka.util._ import org.mavlink._ -import org.mavlink.messages.{Heartbeat, Message} +import org.mavlink.messages.Message -class MockConnection( +/** A test connection that produces random MAVLink messages. */ +class MockBackend( remoteSystemId: Byte, remoteComponentId: Byte, prescaler: Double ) { - import MockConnection._ + import MockBackend._ private lazy val assembler = new Assembler(remoteSystemId, remoteComponentId) @@ -35,7 +36,7 @@ class MockConnection( delayed(0.1)(_.distance) ) - val data: Source[ByteString, NotUsed] = messages.map{ message => + private val data: Source[ByteString, NotUsed] = messages.map{ message => val (messageId, payload) = Message.pack(message) val packet = assembler.assemble(messageId, payload) ByteString(packet.toArray) @@ -43,7 +44,7 @@ class MockConnection( } -object MockConnection { +object MockBackend extends Backend { final val ClockTick: FiniteDuration = 0.02.seconds @@ -52,7 +53,7 @@ object MockConnection { Source.fromGraph(GraphDSL.create() { implicit b => val clock = Source.tick(ClockTick, ClockTick, plan) map { plan => - plan.tick(0.01) + plan.tick(ClockTick.toMillis / 1000.0) plan } val bcast = b.add(Broadcast[RandomFlightPlan](messages.length)) @@ -67,4 +68,28 @@ object MockConnection { }) } + + override def init(core: Core) = { + import core.materializer + import core.system + + system.log.info("Initializing mock backend...") + + val config = system.settings.config.getConfig("mavigator.uav.mock") + + val mock = new MockBackend( + config.getInt("remote_system_id").toByte, + config.getInt("remote_component_id").toByte, + config.getDouble("prescaler") + ) + + val mockFlow = Flow.fromSinkAndSource( + Sink.ignore, + mock.data + ) + + (mockFlow join core.setBackend()).run() + + } + } diff --git a/mavigator-uav/src/main/scala/mavigator/uav/serial/SerialBackend.scala b/mavigator-uav/src/main/scala/mavigator/uav/serial/SerialBackend.scala new file mode 100644 index 0000000..a52d238 --- /dev/null +++ b/mavigator-uav/src/main/scala/mavigator/uav/serial/SerialBackend.scala @@ -0,0 +1,58 @@ +package mavigator +package uav +package serial + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.{Failure, Success} + +import akka.NotUsed +import akka.stream.scaladsl.{Flow, Keep} +import akka.util.ByteString +import com.github.jodersky.flow.{Parity, SerialSettings} +import com.github.jodersky.flow.stream.Serial +import com.github.jodersky.flow.stream.Serial.Connection + +object SerialBackend extends Backend { + + override def init(core: Core): Unit = { + import core.materializer + import core.system + import core.system.dispatcher + + system.log.info("Initializing serial backend...") + + val conf = system.settings.config.getConfig("mavigator.uav.serial") + val port = conf.getString("port") + val serialSettings = SerialSettings( + baud = conf.getInt("baud"), + twoStopBits = conf.getBoolean("two_stop_bits"), + parity = Parity(conf.getInt("parity")) + ) + + val connectionDelay = conf.getInt("connection_delay").millis + + system.log.info("Waiting for serial device on " + port + "...") + Serial().watch(Set(port)).map{ port => + system.log.info("Serial device connected on port " + port) + port + }.delay(connectionDelay).runForeach{ port => + system.log.info("Opening serial port " + port) + + val backend: Flow[ByteString, ByteString, NotUsed] = core.setBackend() + + val uav: Flow[ByteString, ByteString, Future[Connection]] = Serial().open(port, serialSettings) + + val connection = uav.joinMat(backend)(Keep.left).run().onComplete{ + case Success(connection) => + system.log.info("Successfully opened serial port " + connection.port) + case Failure(ex) => + system.log.error(ex, "Error occurred while trying to open " + port) + } + + } + + + } + +} |