diff options
author | Jakob Odersky <jakob@driver.xyz> | 2017-10-04 13:48:50 -0700 |
---|---|---|
committer | Jakob Odersky <jakob@driver.xyz> | 2017-10-04 13:49:26 -0700 |
commit | 586ed28262f1609a2338e01236548716e1d2c264 (patch) | |
tree | 994b593c217718b0a07c009bbe24cb122406c686 /src/main | |
parent | df2c41dac7d29dea49950700b0146229947fdf65 (diff) | |
download | tracing-586ed28262f1609a2338e01236548716e1d2c264.tar.gz tracing-586ed28262f1609a2338e01236548716e1d2c264.tar.bz2 tracing-586ed28262f1609a2338e01236548716e1d2c264.zip |
Add closing functionality to tracers
Diffstat (limited to 'src/main')
-rw-r--r-- | src/main/scala/GoogleTracer.scala | 17 | ||||
-rw-r--r-- | src/main/scala/Tracer.scala | 13 | ||||
-rw-r--r-- | src/main/scala/google/OAuth2.scala | 2 |
3 files changed, 27 insertions, 5 deletions
diff --git a/src/main/scala/GoogleTracer.scala b/src/main/scala/GoogleTracer.scala index 144c0ff..f01b070 100644 --- a/src/main/scala/GoogleTracer.scala +++ b/src/main/scala/GoogleTracer.scala @@ -2,6 +2,7 @@ package xyz.driver.tracing import java.nio.file.Path +import akka.Done import akka.actor.ActorSystem import akka.http.scaladsl._ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ @@ -11,14 +12,16 @@ import akka.stream._ import akka.stream.scaladsl._ import xyz.driver.tracing.google._ +import scala.concurrent._ import scala.concurrent.duration._ import scala.util.control._ +import scala.util._ class GoogleTracer(projectId: String, serviceAccountFile: Path, bufferSize: Int = 1000, - bufferDelay: FiniteDuration = 2.seconds, - concurrentConnections: Int = 5)(implicit system: ActorSystem, + bufferDelay: FiniteDuration = 15.seconds, + concurrentConnections: Int = 1)(implicit system: ActorSystem, materializer: Materializer) extends Tracer { @@ -44,7 +47,7 @@ class GoogleTracer(projectId: String, Traces(traces) } - lazy val queue: SourceQueueWithComplete[Span] = { + lazy val (queue: SourceQueueWithComplete[Span], completed: Future[Done]) = Source .queue[Span](bufferSize, OverflowStrategy.dropNew) .viaMat(batchingPipeline)(Keep.left) @@ -72,10 +75,14 @@ class GoogleTracer(projectId: String, system.log.error(e, s"Exception encountered while submitting trace") e } - .to(Sink.ignore) + .toMat(Sink.ignore)(Keep.both) .run() - } override def submit(span: Span): Unit = queue.offer(span) + override def close() = { + queue.complete() + completed + } + } diff --git a/src/main/scala/Tracer.scala b/src/main/scala/Tracer.scala index 00a1fbf..3d82489 100644 --- a/src/main/scala/Tracer.scala +++ b/src/main/scala/Tracer.scala @@ -1,5 +1,18 @@ package xyz.driver.tracing +import scala.concurrent.Future + +/** Interface to tracing aggregation backends. */ trait Tracer { + + /** Submit a span to be sent to the aggregation backend. + * + * Note that submission typically happens asynchronously. Exact semantics are + * implementation-specific, however to guarantee of successful submission is + * made when this method returns. */ def submit(span: Span): Unit + + /** Aggregate any potentially queued submissions and perform any cleanup logic. */ + def close(): Future[_] = Future.unit + } diff --git a/src/main/scala/google/OAuth2.scala b/src/main/scala/google/OAuth2.scala index ff010ef..6a25b39 100644 --- a/src/main/scala/google/OAuth2.scala +++ b/src/main/scala/google/OAuth2.scala @@ -17,6 +17,7 @@ import spray.json._ import scala.concurrent._ +/** OAUTH2 utilities. */ object OAuth2 { private case class ServiceAccount(project_id: String, @@ -96,6 +97,7 @@ object OAuth2 { Future.successful((request, expiration, accessToken)) } } + .async .drop(1) // drop initial, empty HttpRequest .map { case (request, _, accessToken) => |