1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
package mavigator
package uav
import akka.NotUsed
import akka.actor.{ActorLogging, ActorRef, ActorSystem, Props}
import akka.stream.Materializer
import akka.stream.actor.{ActorPublisher, ActorPublisherMessage}
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.util.ByteString
import org.reactivestreams.{Publisher, Subscriber}
/** A core enables dynamic creation and removal of clients and backend connections.
* It is essentially a dynamic stream multiplexer. */
class Core(implicit val system: ActorSystem, val materializer: Materializer) {
import Core._
/** The actor responsible for forwarding messages from the backend
* to this core. */
private lazy val backendPublisherActor: ActorRef = system.actorOf(BackendPublisher())
/** Publisher that forwards messages received from the backend. */
private lazy val backendPublisher: Publisher[ByteString] = ActorPublisher(backendPublisherActor)
private lazy val backend = Flow.fromSinkAndSourceMat(
Sink.ignore, //FIXME: this will need to be changed for controlling the uav from the browser
Source.fromPublisher(backendPublisher)
)((toBackend, fromBackend) => (toBackend, fromBackend))
private lazy val clients = Flow.fromSinkAndSourceMat(
Sink.asPublisher[ByteString](true),
Source.asSubscriber[ByteString]
)((toClient, fromClient) => (toClient, fromClient))
private lazy val runnable = clients.joinMat(backend){ case ((toClient, fromClient), (toBackend, fromBackend)) =>
toClient
}
private lazy val clientPublisher: Publisher[ByteString] = {
system.log.info("Core started.")
runnable.run()
}
def setBackend(): Flow[ByteString, ByteString, NotUsed] = {
val sink = Sink.actorRef(backendPublisherActor, BackendPublisher.BackendComplete)
val source = Source.empty[ByteString] //FIXME: this will need to be changed for controlling the uav from the browser
Flow.fromSinkAndSource(sink, source)
}
def connect(): Flow[ByteString, ByteString, NotUsed] = {
Flow.fromSinkAndSource(
Sink.ignore,
Source.fromPublisher(clientPublisher)
)
}
}
object Core {
private class BackendPublisher extends ActorPublisher[ByteString] with ActorLogging {
import akka.stream.actor.ActorPublisherMessage._
override def preStart() = {
log.info("Starting backend publisher actor...")
}
private var fromBackend: ByteString = null
def receive = {
case msg: ByteString =>
fromBackend = msg
deliver()
case BackendPublisher.BackendComplete =>
sys.error("Backend completed normally, this should not happen.")
// subscriber requests more
case ActorPublisherMessage.Request(_) =>
deliver()
//subscriber cancels
case ActorPublisherMessage.Cancel =>
sys.error("Subscriber cancelled backend stream, this should not happen.")
}
def deliver() = if (fromBackend != null && totalDemand > 0 && isActive) {
onNext(fromBackend)
fromBackend = null
}
}
private object BackendPublisher {
case object BackendComplete
def apply(): Props = Props(classOf[BackendPublisher])
}
}
|