From cacf57dc4af5b0104b28b6417715c660aef613f3 Mon Sep 17 00:00:00 2001 From: John St John Date: Wed, 4 Oct 2017 10:39:21 -0700 Subject: Follow google example to implement a scheduled flusing trace consumer by default --- src/main/scala/xyz/driver/core/app.scala | 2 +- .../driver/core/trace/GoogleStackdriverTrace.scala | 11 +++++++++-- .../trace/GoogleStackdriverTraceWithConsumer.scala | 20 +++++++++++++++++--- .../scala/xyz/driver/core/trace/LoggingTrace.scala | 5 +++-- 4 files changed, 30 insertions(+), 8 deletions(-) (limited to 'src/main') diff --git a/src/main/scala/xyz/driver/core/app.scala b/src/main/scala/xyz/driver/core/app.scala index a7f58e3..a7ba7aa 100644 --- a/src/main/scala/xyz/driver/core/app.scala +++ b/src/main/scala/xyz/driver/core/app.scala @@ -54,7 +54,7 @@ object app { private lazy val http = Http()(actorSystem) val appEnvironment = config.getString("application.environment") val serviceTracer = - tracer.getOrElse(new LoggingTrace(appName, config.getString("application.environment"), log, 10)) + tracer.getOrElse(new LoggingTrace(appName, config.getString("application.environment"), log, 1024, 15)) def run(): Unit = { activateServices(modules) scheduleServicesDeactivation(modules) diff --git a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala index ce84f9d..fe4bb5c 100644 --- a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala +++ b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala @@ -15,7 +15,8 @@ final class GoogleStackdriverTrace(projectId: String, appName: String, appEnvironment: String, log: Logger, - bufferSize: Int = 10) + bufferSize: Int = 1024, + scheduledDelay: Int = 15) extends GoogleServiceTracer { // initialize our various tracking storage systems @@ -38,7 +39,13 @@ final class GoogleStackdriverTrace(projectId: String, } private val googleServiceTracer = - new GoogleStackdriverTraceWithConsumer(projectId, appName, appEnvironment, traceConsumer, log, bufferSize) + new GoogleStackdriverTraceWithConsumer(projectId, + appName, + appEnvironment, + traceConsumer, + log, + bufferSize, + scheduledDelay) override def startSpan(httpRequest: HttpRequest): GoogleStackdriverTraceSpan = googleServiceTracer.startSpan(httpRequest) diff --git a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala index cd9170a..ca1eab2 100644 --- a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala +++ b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala @@ -4,10 +4,12 @@ import akka.http.scaladsl.model.HttpRequest import com.google.cloud.trace.core._ import com.google.cloud.trace.sink.TraceSink import com.google.cloud.trace.v1.TraceSinkV1 -import com.google.cloud.trace.v1.consumer.{SizedBufferingTraceConsumer, TraceConsumer} +import com.google.cloud.trace.v1.consumer.{ScheduledBufferingTraceConsumer, TraceConsumer} import com.google.cloud.trace.v1.producer.TraceProducer import com.google.cloud.trace.{SpanContextHandler, SpanContextHandlerTracer, Tracer} import com.typesafe.scalalogging.Logger +import java.util.concurrent.ScheduledThreadPoolExecutor +import java.util.concurrent.TimeUnit import scala.compat.java8.OptionConverters._ @@ -16,13 +18,25 @@ final class GoogleStackdriverTraceWithConsumer(projectId: String, appEnvironment: String, traceConsumer: TraceConsumer, log: Logger, - bufferSize: Int) + bufferSize: Int, + scheduledDelay: Int) extends GoogleServiceTracer { private val traceProducer: TraceProducer = new TraceProducer() // use a UnitTraceSizer so the interpretation of bufferSize is # of spans to hold in memory prior to flushing + + // now set up the scheduled executor for time-based flushing of our tracing + // see https://goo.gl/HrPLuC + private val scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1) // this is just used for it's timing execution + scheduledThreadPoolExecutor.setKeepAliveTime(scheduledDelay.toLong, TimeUnit.SECONDS) + scheduledThreadPoolExecutor.allowCoreThreadTimeOut(true) + private val executorService = scheduledThreadPoolExecutor private val threadSafeBufferingTraceConsumer = new ExceptionLoggingFlushableTraceConsumer( - new SizedBufferingTraceConsumer(traceConsumer, new UnitTraceSizer(), bufferSize), + new ScheduledBufferingTraceConsumer(traceConsumer, + new UnitTraceSizer(), + bufferSize, + scheduledDelay, + executorService), log ) diff --git a/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala b/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala index cd920f0..6a13708 100644 --- a/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala +++ b/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala @@ -4,7 +4,7 @@ import akka.http.scaladsl.model.HttpRequest import com.google.cloud.trace.v1.consumer.TraceConsumer import com.typesafe.scalalogging.Logger -final class LoggingTrace(appName: String, appEnvironment: String, log: Logger, bufferSize: Int) +final class LoggingTrace(appName: String, appEnvironment: String, log: Logger, bufferSize: Int, scheduledDelay: Int) extends GoogleServiceTracer { private val traceConsumer: TraceConsumer = new LoggingTraceConsumer(log) @@ -14,7 +14,8 @@ final class LoggingTrace(appName: String, appEnvironment: String, log: Logger, b appEnvironment, traceConsumer, log, - bufferSize + bufferSize, + scheduledDelay ) override def startSpan(httpRequest: HttpRequest): GoogleStackdriverTraceSpan = -- cgit v1.2.3