1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
package mavigator
package uav
import akka.NotUsed
import akka.actor.{ActorLogging, ActorRef, ActorSystem, Props}
import akka.stream.Materializer
import akka.stream.actor.{ActorPublisher, ActorPublisherMessage}
import akka.stream.scaladsl.{ Flow, RunnableGraph, Sink, Source }
import akka.util.ByteString
import org.mavlink.Assembler
import org.mavlink.messages._
import org.reactivestreams.{Publisher, Subscriber}
import scala.concurrent.duration._
/** A core enables dynamic creation and removal of clients and backend connections.
* It is essentially a dynamic stream multiplexer. */
class Core(implicit val system: ActorSystem, val materializer: Materializer) {
import Core._
/** The actor responsible for forwarding messages from the backend
* to this core. */
private lazy val backendPublisherActor: ActorRef = system.actorOf(BackendPublisher())
/** Publisher that forwards messages received from the backend. */
private lazy val backendPublisher: Publisher[ByteString] = ActorPublisher(backendPublisherActor)
private lazy val backend = Flow.fromSinkAndSourceMat(
Sink.ignore, //FIXME: this will need to be changed for controlling the uav from the browser
Source.fromPublisher(backendPublisher)
)((toBackend, fromBackend) => (toBackend, fromBackend))
private lazy val clients = Flow.fromSinkAndSourceMat(
Sink.asPublisher[ByteString](true),
Source.asSubscriber[ByteString]
)((toClient, fromClient) => (toClient, fromClient))
private lazy val runnable: RunnableGraph[Publisher[ByteString]] = {
val timer = Source.tick(2.seconds, 2.seconds, ())
val generator: Source[ByteString, _] = timer.flatMapConcat{ _ =>
Util.barrelRoll via Util.assembler
}
val merged: Flow[ByteString, ByteString, _] = Util.merge(generator, backend)
merged.joinMat(clients){case (_, (toClient, _)) =>
toClient
}
}
private lazy val clientPublisher: Publisher[ByteString] = {
system.log.info("Core started.")
runnable.run()
}
def setBackend(): Flow[ByteString, ByteString, NotUsed] = {
val sink = Sink.actorRef(backendPublisherActor, BackendPublisher.BackendComplete)
val source = Source.empty[ByteString] //FIXME: this will need to be changed for controlling the uav from the browser
Flow.fromSinkAndSource(sink, source)
}
def connect(): Flow[ByteString, ByteString, NotUsed] = {
Flow.fromSinkAndSource(
Sink.ignore,
Source.fromPublisher(clientPublisher)
)
}
}
object Util {
import akka.stream.scaladsl._
import akka.stream.FlowShape
def merge[A](preferred: Source[A, _], other: Flow[A, A, _]): Flow[A, A, _] = {
val graph = GraphDSL.create(preferred, other)((_, _)) { implicit builder =>
(pref, flow) =>
import GraphDSL.Implicits._
val merge = builder.add(MergePreferred[A](1))
pref.out ~> merge.preferred
flow.out ~> merge.in(0)
FlowShape(flow.in, merge.out)
}
Flow.fromGraph(graph)
}
def assembler: Flow[Message, ByteString, _] = Flow[Message].map{ msg =>
val as = new Assembler(1, 1)
val (id, payload) = Message.pack(msg)
val bytes = as.assemble(id, payload).toArray
ByteString(bytes)
}
def barrelRoll(): Source[Message, _] = {
val angle: Source[Float, _] =
Source.tick(10.millis, 10.millis, 0.1f).scan(0.0f)(_+_).takeWhile(_ < 2 * math.Pi)
val attitude = angle.map{ a =>
Attitude(0,a,0,0,0,0,0)
}
Source.single(Stability(0)) concat attitude concat Source.single(Stability(1))
}
}
object Core {
private class BackendPublisher extends ActorPublisher[ByteString] with ActorLogging {
import akka.stream.actor.ActorPublisherMessage._
override def preStart() = {
log.info("Starting backend publisher actor...")
}
private var fromBackend: ByteString = null
def receive = {
case msg: ByteString =>
fromBackend = msg
deliver()
case BackendPublisher.BackendComplete =>
sys.error("Backend completed normally, this should not happen.")
// subscriber requests more
case ActorPublisherMessage.Request(_) =>
deliver()
//subscriber cancels
case ActorPublisherMessage.Cancel =>
sys.error("Subscriber cancelled backend stream, this should not happen.")
}
def deliver() = if (fromBackend != null && totalDemand > 0 && isActive) {
onNext(fromBackend)
fromBackend = null
}
}
private object BackendPublisher {
case object BackendComplete
def apply(): Props = Props(classOf[BackendPublisher])
}
}
|