aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jakob@driver.xyz>2017-10-01 13:22:50 -0700
committerJakob Odersky <jakob@driver.xyz>2017-10-01 13:22:50 -0700
commit5bd947dd08eec1d6c64a9549566f3ce0e91fe74f (patch)
tree4c75ca85ef1df5442f4c8187ca0b66522f1adc3b
parent03503f6be54cbde22261c56919f5ef45ebdcb21d (diff)
downloadtracing-5bd947dd08eec1d6c64a9549566f3ce0e91fe74f.tar.gz
tracing-5bd947dd08eec1d6c64a9549566f3ce0e91fe74f.tar.bz2
tracing-5bd947dd08eec1d6c64a9549566f3ce0e91fe74f.zip
Add entity to google tracer
-rw-r--r--build.sbt1
-rw-r--r--src/main/scala/xyz/driver/tracing/Span.scala4
-rw-r--r--src/main/scala/xyz/driver/tracing/TraceContext.scala1
-rw-r--r--src/main/scala/xyz/driver/tracing/TracingDirectives.scala24
-rw-r--r--src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala22
-rw-r--r--src/main/scala/xyz/driver/tracing/google/api.scala2
6 files changed, 27 insertions, 27 deletions
diff --git a/build.sbt b/build.sbt
index a3faa13..733739e 100644
--- a/build.sbt
+++ b/build.sbt
@@ -2,5 +2,6 @@ scalaVersion := "2.12.3"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http" % "10.0.10",
+ "com.typesafe.akka" %% "akka-http-spray-json" % "10.0.10",
"io.spray" %% "spray-json" % "1.3.3"
)
diff --git a/src/main/scala/xyz/driver/tracing/Span.scala b/src/main/scala/xyz/driver/tracing/Span.scala
index 2194e8d..2ae640e 100644
--- a/src/main/scala/xyz/driver/tracing/Span.scala
+++ b/src/main/scala/xyz/driver/tracing/Span.scala
@@ -13,9 +13,9 @@ case class Span(
endTime: Instant = Instant.now
) {
- def started(clock: Clock = Clock.systemUTC): Span =
+ def start(clock: Clock = Clock.systemUTC): Span =
this.copy(startTime = clock.instant())
- def ended(clock: Clock = Clock.systemUTC): Span =
+ def end(clock: Clock = Clock.systemUTC): Span =
this.copy(endTime = clock.instant())
def withLabels(extraLabels: (String, String)*) =
diff --git a/src/main/scala/xyz/driver/tracing/TraceContext.scala b/src/main/scala/xyz/driver/tracing/TraceContext.scala
deleted file mode 100644
index bc77c2f..0000000
--- a/src/main/scala/xyz/driver/tracing/TraceContext.scala
+++ /dev/null
@@ -1 +0,0 @@
-package xyz.driver.tracing
diff --git a/src/main/scala/xyz/driver/tracing/TracingDirectives.scala b/src/main/scala/xyz/driver/tracing/TracingDirectives.scala
index 0c333f6..9e0237c 100644
--- a/src/main/scala/xyz/driver/tracing/TracingDirectives.scala
+++ b/src/main/scala/xyz/driver/tracing/TracingDirectives.scala
@@ -2,8 +2,8 @@ package xyz.driver.tracing
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
+import akka.http.scaladsl.server._
import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server._ //{Directive, Directive0, Directive1}
import java.util.UUID
import scala.util.Random
import java.time._
@@ -44,27 +44,21 @@ trait TracingDirectives {
}
withTraceContext(TraceContext.fromSpan(span)) & mapRouteResult { res =>
- tracer.submit(span.ended())
+ tracer.submit(span.end())
res
}
}
/*
- def span2(name2: String, tracer: Tracer): Directive0 = {
- val f: RouteResult ⇒ RouteResult = ???
- Directive { inner ⇒ ctx ⇒
- inner(())(ctx).map(f)(ctx.executionContext)
- }
- }*/
+ def span2(name2: String, tracer: Tracer): Directive0 = {
+ val f: RouteResult ⇒ RouteResult = ???
+ Directive { inner ⇒ ctx ⇒
+ inner(())(ctx).map(f)(ctx.executionContext)
+ }
+ }
+ */
}
-/*
- def span2(name2: String, tracer: Tracer): Directive0 = {
- val f: RouteResult ⇒ RouteResult = ???
- Directive { inner ⇒ ctx ⇒
- inner(())(ctx).map(f)(ctx.executionContext)
- }
- }*/
object TracingDirectives {
diff --git a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala
index fb8d9e6..6a4d4fa 100644
--- a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala
+++ b/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala
@@ -5,16 +5,20 @@ import akka.stream._
import akka.stream.scaladsl._
import akka.actor.ActorSystem
import akka.http.scaladsl._
+import akka.http.scaladsl.marshalling._
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model._
import akka.http.scaladsl.server._
import scala.util.control._
import scala.concurrent.duration._
+import spray.json.DefaultJsonProtocol._
import java.util.UUID
-class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000)(
+class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000, concurrentConnections: Int = 1)(
implicit system: ActorSystem,
materializer: Materializer)
extends Tracer {
+ import system.dispatcher
lazy val connectionPool = Http().superPool[Unit]()
@@ -40,14 +44,16 @@ class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000)
Source
.queue[Span](bufferSize, OverflowStrategy.dropNew)
.viaMat(batchingPipeline)(Keep.left)
- .map { traces =>
- val entity = HttpEntity
- val req = HttpRequest(
- HttpMethods.PATCH,
- s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces"
- )
- (req, ())
+ .mapAsync(concurrentConnections) { (traces: Traces) =>
+ Marshal(traces).to[RequestEntity].map{ entity =>
+ HttpRequest(
+ HttpMethods.PATCH,
+ s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces",
+ entity = entity
+ )
+ }
}
+ .map(req => (req, ()))
.viaMat(connectionPool)(Keep.left)
.mapError {
case NonFatal(e) =>
diff --git a/src/main/scala/xyz/driver/tracing/google/api.scala b/src/main/scala/xyz/driver/tracing/google/api.scala
index 2dcac92..356e3da 100644
--- a/src/main/scala/xyz/driver/tracing/google/api.scala
+++ b/src/main/scala/xyz/driver/tracing/google/api.scala
@@ -107,5 +107,5 @@ object Trace {
case class Traces(traces: Seq[Trace])
object Traces {
- implicit val format: JsonFormat[Traces] = jsonFormat1(Traces.apply)
+ implicit val format: RootJsonFormat[Traces] = jsonFormat1(Traces.apply)
}