From 586ed28262f1609a2338e01236548716e1d2c264 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Wed, 4 Oct 2017 13:48:50 -0700 Subject: Add closing functionality to tracers --- src/main/scala/GoogleTracer.scala | 17 ++++++++++++----- src/main/scala/Tracer.scala | 13 +++++++++++++ src/main/scala/google/OAuth2.scala | 2 ++ src/test/scala/TracingDirectivesSpec.scala | 26 +++++++++++--------------- 4 files changed, 38 insertions(+), 20 deletions(-) diff --git a/src/main/scala/GoogleTracer.scala b/src/main/scala/GoogleTracer.scala index 144c0ff..f01b070 100644 --- a/src/main/scala/GoogleTracer.scala +++ b/src/main/scala/GoogleTracer.scala @@ -2,6 +2,7 @@ package xyz.driver.tracing import java.nio.file.Path +import akka.Done import akka.actor.ActorSystem import akka.http.scaladsl._ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ @@ -11,14 +12,16 @@ import akka.stream._ import akka.stream.scaladsl._ import xyz.driver.tracing.google._ +import scala.concurrent._ import scala.concurrent.duration._ import scala.util.control._ +import scala.util._ class GoogleTracer(projectId: String, serviceAccountFile: Path, bufferSize: Int = 1000, - bufferDelay: FiniteDuration = 2.seconds, - concurrentConnections: Int = 5)(implicit system: ActorSystem, + bufferDelay: FiniteDuration = 15.seconds, + concurrentConnections: Int = 1)(implicit system: ActorSystem, materializer: Materializer) extends Tracer { @@ -44,7 +47,7 @@ class GoogleTracer(projectId: String, Traces(traces) } - lazy val queue: SourceQueueWithComplete[Span] = { + lazy val (queue: SourceQueueWithComplete[Span], completed: Future[Done]) = Source .queue[Span](bufferSize, OverflowStrategy.dropNew) .viaMat(batchingPipeline)(Keep.left) @@ -72,10 +75,14 @@ class GoogleTracer(projectId: String, system.log.error(e, s"Exception encountered while submitting trace") e } - .to(Sink.ignore) + .toMat(Sink.ignore)(Keep.both) .run() - } override def submit(span: Span): Unit = queue.offer(span) + override def close() = { + queue.complete() + completed + } + } diff --git a/src/main/scala/Tracer.scala b/src/main/scala/Tracer.scala index 00a1fbf..3d82489 100644 --- a/src/main/scala/Tracer.scala +++ b/src/main/scala/Tracer.scala @@ -1,5 +1,18 @@ package xyz.driver.tracing +import scala.concurrent.Future + +/** Interface to tracing aggregation backends. */ trait Tracer { + + /** Submit a span to be sent to the aggregation backend. + * + * Note that submission typically happens asynchronously. Exact semantics are + * implementation-specific, however to guarantee of successful submission is + * made when this method returns. */ def submit(span: Span): Unit + + /** Aggregate any potentially queued submissions and perform any cleanup logic. */ + def close(): Future[_] = Future.unit + } diff --git a/src/main/scala/google/OAuth2.scala b/src/main/scala/google/OAuth2.scala index ff010ef..6a25b39 100644 --- a/src/main/scala/google/OAuth2.scala +++ b/src/main/scala/google/OAuth2.scala @@ -17,6 +17,7 @@ import spray.json._ import scala.concurrent._ +/** OAUTH2 utilities. */ object OAuth2 { private case class ServiceAccount(project_id: String, @@ -96,6 +97,7 @@ object OAuth2 { Future.successful((request, expiration, accessToken)) } } + .async .drop(1) // drop initial, empty HttpRequest .map { case (request, _, accessToken) => diff --git a/src/test/scala/TracingDirectivesSpec.scala b/src/test/scala/TracingDirectivesSpec.scala index ebe7858..783f59b 100644 --- a/src/test/scala/TracingDirectivesSpec.scala +++ b/src/test/scala/TracingDirectivesSpec.scala @@ -16,13 +16,7 @@ class TracingDirectivesSpec with BeforeAndAfterAll with ScalatestRouteTest { - implicit val tracer = new GoogleTracer( - "driverinc-sandbox", - Paths.get( - system.settings.config.getString("tracing.google.service-account-file")) - ) - - val route: Route = trace(tracer, "example.org") { + def route(tracer: Tracer): Route = trace(tracer, "example.org") { pathPrefix("1") { trace(tracer, "test-sub-trace-1") { Thread.sleep(2) @@ -43,18 +37,20 @@ class TracingDirectivesSpec } } - "Tracer" should "submit" in { - for (i <- 0 until 100) { - Get(s"https://example.org/${i % 2 + 1}") ~> route ~> check { + "Google Tracer" should "submit" in { + val tracer = new GoogleTracer( + "driverinc-sandbox", + Paths.get( + system.settings.config.getString("tracing.google.service-account-file")) + ) + + val futures: Seq[Assertion] = for (i <- 0 until 100) yield { + Get(s"https://example.org/${i % 2 + 1}") ~> route(tracer) ~> check { assert(responseAs[String] == "ok") } } - } - override def afterAll() = { - tracer.queue.complete() - Await.ready(tracer.queue.watchCompletion(), Duration.Inf) - super.afterAll() + Await.ready(tracer.close(), 30.seconds) } } -- cgit v1.2.3