From 586ed28262f1609a2338e01236548716e1d2c264 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Wed, 4 Oct 2017 13:48:50 -0700 Subject: Add closing functionality to tracers --- src/main/scala/GoogleTracer.scala | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) (limited to 'src/main/scala/GoogleTracer.scala') diff --git a/src/main/scala/GoogleTracer.scala b/src/main/scala/GoogleTracer.scala index 144c0ff..f01b070 100644 --- a/src/main/scala/GoogleTracer.scala +++ b/src/main/scala/GoogleTracer.scala @@ -2,6 +2,7 @@ package xyz.driver.tracing import java.nio.file.Path +import akka.Done import akka.actor.ActorSystem import akka.http.scaladsl._ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ @@ -11,14 +12,16 @@ import akka.stream._ import akka.stream.scaladsl._ import xyz.driver.tracing.google._ +import scala.concurrent._ import scala.concurrent.duration._ import scala.util.control._ +import scala.util._ class GoogleTracer(projectId: String, serviceAccountFile: Path, bufferSize: Int = 1000, - bufferDelay: FiniteDuration = 2.seconds, - concurrentConnections: Int = 5)(implicit system: ActorSystem, + bufferDelay: FiniteDuration = 15.seconds, + concurrentConnections: Int = 1)(implicit system: ActorSystem, materializer: Materializer) extends Tracer { @@ -44,7 +47,7 @@ class GoogleTracer(projectId: String, Traces(traces) } - lazy val queue: SourceQueueWithComplete[Span] = { + lazy val (queue: SourceQueueWithComplete[Span], completed: Future[Done]) = Source .queue[Span](bufferSize, OverflowStrategy.dropNew) .viaMat(batchingPipeline)(Keep.left) @@ -72,10 +75,14 @@ class GoogleTracer(projectId: String, system.log.error(e, s"Exception encountered while submitting trace") e } - .to(Sink.ignore) + .toMat(Sink.ignore)(Keep.both) .run() - } override def submit(span: Span): Unit = queue.offer(span) + override def close() = { + queue.complete() + completed + } + } -- cgit v1.2.3