aboutsummaryrefslogtreecommitdiff
path: root/mavigator-server/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'mavigator-server/src/main/scala')
-rw-r--r--mavigator-server/src/main/scala/mavigator/Main.scala45
-rw-r--r--mavigator-server/src/main/scala/mavigator/MavlinkWebsocket.scala76
-rw-r--r--mavigator-server/src/main/scala/mavigator/Router.scala92
-rw-r--r--mavigator-server/src/main/scala/mavigator/settings.scala74
4 files changed, 287 insertions, 0 deletions
diff --git a/mavigator-server/src/main/scala/mavigator/Main.scala b/mavigator-server/src/main/scala/mavigator/Main.scala
new file mode 100644
index 0000000..ca81fa9
--- /dev/null
+++ b/mavigator-server/src/main/scala/mavigator/Main.scala
@@ -0,0 +1,45 @@
+package mavigator
+
+import akka.actor._
+import akka.http.scaladsl._
+import akka.http.scaladsl.server._
+import akka.stream._
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+object Main {
+
+ implicit lazy val system = ActorSystem("mavigator")
+ implicit lazy val materializer = ActorMaterializer()
+
+ def main(args: Array[String]): Unit = {
+ import system.dispatcher
+
+ system.log.info("System started.")
+
+ val router = (new Router(system)).route
+ val settings = Mavigator(system)
+
+ system.log.info(s"Starting server on ${settings.interface}:${settings.port}...")
+ val binding = Http(system).bindAndHandle(router, settings.interface, settings.port)
+
+ for (b <- binding) {
+ val addr = b.localAddress.getHostString()
+ val port = b.localAddress.getPort()
+ system.log.info(s"Server is listening on $addr:$port")
+ }
+
+ scala.io.StdIn.readLine()
+
+ binding.flatMap{b =>
+ system.log.info("Shutting down server...")
+ b.unbind()
+ }.onComplete{ _ =>
+ system.log.info("Server shut down")
+ system.terminate()
+ }
+
+ Await.result(system.whenTerminated, Duration.Inf)
+
+ }
+}
diff --git a/mavigator-server/src/main/scala/mavigator/MavlinkWebsocket.scala b/mavigator-server/src/main/scala/mavigator/MavlinkWebsocket.scala
new file mode 100644
index 0000000..9cbfa64
--- /dev/null
+++ b/mavigator-server/src/main/scala/mavigator/MavlinkWebsocket.scala
@@ -0,0 +1,76 @@
+package mavigator
+
+import akka.actor.Terminated
+import akka.actor._
+import akka.http.scaladsl._
+import akka.http.scaladsl.model.ws.Message
+import akka.http.scaladsl.model.ws.TextMessage
+import akka.http.scaladsl.server._
+import akka.stream.OverflowStrategy
+import akka.stream._
+import akka.stream.scaladsl.Flow
+import akka.stream.scaladsl.GraphDSL
+import akka.stream.scaladsl.Sink
+import akka.stream.scaladsl.Source
+import akka.util.ByteString
+import mavigator.uav.Connection
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+
+/**
+ * Adapted from https://github.com/jrudolph/akka-http-scala-js-websocket-chat
+ */
+class MavlinkWebsocket(system: ActorSystem) {
+
+ /*
+ GraphDSL.create(Source.actorRef[Connection.Event](1, OverflowStrategy.fail)) {implicit builder =>
+ import GraphDSL.Implicits._
+
+ //source: SourceShape[Connection.Event] =>
+ source =>
+
+ val inSink = builder.materializedValue.map { client: ActorRef =>
+ Sink.actorRef[Connection.Command](
+ Mavigator(system).uav,
+ Connection.Unregister(client)
+ )
+ }
+
+ (inSink., source.out)
+
+ //FlowShape(inSink.in, source.out)
+ //???
+ }
+ */
+
+ /** Sink that forwards incomming (from the browser) messages to the uav. */
+ private val inSink = Sink.actorRef[Connection.Command](
+ Mavigator(system).uav,
+ Connection.Send(ByteString("goodbye")) //unregister
+ )
+
+ /** Source that emmits messages comming from the uav. */
+ private val outSource = Source.actorRef[Connection.Event](
+ bufferSize = 1,
+ overflowStrategy = OverflowStrategy.fail
+ ) mapMaterializedValue { client => // a client is spawned for every outSource materialization
+ Mavigator(system).uav.tell(Connection.Register, client)
+ }
+
+ private val flow: Flow[Connection.Command, Connection.Event, _] = Flow.fromSinkAndSource(inSink, outSource)
+
+ @deprecated("WIP", "0.0")
+ val wsflow = Flow[Message].collect{
+ case TextMessage.Strict(msg) => Connection.Send(ByteString(msg))
+ // unpack incoming WS text messages...
+ // This will lose (ignore) messages not received in one chunk (which is
+ // unlikely because chat messages are small) but absolutely possible
+ // FIXME: We need to handle TextMessage.Streamed as well.
+ }.via(flow).map {
+ case msg: Connection.Event =>
+ TextMessage.Strict(msg.toString) // ... pack outgoing messages into WS JSON messages ...
+ }
+
+}
+
diff --git a/mavigator-server/src/main/scala/mavigator/Router.scala b/mavigator-server/src/main/scala/mavigator/Router.scala
new file mode 100644
index 0000000..bd54422
--- /dev/null
+++ b/mavigator-server/src/main/scala/mavigator/Router.scala
@@ -0,0 +1,92 @@
+package mavigator
+
+import akka.actor._
+import akka.stream._
+import akka.stream.scaladsl._
+import akka.http.scaladsl._
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.server._
+
+
+class Router(system: ActorSystem) {
+
+ import Directives._
+
+ val route: Route = (
+ path("info") {
+ get {
+ val f: play.twirl.api.Html = mavigator.views.html.index()
+ complete(f.body)
+ }
+ } ~
+ path("dashboard" / IntNumber) { id =>
+ get {
+ //get dashboard for remote sys id
+ ???
+ }
+ } ~
+ path("mavlink") {
+ get {
+ /*extractUpgradeToWebsocket{ upgrade =>
+ //upgrade.handleMessagesWith(inSink: Sink[Message, _$3], outSource: Source[Message, _$4])
+ ???
+ }*/
+ handleWebsocketMessages((new MavlinkWebsocket(system)).wsflow)
+ }
+ } ~
+ pathPrefix("assets") {
+ get {
+ encodeResponse {
+ getFromResourceDirectory("assets")
+ }
+ }
+ } ~
+ pathEndOrSingleSlash {
+ get {
+ complete("hello world")
+ }
+ }
+ )
+
+}
+
+import akka.http.scaladsl.model.ws._
+import akka.stream.scaladsl._
+
+object SocketService {
+
+ /*
+ val out: Source[OutgoingMessage, ActorRef] = Source.actorRef[OutgoingMessage](0, OverflowStrategy.fail)
+ val in = Sink.actorRef(ref: ActorRef, onCompleteMessage: Any)
+ */
+
+
+ /*
+ Flow[Message, Message, _] { implicit builder =>
+
+ val in: Flow[Message, IncomingMessage, _] = Flow[Message].map {
+ case TextMessage.Strict(txt) => IncomingMessage(s"frpm websocket $txt")
+ case _ => ???
+ }
+
+ val out: Flow[OutgoingMessage, Message, _] = Flow[OutgoingMessage].map {
+ case OutgoingMessage(txt) => TextMessage(s"to websocket $text")
+ }
+
+ val chatActorSink = Sink.actorRef[ChatEvent](chatRoomActor, UserLeft(user))
+
+ }
+ */
+}
+
+/*
+object EchoService {
+
+ val flow: Flow[Message, Message, _] = Flow[Message].map {
+ case TextMessage.Strict(txt) => TextMessage("ECHO: " + txt)
+ case _ => TextMessage("Message type unsupported")
+ }
+
+}
+ */
+
diff --git a/mavigator-server/src/main/scala/mavigator/settings.scala b/mavigator-server/src/main/scala/mavigator/settings.scala
new file mode 100644
index 0000000..9089c2e
--- /dev/null
+++ b/mavigator-server/src/main/scala/mavigator/settings.scala
@@ -0,0 +1,74 @@
+package mavigator
+
+import akka.actor.ActorSystem
+import akka.actor.Extension
+import akka.actor.ExtensionId
+import akka.actor.ExtensionIdProvider
+import akka.actor.ExtendedActorSystem
+
+import scala.concurrent.duration.Duration
+import com.typesafe.config.Config
+import java.util.concurrent.TimeUnit
+
+import akka.actor.ActorRef
+import akka.actor.Props
+import mavigator.uav.MockConnection
+import mavigator.uav.SerialConnection
+
+class MavigatorImpl(system: ExtendedActorSystem) extends Extension {
+
+ private val config = system.settings.config.getConfig("mavigator")
+
+ val interface: String = config.getString("interface")
+
+ val port: Int = config.getInt("port")
+
+ /** Mavlink system ID identifying the base station */
+ val systemId: Byte = config.getInt("system_id").toByte
+
+ val tpe = config.getString("connection.type")
+
+ /** Actor representing a connection channel to UAVs. This actor
+ * implements the protocol defined in [mavigator.uav.Connection] */
+ val uav: ActorRef = {
+ val config = this.config.getConfig("connection")
+ val tpe = config.getString("type")
+ val heartbeat = config.getInt("heartbeat")
+ val compId = config.getString("component_id").toByte
+
+ val props = tpe match {
+ case "mock" =>
+ val remote = config.getInt("mock.remote_system_id").toByte
+ val prescaler = config.getInt("mock.prescaler")
+ MockConnection(systemId, compId, remote, prescaler)
+
+ case "serial" =>
+ val serial = config.getConfig("serial")
+ SerialConnection(
+ systemId,
+ compId,
+ heartbeat,
+ serial.getString("port"),
+ serial.getInt("baud"),
+ serial.getBoolean("two_stop_bits"),
+ serial.getInt("parity")
+ )
+
+ case unknown => throw new IllegalArgumentException("Unsupported connection type '" + unknown + "'")
+
+ }
+
+ system.actorOf(props, name = "uav-connection")
+ }
+
+}
+
+object Mavigator extends ExtensionId[MavigatorImpl] with ExtensionIdProvider {
+
+ override def lookup = Mavigator
+
+ override def createExtension(system: ExtendedActorSystem) =
+ new MavigatorImpl(system)
+
+}
+