diff options
author | Jakob Odersky <jakob@odersky.com> | 2016-02-01 22:46:48 -0800 |
---|---|---|
committer | Jakob Odersky <jakob@odersky.com> | 2016-02-01 22:46:48 -0800 |
commit | 50bc6c80aa201c0543db07a62bc7c726b813c168 (patch) | |
tree | 0ed1c753f9d9fe29d1e51a0d03172e5659c4c339 /mavigator-uav | |
parent | 4f4c799a6d9ccf333a3e609a2464e2f317875af7 (diff) | |
download | mavigator-50bc6c80aa201c0543db07a62bc7c726b813c168.tar.gz mavigator-50bc6c80aa201c0543db07a62bc7c726b813c168.tar.bz2 mavigator-50bc6c80aa201c0543db07a62bc7c726b813c168.zip |
Use Akka streams and temporarily disable frontend
Diffstat (limited to 'mavigator-uav')
5 files changed, 132 insertions, 2 deletions
diff --git a/mavigator-uav/build.sbt b/mavigator-uav/build.sbt index 13a8b5a..abe935b 100644 --- a/mavigator-uav/build.sbt +++ b/mavigator-uav/build.sbt @@ -4,6 +4,8 @@ MavigatorBuild.defaultSettings libraryDependencies ++= Seq( Dependencies.akkaActor, + Dependencies.akkaStream, Dependencies.flow, - Dependencies.flowNative + Dependencies.flowNative, + Dependencies.reactiveStreams ) diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala b/mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala new file mode 100644 index 0000000..5e48ea1 --- /dev/null +++ b/mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala @@ -0,0 +1,20 @@ +package mavigator +package uav + +import akka.stream.scaladsl._ +import akka.stream._ +import org.reactivestreams._ + +private[uav] class Multiplexer[In, Out](service: Flow[In, Out, _])(implicit materializer: Materializer) { + + private val endpoint: Flow[Out, In, (Publisher[Out], Subscriber[In])] = Flow.fromSinkAndSourceMat( + Sink.asPublisher[Out](fanout = true), + Source.asSubscriber[In])((pub, sub) => (pub, sub)) + + private lazy val (publisher, subscriber) = (service.joinMat(endpoint)(Keep.right)).run() + + def connect(client: Flow[Out, In, _]) = { + Source.fromPublisher(publisher).via(client).to(Sink.ignore).run() + } + +} diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala b/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala new file mode 100644 index 0000000..a1ef333 --- /dev/null +++ b/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala @@ -0,0 +1,31 @@ +package mavigator +package uav + +import akka._ +import akka.actor._ +import akka.util._ +import akka.stream.scaladsl._ + +class Uav(system: ExtendedActorSystem) extends Extension { + + private lazy val config = system.settings.config.getConfig("mavigator.uav") + + def connect(): Flow[ByteString, ByteString, NotUsed] = { + val t = scala.concurrent.duration.FiniteDuration(100, "ms") + Flow.fromSinkAndSource( + Sink.ignore, + Source.tick(t,t, ByteString("hello world")) + ) + } + +} + +object Uav extends ExtensionId[Uav] with ExtensionIdProvider { + + override def lookup = Uav + + override def createExtension(system: ExtendedActorSystem) = new Uav(system) + + def apply()(implicit system: ActorSystem) = super.apply(system) + +} diff --git a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala new file mode 100644 index 0000000..5949a3a --- /dev/null +++ b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala @@ -0,0 +1,77 @@ +package mavigator.uav + +import java.util.concurrent.TimeUnit.MILLISECONDS +import scala.concurrent.duration.FiniteDuration +import scala.util.Random +import org.mavlink.Packet +import org.mavlink.enums.MavAutopilot +import org.mavlink.enums.MavModeFlag +import org.mavlink.enums.MavState +import org.mavlink.enums.MavType +import org.mavlink.messages.Heartbeat +import Connection.Received +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.Props +import akka.util.ByteString +import scala.concurrent.duration._ +import org.mavlink.messages.Message +import mock.RandomFlightPlan + +class MockConnection( + localSystemId: Byte, + localComponentId: Byte, + remoteSystemId: Byte, + prescaler: Int +) + extends Actor with ActorLogging with Connection with MavlinkUtil { + + import context._ + + override val systemId = remoteSystemId + override val componentId = remoteSystemId + + val plan = new RandomFlightPlan + + def scheduleMessage(delay: FiniteDuration)(fct: => Message) = system.scheduler.schedule(delay, delay) { + sendAll(Received(assemble(fct))) + } + def scheduleBytes(delay: FiniteDuration)(fct: => Array[Byte]) = system.scheduler.schedule(delay, delay) { + sendAll(Received(ByteString(fct))) + } + + override def preStart() = { + //increment state + system.scheduler.schedule(0.01.seconds * prescaler, 0.01.seconds * prescaler) { plan.tick(0.01) } + + //send messages + scheduleMessage(0.1.seconds * prescaler)(plan.position) + scheduleMessage(0.05.seconds * prescaler)(plan.attitude) + scheduleMessage(0.05.seconds * prescaler)(plan.motors) + scheduleMessage(0.1.seconds * prescaler)(plan.distance) + scheduleMessage(1.seconds)(plan.heartbeat) + + //simulate noisy line + scheduleBytes(0.3.seconds * prescaler)(MockPackets.invalidCrc) + scheduleBytes(1.5.seconds * prescaler)(MockPackets.invalidOverflow) + } + + override def receive = handleRegistration + +} + +object MockConnection { + def apply(systemId: Byte, componentId: Byte, remoteSystemId: Byte, prescaler: Int = 1) = + Props(classOf[MockConnection], systemId, componentId, remoteSystemId, prescaler) +} + +object MockPackets { + val invalidCrc = Array(254, 1, 123, 13, 13).map(_.toByte) + val invalidOverflow = { + val data = Array.fill[Byte](Packet.MaxPayloadLength + 100)(42) + data(0) = -2 + data(1) = 2 + data(1) = -1 + data + } +} diff --git a/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala b/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala index 1e5431e..4a6520f 100644 --- a/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala +++ b/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala @@ -9,7 +9,7 @@ import org.mavlink.messages._ class RandomFlightPlan { - private var time: Double = 0 + private var time: Double = 0 //current time in seconds private def millis = (time * 1000).toInt private def micros = (time * 1E6).toInt |