aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala')
-rw-r--r--src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala22
1 files changed, 14 insertions, 8 deletions
diff --git a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala
index fb8d9e6..6a4d4fa 100644
--- a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala
+++ b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala
@@ -5,16 +5,20 @@ import akka.stream._
import akka.stream.scaladsl._
import akka.actor.ActorSystem
import akka.http.scaladsl._
+import akka.http.scaladsl.marshalling._
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model._
import akka.http.scaladsl.server._
import scala.util.control._
import scala.concurrent.duration._
+import spray.json.DefaultJsonProtocol._
import java.util.UUID
-class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000)(
+class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000, concurrentConnections: Int = 1)(
implicit system: ActorSystem,
materializer: Materializer)
extends Tracer {
+ import system.dispatcher
lazy val connectionPool = Http().superPool[Unit]()
@@ -40,14 +44,16 @@ class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000)
Source
.queue[Span](bufferSize, OverflowStrategy.dropNew)
.viaMat(batchingPipeline)(Keep.left)
- .map { traces =>
- val entity = HttpEntity
- val req = HttpRequest(
- HttpMethods.PATCH,
- s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces"
- )
- (req, ())
+ .mapAsync(concurrentConnections) { (traces: Traces) =>
+ Marshal(traces).to[RequestEntity].map{ entity =>
+ HttpRequest(
+ HttpMethods.PATCH,
+ s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces",
+ entity = entity
+ )
+ }
}
+ .map(req => (req, ()))
.viaMat(connectionPool)(Keep.left)
.mapError {
case NonFatal(e) =>