diff options
Diffstat (limited to 'mavigator-server/src/main/scala')
4 files changed, 287 insertions, 0 deletions
diff --git a/mavigator-server/src/main/scala/mavigator/Main.scala b/mavigator-server/src/main/scala/mavigator/Main.scala new file mode 100644 index 0000000..ca81fa9 --- /dev/null +++ b/mavigator-server/src/main/scala/mavigator/Main.scala @@ -0,0 +1,45 @@ +package mavigator + +import akka.actor._ +import akka.http.scaladsl._ +import akka.http.scaladsl.server._ +import akka.stream._ +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +object Main { + + implicit lazy val system = ActorSystem("mavigator") + implicit lazy val materializer = ActorMaterializer() + + def main(args: Array[String]): Unit = { + import system.dispatcher + + system.log.info("System started.") + + val router = (new Router(system)).route + val settings = Mavigator(system) + + system.log.info(s"Starting server on ${settings.interface}:${settings.port}...") + val binding = Http(system).bindAndHandle(router, settings.interface, settings.port) + + for (b <- binding) { + val addr = b.localAddress.getHostString() + val port = b.localAddress.getPort() + system.log.info(s"Server is listening on $addr:$port") + } + + scala.io.StdIn.readLine() + + binding.flatMap{b => + system.log.info("Shutting down server...") + b.unbind() + }.onComplete{ _ => + system.log.info("Server shut down") + system.terminate() + } + + Await.result(system.whenTerminated, Duration.Inf) + + } +} diff --git a/mavigator-server/src/main/scala/mavigator/MavlinkWebsocket.scala b/mavigator-server/src/main/scala/mavigator/MavlinkWebsocket.scala new file mode 100644 index 0000000..9cbfa64 --- /dev/null +++ b/mavigator-server/src/main/scala/mavigator/MavlinkWebsocket.scala @@ -0,0 +1,76 @@ +package mavigator + +import akka.actor.Terminated +import akka.actor._ +import akka.http.scaladsl._ +import akka.http.scaladsl.model.ws.Message +import akka.http.scaladsl.model.ws.TextMessage +import akka.http.scaladsl.server._ +import akka.stream.OverflowStrategy +import akka.stream._ +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.GraphDSL +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.util.ByteString +import mavigator.uav.Connection +import scala.concurrent.Await +import scala.concurrent.duration.Duration + + +/** + * Adapted from https://github.com/jrudolph/akka-http-scala-js-websocket-chat + */ +class MavlinkWebsocket(system: ActorSystem) { + + /* + GraphDSL.create(Source.actorRef[Connection.Event](1, OverflowStrategy.fail)) {implicit builder => + import GraphDSL.Implicits._ + + //source: SourceShape[Connection.Event] => + source => + + val inSink = builder.materializedValue.map { client: ActorRef => + Sink.actorRef[Connection.Command]( + Mavigator(system).uav, + Connection.Unregister(client) + ) + } + + (inSink., source.out) + + //FlowShape(inSink.in, source.out) + //??? + } + */ + + /** Sink that forwards incomming (from the browser) messages to the uav. */ + private val inSink = Sink.actorRef[Connection.Command]( + Mavigator(system).uav, + Connection.Send(ByteString("goodbye")) //unregister + ) + + /** Source that emmits messages comming from the uav. */ + private val outSource = Source.actorRef[Connection.Event]( + bufferSize = 1, + overflowStrategy = OverflowStrategy.fail + ) mapMaterializedValue { client => // a client is spawned for every outSource materialization + Mavigator(system).uav.tell(Connection.Register, client) + } + + private val flow: Flow[Connection.Command, Connection.Event, _] = Flow.fromSinkAndSource(inSink, outSource) + + @deprecated("WIP", "0.0") + val wsflow = Flow[Message].collect{ + case TextMessage.Strict(msg) => Connection.Send(ByteString(msg)) + // unpack incoming WS text messages... + // This will lose (ignore) messages not received in one chunk (which is + // unlikely because chat messages are small) but absolutely possible + // FIXME: We need to handle TextMessage.Streamed as well. + }.via(flow).map { + case msg: Connection.Event => + TextMessage.Strict(msg.toString) // ... pack outgoing messages into WS JSON messages ... + } + +} + diff --git a/mavigator-server/src/main/scala/mavigator/Router.scala b/mavigator-server/src/main/scala/mavigator/Router.scala new file mode 100644 index 0000000..bd54422 --- /dev/null +++ b/mavigator-server/src/main/scala/mavigator/Router.scala @@ -0,0 +1,92 @@ +package mavigator + +import akka.actor._ +import akka.stream._ +import akka.stream.scaladsl._ +import akka.http.scaladsl._ +import akka.http.scaladsl.model._ +import akka.http.scaladsl.server._ + + +class Router(system: ActorSystem) { + + import Directives._ + + val route: Route = ( + path("info") { + get { + val f: play.twirl.api.Html = mavigator.views.html.index() + complete(f.body) + } + } ~ + path("dashboard" / IntNumber) { id => + get { + //get dashboard for remote sys id + ??? + } + } ~ + path("mavlink") { + get { + /*extractUpgradeToWebsocket{ upgrade => + //upgrade.handleMessagesWith(inSink: Sink[Message, _$3], outSource: Source[Message, _$4]) + ??? + }*/ + handleWebsocketMessages((new MavlinkWebsocket(system)).wsflow) + } + } ~ + pathPrefix("assets") { + get { + encodeResponse { + getFromResourceDirectory("assets") + } + } + } ~ + pathEndOrSingleSlash { + get { + complete("hello world") + } + } + ) + +} + +import akka.http.scaladsl.model.ws._ +import akka.stream.scaladsl._ + +object SocketService { + + /* + val out: Source[OutgoingMessage, ActorRef] = Source.actorRef[OutgoingMessage](0, OverflowStrategy.fail) + val in = Sink.actorRef(ref: ActorRef, onCompleteMessage: Any) + */ + + + /* + Flow[Message, Message, _] { implicit builder => + + val in: Flow[Message, IncomingMessage, _] = Flow[Message].map { + case TextMessage.Strict(txt) => IncomingMessage(s"frpm websocket $txt") + case _ => ??? + } + + val out: Flow[OutgoingMessage, Message, _] = Flow[OutgoingMessage].map { + case OutgoingMessage(txt) => TextMessage(s"to websocket $text") + } + + val chatActorSink = Sink.actorRef[ChatEvent](chatRoomActor, UserLeft(user)) + + } + */ +} + +/* +object EchoService { + + val flow: Flow[Message, Message, _] = Flow[Message].map { + case TextMessage.Strict(txt) => TextMessage("ECHO: " + txt) + case _ => TextMessage("Message type unsupported") + } + +} + */ + diff --git a/mavigator-server/src/main/scala/mavigator/settings.scala b/mavigator-server/src/main/scala/mavigator/settings.scala new file mode 100644 index 0000000..9089c2e --- /dev/null +++ b/mavigator-server/src/main/scala/mavigator/settings.scala @@ -0,0 +1,74 @@ +package mavigator + +import akka.actor.ActorSystem +import akka.actor.Extension +import akka.actor.ExtensionId +import akka.actor.ExtensionIdProvider +import akka.actor.ExtendedActorSystem + +import scala.concurrent.duration.Duration +import com.typesafe.config.Config +import java.util.concurrent.TimeUnit + +import akka.actor.ActorRef +import akka.actor.Props +import mavigator.uav.MockConnection +import mavigator.uav.SerialConnection + +class MavigatorImpl(system: ExtendedActorSystem) extends Extension { + + private val config = system.settings.config.getConfig("mavigator") + + val interface: String = config.getString("interface") + + val port: Int = config.getInt("port") + + /** Mavlink system ID identifying the base station */ + val systemId: Byte = config.getInt("system_id").toByte + + val tpe = config.getString("connection.type") + + /** Actor representing a connection channel to UAVs. This actor + * implements the protocol defined in [mavigator.uav.Connection] */ + val uav: ActorRef = { + val config = this.config.getConfig("connection") + val tpe = config.getString("type") + val heartbeat = config.getInt("heartbeat") + val compId = config.getString("component_id").toByte + + val props = tpe match { + case "mock" => + val remote = config.getInt("mock.remote_system_id").toByte + val prescaler = config.getInt("mock.prescaler") + MockConnection(systemId, compId, remote, prescaler) + + case "serial" => + val serial = config.getConfig("serial") + SerialConnection( + systemId, + compId, + heartbeat, + serial.getString("port"), + serial.getInt("baud"), + serial.getBoolean("two_stop_bits"), + serial.getInt("parity") + ) + + case unknown => throw new IllegalArgumentException("Unsupported connection type '" + unknown + "'") + + } + + system.actorOf(props, name = "uav-connection") + } + +} + +object Mavigator extends ExtensionId[MavigatorImpl] with ExtensionIdProvider { + + override def lookup = Mavigator + + override def createExtension(system: ExtendedActorSystem) = + new MavigatorImpl(system) + +} + |