diff options
7 files changed, 62 insertions, 11 deletions
diff --git a/src/main/scala/xyz/driver/core/app.scala b/src/main/scala/xyz/driver/core/app.scala index 763a363..a7f58e3 100644 --- a/src/main/scala/xyz/driver/core/app.scala +++ b/src/main/scala/xyz/driver/core/app.scala @@ -53,7 +53,8 @@ object app { implicit private lazy val materializer = ActorMaterializer()(actorSystem) private lazy val http = Http()(actorSystem) val appEnvironment = config.getString("application.environment") - val serviceTracer = tracer.getOrElse(new LoggingTrace(appName, config.getString("application.environment"), log)) + val serviceTracer = + tracer.getOrElse(new LoggingTrace(appName, config.getString("application.environment"), log, 10)) def run(): Unit = { activateServices(modules) scheduleServicesDeactivation(modules) @@ -63,7 +64,8 @@ object app { def stop(): Unit = { http.shutdownAllConnectionPools().onComplete { _ => - val _ = actorSystem.terminate() + val _ = actorSystem.terminate() + serviceTracer.flush() // flush out any remaining traces from the buffer val terminated = Await.result(actorSystem.whenTerminated, 30.seconds) val addressTerminated = if (terminated.addressTerminated) "is" else "is not" Console.print(s"${this.getClass.getName} App $addressTerminated stopped ") diff --git a/src/main/scala/xyz/driver/core/trace/ExceptionLoggingSizedBufferingTraceConsumer.scala b/src/main/scala/xyz/driver/core/trace/ExceptionLoggingSizedBufferingTraceConsumer.scala new file mode 100644 index 0000000..64ae15a --- /dev/null +++ b/src/main/scala/xyz/driver/core/trace/ExceptionLoggingSizedBufferingTraceConsumer.scala @@ -0,0 +1,26 @@ +package xyz.driver.core.trace + +import com.google.cloud.trace.v1.consumer.{FlushableTraceConsumer} +import com.google.devtools.cloudtrace.v1.Traces +import com.typesafe.scalalogging.Logger +import scala.util.Try + +/** + * ExceptionLoggingFlushableTraceConsumer simply wraps a flushable trace consumer and catches/logs any exceptions + * @param traceConsumer the flusable trace consumer to wrap + * @param log where to log any exceptions + */ +class ExceptionLoggingFlushableTraceConsumer(traceConsumer: FlushableTraceConsumer, log: Logger) + extends FlushableTraceConsumer { + + private val flushableTraceConsumer = traceConsumer + + private def exceptionLogger(exception: Throwable): Unit = + log.error(s"Encountered exception logging to google $exception") + + override def receive(trace: Traces): Unit = + Try(flushableTraceConsumer.receive(trace)).fold(exceptionLogger, identity) + + override def flush(): Unit = + Try(flushableTraceConsumer.flush()).fold(exceptionLogger, identity) +} diff --git a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala index 1ff8d10..04f2ec6 100644 --- a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala +++ b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTrace.scala @@ -14,7 +14,8 @@ final class GoogleStackdriverTrace(projectId: String, clientSecretsFile: String, appName: String, appEnvironment: String, - log: Logger) + log: Logger, + bufferSize: Int = 10) extends GoogleServiceTracer { // initialize our various tracking storage systems @@ -38,10 +39,12 @@ final class GoogleStackdriverTrace(projectId: String, } private val googleServiceTracer = - new GoogleStackdriverTraceWithConsumer(projectId, appName, appEnvironment, traceConsumer) + new GoogleStackdriverTraceWithConsumer(projectId, appName, appEnvironment, traceConsumer, log, bufferSize) override def startSpan(httpRequest: HttpRequest): GoogleStackdriverTraceSpan = googleServiceTracer.startSpan(httpRequest) override def endSpan(span: GoogleStackdriverTraceSpan): Unit = googleServiceTracer.endSpan(span) + + override def flush(): Unit = googleServiceTracer.flush() } diff --git a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala index 7fed3c7..cd9170a 100644 --- a/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala +++ b/src/main/scala/xyz/driver/core/trace/GoogleStackdriverTraceWithConsumer.scala @@ -6,20 +6,25 @@ import com.google.cloud.trace.sink.TraceSink import com.google.cloud.trace.v1.TraceSinkV1 import com.google.cloud.trace.v1.consumer.{SizedBufferingTraceConsumer, TraceConsumer} import com.google.cloud.trace.v1.producer.TraceProducer -import com.google.cloud.trace.v1.util.RoughTraceSizer import com.google.cloud.trace.{SpanContextHandler, SpanContextHandlerTracer, Tracer} +import com.typesafe.scalalogging.Logger import scala.compat.java8.OptionConverters._ final class GoogleStackdriverTraceWithConsumer(projectId: String, appName: String, appEnvironment: String, - traceConsumer: TraceConsumer) + traceConsumer: TraceConsumer, + log: Logger, + bufferSize: Int) extends GoogleServiceTracer { private val traceProducer: TraceProducer = new TraceProducer() - private val threadSafeBufferingTraceConsumer = - new SizedBufferingTraceConsumer(traceConsumer, new RoughTraceSizer(), 100) + // use a UnitTraceSizer so the interpretation of bufferSize is # of spans to hold in memory prior to flushing + private val threadSafeBufferingTraceConsumer = new ExceptionLoggingFlushableTraceConsumer( + new SizedBufferingTraceConsumer(traceConsumer, new UnitTraceSizer(), bufferSize), + log + ) private val traceSink: TraceSink = new TraceSinkV1(projectId, traceProducer, threadSafeBufferingTraceConsumer) @@ -69,7 +74,8 @@ final class GoogleStackdriverTraceWithConsumer(projectId: String, override def endSpan(span: TracerSpanPayload): Unit = { span.tracer.endSpan(span.context) - threadSafeBufferingTraceConsumer.flush() // flush out the thread safe buffer } + override def flush(): Unit = threadSafeBufferingTraceConsumer.flush() // flush out the thread safe buffer + } diff --git a/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala b/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala index 9db85b7..cd920f0 100644 --- a/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala +++ b/src/main/scala/xyz/driver/core/trace/LoggingTrace.scala @@ -4,18 +4,23 @@ import akka.http.scaladsl.model.HttpRequest import com.google.cloud.trace.v1.consumer.TraceConsumer import com.typesafe.scalalogging.Logger -final class LoggingTrace(appName: String, appEnvironment: String, log: Logger) extends GoogleServiceTracer { +final class LoggingTrace(appName: String, appEnvironment: String, log: Logger, bufferSize: Int) + extends GoogleServiceTracer { private val traceConsumer: TraceConsumer = new LoggingTraceConsumer(log) private val googleServiceTracer = new GoogleStackdriverTraceWithConsumer( "logging-tracer", appName, appEnvironment, - traceConsumer + traceConsumer, + log, + bufferSize ) override def startSpan(httpRequest: HttpRequest): GoogleStackdriverTraceSpan = googleServiceTracer.startSpan(httpRequest) override def endSpan(span: GoogleStackdriverTraceSpan): Unit = googleServiceTracer.endSpan(span) + + override def flush(): Unit = googleServiceTracer.flush() } diff --git a/src/main/scala/xyz/driver/core/trace/ServiceTracer.scala b/src/main/scala/xyz/driver/core/trace/ServiceTracer.scala index 25562cd..1413b63 100644 --- a/src/main/scala/xyz/driver/core/trace/ServiceTracer.scala +++ b/src/main/scala/xyz/driver/core/trace/ServiceTracer.scala @@ -14,4 +14,6 @@ trait ServiceTracer { def startSpan(httpRequest: HttpRequest): TracerSpanPayload def endSpan(span: TracerSpanPayload): Unit + + def flush(): Unit } diff --git a/src/main/scala/xyz/driver/core/trace/UnitTraceSizer.scala b/src/main/scala/xyz/driver/core/trace/UnitTraceSizer.scala new file mode 100644 index 0000000..a7d6032 --- /dev/null +++ b/src/main/scala/xyz/driver/core/trace/UnitTraceSizer.scala @@ -0,0 +1,7 @@ +package xyz.driver.core.trace +import com.google.cloud.trace.v1.util.Sizer +import com.google.devtools.cloudtrace.v1.Trace + +class UnitTraceSizer extends Sizer[Trace] { + override def size(sizeable: Trace) = 1 +} |