blob: 9cbfa64fe4a1a8fb4d5a88a72e88aba3168a6b79 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
package mavigator
import akka.actor.Terminated
import akka.actor._
import akka.http.scaladsl._
import akka.http.scaladsl.model.ws.Message
import akka.http.scaladsl.model.ws.TextMessage
import akka.http.scaladsl.server._
import akka.stream.OverflowStrategy
import akka.stream._
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.GraphDSL
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.ByteString
import mavigator.uav.Connection
import scala.concurrent.Await
import scala.concurrent.duration.Duration
/**
* Adapted from https://github.com/jrudolph/akka-http-scala-js-websocket-chat
*/
class MavlinkWebsocket(system: ActorSystem) {
/*
GraphDSL.create(Source.actorRef[Connection.Event](1, OverflowStrategy.fail)) {implicit builder =>
import GraphDSL.Implicits._
//source: SourceShape[Connection.Event] =>
source =>
val inSink = builder.materializedValue.map { client: ActorRef =>
Sink.actorRef[Connection.Command](
Mavigator(system).uav,
Connection.Unregister(client)
)
}
(inSink., source.out)
//FlowShape(inSink.in, source.out)
//???
}
*/
/** Sink that forwards incomming (from the browser) messages to the uav. */
private val inSink = Sink.actorRef[Connection.Command](
Mavigator(system).uav,
Connection.Send(ByteString("goodbye")) //unregister
)
/** Source that emmits messages comming from the uav. */
private val outSource = Source.actorRef[Connection.Event](
bufferSize = 1,
overflowStrategy = OverflowStrategy.fail
) mapMaterializedValue { client => // a client is spawned for every outSource materialization
Mavigator(system).uav.tell(Connection.Register, client)
}
private val flow: Flow[Connection.Command, Connection.Event, _] = Flow.fromSinkAndSource(inSink, outSource)
@deprecated("WIP", "0.0")
val wsflow = Flow[Message].collect{
case TextMessage.Strict(msg) => Connection.Send(ByteString(msg))
// unpack incoming WS text messages...
// This will lose (ignore) messages not received in one chunk (which is
// unlikely because chat messages are small) but absolutely possible
// FIXME: We need to handle TextMessage.Streamed as well.
}.via(flow).map {
case msg: Connection.Event =>
TextMessage.Strict(msg.toString) // ... pack outgoing messages into WS JSON messages ...
}
}
|