aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn St John <johnthesaintjohn@gmail.com>2017-10-04 10:39:21 -0700
committerJohn St John <johnthesaintjohn@gmail.com>2017-10-04 10:39:21 -0700
commitcacf57dc4af5b0104b28b6417715c660aef613f3 (patch)
tree111f9973b559cd88fcf910c780b2821268add6af
parent16d2b4c6b4d230681cae45d2c2557a4cd46439a2 (diff)
downloaddriver-core-cacf57dc4af5b0104b28b6417715c660aef613f3.tar.gz
driver-core-cacf57dc4af5b0104b28b6417715c660aef613f3.tar.bz2
driver-core-cacf57dc4af5b0104b28b6417715c660aef613f3.zip
Follow google example to implement a scheduled flusing trace consumer by default
-rw-r--r--build.sbt6
-rw-r--r--src/main/scala/xyz/driver/core/app.scala2
-rw-r--r--src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala11
-rw-r--r--src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala20
-rw-r--r--src/main/scala/xyz/driver/core/trace/LoggingTrace.scala5
5 files changed, 33 insertions, 11 deletions
diff --git a/build.sbt b/build.sbt
index 175f674..f6473ae 100644
--- a/build.sbt
+++ b/build.sbt
@@ -12,17 +12,17 @@ lazy val core = (project in file("."))
"com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpV,
"com.typesafe.akka" %% "akka-http-testkit" % akkaHttpV,
"com.pauldijou" %% "jwt-core" % "0.14.0",
- "org.scalatest" %% "scalatest" % "3.0.1" % "test",
+ "org.scalatest" %% "scalatest" % "3.0.2" % "test",
"org.scalacheck" %% "scalacheck" % "1.13.4" % "test",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
- "com.github.swagger-akka-http" %% "swagger-akka-http" % "0.9.1",
+ "com.github.swagger-akka-http" %% "swagger-akka-http" % "0.9.2",
"com.amazonaws" % "aws-java-sdk-s3" % "1.11.26",
"com.google.cloud" % "google-cloud-pubsub" % "0.25.0-beta",
"com.google.cloud" % "google-cloud-storage" % "1.7.0",
"com.typesafe.slick" %% "slick" % "3.2.1",
"com.typesafe" % "config" % "1.2.1",
"com.typesafe.scala-logging" %% "scala-logging" % "3.5.0",
- "ch.qos.logback" % "logback-classic" % "1.1.3",
+ "ch.qos.logback" % "logback-classic" % "1.1.11",
"com.google.cloud.trace" % "core" % googleTraceV,
"com.google.cloud.trace" % "logging-service" % googleTraceV,
"com.google.cloud.trace" % "trace-grpc-api-service" % googleTraceV,
diff --git a/src/main/scala/xyz/driver/core/app.scala b/src/main/scala/xyz/driver/core/app.scala
index a7f58e3..a7ba7aa 100644
--- a/src/main/scala/xyz/driver/core/app.scala
+++ b/src/main/scala/xyz/driver/core/app.scala
@@ -54,7 +54,7 @@ object app {
private lazy val http = Http()(actorSystem)
val appEnvironment = config.getString("application.environment")
val serviceTracer =
- tracer.getOrElse(new LoggingTrace(appName, config.getString("application.environment"), log, 10))
+ tracer.getOrElse(new LoggingTrace(appName, config.getString("application.environment"), log, 1024, 15))
def run(): Unit = {
activateServices(modules)
scheduleServicesDeactivation(modules)
diff --git a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala
index ce84f9d..fe4bb5c 100644
--- a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala
+++ b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala
@@ -15,7 +15,8 @@ final class GoogleStackdriverTrace(projectId: String,
appName: String,
appEnvironment: String,
log: Logger,
- bufferSize: Int = 10)
+ bufferSize: Int = 1024,
+ scheduledDelay: Int = 15)
extends GoogleServiceTracer {
// initialize our various tracking storage systems
@@ -38,7 +39,13 @@ final class GoogleStackdriverTrace(projectId: String,
}
private val googleServiceTracer =
- new GoogleStackdriverTraceWithConsumer(projectId, appName, appEnvironment, traceConsumer, log, bufferSize)
+ new GoogleStackdriverTraceWithConsumer(projectId,
+ appName,
+ appEnvironment,
+ traceConsumer,
+ log,
+ bufferSize,
+ scheduledDelay)
override def startSpan(httpRequest: HttpRequest): GoogleStackdriverTraceSpan =
googleServiceTracer.startSpan(httpRequest)
diff --git a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala
index cd9170a..ca1eab2 100644
--- a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala
+++ b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala
@@ -4,10 +4,12 @@ import akka.http.scaladsl.model.HttpRequest
import com.google.cloud.trace.core._
import com.google.cloud.trace.sink.TraceSink
import com.google.cloud.trace.v1.TraceSinkV1
-import com.google.cloud.trace.v1.consumer.{SizedBufferingTraceConsumer, TraceConsumer}
+import com.google.cloud.trace.v1.consumer.{ScheduledBufferingTraceConsumer, TraceConsumer}
import com.google.cloud.trace.v1.producer.TraceProducer
import com.google.cloud.trace.{SpanContextHandler, SpanContextHandlerTracer, Tracer}
import com.typesafe.scalalogging.Logger
+import java.util.concurrent.ScheduledThreadPoolExecutor
+import java.util.concurrent.TimeUnit
import scala.compat.java8.OptionConverters._
@@ -16,13 +18,25 @@ final class GoogleStackdriverTraceWithConsumer(projectId: String,
appEnvironment: String,
traceConsumer: TraceConsumer,
log: Logger,
- bufferSize: Int)
+ bufferSize: Int,
+ scheduledDelay: Int)
extends GoogleServiceTracer {
private val traceProducer: TraceProducer = new TraceProducer()
// use a UnitTraceSizer so the interpretation of bufferSize is # of spans to hold in memory prior to flushing
+
+ // now set up the scheduled executor for time-based flushing of our tracing
+ // see https://goo.gl/HrPLuC
+ private val scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1) // this is just used for it's timing execution
+ scheduledThreadPoolExecutor.setKeepAliveTime(scheduledDelay.toLong, TimeUnit.SECONDS)
+ scheduledThreadPoolExecutor.allowCoreThreadTimeOut(true)
+ private val executorService = scheduledThreadPoolExecutor
private val threadSafeBufferingTraceConsumer = new ExceptionLoggingFlushableTraceConsumer(
- new SizedBufferingTraceConsumer(traceConsumer, new UnitTraceSizer(), bufferSize),
+ new ScheduledBufferingTraceConsumer(traceConsumer,
+ new UnitTraceSizer(),
+ bufferSize,
+ scheduledDelay,
+ executorService),
log
)
diff --git a/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala b/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala
index cd920f0..6a13708 100644
--- a/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala
+++ b/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala
@@ -4,7 +4,7 @@ import akka.http.scaladsl.model.HttpRequest
import com.google.cloud.trace.v1.consumer.TraceConsumer
import com.typesafe.scalalogging.Logger
-final class LoggingTrace(appName: String, appEnvironment: String, log: Logger, bufferSize: Int)
+final class LoggingTrace(appName: String, appEnvironment: String, log: Logger, bufferSize: Int, scheduledDelay: Int)
extends GoogleServiceTracer {
private val traceConsumer: TraceConsumer = new LoggingTraceConsumer(log)
@@ -14,7 +14,8 @@ final class LoggingTrace(appName: String, appEnvironment: String, log: Logger, b
appEnvironment,
traceConsumer,
log,
- bufferSize
+ bufferSize,
+ scheduledDelay
)
override def startSpan(httpRequest: HttpRequest): GoogleStackdriverTraceSpan =