diff options
Diffstat (limited to 'src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala')
-rw-r--r-- | src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala | 22 |
1 files changed, 14 insertions, 8 deletions
diff --git a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala index fb8d9e6..6a4d4fa 100644 --- a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala +++ b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala @@ -5,16 +5,20 @@ import akka.stream._ import akka.stream.scaladsl._ import akka.actor.ActorSystem import akka.http.scaladsl._ +import akka.http.scaladsl.marshalling._ +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.model._ import akka.http.scaladsl.server._ import scala.util.control._ import scala.concurrent.duration._ +import spray.json.DefaultJsonProtocol._ import java.util.UUID -class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000)( +class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000, concurrentConnections: Int = 1)( implicit system: ActorSystem, materializer: Materializer) extends Tracer { + import system.dispatcher lazy val connectionPool = Http().superPool[Unit]() @@ -40,14 +44,16 @@ class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000) Source .queue[Span](bufferSize, OverflowStrategy.dropNew) .viaMat(batchingPipeline)(Keep.left) - .map { traces => - val entity = HttpEntity - val req = HttpRequest( - HttpMethods.PATCH, - s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces" - ) - (req, ()) + .mapAsync(concurrentConnections) { (traces: Traces) => + Marshal(traces).to[RequestEntity].map{ entity => + HttpRequest( + HttpMethods.PATCH, + s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces", + entity = entity + ) + } } + .map(req => (req, ())) .viaMat(connectionPool)(Keep.left) .mapError { case NonFatal(e) => |