From 50bc6c80aa201c0543db07a62bc7c726b813c168 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Mon, 1 Feb 2016 22:46:48 -0800 Subject: Use Akka streams and temporarily disable frontend --- .../main/scala/mavigator/util/Application.scala | 26 ++++++++ .../main/scala/mavigator/util/environment.scala | 24 +++++++ mavigator-server/build.sbt | 2 +- .../src/main/scala/mavigator/Main.scala | 7 +- .../main/scala/mavigator/MavlinkWebsocket.scala | 76 --------------------- .../src/main/scala/mavigator/Router.scala | 63 +++++------------- .../src/main/scala/mavigator/settings.scala | 74 --------------------- .../src/main/twirl/mavigator/views/main.scala.html | 4 +- mavigator-uav/build.sbt | 4 +- .../src/main/scala/mavigator/uav/Multiplexer.scala | 20 ++++++ .../src/main/scala/mavigator/uav/Uav.scala | 31 +++++++++ .../scala/mavigator/uav/mock/MockConnection.scala | 77 ++++++++++++++++++++++ .../mavigator/uav/mock/RandomFlightPlan.scala | 2 +- project/Dependencies.scala | 6 +- project/MavigatorBuild.scala | 2 +- 15 files changed, 211 insertions(+), 207 deletions(-) create mode 100644 mavigator-dashboard/src/main/scala/mavigator/util/Application.scala create mode 100644 mavigator-dashboard/src/main/scala/mavigator/util/environment.scala delete mode 100644 mavigator-server/src/main/scala/mavigator/MavlinkWebsocket.scala delete mode 100644 mavigator-server/src/main/scala/mavigator/settings.scala create mode 100644 mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala create mode 100644 mavigator-uav/src/main/scala/mavigator/uav/Uav.scala create mode 100644 mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala diff --git a/mavigator-dashboard/src/main/scala/mavigator/util/Application.scala b/mavigator-dashboard/src/main/scala/mavigator/util/Application.scala new file mode 100644 index 0000000..9a57e43 --- /dev/null +++ b/mavigator-dashboard/src/main/scala/mavigator/util/Application.scala @@ -0,0 +1,26 @@ +package mavigator +package util + +import scala.scalajs.js.annotation.JSExport +import scala.scalajs.js + +import org.scalajs.dom.html + +trait Application { + + def main(env: Environment, args: Map[String, String]): Unit + + @JSExport + final def _start(settings: js.Dynamic): Unit = { + + val env = new StaticEnvironment( + root = settings.root.asInstanceOf[html.Element], + assetsBase = settings.root.asInstanceOf[String] + ) + + val args = settings.args.asInstanceOf[Map[String, String]] + + main(env, args) + } + +} diff --git a/mavigator-dashboard/src/main/scala/mavigator/util/environment.scala b/mavigator-dashboard/src/main/scala/mavigator/util/environment.scala new file mode 100644 index 0000000..fad1bf3 --- /dev/null +++ b/mavigator-dashboard/src/main/scala/mavigator/util/environment.scala @@ -0,0 +1,24 @@ +package mavigator +package util + +import org.scalajs.dom.html + +/** Represents an application's environment */ +trait Environment { + + /** The application's root element. */ + def root: html.Element + + /** Retrieve an asset's URL based on its file location. */ + def asset(file: String): String + +} + +class StaticEnvironment( + override val root: html.Element, + assetsBase: String +) extends Environment { + + override def asset(file: String): String = assetsBase + "/" + file + +} diff --git a/mavigator-server/build.sbt b/mavigator-server/build.sbt index 207fad6..f98d369 100644 --- a/mavigator-server/build.sbt +++ b/mavigator-server/build.sbt @@ -7,5 +7,5 @@ MavigatorBuild.defaultSettings libraryDependencies ++= Seq( Dependencies.akkaHttp, Dependencies.akkaHttpCore, - Dependencies.akkaStreams + Dependencies.akkaStream ) diff --git a/mavigator-server/src/main/scala/mavigator/Main.scala b/mavigator-server/src/main/scala/mavigator/Main.scala index ca81fa9..6ea894e 100644 --- a/mavigator-server/src/main/scala/mavigator/Main.scala +++ b/mavigator-server/src/main/scala/mavigator/Main.scala @@ -17,11 +17,10 @@ object Main { system.log.info("System started.") - val router = (new Router(system)).route - val settings = Mavigator(system) + val router = Router.route - system.log.info(s"Starting server on ${settings.interface}:${settings.port}...") - val binding = Http(system).bindAndHandle(router, settings.interface, settings.port) + system.log.info(s"Starting server") + val binding = Http(system).bindAndHandle(router, "0.0.0.0", 8080) for (b <- binding) { val addr = b.localAddress.getHostString() diff --git a/mavigator-server/src/main/scala/mavigator/MavlinkWebsocket.scala b/mavigator-server/src/main/scala/mavigator/MavlinkWebsocket.scala deleted file mode 100644 index 9cbfa64..0000000 --- a/mavigator-server/src/main/scala/mavigator/MavlinkWebsocket.scala +++ /dev/null @@ -1,76 +0,0 @@ -package mavigator - -import akka.actor.Terminated -import akka.actor._ -import akka.http.scaladsl._ -import akka.http.scaladsl.model.ws.Message -import akka.http.scaladsl.model.ws.TextMessage -import akka.http.scaladsl.server._ -import akka.stream.OverflowStrategy -import akka.stream._ -import akka.stream.scaladsl.Flow -import akka.stream.scaladsl.GraphDSL -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source -import akka.util.ByteString -import mavigator.uav.Connection -import scala.concurrent.Await -import scala.concurrent.duration.Duration - - -/** - * Adapted from https://github.com/jrudolph/akka-http-scala-js-websocket-chat - */ -class MavlinkWebsocket(system: ActorSystem) { - - /* - GraphDSL.create(Source.actorRef[Connection.Event](1, OverflowStrategy.fail)) {implicit builder => - import GraphDSL.Implicits._ - - //source: SourceShape[Connection.Event] => - source => - - val inSink = builder.materializedValue.map { client: ActorRef => - Sink.actorRef[Connection.Command]( - Mavigator(system).uav, - Connection.Unregister(client) - ) - } - - (inSink., source.out) - - //FlowShape(inSink.in, source.out) - //??? - } - */ - - /** Sink that forwards incomming (from the browser) messages to the uav. */ - private val inSink = Sink.actorRef[Connection.Command]( - Mavigator(system).uav, - Connection.Send(ByteString("goodbye")) //unregister - ) - - /** Source that emmits messages comming from the uav. */ - private val outSource = Source.actorRef[Connection.Event]( - bufferSize = 1, - overflowStrategy = OverflowStrategy.fail - ) mapMaterializedValue { client => // a client is spawned for every outSource materialization - Mavigator(system).uav.tell(Connection.Register, client) - } - - private val flow: Flow[Connection.Command, Connection.Event, _] = Flow.fromSinkAndSource(inSink, outSource) - - @deprecated("WIP", "0.0") - val wsflow = Flow[Message].collect{ - case TextMessage.Strict(msg) => Connection.Send(ByteString(msg)) - // unpack incoming WS text messages... - // This will lose (ignore) messages not received in one chunk (which is - // unlikely because chat messages are small) but absolutely possible - // FIXME: We need to handle TextMessage.Streamed as well. - }.via(flow).map { - case msg: Connection.Event => - TextMessage.Strict(msg.toString) // ... pack outgoing messages into WS JSON messages ... - } - -} - diff --git a/mavigator-server/src/main/scala/mavigator/Router.scala b/mavigator-server/src/main/scala/mavigator/Router.scala index bd54422..b74e3a6 100644 --- a/mavigator-server/src/main/scala/mavigator/Router.scala +++ b/mavigator-server/src/main/scala/mavigator/Router.scala @@ -5,14 +5,16 @@ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl._ import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.ws._ import akka.http.scaladsl.server._ +import uav.Uav +import akka.util._ - -class Router(system: ActorSystem) { +object Router { import Directives._ - val route: Route = ( + def route(implicit system: ActorSystem): Route = ( path("info") { get { val f: play.twirl.api.Html = mavigator.views.html.index() @@ -31,7 +33,19 @@ class Router(system: ActorSystem) { //upgrade.handleMessagesWith(inSink: Sink[Message, _$3], outSource: Source[Message, _$4]) ??? }*/ - handleWebsocketMessages((new MavlinkWebsocket(system)).wsflow) + + val fromWebSocket = Flow[Message].collect{ + case BinaryMessage.Strict(data) => data + } + + val toWebSocket = Flow[ByteString].map{bytes => + //BinaryMessage(bytes) + TextMessage(bytes.decodeString("UTF-8")) + } + + val bytes = Uav().connect() + + handleWebSocketMessages(fromWebSocket via bytes via toWebSocket) } } ~ pathPrefix("assets") { @@ -49,44 +63,3 @@ class Router(system: ActorSystem) { ) } - -import akka.http.scaladsl.model.ws._ -import akka.stream.scaladsl._ - -object SocketService { - - /* - val out: Source[OutgoingMessage, ActorRef] = Source.actorRef[OutgoingMessage](0, OverflowStrategy.fail) - val in = Sink.actorRef(ref: ActorRef, onCompleteMessage: Any) - */ - - - /* - Flow[Message, Message, _] { implicit builder => - - val in: Flow[Message, IncomingMessage, _] = Flow[Message].map { - case TextMessage.Strict(txt) => IncomingMessage(s"frpm websocket $txt") - case _ => ??? - } - - val out: Flow[OutgoingMessage, Message, _] = Flow[OutgoingMessage].map { - case OutgoingMessage(txt) => TextMessage(s"to websocket $text") - } - - val chatActorSink = Sink.actorRef[ChatEvent](chatRoomActor, UserLeft(user)) - - } - */ -} - -/* -object EchoService { - - val flow: Flow[Message, Message, _] = Flow[Message].map { - case TextMessage.Strict(txt) => TextMessage("ECHO: " + txt) - case _ => TextMessage("Message type unsupported") - } - -} - */ - diff --git a/mavigator-server/src/main/scala/mavigator/settings.scala b/mavigator-server/src/main/scala/mavigator/settings.scala deleted file mode 100644 index 9089c2e..0000000 --- a/mavigator-server/src/main/scala/mavigator/settings.scala +++ /dev/null @@ -1,74 +0,0 @@ -package mavigator - -import akka.actor.ActorSystem -import akka.actor.Extension -import akka.actor.ExtensionId -import akka.actor.ExtensionIdProvider -import akka.actor.ExtendedActorSystem - -import scala.concurrent.duration.Duration -import com.typesafe.config.Config -import java.util.concurrent.TimeUnit - -import akka.actor.ActorRef -import akka.actor.Props -import mavigator.uav.MockConnection -import mavigator.uav.SerialConnection - -class MavigatorImpl(system: ExtendedActorSystem) extends Extension { - - private val config = system.settings.config.getConfig("mavigator") - - val interface: String = config.getString("interface") - - val port: Int = config.getInt("port") - - /** Mavlink system ID identifying the base station */ - val systemId: Byte = config.getInt("system_id").toByte - - val tpe = config.getString("connection.type") - - /** Actor representing a connection channel to UAVs. This actor - * implements the protocol defined in [mavigator.uav.Connection] */ - val uav: ActorRef = { - val config = this.config.getConfig("connection") - val tpe = config.getString("type") - val heartbeat = config.getInt("heartbeat") - val compId = config.getString("component_id").toByte - - val props = tpe match { - case "mock" => - val remote = config.getInt("mock.remote_system_id").toByte - val prescaler = config.getInt("mock.prescaler") - MockConnection(systemId, compId, remote, prescaler) - - case "serial" => - val serial = config.getConfig("serial") - SerialConnection( - systemId, - compId, - heartbeat, - serial.getString("port"), - serial.getInt("baud"), - serial.getBoolean("two_stop_bits"), - serial.getInt("parity") - ) - - case unknown => throw new IllegalArgumentException("Unsupported connection type '" + unknown + "'") - - } - - system.actorOf(props, name = "uav-connection") - } - -} - -object Mavigator extends ExtensionId[MavigatorImpl] with ExtensionIdProvider { - - override def lookup = Mavigator - - override def createExtension(system: ExtendedActorSystem) = - new MavigatorImpl(system) - -} - diff --git a/mavigator-server/src/main/twirl/mavigator/views/main.scala.html b/mavigator-server/src/main/twirl/mavigator/views/main.scala.html index 10a48dd..9631c4d 100644 --- a/mavigator-server/src/main/twirl/mavigator/views/main.scala.html +++ b/mavigator-server/src/main/twirl/mavigator/views/main.scala.html @@ -16,11 +16,11 @@ - + @content - + diff --git a/mavigator-uav/build.sbt b/mavigator-uav/build.sbt index 13a8b5a..abe935b 100644 --- a/mavigator-uav/build.sbt +++ b/mavigator-uav/build.sbt @@ -4,6 +4,8 @@ MavigatorBuild.defaultSettings libraryDependencies ++= Seq( Dependencies.akkaActor, + Dependencies.akkaStream, Dependencies.flow, - Dependencies.flowNative + Dependencies.flowNative, + Dependencies.reactiveStreams ) diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala b/mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala new file mode 100644 index 0000000..5e48ea1 --- /dev/null +++ b/mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala @@ -0,0 +1,20 @@ +package mavigator +package uav + +import akka.stream.scaladsl._ +import akka.stream._ +import org.reactivestreams._ + +private[uav] class Multiplexer[In, Out](service: Flow[In, Out, _])(implicit materializer: Materializer) { + + private val endpoint: Flow[Out, In, (Publisher[Out], Subscriber[In])] = Flow.fromSinkAndSourceMat( + Sink.asPublisher[Out](fanout = true), + Source.asSubscriber[In])((pub, sub) => (pub, sub)) + + private lazy val (publisher, subscriber) = (service.joinMat(endpoint)(Keep.right)).run() + + def connect(client: Flow[Out, In, _]) = { + Source.fromPublisher(publisher).via(client).to(Sink.ignore).run() + } + +} diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala b/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala new file mode 100644 index 0000000..a1ef333 --- /dev/null +++ b/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala @@ -0,0 +1,31 @@ +package mavigator +package uav + +import akka._ +import akka.actor._ +import akka.util._ +import akka.stream.scaladsl._ + +class Uav(system: ExtendedActorSystem) extends Extension { + + private lazy val config = system.settings.config.getConfig("mavigator.uav") + + def connect(): Flow[ByteString, ByteString, NotUsed] = { + val t = scala.concurrent.duration.FiniteDuration(100, "ms") + Flow.fromSinkAndSource( + Sink.ignore, + Source.tick(t,t, ByteString("hello world")) + ) + } + +} + +object Uav extends ExtensionId[Uav] with ExtensionIdProvider { + + override def lookup = Uav + + override def createExtension(system: ExtendedActorSystem) = new Uav(system) + + def apply()(implicit system: ActorSystem) = super.apply(system) + +} diff --git a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala new file mode 100644 index 0000000..5949a3a --- /dev/null +++ b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala @@ -0,0 +1,77 @@ +package mavigator.uav + +import java.util.concurrent.TimeUnit.MILLISECONDS +import scala.concurrent.duration.FiniteDuration +import scala.util.Random +import org.mavlink.Packet +import org.mavlink.enums.MavAutopilot +import org.mavlink.enums.MavModeFlag +import org.mavlink.enums.MavState +import org.mavlink.enums.MavType +import org.mavlink.messages.Heartbeat +import Connection.Received +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.Props +import akka.util.ByteString +import scala.concurrent.duration._ +import org.mavlink.messages.Message +import mock.RandomFlightPlan + +class MockConnection( + localSystemId: Byte, + localComponentId: Byte, + remoteSystemId: Byte, + prescaler: Int +) + extends Actor with ActorLogging with Connection with MavlinkUtil { + + import context._ + + override val systemId = remoteSystemId + override val componentId = remoteSystemId + + val plan = new RandomFlightPlan + + def scheduleMessage(delay: FiniteDuration)(fct: => Message) = system.scheduler.schedule(delay, delay) { + sendAll(Received(assemble(fct))) + } + def scheduleBytes(delay: FiniteDuration)(fct: => Array[Byte]) = system.scheduler.schedule(delay, delay) { + sendAll(Received(ByteString(fct))) + } + + override def preStart() = { + //increment state + system.scheduler.schedule(0.01.seconds * prescaler, 0.01.seconds * prescaler) { plan.tick(0.01) } + + //send messages + scheduleMessage(0.1.seconds * prescaler)(plan.position) + scheduleMessage(0.05.seconds * prescaler)(plan.attitude) + scheduleMessage(0.05.seconds * prescaler)(plan.motors) + scheduleMessage(0.1.seconds * prescaler)(plan.distance) + scheduleMessage(1.seconds)(plan.heartbeat) + + //simulate noisy line + scheduleBytes(0.3.seconds * prescaler)(MockPackets.invalidCrc) + scheduleBytes(1.5.seconds * prescaler)(MockPackets.invalidOverflow) + } + + override def receive = handleRegistration + +} + +object MockConnection { + def apply(systemId: Byte, componentId: Byte, remoteSystemId: Byte, prescaler: Int = 1) = + Props(classOf[MockConnection], systemId, componentId, remoteSystemId, prescaler) +} + +object MockPackets { + val invalidCrc = Array(254, 1, 123, 13, 13).map(_.toByte) + val invalidOverflow = { + val data = Array.fill[Byte](Packet.MaxPayloadLength + 100)(42) + data(0) = -2 + data(1) = 2 + data(1) = -1 + data + } +} diff --git a/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala b/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala index 1e5431e..4a6520f 100644 --- a/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala +++ b/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala @@ -9,7 +9,7 @@ import org.mavlink.messages._ class RandomFlightPlan { - private var time: Double = 0 + private var time: Double = 0 //current time in seconds private def millis = (time * 1000).toInt private def micros = (time * 1E6).toInt diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 1be5b70..15474c7 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -9,9 +9,11 @@ object Dependencies { val AkkaVersion = "2.4.2-RC1" val akkaActor = "com.typesafe.akka" %% "akka-actor" % AkkaVersion - val akkaHttp = "com.typesafe.akka" %% "akka-http" % AkkaVersion + val akkaHttp = "com.typesafe.akka" %% "akka-http-experimental" % AkkaVersion val akkaHttpCore = "com.typesafe.akka" %% "akka-http-core" % AkkaVersion - val akkaStreams = "com.typesafe.akka" %% "akka-stream" % AkkaVersion + val akkaStream = "com.typesafe.akka" %% "akka-stream" % AkkaVersion + + val reactiveStreams = "org.reactivestreams" % "reactive-streams" % "1.0.0" val flow = "com.github.jodersky" %% "flow" % "2.4.0" val flowNative = "com.github.jodersky" % "flow-native" % "2.4.0" % Runtime diff --git a/project/MavigatorBuild.scala b/project/MavigatorBuild.scala index 5bc999c..1a3713c 100644 --- a/project/MavigatorBuild.scala +++ b/project/MavigatorBuild.scala @@ -16,7 +16,7 @@ object MavigatorBuild extends Build { lazy val root = Project( id = "root", base = file("."), - aggregate = Seq(bindings, uav) + aggregate = Seq(bindings, uav, server) ) // empty project that uses SbtMavlink to generate protocol bindings -- cgit v1.2.3