From 5bd947dd08eec1d6c64a9549566f3ce0e91fe74f Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Sun, 1 Oct 2017 13:22:50 -0700 Subject: Add entity to google tracer --- build.sbt | 1 + src/main/scala/xyz/driver/tracing/Span.scala | 4 ++-- .../scala/xyz/driver/tracing/TraceContext.scala | 1 - .../xyz/driver/tracing/TracingDirectives.scala | 24 ++++++++-------------- .../xyz/driver/tracing/google/GoogleTracer.scala | 22 ++++++++++++-------- src/main/scala/xyz/driver/tracing/google/api.scala | 2 +- 6 files changed, 27 insertions(+), 27 deletions(-) delete mode 100644 src/main/scala/xyz/driver/tracing/TraceContext.scala diff --git a/build.sbt b/build.sbt index a3faa13..733739e 100644 --- a/build.sbt +++ b/build.sbt @@ -2,5 +2,6 @@ scalaVersion := "2.12.3" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-http" % "10.0.10", + "com.typesafe.akka" %% "akka-http-spray-json" % "10.0.10", "io.spray" %% "spray-json" % "1.3.3" ) diff --git a/src/main/scala/xyz/driver/tracing/Span.scala b/src/main/scala/xyz/driver/tracing/Span.scala index 2194e8d..2ae640e 100644 --- a/src/main/scala/xyz/driver/tracing/Span.scala +++ b/src/main/scala/xyz/driver/tracing/Span.scala @@ -13,9 +13,9 @@ case class Span( endTime: Instant = Instant.now ) { - def started(clock: Clock = Clock.systemUTC): Span = + def start(clock: Clock = Clock.systemUTC): Span = this.copy(startTime = clock.instant()) - def ended(clock: Clock = Clock.systemUTC): Span = + def end(clock: Clock = Clock.systemUTC): Span = this.copy(endTime = clock.instant()) def withLabels(extraLabels: (String, String)*) = diff --git a/src/main/scala/xyz/driver/tracing/TraceContext.scala b/src/main/scala/xyz/driver/tracing/TraceContext.scala deleted file mode 100644 index bc77c2f..0000000 --- a/src/main/scala/xyz/driver/tracing/TraceContext.scala +++ /dev/null @@ -1 +0,0 @@ -package xyz.driver.tracing diff --git a/src/main/scala/xyz/driver/tracing/TracingDirectives.scala b/src/main/scala/xyz/driver/tracing/TracingDirectives.scala index 0c333f6..9e0237c 100644 --- a/src/main/scala/xyz/driver/tracing/TracingDirectives.scala +++ b/src/main/scala/xyz/driver/tracing/TracingDirectives.scala @@ -2,8 +2,8 @@ package xyz.driver.tracing import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers._ +import akka.http.scaladsl.server._ import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server._ //{Directive, Directive0, Directive1} import java.util.UUID import scala.util.Random import java.time._ @@ -44,27 +44,21 @@ trait TracingDirectives { } withTraceContext(TraceContext.fromSpan(span)) & mapRouteResult { res => - tracer.submit(span.ended()) + tracer.submit(span.end()) res } } /* - def span2(name2: String, tracer: Tracer): Directive0 = { - val f: RouteResult ⇒ RouteResult = ??? - Directive { inner ⇒ ctx ⇒ - inner(())(ctx).map(f)(ctx.executionContext) - } - }*/ + def span2(name2: String, tracer: Tracer): Directive0 = { + val f: RouteResult ⇒ RouteResult = ??? + Directive { inner ⇒ ctx ⇒ + inner(())(ctx).map(f)(ctx.executionContext) + } + } + */ } -/* - def span2(name2: String, tracer: Tracer): Directive0 = { - val f: RouteResult ⇒ RouteResult = ??? - Directive { inner ⇒ ctx ⇒ - inner(())(ctx).map(f)(ctx.executionContext) - } - }*/ object TracingDirectives { diff --git a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala index fb8d9e6..6a4d4fa 100644 --- a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala +++ b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala @@ -5,16 +5,20 @@ import akka.stream._ import akka.stream.scaladsl._ import akka.actor.ActorSystem import akka.http.scaladsl._ +import akka.http.scaladsl.marshalling._ +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.model._ import akka.http.scaladsl.server._ import scala.util.control._ import scala.concurrent.duration._ +import spray.json.DefaultJsonProtocol._ import java.util.UUID -class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000)( +class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000, concurrentConnections: Int = 1)( implicit system: ActorSystem, materializer: Materializer) extends Tracer { + import system.dispatcher lazy val connectionPool = Http().superPool[Unit]() @@ -40,14 +44,16 @@ class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000) Source .queue[Span](bufferSize, OverflowStrategy.dropNew) .viaMat(batchingPipeline)(Keep.left) - .map { traces => - val entity = HttpEntity - val req = HttpRequest( - HttpMethods.PATCH, - s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces" - ) - (req, ()) + .mapAsync(concurrentConnections) { (traces: Traces) => + Marshal(traces).to[RequestEntity].map{ entity => + HttpRequest( + HttpMethods.PATCH, + s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces", + entity = entity + ) + } } + .map(req => (req, ())) .viaMat(connectionPool)(Keep.left) .mapError { case NonFatal(e) => diff --git a/src/main/scala/xyz/driver/tracing/google/api.scala b/src/main/scala/xyz/driver/tracing/google/api.scala index 2dcac92..356e3da 100644 --- a/src/main/scala/xyz/driver/tracing/google/api.scala +++ b/src/main/scala/xyz/driver/tracing/google/api.scala @@ -107,5 +107,5 @@ object Trace { case class Traces(traces: Seq[Trace]) object Traces { - implicit val format: JsonFormat[Traces] = jsonFormat1(Traces.apply) + implicit val format: RootJsonFormat[Traces] = jsonFormat1(Traces.apply) } -- cgit v1.2.3