aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2016-02-01 22:46:48 -0800
committerJakob Odersky <jakob@odersky.com>2016-02-01 22:46:48 -0800
commit50bc6c80aa201c0543db07a62bc7c726b813c168 (patch)
tree0ed1c753f9d9fe29d1e51a0d03172e5659c4c339
parent4f4c799a6d9ccf333a3e609a2464e2f317875af7 (diff)
downloadmavigator-50bc6c80aa201c0543db07a62bc7c726b813c168.tar.gz
mavigator-50bc6c80aa201c0543db07a62bc7c726b813c168.tar.bz2
mavigator-50bc6c80aa201c0543db07a62bc7c726b813c168.zip
Use Akka streams and temporarily disable frontend
-rw-r--r--mavigator-dashboard/src/main/scala/mavigator/util/Application.scala26
-rw-r--r--mavigator-dashboard/src/main/scala/mavigator/util/environment.scala24
-rw-r--r--mavigator-server/build.sbt2
-rw-r--r--mavigator-server/src/main/scala/mavigator/Main.scala7
-rw-r--r--mavigator-server/src/main/scala/mavigator/MavlinkWebsocket.scala76
-rw-r--r--mavigator-server/src/main/scala/mavigator/Router.scala63
-rw-r--r--mavigator-server/src/main/scala/mavigator/settings.scala74
-rw-r--r--mavigator-server/src/main/twirl/mavigator/views/main.scala.html4
-rw-r--r--mavigator-uav/build.sbt4
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala20
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/Uav.scala31
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala77
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala2
-rw-r--r--project/Dependencies.scala6
-rw-r--r--project/MavigatorBuild.scala2
15 files changed, 211 insertions, 207 deletions
diff --git a/mavigator-dashboard/src/main/scala/mavigator/util/Application.scala b/mavigator-dashboard/src/main/scala/mavigator/util/Application.scala
new file mode 100644
index 0000000..9a57e43
--- /dev/null
+++ b/mavigator-dashboard/src/main/scala/mavigator/util/Application.scala
@@ -0,0 +1,26 @@
+package mavigator
+package util
+
+import scala.scalajs.js.annotation.JSExport
+import scala.scalajs.js
+
+import org.scalajs.dom.html
+
+trait Application {
+
+ def main(env: Environment, args: Map[String, String]): Unit
+
+ @JSExport
+ final def _start(settings: js.Dynamic): Unit = {
+
+ val env = new StaticEnvironment(
+ root = settings.root.asInstanceOf[html.Element],
+ assetsBase = settings.root.asInstanceOf[String]
+ )
+
+ val args = settings.args.asInstanceOf[Map[String, String]]
+
+ main(env, args)
+ }
+
+}
diff --git a/mavigator-dashboard/src/main/scala/mavigator/util/environment.scala b/mavigator-dashboard/src/main/scala/mavigator/util/environment.scala
new file mode 100644
index 0000000..fad1bf3
--- /dev/null
+++ b/mavigator-dashboard/src/main/scala/mavigator/util/environment.scala
@@ -0,0 +1,24 @@
+package mavigator
+package util
+
+import org.scalajs.dom.html
+
+/** Represents an application's environment */
+trait Environment {
+
+ /** The application's root element. */
+ def root: html.Element
+
+ /** Retrieve an asset's URL based on its file location. */
+ def asset(file: String): String
+
+}
+
+class StaticEnvironment(
+ override val root: html.Element,
+ assetsBase: String
+) extends Environment {
+
+ override def asset(file: String): String = assetsBase + "/" + file
+
+}
diff --git a/mavigator-server/build.sbt b/mavigator-server/build.sbt
index 207fad6..f98d369 100644
--- a/mavigator-server/build.sbt
+++ b/mavigator-server/build.sbt
@@ -7,5 +7,5 @@ MavigatorBuild.defaultSettings
libraryDependencies ++= Seq(
Dependencies.akkaHttp,
Dependencies.akkaHttpCore,
- Dependencies.akkaStreams
+ Dependencies.akkaStream
)
diff --git a/mavigator-server/src/main/scala/mavigator/Main.scala b/mavigator-server/src/main/scala/mavigator/Main.scala
index ca81fa9..6ea894e 100644
--- a/mavigator-server/src/main/scala/mavigator/Main.scala
+++ b/mavigator-server/src/main/scala/mavigator/Main.scala
@@ -17,11 +17,10 @@ object Main {
system.log.info("System started.")
- val router = (new Router(system)).route
- val settings = Mavigator(system)
+ val router = Router.route
- system.log.info(s"Starting server on ${settings.interface}:${settings.port}...")
- val binding = Http(system).bindAndHandle(router, settings.interface, settings.port)
+ system.log.info(s"Starting server")
+ val binding = Http(system).bindAndHandle(router, "0.0.0.0", 8080)
for (b <- binding) {
val addr = b.localAddress.getHostString()
diff --git a/mavigator-server/src/main/scala/mavigator/MavlinkWebsocket.scala b/mavigator-server/src/main/scala/mavigator/MavlinkWebsocket.scala
deleted file mode 100644
index 9cbfa64..0000000
--- a/mavigator-server/src/main/scala/mavigator/MavlinkWebsocket.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-package mavigator
-
-import akka.actor.Terminated
-import akka.actor._
-import akka.http.scaladsl._
-import akka.http.scaladsl.model.ws.Message
-import akka.http.scaladsl.model.ws.TextMessage
-import akka.http.scaladsl.server._
-import akka.stream.OverflowStrategy
-import akka.stream._
-import akka.stream.scaladsl.Flow
-import akka.stream.scaladsl.GraphDSL
-import akka.stream.scaladsl.Sink
-import akka.stream.scaladsl.Source
-import akka.util.ByteString
-import mavigator.uav.Connection
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-
-/**
- * Adapted from https://github.com/jrudolph/akka-http-scala-js-websocket-chat
- */
-class MavlinkWebsocket(system: ActorSystem) {
-
- /*
- GraphDSL.create(Source.actorRef[Connection.Event](1, OverflowStrategy.fail)) {implicit builder =>
- import GraphDSL.Implicits._
-
- //source: SourceShape[Connection.Event] =>
- source =>
-
- val inSink = builder.materializedValue.map { client: ActorRef =>
- Sink.actorRef[Connection.Command](
- Mavigator(system).uav,
- Connection.Unregister(client)
- )
- }
-
- (inSink., source.out)
-
- //FlowShape(inSink.in, source.out)
- //???
- }
- */
-
- /** Sink that forwards incomming (from the browser) messages to the uav. */
- private val inSink = Sink.actorRef[Connection.Command](
- Mavigator(system).uav,
- Connection.Send(ByteString("goodbye")) //unregister
- )
-
- /** Source that emmits messages comming from the uav. */
- private val outSource = Source.actorRef[Connection.Event](
- bufferSize = 1,
- overflowStrategy = OverflowStrategy.fail
- ) mapMaterializedValue { client => // a client is spawned for every outSource materialization
- Mavigator(system).uav.tell(Connection.Register, client)
- }
-
- private val flow: Flow[Connection.Command, Connection.Event, _] = Flow.fromSinkAndSource(inSink, outSource)
-
- @deprecated("WIP", "0.0")
- val wsflow = Flow[Message].collect{
- case TextMessage.Strict(msg) => Connection.Send(ByteString(msg))
- // unpack incoming WS text messages...
- // This will lose (ignore) messages not received in one chunk (which is
- // unlikely because chat messages are small) but absolutely possible
- // FIXME: We need to handle TextMessage.Streamed as well.
- }.via(flow).map {
- case msg: Connection.Event =>
- TextMessage.Strict(msg.toString) // ... pack outgoing messages into WS JSON messages ...
- }
-
-}
-
diff --git a/mavigator-server/src/main/scala/mavigator/Router.scala b/mavigator-server/src/main/scala/mavigator/Router.scala
index bd54422..b74e3a6 100644
--- a/mavigator-server/src/main/scala/mavigator/Router.scala
+++ b/mavigator-server/src/main/scala/mavigator/Router.scala
@@ -5,14 +5,16 @@ import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl._
import akka.http.scaladsl.model._
+import akka.http.scaladsl.model.ws._
import akka.http.scaladsl.server._
+import uav.Uav
+import akka.util._
-
-class Router(system: ActorSystem) {
+object Router {
import Directives._
- val route: Route = (
+ def route(implicit system: ActorSystem): Route = (
path("info") {
get {
val f: play.twirl.api.Html = mavigator.views.html.index()
@@ -31,7 +33,19 @@ class Router(system: ActorSystem) {
//upgrade.handleMessagesWith(inSink: Sink[Message, _$3], outSource: Source[Message, _$4])
???
}*/
- handleWebsocketMessages((new MavlinkWebsocket(system)).wsflow)
+
+ val fromWebSocket = Flow[Message].collect{
+ case BinaryMessage.Strict(data) => data
+ }
+
+ val toWebSocket = Flow[ByteString].map{bytes =>
+ //BinaryMessage(bytes)
+ TextMessage(bytes.decodeString("UTF-8"))
+ }
+
+ val bytes = Uav().connect()
+
+ handleWebSocketMessages(fromWebSocket via bytes via toWebSocket)
}
} ~
pathPrefix("assets") {
@@ -49,44 +63,3 @@ class Router(system: ActorSystem) {
)
}
-
-import akka.http.scaladsl.model.ws._
-import akka.stream.scaladsl._
-
-object SocketService {
-
- /*
- val out: Source[OutgoingMessage, ActorRef] = Source.actorRef[OutgoingMessage](0, OverflowStrategy.fail)
- val in = Sink.actorRef(ref: ActorRef, onCompleteMessage: Any)
- */
-
-
- /*
- Flow[Message, Message, _] { implicit builder =>
-
- val in: Flow[Message, IncomingMessage, _] = Flow[Message].map {
- case TextMessage.Strict(txt) => IncomingMessage(s"frpm websocket $txt")
- case _ => ???
- }
-
- val out: Flow[OutgoingMessage, Message, _] = Flow[OutgoingMessage].map {
- case OutgoingMessage(txt) => TextMessage(s"to websocket $text")
- }
-
- val chatActorSink = Sink.actorRef[ChatEvent](chatRoomActor, UserLeft(user))
-
- }
- */
-}
-
-/*
-object EchoService {
-
- val flow: Flow[Message, Message, _] = Flow[Message].map {
- case TextMessage.Strict(txt) => TextMessage("ECHO: " + txt)
- case _ => TextMessage("Message type unsupported")
- }
-
-}
- */
-
diff --git a/mavigator-server/src/main/scala/mavigator/settings.scala b/mavigator-server/src/main/scala/mavigator/settings.scala
deleted file mode 100644
index 9089c2e..0000000
--- a/mavigator-server/src/main/scala/mavigator/settings.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-package mavigator
-
-import akka.actor.ActorSystem
-import akka.actor.Extension
-import akka.actor.ExtensionId
-import akka.actor.ExtensionIdProvider
-import akka.actor.ExtendedActorSystem
-
-import scala.concurrent.duration.Duration
-import com.typesafe.config.Config
-import java.util.concurrent.TimeUnit
-
-import akka.actor.ActorRef
-import akka.actor.Props
-import mavigator.uav.MockConnection
-import mavigator.uav.SerialConnection
-
-class MavigatorImpl(system: ExtendedActorSystem) extends Extension {
-
- private val config = system.settings.config.getConfig("mavigator")
-
- val interface: String = config.getString("interface")
-
- val port: Int = config.getInt("port")
-
- /** Mavlink system ID identifying the base station */
- val systemId: Byte = config.getInt("system_id").toByte
-
- val tpe = config.getString("connection.type")
-
- /** Actor representing a connection channel to UAVs. This actor
- * implements the protocol defined in [mavigator.uav.Connection] */
- val uav: ActorRef = {
- val config = this.config.getConfig("connection")
- val tpe = config.getString("type")
- val heartbeat = config.getInt("heartbeat")
- val compId = config.getString("component_id").toByte
-
- val props = tpe match {
- case "mock" =>
- val remote = config.getInt("mock.remote_system_id").toByte
- val prescaler = config.getInt("mock.prescaler")
- MockConnection(systemId, compId, remote, prescaler)
-
- case "serial" =>
- val serial = config.getConfig("serial")
- SerialConnection(
- systemId,
- compId,
- heartbeat,
- serial.getString("port"),
- serial.getInt("baud"),
- serial.getBoolean("two_stop_bits"),
- serial.getInt("parity")
- )
-
- case unknown => throw new IllegalArgumentException("Unsupported connection type '" + unknown + "'")
-
- }
-
- system.actorOf(props, name = "uav-connection")
- }
-
-}
-
-object Mavigator extends ExtensionId[MavigatorImpl] with ExtensionIdProvider {
-
- override def lookup = Mavigator
-
- override def createExtension(system: ExtendedActorSystem) =
- new MavigatorImpl(system)
-
-}
-
diff --git a/mavigator-server/src/main/twirl/mavigator/views/main.scala.html b/mavigator-server/src/main/twirl/mavigator/views/main.scala.html
index 10a48dd..9631c4d 100644
--- a/mavigator-server/src/main/twirl/mavigator/views/main.scala.html
+++ b/mavigator-server/src/main/twirl/mavigator/views/main.scala.html
@@ -16,11 +16,11 @@
<link rel="stylesheet" media="screen" href="/assets/stylesheets/main.css">
</head>
<body>
-
+
@content
<script type="text/javascript" src="/assets/lib/jquery/jquery.js"></script>
<script type="text/javascript" src="/assets/lib/bootstrap/js/bootstrap.min.js"></script>
-
+
</body>
</html>
diff --git a/mavigator-uav/build.sbt b/mavigator-uav/build.sbt
index 13a8b5a..abe935b 100644
--- a/mavigator-uav/build.sbt
+++ b/mavigator-uav/build.sbt
@@ -4,6 +4,8 @@ MavigatorBuild.defaultSettings
libraryDependencies ++= Seq(
Dependencies.akkaActor,
+ Dependencies.akkaStream,
Dependencies.flow,
- Dependencies.flowNative
+ Dependencies.flowNative,
+ Dependencies.reactiveStreams
)
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala b/mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala
new file mode 100644
index 0000000..5e48ea1
--- /dev/null
+++ b/mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala
@@ -0,0 +1,20 @@
+package mavigator
+package uav
+
+import akka.stream.scaladsl._
+import akka.stream._
+import org.reactivestreams._
+
+private[uav] class Multiplexer[In, Out](service: Flow[In, Out, _])(implicit materializer: Materializer) {
+
+ private val endpoint: Flow[Out, In, (Publisher[Out], Subscriber[In])] = Flow.fromSinkAndSourceMat(
+ Sink.asPublisher[Out](fanout = true),
+ Source.asSubscriber[In])((pub, sub) => (pub, sub))
+
+ private lazy val (publisher, subscriber) = (service.joinMat(endpoint)(Keep.right)).run()
+
+ def connect(client: Flow[Out, In, _]) = {
+ Source.fromPublisher(publisher).via(client).to(Sink.ignore).run()
+ }
+
+}
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala b/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala
new file mode 100644
index 0000000..a1ef333
--- /dev/null
+++ b/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala
@@ -0,0 +1,31 @@
+package mavigator
+package uav
+
+import akka._
+import akka.actor._
+import akka.util._
+import akka.stream.scaladsl._
+
+class Uav(system: ExtendedActorSystem) extends Extension {
+
+ private lazy val config = system.settings.config.getConfig("mavigator.uav")
+
+ def connect(): Flow[ByteString, ByteString, NotUsed] = {
+ val t = scala.concurrent.duration.FiniteDuration(100, "ms")
+ Flow.fromSinkAndSource(
+ Sink.ignore,
+ Source.tick(t,t, ByteString("hello world"))
+ )
+ }
+
+}
+
+object Uav extends ExtensionId[Uav] with ExtensionIdProvider {
+
+ override def lookup = Uav
+
+ override def createExtension(system: ExtendedActorSystem) = new Uav(system)
+
+ def apply()(implicit system: ActorSystem) = super.apply(system)
+
+}
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala
new file mode 100644
index 0000000..5949a3a
--- /dev/null
+++ b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala
@@ -0,0 +1,77 @@
+package mavigator.uav
+
+import java.util.concurrent.TimeUnit.MILLISECONDS
+import scala.concurrent.duration.FiniteDuration
+import scala.util.Random
+import org.mavlink.Packet
+import org.mavlink.enums.MavAutopilot
+import org.mavlink.enums.MavModeFlag
+import org.mavlink.enums.MavState
+import org.mavlink.enums.MavType
+import org.mavlink.messages.Heartbeat
+import Connection.Received
+import akka.actor.Actor
+import akka.actor.ActorLogging
+import akka.actor.Props
+import akka.util.ByteString
+import scala.concurrent.duration._
+import org.mavlink.messages.Message
+import mock.RandomFlightPlan
+
+class MockConnection(
+ localSystemId: Byte,
+ localComponentId: Byte,
+ remoteSystemId: Byte,
+ prescaler: Int
+)
+ extends Actor with ActorLogging with Connection with MavlinkUtil {
+
+ import context._
+
+ override val systemId = remoteSystemId
+ override val componentId = remoteSystemId
+
+ val plan = new RandomFlightPlan
+
+ def scheduleMessage(delay: FiniteDuration)(fct: => Message) = system.scheduler.schedule(delay, delay) {
+ sendAll(Received(assemble(fct)))
+ }
+ def scheduleBytes(delay: FiniteDuration)(fct: => Array[Byte]) = system.scheduler.schedule(delay, delay) {
+ sendAll(Received(ByteString(fct)))
+ }
+
+ override def preStart() = {
+ //increment state
+ system.scheduler.schedule(0.01.seconds * prescaler, 0.01.seconds * prescaler) { plan.tick(0.01) }
+
+ //send messages
+ scheduleMessage(0.1.seconds * prescaler)(plan.position)
+ scheduleMessage(0.05.seconds * prescaler)(plan.attitude)
+ scheduleMessage(0.05.seconds * prescaler)(plan.motors)
+ scheduleMessage(0.1.seconds * prescaler)(plan.distance)
+ scheduleMessage(1.seconds)(plan.heartbeat)
+
+ //simulate noisy line
+ scheduleBytes(0.3.seconds * prescaler)(MockPackets.invalidCrc)
+ scheduleBytes(1.5.seconds * prescaler)(MockPackets.invalidOverflow)
+ }
+
+ override def receive = handleRegistration
+
+}
+
+object MockConnection {
+ def apply(systemId: Byte, componentId: Byte, remoteSystemId: Byte, prescaler: Int = 1) =
+ Props(classOf[MockConnection], systemId, componentId, remoteSystemId, prescaler)
+}
+
+object MockPackets {
+ val invalidCrc = Array(254, 1, 123, 13, 13).map(_.toByte)
+ val invalidOverflow = {
+ val data = Array.fill[Byte](Packet.MaxPayloadLength + 100)(42)
+ data(0) = -2
+ data(1) = 2
+ data(1) = -1
+ data
+ }
+}
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala b/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala
index 1e5431e..4a6520f 100644
--- a/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala
+++ b/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala
@@ -9,7 +9,7 @@ import org.mavlink.messages._
class RandomFlightPlan {
- private var time: Double = 0
+ private var time: Double = 0 //current time in seconds
private def millis = (time * 1000).toInt
private def micros = (time * 1E6).toInt
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 1be5b70..15474c7 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -9,9 +9,11 @@ object Dependencies {
val AkkaVersion = "2.4.2-RC1"
val akkaActor = "com.typesafe.akka" %% "akka-actor" % AkkaVersion
- val akkaHttp = "com.typesafe.akka" %% "akka-http" % AkkaVersion
+ val akkaHttp = "com.typesafe.akka" %% "akka-http-experimental" % AkkaVersion
val akkaHttpCore = "com.typesafe.akka" %% "akka-http-core" % AkkaVersion
- val akkaStreams = "com.typesafe.akka" %% "akka-stream" % AkkaVersion
+ val akkaStream = "com.typesafe.akka" %% "akka-stream" % AkkaVersion
+
+ val reactiveStreams = "org.reactivestreams" % "reactive-streams" % "1.0.0"
val flow = "com.github.jodersky" %% "flow" % "2.4.0"
val flowNative = "com.github.jodersky" % "flow-native" % "2.4.0" % Runtime
diff --git a/project/MavigatorBuild.scala b/project/MavigatorBuild.scala
index 5bc999c..1a3713c 100644
--- a/project/MavigatorBuild.scala
+++ b/project/MavigatorBuild.scala
@@ -16,7 +16,7 @@ object MavigatorBuild extends Build {
lazy val root = Project(
id = "root",
base = file("."),
- aggregate = Seq(bindings, uav)
+ aggregate = Seq(bindings, uav, server)
)
// empty project that uses SbtMavlink to generate protocol bindings