From c1628abaad79de60c7eafb27a5914dabd3fb75dc Mon Sep 17 00:00:00 2001 From: John St John Date: Tue, 26 Sep 2017 17:03:32 -0700 Subject: only push traces every 10 requests by default --- src/main/scala/xyz/driver/core/app.scala | 6 ++++-- src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala | 7 +++++-- .../driver/core/trace/GoogleStackdriverTraceWithConsumer.scala | 8 +++++--- src/main/scala/xyz/driver/core/trace/LoggingTrace.scala | 8 ++++++-- src/main/scala/xyz/driver/core/trace/ServiceTracer.scala | 2 ++ 5 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/main/scala/xyz/driver/core/app.scala b/src/main/scala/xyz/driver/core/app.scala index 763a363..a7f58e3 100644 --- a/src/main/scala/xyz/driver/core/app.scala +++ b/src/main/scala/xyz/driver/core/app.scala @@ -53,7 +53,8 @@ object app { implicit private lazy val materializer = ActorMaterializer()(actorSystem) private lazy val http = Http()(actorSystem) val appEnvironment = config.getString("application.environment") - val serviceTracer = tracer.getOrElse(new LoggingTrace(appName, config.getString("application.environment"), log)) + val serviceTracer = + tracer.getOrElse(new LoggingTrace(appName, config.getString("application.environment"), log, 10)) def run(): Unit = { activateServices(modules) scheduleServicesDeactivation(modules) @@ -63,7 +64,8 @@ object app { def stop(): Unit = { http.shutdownAllConnectionPools().onComplete { _ => - val _ = actorSystem.terminate() + val _ = actorSystem.terminate() + serviceTracer.flush() // flush out any remaining traces from the buffer val terminated = Await.result(actorSystem.whenTerminated, 30.seconds) val addressTerminated = if (terminated.addressTerminated) "is" else "is not" Console.print(s"${this.getClass.getName} App $addressTerminated stopped ") diff --git a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala index fa98ef4..04f2ec6 100644 --- a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala +++ b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala @@ -14,7 +14,8 @@ final class GoogleStackdriverTrace(projectId: String, clientSecretsFile: String, appName: String, appEnvironment: String, - log: Logger) + log: Logger, + bufferSize: Int = 10) extends GoogleServiceTracer { // initialize our various tracking storage systems @@ -38,10 +39,12 @@ final class GoogleStackdriverTrace(projectId: String, } private val googleServiceTracer = - new GoogleStackdriverTraceWithConsumer(projectId, appName, appEnvironment, traceConsumer, log) + new GoogleStackdriverTraceWithConsumer(projectId, appName, appEnvironment, traceConsumer, log, bufferSize) override def startSpan(httpRequest: HttpRequest): GoogleStackdriverTraceSpan = googleServiceTracer.startSpan(httpRequest) override def endSpan(span: GoogleStackdriverTraceSpan): Unit = googleServiceTracer.endSpan(span) + + override def flush(): Unit = googleServiceTracer.flush() } diff --git a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala index 81619d2..13b80bb 100644 --- a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala +++ b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala @@ -16,12 +16,13 @@ final class GoogleStackdriverTraceWithConsumer(projectId: String, appName: String, appEnvironment: String, traceConsumer: TraceConsumer, - log: Logger) + log: Logger, + bufferSize: Int) extends GoogleServiceTracer { private val traceProducer: TraceProducer = new TraceProducer() private val threadSafeBufferingTraceConsumer = new ExceptionLoggingFlushableTraceConsumer( - new SizedBufferingTraceConsumer(traceConsumer, new RoughTraceSizer(), 100), + new SizedBufferingTraceConsumer(traceConsumer, new RoughTraceSizer(), bufferSize), log ) @@ -73,7 +74,8 @@ final class GoogleStackdriverTraceWithConsumer(projectId: String, override def endSpan(span: TracerSpanPayload): Unit = { span.tracer.endSpan(span.context) - threadSafeBufferingTraceConsumer.flush() // flush out the thread safe buffer } + override def flush(): Unit = threadSafeBufferingTraceConsumer.flush() // flush out the thread safe buffer + } diff --git a/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala b/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala index e75de53..cd920f0 100644 --- a/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala +++ b/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala @@ -4,7 +4,8 @@ import akka.http.scaladsl.model.HttpRequest import com.google.cloud.trace.v1.consumer.TraceConsumer import com.typesafe.scalalogging.Logger -final class LoggingTrace(appName: String, appEnvironment: String, log: Logger) extends GoogleServiceTracer { +final class LoggingTrace(appName: String, appEnvironment: String, log: Logger, bufferSize: Int) + extends GoogleServiceTracer { private val traceConsumer: TraceConsumer = new LoggingTraceConsumer(log) private val googleServiceTracer = new GoogleStackdriverTraceWithConsumer( @@ -12,11 +13,14 @@ final class LoggingTrace(appName: String, appEnvironment: String, log: Logger) e appName, appEnvironment, traceConsumer, - log + log, + bufferSize ) override def startSpan(httpRequest: HttpRequest): GoogleStackdriverTraceSpan = googleServiceTracer.startSpan(httpRequest) override def endSpan(span: GoogleStackdriverTraceSpan): Unit = googleServiceTracer.endSpan(span) + + override def flush(): Unit = googleServiceTracer.flush() } diff --git a/src/main/scala/xyz/driver/core/trace/ServiceTracer.scala b/src/main/scala/xyz/driver/core/trace/ServiceTracer.scala index 25562cd..1413b63 100644 --- a/src/main/scala/xyz/driver/core/trace/ServiceTracer.scala +++ b/src/main/scala/xyz/driver/core/trace/ServiceTracer.scala @@ -14,4 +14,6 @@ trait ServiceTracer { def startSpan(httpRequest: HttpRequest): TracerSpanPayload def endSpan(span: TracerSpanPayload): Unit + + def flush(): Unit } -- cgit v1.2.3