blob: 7d4cec52d8bd94b6f9e7bf1ab07b961e57d8cbc3 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
package test
import akka.actor._
import kamon.Tracer
import spray.routing.SimpleRoutingApp
import akka.util.Timeout
import spray.httpx.RequestBuilding
import scala.concurrent.{Await, Future}
import kamon.trace.UowDirectives
object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding with UowDirectives {
import scala.concurrent.duration._
import spray.client.pipelining._
import akka.pattern.ask
implicit val system = ActorSystem("test")
import system.dispatcher
implicit val timeout = Timeout(30 seconds)
val pipeline = sendReceive
val replier = system.actorOf(Props[Replier])
startServer(interface = "localhost", port = 9090) {
get {
path("test"){
complete {
pipeline(Get("http://www.despegar.com.ar")).map(r => "Ok")
}
} ~
path("reply" / Segment) { reqID =>
uow {
complete {
if (Tracer.context().isEmpty)
println("ROUTE NO CONTEXT")
(replier ? reqID).mapTo[String]
}
}
} ~
path("ok") {
complete("ok")
} ~
path("future") {
dynamic {
complete(Future { "OK" })
}
}
}
}
}
object Verifier extends App {
def go: Unit = {
import scala.concurrent.duration._
import spray.client.pipelining._
implicit val system = ActorSystem("test")
import system.dispatcher
implicit val timeout = Timeout(30 seconds)
val pipeline = sendReceive
val futures = Future.sequence(for(i <- 1 to 500) yield {
pipeline(Get("http://127.0.0.1:9090/reply/"+i)).map(r => r.entity.asString == i.toString)
})
println("Everything is: "+ Await.result(futures, 10 seconds).forall(a => a == true))
}
}
class Replier extends Actor with ActorLogging {
def receive = {
case anything =>
if(Tracer.context.isEmpty)
log.warning("PROCESSING A MESSAGE WITHOUT CONTEXT")
log.info("Processing at the Replier")
sender ! anything
}
}
|