diff options
Diffstat (limited to 'src/main/scala/xyz/driver/tracing/google')
-rw-r--r-- | src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala | 22 | ||||
-rw-r--r-- | src/main/scala/xyz/driver/tracing/google/api.scala | 2 |
2 files changed, 15 insertions, 9 deletions
diff --git a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala index fb8d9e6..6a4d4fa 100644 --- a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala +++ b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala @@ -5,16 +5,20 @@ import akka.stream._ import akka.stream.scaladsl._ import akka.actor.ActorSystem import akka.http.scaladsl._ +import akka.http.scaladsl.marshalling._ +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.model._ import akka.http.scaladsl.server._ import scala.util.control._ import scala.concurrent.duration._ +import spray.json.DefaultJsonProtocol._ import java.util.UUID -class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000)( +class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000, concurrentConnections: Int = 1)( implicit system: ActorSystem, materializer: Materializer) extends Tracer { + import system.dispatcher lazy val connectionPool = Http().superPool[Unit]() @@ -40,14 +44,16 @@ class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000) Source .queue[Span](bufferSize, OverflowStrategy.dropNew) .viaMat(batchingPipeline)(Keep.left) - .map { traces => - val entity = HttpEntity - val req = HttpRequest( - HttpMethods.PATCH, - s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces" - ) - (req, ()) + .mapAsync(concurrentConnections) { (traces: Traces) => + Marshal(traces).to[RequestEntity].map{ entity => + HttpRequest( + HttpMethods.PATCH, + s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces", + entity = entity + ) + } } + .map(req => (req, ())) .viaMat(connectionPool)(Keep.left) .mapError { case NonFatal(e) => diff --git a/src/main/scala/xyz/driver/tracing/google/api.scala b/src/main/scala/xyz/driver/tracing/google/api.scala index 2dcac92..356e3da 100644 --- a/src/main/scala/xyz/driver/tracing/google/api.scala +++ b/src/main/scala/xyz/driver/tracing/google/api.scala @@ -107,5 +107,5 @@ object Trace { case class Traces(traces: Seq[Trace]) object Traces { - implicit val format: JsonFormat[Traces] = jsonFormat1(Traces.apply) + implicit val format: RootJsonFormat[Traces] = jsonFormat1(Traces.apply) } |