aboutsummaryrefslogtreecommitdiff
path: root/mavigator-uav/src/main/scala/mavigator/uav/Core.scala
blob: 02a8779f987d69b935e9be58265372be74b36493 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package mavigator
package uav

import akka.NotUsed
import akka.actor.{ActorLogging, ActorRef, ActorSystem, Props}
import akka.stream.Materializer
import akka.stream.actor.{ActorPublisher, ActorPublisherMessage}
import akka.stream.scaladsl.{ Flow, RunnableGraph, Sink, Source }
import akka.util.ByteString
import org.mavlink.Assembler
import org.mavlink.messages._
import org.reactivestreams.{Publisher, Subscriber}
import scala.concurrent.duration._

/** A core enables dynamic creation and removal of clients and backend connections.
  * It is essentially a dynamic stream multiplexer. */
class Core(implicit val system: ActorSystem, val materializer: Materializer) {
  import Core._

  /** The actor responsible for forwarding messages from the backend
    * to this core. */
  private lazy val backendPublisherActor: ActorRef = system.actorOf(BackendPublisher())

  /** Publisher that forwards messages received from the backend. */
  private lazy val backendPublisher: Publisher[ByteString] = ActorPublisher(backendPublisherActor)

  private lazy val backend = Flow.fromSinkAndSourceMat(
    Sink.ignore, //FIXME: this will need to be changed for controlling the uav from the browser
    Source.fromPublisher(backendPublisher)
  )((toBackend, fromBackend) => (toBackend, fromBackend))

  private lazy val clients = Flow.fromSinkAndSourceMat(
    Sink.asPublisher[ByteString](true),
    Source.asSubscriber[ByteString]
  )((toClient, fromClient) => (toClient, fromClient))

  private lazy val runnable: RunnableGraph[Publisher[ByteString]] = {
    val timer = Source.tick(2.seconds, 2.seconds, ())
    val generator: Source[ByteString, _] = timer.flatMapConcat{ _ =>
      Util.barrelRoll via Util.assembler
    }

    val merged: Flow[ByteString, ByteString, _] =  Util.merge(generator, backend)

    merged.joinMat(clients){case (_, (toClient, _)) =>
      toClient
    }

  }

  private lazy val clientPublisher: Publisher[ByteString] = {
    system.log.info("Core started.")
    runnable.run()
  }

  def setBackend(): Flow[ByteString, ByteString, NotUsed] = {
    val sink = Sink.actorRef(backendPublisherActor, BackendPublisher.BackendComplete)
    val source = Source.empty[ByteString] //FIXME: this will need to be changed for controlling the uav from the browser
    Flow.fromSinkAndSource(sink, source)
  }

  def connect(): Flow[ByteString, ByteString, NotUsed] = {
    Flow.fromSinkAndSource(
      Sink.ignore,
      Source.fromPublisher(clientPublisher)
    )
  }

}

object Util {
  import akka.stream.scaladsl._
  import akka.stream.FlowShape


  def merge[A](preferred: Source[A, _], other: Flow[A, A, _]): Flow[A, A, _] = {
    val graph = GraphDSL.create(preferred, other)((_, _)) { implicit builder =>
      (pref, flow) =>

      import GraphDSL.Implicits._

      val merge = builder.add(MergePreferred[A](1))

      pref.out ~> merge.preferred
      flow.out ~> merge.in(0)

      FlowShape(flow.in, merge.out)
    }
    Flow.fromGraph(graph)
  }

  def assembler: Flow[Message, ByteString, _] = Flow[Message].map{ msg =>
    val as = new Assembler(1, 1)
    val (id, payload) = Message.pack(msg)
    val bytes = as.assemble(id, payload).toArray
    ByteString(bytes)
  }

  def barrelRoll(): Source[Message, _] = {
    val angle: Source[Float, _] =
      Source.tick(10.millis, 10.millis, 0.1f).scan(0.0f)(_+_).takeWhile(_ < 2 * math.Pi)
    val attitude = angle.map{ a =>
      Attitude(0,a,0,0,0,0,0)
    }

    Source.single(Stability(0)) concat attitude concat Source.single(Stability(1))
  }

}


object Core {

  private class BackendPublisher extends ActorPublisher[ByteString] with ActorLogging {
    import akka.stream.actor.ActorPublisherMessage._

    override def preStart() = {
      log.info("Starting backend publisher actor...")
    }

    private var fromBackend: ByteString = null

    def receive = {

      case msg: ByteString =>
        fromBackend = msg
        deliver()

      case BackendPublisher.BackendComplete =>
        sys.error("Backend completed normally, this should not happen.")

      // subscriber requests more
      case ActorPublisherMessage.Request(_) =>
        deliver()

      //subscriber cancels
      case ActorPublisherMessage.Cancel =>
        sys.error("Subscriber cancelled backend stream, this should not happen.")

    }

    def deliver() = if (fromBackend != null && totalDemand > 0 && isActive) {
      onNext(fromBackend)
      fromBackend = null
    }

  }

  private object BackendPublisher {
    case object BackendComplete
    def apply(): Props = Props(classOf[BackendPublisher])
  }

}