From cacf57dc4af5b0104b28b6417715c660aef613f3 Mon Sep 17 00:00:00 2001 From: John St John Date: Wed, 4 Oct 2017 10:39:21 -0700 Subject: Follow google example to implement a scheduled flusing trace consumer by default --- build.sbt | 6 +++--- src/main/scala/xyz/driver/core/app.scala | 2 +- .../driver/core/trace/GoogleStackdriverTrace.scala | 11 +++++++++-- .../trace/GoogleStackdriverTraceWithConsumer.scala | 20 +++++++++++++++++--- .../scala/xyz/driver/core/trace/LoggingTrace.scala | 5 +++-- 5 files changed, 33 insertions(+), 11 deletions(-) diff --git a/build.sbt b/build.sbt index 175f674..f6473ae 100644 --- a/build.sbt +++ b/build.sbt @@ -12,17 +12,17 @@ lazy val core = (project in file(".")) "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpV, "com.typesafe.akka" %% "akka-http-testkit" % akkaHttpV, "com.pauldijou" %% "jwt-core" % "0.14.0", - "org.scalatest" %% "scalatest" % "3.0.1" % "test", + "org.scalatest" %% "scalatest" % "3.0.2" % "test", "org.scalacheck" %% "scalacheck" % "1.13.4" % "test", "org.mockito" % "mockito-core" % "1.9.5" % "test", - "com.github.swagger-akka-http" %% "swagger-akka-http" % "0.9.1", + "com.github.swagger-akka-http" %% "swagger-akka-http" % "0.9.2", "com.amazonaws" % "aws-java-sdk-s3" % "1.11.26", "com.google.cloud" % "google-cloud-pubsub" % "0.25.0-beta", "com.google.cloud" % "google-cloud-storage" % "1.7.0", "com.typesafe.slick" %% "slick" % "3.2.1", "com.typesafe" % "config" % "1.2.1", "com.typesafe.scala-logging" %% "scala-logging" % "3.5.0", - "ch.qos.logback" % "logback-classic" % "1.1.3", + "ch.qos.logback" % "logback-classic" % "1.1.11", "com.google.cloud.trace" % "core" % googleTraceV, "com.google.cloud.trace" % "logging-service" % googleTraceV, "com.google.cloud.trace" % "trace-grpc-api-service" % googleTraceV, diff --git a/src/main/scala/xyz/driver/core/app.scala b/src/main/scala/xyz/driver/core/app.scala index a7f58e3..a7ba7aa 100644 --- a/src/main/scala/xyz/driver/core/app.scala +++ b/src/main/scala/xyz/driver/core/app.scala @@ -54,7 +54,7 @@ object app { private lazy val http = Http()(actorSystem) val appEnvironment = config.getString("application.environment") val serviceTracer = - tracer.getOrElse(new LoggingTrace(appName, config.getString("application.environment"), log, 10)) + tracer.getOrElse(new LoggingTrace(appName, config.getString("application.environment"), log, 1024, 15)) def run(): Unit = { activateServices(modules) scheduleServicesDeactivation(modules) diff --git a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala index ce84f9d..fe4bb5c 100644 --- a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala +++ b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala @@ -15,7 +15,8 @@ final class GoogleStackdriverTrace(projectId: String, appName: String, appEnvironment: String, log: Logger, - bufferSize: Int = 10) + bufferSize: Int = 1024, + scheduledDelay: Int = 15) extends GoogleServiceTracer { // initialize our various tracking storage systems @@ -38,7 +39,13 @@ final class GoogleStackdriverTrace(projectId: String, } private val googleServiceTracer = - new GoogleStackdriverTraceWithConsumer(projectId, appName, appEnvironment, traceConsumer, log, bufferSize) + new GoogleStackdriverTraceWithConsumer(projectId, + appName, + appEnvironment, + traceConsumer, + log, + bufferSize, + scheduledDelay) override def startSpan(httpRequest: HttpRequest): GoogleStackdriverTraceSpan = googleServiceTracer.startSpan(httpRequest) diff --git a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala index cd9170a..ca1eab2 100644 --- a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala +++ b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala @@ -4,10 +4,12 @@ import akka.http.scaladsl.model.HttpRequest import com.google.cloud.trace.core._ import com.google.cloud.trace.sink.TraceSink import com.google.cloud.trace.v1.TraceSinkV1 -import com.google.cloud.trace.v1.consumer.{SizedBufferingTraceConsumer, TraceConsumer} +import com.google.cloud.trace.v1.consumer.{ScheduledBufferingTraceConsumer, TraceConsumer} import com.google.cloud.trace.v1.producer.TraceProducer import com.google.cloud.trace.{SpanContextHandler, SpanContextHandlerTracer, Tracer} import com.typesafe.scalalogging.Logger +import java.util.concurrent.ScheduledThreadPoolExecutor +import java.util.concurrent.TimeUnit import scala.compat.java8.OptionConverters._ @@ -16,13 +18,25 @@ final class GoogleStackdriverTraceWithConsumer(projectId: String, appEnvironment: String, traceConsumer: TraceConsumer, log: Logger, - bufferSize: Int) + bufferSize: Int, + scheduledDelay: Int) extends GoogleServiceTracer { private val traceProducer: TraceProducer = new TraceProducer() // use a UnitTraceSizer so the interpretation of bufferSize is # of spans to hold in memory prior to flushing + + // now set up the scheduled executor for time-based flushing of our tracing + // see https://goo.gl/HrPLuC + private val scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1) // this is just used for it's timing execution + scheduledThreadPoolExecutor.setKeepAliveTime(scheduledDelay.toLong, TimeUnit.SECONDS) + scheduledThreadPoolExecutor.allowCoreThreadTimeOut(true) + private val executorService = scheduledThreadPoolExecutor private val threadSafeBufferingTraceConsumer = new ExceptionLoggingFlushableTraceConsumer( - new SizedBufferingTraceConsumer(traceConsumer, new UnitTraceSizer(), bufferSize), + new ScheduledBufferingTraceConsumer(traceConsumer, + new UnitTraceSizer(), + bufferSize, + scheduledDelay, + executorService), log ) diff --git a/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala b/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala index cd920f0..6a13708 100644 --- a/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala +++ b/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala @@ -4,7 +4,7 @@ import akka.http.scaladsl.model.HttpRequest import com.google.cloud.trace.v1.consumer.TraceConsumer import com.typesafe.scalalogging.Logger -final class LoggingTrace(appName: String, appEnvironment: String, log: Logger, bufferSize: Int) +final class LoggingTrace(appName: String, appEnvironment: String, log: Logger, bufferSize: Int, scheduledDelay: Int) extends GoogleServiceTracer { private val traceConsumer: TraceConsumer = new LoggingTraceConsumer(log) @@ -14,7 +14,8 @@ final class LoggingTrace(appName: String, appEnvironment: String, log: Logger, b appEnvironment, traceConsumer, log, - bufferSize + bufferSize, + scheduledDelay ) override def startSpan(httpRequest: HttpRequest): GoogleStackdriverTraceSpan = -- cgit v1.2.3