From 616e62e733dbbd4e6bacc5f563deef534794dc9e Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Wed, 12 Sep 2018 16:03:17 -0700 Subject: Move reporting into separate project --- .../driver/core/reporting/GoogleMdcLogger.scala | 14 ++ .../xyz/driver/core/reporting/GoogleReporter.scala | 217 +++++++++++++++++++++ .../xyz/driver/core/reporting/NoReporter.scala | 8 + .../driver/core/reporting/NoTraceReporter.scala | 20 ++ .../scala/xyz/driver/core/reporting/Reporter.scala | 183 +++++++++++++++++ .../driver/core/reporting/ScalaLoggingCompat.scala | 36 ++++ .../xyz/driver/core/reporting/SpanContext.scala | 13 ++ .../src/main/scala/xyz/driver/core/Refresh.scala | 69 +++++++ src/main/scala/xyz/driver/core/Refresh.scala | 69 ------- .../driver/core/reporting/GoogleMdcLogger.scala | 14 -- .../xyz/driver/core/reporting/GoogleReporter.scala | 217 --------------------- .../xyz/driver/core/reporting/NoReporter.scala | 8 - .../driver/core/reporting/NoTraceReporter.scala | 20 -- .../scala/xyz/driver/core/reporting/Reporter.scala | 183 ----------------- .../driver/core/reporting/ScalaLoggingCompat.scala | 36 ---- .../xyz/driver/core/reporting/SpanContext.scala | 13 -- 16 files changed, 560 insertions(+), 560 deletions(-) create mode 100644 core-reporting/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala create mode 100644 core-reporting/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala create mode 100644 core-reporting/src/main/scala/xyz/driver/core/reporting/NoReporter.scala create mode 100644 core-reporting/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala create mode 100644 core-reporting/src/main/scala/xyz/driver/core/reporting/Reporter.scala create mode 100644 core-reporting/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala create mode 100644 core-reporting/src/main/scala/xyz/driver/core/reporting/SpanContext.scala create mode 100644 core-util/src/main/scala/xyz/driver/core/Refresh.scala delete mode 100644 src/main/scala/xyz/driver/core/Refresh.scala delete mode 100644 src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala delete mode 100644 src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala delete mode 100644 src/main/scala/xyz/driver/core/reporting/NoReporter.scala delete mode 100644 src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala delete mode 100644 src/main/scala/xyz/driver/core/reporting/Reporter.scala delete mode 100644 src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala delete mode 100644 src/main/scala/xyz/driver/core/reporting/SpanContext.scala diff --git a/core-reporting/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala b/core-reporting/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala new file mode 100644 index 0000000..f5c41cf --- /dev/null +++ b/core-reporting/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala @@ -0,0 +1,14 @@ +package xyz.driver.core +package reporting + +import org.slf4j.MDC + +trait GoogleMdcLogger extends Reporter { self: GoogleReporter => + + abstract override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])( + implicit ctx: SpanContext): Unit = { + MDC.put("trace", s"projects/${credentials.getProjectId}/traces/${ctx.traceId}") + super.log(severity, message, reason) + } + +} diff --git a/core-reporting/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala b/core-reporting/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala new file mode 100644 index 0000000..14c4954 --- /dev/null +++ b/core-reporting/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala @@ -0,0 +1,217 @@ +package xyz.driver.core +package reporting + +import java.security.Signature +import java.time.Instant +import java.util + +import akka.NotUsed +import akka.stream.scaladsl.{Flow, RestartSink, Sink, Source, SourceQueueWithComplete} +import akka.stream.{Materializer, OverflowStrategy} +import com.google.auth.oauth2.ServiceAccountCredentials +import com.softwaremill.sttp._ +import spray.json.DerivedJsonProtocol._ +import spray.json._ +import xyz.driver.core.reporting.Reporter.CausalRelation + +import scala.async.Async._ +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future} +import scala.util.control.NonFatal +import scala.util.{Failure, Random, Success, Try} + +/** A reporter that collects traces and submits them to + * [[https://cloud.google.com/trace/docs/reference/v2/rest/ Google's Stackdriver Trace API]]. + */ +class GoogleReporter( + val credentials: ServiceAccountCredentials, + namespace: String, + buffer: Int = GoogleReporter.DefaultBufferSize, + interval: FiniteDuration = GoogleReporter.DefaultInterval)( + implicit client: SttpBackend[Future, _], + mat: Materializer, + ec: ExecutionContext +) extends Reporter { + import GoogleReporter._ + + private val getToken: () => Future[String] = Refresh.every(55.minutes) { + def jwt = { + val now = Instant.now().getEpochSecond + val base64 = util.Base64.getEncoder + val header = base64.encodeToString("""{"alg":"RS256","typ":"JWT"}""".getBytes("utf-8")) + val body = base64.encodeToString( + s"""|{ + | "iss": "${credentials.getClientEmail}", + | "scope": "https://www.googleapis.com/auth/trace.append", + | "aud": "https://www.googleapis.com/oauth2/v4/token", + | "exp": ${now + 60.minutes.toSeconds}, + | "iat": $now + |}""".stripMargin.getBytes("utf-8") + ) + val signer = Signature.getInstance("SHA256withRSA") + signer.initSign(credentials.getPrivateKey) + signer.update(s"$header.$body".getBytes("utf-8")) + val signature = base64.encodeToString(signer.sign()) + s"$header.$body.$signature" + } + sttp + .post(uri"https://www.googleapis.com/oauth2/v4/token") + .body( + "grant_type" -> "urn:ietf:params:oauth:grant-type:jwt-bearer", + "assertion" -> jwt + ) + .mapResponse(s => s.parseJson.asJsObject.fields("access_token").convertTo[String]) + .send() + .map(_.unsafeBody) + } + + private val sendToGoogle: Sink[Span, NotUsed] = RestartSink.withBackoff( + minBackoff = 3.seconds, + maxBackoff = 30.seconds, + randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly + ) { () => + Flow[Span] + .groupedWithin(buffer, interval) + .mapAsync(1) { spans => + async { + val token = await(getToken()) + val res = await( + sttp + .post(uri"https://cloudtrace.googleapis.com/v2/projects/${credentials.getProjectId}/traces:batchWrite") + .auth + .bearer(token) + .body( + Spans(spans).toJson.compactPrint + ) + .send() + .map(_.unsafeBody)) + res + } + } + .recover { + case NonFatal(e) => + System.err.println(s"Error submitting trace spans: $e") // scalastyle: ignore + throw e + } + .to(Sink.ignore) + } + private val queue: SourceQueueWithComplete[Span] = Source + .queue[Span](buffer, OverflowStrategy.dropHead) + .to(sendToGoogle) + .run() + + private def submit(span: Span): Unit = queue.offer(span).failed.map { e => + System.err.println(s"Error adding span to submission queue: $e") + } + + private def startSpan( + traceId: String, + spanId: String, + parentSpanId: Option[String], + displayName: String, + attributes: Map[String, String]) = Span( + s"projects/${credentials.getProjectId}/traces/$traceId/spans/$spanId", + spanId, + parentSpanId, + TruncatableString(displayName), + Instant.now(), + Instant.now(), + Attributes(attributes ++ Map("service.namespace" -> namespace)), + Failure(new IllegalStateException("span status not set")) + ) + + def traceWithOptionalParent[A]( + operationName: String, + tags: Map[String, String], + parent: Option[(SpanContext, CausalRelation)])(operation: SpanContext => A): A = { + val child = parent match { + case Some((p, _)) => SpanContext(p.traceId, f"${Random.nextLong()}%016x") + case None => SpanContext.fresh() + } + val span = startSpan(child.traceId, child.spanId, parent.map(_._1.spanId), operationName, tags) + val result = operation(child) + span.endTime = Instant.now() + submit(span) + result + } + + def traceWithOptionalParentAsync[A]( + operationName: String, + tags: Map[String, String], + parent: Option[(SpanContext, CausalRelation)])(operation: SpanContext => Future[A]): Future[A] = { + val child = parent match { + case Some((p, _)) => SpanContext(p.traceId, f"${Random.nextLong()}%016x") + case None => SpanContext.fresh() + } + val span = startSpan(child.traceId, child.spanId, parent.map(_._1.spanId), operationName, tags) + val result = operation(child) + result.onComplete { result => + span.endTime = Instant.now() + span.status = result + submit(span) + } + result + } + + override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])( + implicit ctx: SpanContext): Unit = + sys.error("Submitting logs directly to GCP is " + + "currently not supported. Messages should go to stdout.") // TODO: attach logs to traces and submit them directly + +} + +object GoogleReporter { + + val DefaultBufferSize: Int = 10000 + val DefaultInterval: FiniteDuration = 5.seconds + + private case class Attributes(attributeMap: Map[String, String]) + private case class TruncatableString(value: String) + private case class Span( + name: String, + spanId: String, + parentSpanId: Option[String], + displayName: TruncatableString, + startTime: Instant, + var endTime: Instant, + var attributes: Attributes, + var status: Try[_] + ) + + private case class Spans(spans: Seq[Span]) + + private implicit val instantFormat: RootJsonFormat[Instant] = new RootJsonFormat[Instant] { + override def write(obj: Instant): JsValue = obj.toString.toJson + override def read(json: JsValue): Instant = Instant.parse(json.convertTo[String]) + } + + private implicit val mapFormat = new RootJsonFormat[Map[String, String]] { + override def read(json: JsValue): Map[String, String] = sys.error("unimplemented") + override def write(obj: Map[String, String]): JsValue = { + val withValueObjects = obj.mapValues(value => JsObject("stringValue" -> JsObject("value" -> value.toJson))) + JsObject(withValueObjects) + } + } + + private implicit val statusFormat = new RootJsonFormat[Try[_]] { + override def read(json: JsValue): Try[_] = sys.error("unimplemented") + override def write(obj: Try[_]) = { + // error codes defined at https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto + val (code, message) = obj match { + case Success(_) => (0, "success") + case Failure(_) => (2, "failure") + } + JsObject( + "code" -> code.toJson, + "message" -> message.toJson, + "details" -> JsArray() + ) + } + } + + private implicit val attributeFormat: RootJsonFormat[Attributes] = jsonFormat1(Attributes) + private implicit val truncatableStringFormat: RootJsonFormat[TruncatableString] = jsonFormat1(TruncatableString) + private implicit val spanFormat: RootJsonFormat[Span] = jsonFormat8(Span) + private implicit val spansFormat: RootJsonFormat[Spans] = jsonFormat1(Spans) + +} diff --git a/core-reporting/src/main/scala/xyz/driver/core/reporting/NoReporter.scala b/core-reporting/src/main/scala/xyz/driver/core/reporting/NoReporter.scala new file mode 100644 index 0000000..c1c81f4 --- /dev/null +++ b/core-reporting/src/main/scala/xyz/driver/core/reporting/NoReporter.scala @@ -0,0 +1,8 @@ +package xyz.driver.core +package reporting + +trait NoReporter extends NoTraceReporter { + override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])( + implicit ctx: SpanContext): Unit = () +} +object NoReporter extends NoReporter diff --git a/core-reporting/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala b/core-reporting/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala new file mode 100644 index 0000000..b49cfda --- /dev/null +++ b/core-reporting/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala @@ -0,0 +1,20 @@ +package xyz.driver.core +package reporting + +import scala.concurrent.Future + +/** A reporter mixin that does not emit traces. */ +trait NoTraceReporter extends Reporter { + + override def traceWithOptionalParent[A]( + name: String, + tags: Map[String, String], + parent: Option[(SpanContext, Reporter.CausalRelation)])(op: SpanContext => A): A = op(SpanContext.fresh()) + + override def traceWithOptionalParentAsync[A]( + name: String, + tags: Map[String, String], + parent: Option[(SpanContext, Reporter.CausalRelation)])(op: SpanContext => Future[A]): Future[A] = + op(SpanContext.fresh()) + +} diff --git a/core-reporting/src/main/scala/xyz/driver/core/reporting/Reporter.scala b/core-reporting/src/main/scala/xyz/driver/core/reporting/Reporter.scala new file mode 100644 index 0000000..469084c --- /dev/null +++ b/core-reporting/src/main/scala/xyz/driver/core/reporting/Reporter.scala @@ -0,0 +1,183 @@ +package xyz.driver.core +package reporting + +import scala.concurrent.Future + +/** Context-aware diagnostic utility for distributed systems, combining logging and tracing. + * + * Diagnostic messages (i.e. logs) are a vital tool for monitoring applications. Tying such messages to an + * execution context, such as a stack trace, simplifies debugging greatly by giving insight to the chains of events + * that led to a particular message. In synchronous systems, execution contexts can easily be determined by an + * external observer, and, as such, do not need to be propagated explicitly to sub-components (e.g. a stack trace on + * the JVM shows all relevant information). In asynchronous systems and especially distributed systems however, + * execution contexts are not easily determined by an external observer and hence need to be explicitly passed across + * service boundaries. + * + * This reporter provides tracing and logging utilities that explicitly require references to execution contexts + * (called [[SpanContext]]s here) intended to be passed across service boundaries. It embraces Scala's + * implicit-parameter-as-a-context paradigm. + * + * Tracing is intended to be compatible with the + * [[https://github.com/opentracing/specification/blob/master/specification.md OpenTrace specification]], and hence its + * guidelines on naming and tagging apply to methods provided by this Reporter as well. + * + * Usage example: + * {{{ + * val reporter: Reporter = ??? + * object Repo { + * def getUserQuery(userId: String)(implicit ctx: SpanContext) = reporter.trace("query"){ implicit ctx => + * reporter.debug("Running query") + * // run query + * } + * } + * object Service { + * def getUser(userId: String)(implicit ctx: SpanContext) = reporter.trace("get_user"){ implicit ctx => + * reporter.debug("Getting user") + * Repo.getUserQuery(userId) + * } + * } + * reporter.traceRoot("static_get", Map("user" -> "john")) { implicit ctx => + * Service.getUser("john") + * } + * }}} + * + * '''Note that computing traces may be a more expensive operation than traditional logging frameworks provide (in terms + * of memory and processing). It should be used in interesting and actionable code paths.''' + * + * @define rootWarning Note: the idea of the reporting framework is to pass along references to traces as + * implicit parameters. This method should only be used for top-level traces when no parent + * traces are available. + */ +trait Reporter { + import Reporter._ + + def traceWithOptionalParent[A]( + name: String, + tags: Map[String, String], + parent: Option[(SpanContext, CausalRelation)])(op: SpanContext => A): A + def traceWithOptionalParentAsync[A]( + name: String, + tags: Map[String, String], + parent: Option[(SpanContext, CausalRelation)])(op: SpanContext => Future[A]): Future[A] + + /** Trace the execution of an operation, if no parent trace is available. + * + * $rootWarning + */ + final def traceRoot[A](name: String, tags: Map[String, String] = Map.empty)(op: SpanContext => A): A = + traceWithOptionalParent( + name, + tags, + None + )(op) + + /** Trace the execution of an asynchronous operation, if no parent trace is available. + * + * $rootWarning + * + * @see traceRoot + */ + final def traceRootAsync[A](name: String, tags: Map[String, String] = Map.empty)( + op: SpanContext => Future[A]): Future[A] = + traceWithOptionalParentAsync( + name, + tags, + None + )(op) + + /** Trace the execution of an operation, in relation to a parent context. + * + * @param name The name of the operation. Note that this name should not be too specific. According to the + * OpenTrace RFC: "An operation name, a human-readable string which concisely represents the work done + * by the Span (for example, an RPC method name, a function name, or the name of a subtask or stage + * within a larger computation). The operation name should be the most general string that identifies a + * (statistically) interesting class of Span instances. That is, `"get_user"` is better than + * `"get_user/314159"`". + * @param tags Attributes associated with the traced event. Following the above example, if `"get_user"` is an + * operation name, a good tag would be `("account_id" -> 314159)`. + * @param relation Relation of the operation to its parent context. + * @param op The operation to be traced. The trace will complete once the operation returns. + * @param ctx Context of the parent trace. + * @tparam A Return type of the operation. + * @return The value of the child operation. + */ + final def trace[A]( + name: String, + tags: Map[String, String] = Map.empty, + relation: CausalRelation = CausalRelation.Child)(op: /* implicit (gotta wait for Scala 3) */ SpanContext => A)( + implicit ctx: SpanContext): A = + traceWithOptionalParent( + name, + tags, + Some(ctx -> relation) + )(op) + + /** Trace the operation of an asynchronous operation. + * + * Contrary to the synchronous version of this method, a trace is completed once the child operation completes + * (rather than returns). + * + * @see trace + */ + final def traceAsync[A]( + name: String, + tags: Map[String, String] = Map.empty, + relation: CausalRelation = CausalRelation.Child)( + op: /* implicit (gotta wait for Scala 3) */ SpanContext => Future[A])(implicit ctx: SpanContext): Future[A] = + traceWithOptionalParentAsync( + name, + tags, + Some(ctx -> relation) + )(op) + + /** Log a message. */ + def log(severity: Severity, message: String, reason: Option[Throwable])(implicit ctx: SpanContext): Unit + + /** Log a debug message. */ + final def debug(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Debug, message, None) + final def debug(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = + log(Severity.Debug, message, Some(reason)) + + /** Log an informational message. */ + final def info(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Informational, message, None) + final def info(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = + log(Severity.Informational, message, Some(reason)) + + /** Log a warning message. */ + final def warn(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Warning, message, None) + final def warn(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = + log(Severity.Warning, message, Some(reason)) + + /** Log an error message. */ + final def error(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Error, message, None) + final def error(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = + log(Severity.Error, message, Some(reason)) + +} + +object Reporter { + + /** A relation in cause. + * + * Corresponds to + * [[https://github.com/opentracing/specification/blob/master/specification.md#references-between-spans OpenTrace references between spans]] + */ + sealed trait CausalRelation + object CausalRelation { + + /** One event is the child of another. The parent completes once the child is complete. */ + case object Child extends CausalRelation + + /** One event follows from another, not necessarily being the parent. */ + case object Follows extends CausalRelation + } + + sealed trait Severity + object Severity { + case object Debug extends Severity + case object Informational extends Severity + case object Warning extends Severity + case object Error extends Severity + } + +} diff --git a/core-reporting/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala b/core-reporting/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala new file mode 100644 index 0000000..0ff5574 --- /dev/null +++ b/core-reporting/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala @@ -0,0 +1,36 @@ +package xyz.driver.core +package reporting + +import com.typesafe.scalalogging.{Logger => ScalaLogger} + +/** Compatibility mixin for reporters, that enables implicit conversions to scala-logging loggers. */ +trait ScalaLoggingCompat extends Reporter { + import Reporter.Severity + + def logger: ScalaLogger + + override def log(severity: Severity, message: String, reason: Option[Throwable])(implicit ctx: SpanContext): Unit = + severity match { + case Severity.Debug => logger.debug(message, reason.orNull) + case Severity.Informational => logger.info(message, reason.orNull) + case Severity.Warning => logger.warn(message, reason.orNull) + case Severity.Error => logger.error(message, reason.orNull) + } + +} + +object ScalaLoggingCompat { + import scala.language.implicitConversions + + def defaultScalaLogger(json: Boolean = false): ScalaLogger = { + if (json) { + System.setProperty("logback.configurationFile", "deployed-logback.xml") + } else { + System.setProperty("logback.configurationFile", "logback.xml") + } + ScalaLogger.apply("application") + } + + implicit def toScalaLogger(logger: ScalaLoggingCompat): ScalaLogger = logger.logger + +} diff --git a/core-reporting/src/main/scala/xyz/driver/core/reporting/SpanContext.scala b/core-reporting/src/main/scala/xyz/driver/core/reporting/SpanContext.scala new file mode 100644 index 0000000..04a822d --- /dev/null +++ b/core-reporting/src/main/scala/xyz/driver/core/reporting/SpanContext.scala @@ -0,0 +1,13 @@ +package xyz.driver.core +package reporting + +import scala.util.Random + +case class SpanContext private[core] (traceId: String, spanId: String) + +object SpanContext { + def fresh(): SpanContext = SpanContext( + f"${Random.nextLong()}%016x${Random.nextLong()}%016x", + f"${Random.nextLong()}%016x" + ) +} diff --git a/core-util/src/main/scala/xyz/driver/core/Refresh.scala b/core-util/src/main/scala/xyz/driver/core/Refresh.scala new file mode 100644 index 0000000..6db9c26 --- /dev/null +++ b/core-util/src/main/scala/xyz/driver/core/Refresh.scala @@ -0,0 +1,69 @@ +package xyz.driver.core + +import java.time.Instant +import java.util.concurrent.atomic.AtomicReference + +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent.duration.Duration + +/** A single-value asynchronous cache with TTL. + * + * Slightly adapted from + * [[https://github.com/twitter/util/blob/ae0ab09134414438af9dfaa88a4613cecbff4741/util-cache/src/main/scala/com/twitter/cache/Refresh.scala + * Twitter's "util" library]] + * + * Released under the Apache License 2.0. + */ +object Refresh { + + /** Creates a function that will provide a cached value for a given time-to-live (TTL). + * + * It avoids the "thundering herd" problem if multiple requests arrive + * simultanously and the cached value has expired or is unset. + * + * Usage example: + * {{{ + * def freshToken(): Future[String] = // expensive network call to get an access token + * val getToken: () => Future[String] = Refresh.every(1.hour)(freshToken()) + * + * getToken() // new token is issued + * getToken() // subsequent calls use the cached token + * // wait 1 hour + * getToken() // new token is issued + * }}} + * + * @param ttl Time-To-Live duration to cache a computed value. + * @param compute Call-by-name operation that eventually computes a value to + * be cached. Note that if the computation (i.e. the future) fails, the value + * is not cached. + * @param ec The execution context in which valeu computations will be run. + * @return A zero-arg function that returns the cached value. + */ + def every[A](ttl: Duration)(compute: => Future[A])(implicit ec: ExecutionContext): () => Future[A] = { + val ref = new AtomicReference[(Future[A], Instant)]( + (Future.failed(new NoSuchElementException("Cached value was never computed")), Instant.MIN) + ) + def refresh(): Future[A] = { + val tuple = ref.get + val (cachedValue, lastRetrieved) = tuple + val now = Instant.now + if (now.getEpochSecond < lastRetrieved.getEpochSecond + ttl.toSeconds) { + cachedValue + } else { + val p = Promise[A] + val nextTuple = (p.future, now) + if (ref.compareAndSet(tuple, nextTuple)) { + compute.onComplete { done => + if (done.isFailure) { + ref.set((p.future, lastRetrieved)) // don't update retrieval time in case of failure + } + p.complete(done) + } + } + refresh() + } + } + refresh _ + } + +} diff --git a/src/main/scala/xyz/driver/core/Refresh.scala b/src/main/scala/xyz/driver/core/Refresh.scala deleted file mode 100644 index 6db9c26..0000000 --- a/src/main/scala/xyz/driver/core/Refresh.scala +++ /dev/null @@ -1,69 +0,0 @@ -package xyz.driver.core - -import java.time.Instant -import java.util.concurrent.atomic.AtomicReference - -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.concurrent.duration.Duration - -/** A single-value asynchronous cache with TTL. - * - * Slightly adapted from - * [[https://github.com/twitter/util/blob/ae0ab09134414438af9dfaa88a4613cecbff4741/util-cache/src/main/scala/com/twitter/cache/Refresh.scala - * Twitter's "util" library]] - * - * Released under the Apache License 2.0. - */ -object Refresh { - - /** Creates a function that will provide a cached value for a given time-to-live (TTL). - * - * It avoids the "thundering herd" problem if multiple requests arrive - * simultanously and the cached value has expired or is unset. - * - * Usage example: - * {{{ - * def freshToken(): Future[String] = // expensive network call to get an access token - * val getToken: () => Future[String] = Refresh.every(1.hour)(freshToken()) - * - * getToken() // new token is issued - * getToken() // subsequent calls use the cached token - * // wait 1 hour - * getToken() // new token is issued - * }}} - * - * @param ttl Time-To-Live duration to cache a computed value. - * @param compute Call-by-name operation that eventually computes a value to - * be cached. Note that if the computation (i.e. the future) fails, the value - * is not cached. - * @param ec The execution context in which valeu computations will be run. - * @return A zero-arg function that returns the cached value. - */ - def every[A](ttl: Duration)(compute: => Future[A])(implicit ec: ExecutionContext): () => Future[A] = { - val ref = new AtomicReference[(Future[A], Instant)]( - (Future.failed(new NoSuchElementException("Cached value was never computed")), Instant.MIN) - ) - def refresh(): Future[A] = { - val tuple = ref.get - val (cachedValue, lastRetrieved) = tuple - val now = Instant.now - if (now.getEpochSecond < lastRetrieved.getEpochSecond + ttl.toSeconds) { - cachedValue - } else { - val p = Promise[A] - val nextTuple = (p.future, now) - if (ref.compareAndSet(tuple, nextTuple)) { - compute.onComplete { done => - if (done.isFailure) { - ref.set((p.future, lastRetrieved)) // don't update retrieval time in case of failure - } - p.complete(done) - } - } - refresh() - } - } - refresh _ - } - -} diff --git a/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala b/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala deleted file mode 100644 index f5c41cf..0000000 --- a/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala +++ /dev/null @@ -1,14 +0,0 @@ -package xyz.driver.core -package reporting - -import org.slf4j.MDC - -trait GoogleMdcLogger extends Reporter { self: GoogleReporter => - - abstract override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])( - implicit ctx: SpanContext): Unit = { - MDC.put("trace", s"projects/${credentials.getProjectId}/traces/${ctx.traceId}") - super.log(severity, message, reason) - } - -} diff --git a/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala b/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala deleted file mode 100644 index 14c4954..0000000 --- a/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala +++ /dev/null @@ -1,217 +0,0 @@ -package xyz.driver.core -package reporting - -import java.security.Signature -import java.time.Instant -import java.util - -import akka.NotUsed -import akka.stream.scaladsl.{Flow, RestartSink, Sink, Source, SourceQueueWithComplete} -import akka.stream.{Materializer, OverflowStrategy} -import com.google.auth.oauth2.ServiceAccountCredentials -import com.softwaremill.sttp._ -import spray.json.DerivedJsonProtocol._ -import spray.json._ -import xyz.driver.core.reporting.Reporter.CausalRelation - -import scala.async.Async._ -import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future} -import scala.util.control.NonFatal -import scala.util.{Failure, Random, Success, Try} - -/** A reporter that collects traces and submits them to - * [[https://cloud.google.com/trace/docs/reference/v2/rest/ Google's Stackdriver Trace API]]. - */ -class GoogleReporter( - val credentials: ServiceAccountCredentials, - namespace: String, - buffer: Int = GoogleReporter.DefaultBufferSize, - interval: FiniteDuration = GoogleReporter.DefaultInterval)( - implicit client: SttpBackend[Future, _], - mat: Materializer, - ec: ExecutionContext -) extends Reporter { - import GoogleReporter._ - - private val getToken: () => Future[String] = Refresh.every(55.minutes) { - def jwt = { - val now = Instant.now().getEpochSecond - val base64 = util.Base64.getEncoder - val header = base64.encodeToString("""{"alg":"RS256","typ":"JWT"}""".getBytes("utf-8")) - val body = base64.encodeToString( - s"""|{ - | "iss": "${credentials.getClientEmail}", - | "scope": "https://www.googleapis.com/auth/trace.append", - | "aud": "https://www.googleapis.com/oauth2/v4/token", - | "exp": ${now + 60.minutes.toSeconds}, - | "iat": $now - |}""".stripMargin.getBytes("utf-8") - ) - val signer = Signature.getInstance("SHA256withRSA") - signer.initSign(credentials.getPrivateKey) - signer.update(s"$header.$body".getBytes("utf-8")) - val signature = base64.encodeToString(signer.sign()) - s"$header.$body.$signature" - } - sttp - .post(uri"https://www.googleapis.com/oauth2/v4/token") - .body( - "grant_type" -> "urn:ietf:params:oauth:grant-type:jwt-bearer", - "assertion" -> jwt - ) - .mapResponse(s => s.parseJson.asJsObject.fields("access_token").convertTo[String]) - .send() - .map(_.unsafeBody) - } - - private val sendToGoogle: Sink[Span, NotUsed] = RestartSink.withBackoff( - minBackoff = 3.seconds, - maxBackoff = 30.seconds, - randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly - ) { () => - Flow[Span] - .groupedWithin(buffer, interval) - .mapAsync(1) { spans => - async { - val token = await(getToken()) - val res = await( - sttp - .post(uri"https://cloudtrace.googleapis.com/v2/projects/${credentials.getProjectId}/traces:batchWrite") - .auth - .bearer(token) - .body( - Spans(spans).toJson.compactPrint - ) - .send() - .map(_.unsafeBody)) - res - } - } - .recover { - case NonFatal(e) => - System.err.println(s"Error submitting trace spans: $e") // scalastyle: ignore - throw e - } - .to(Sink.ignore) - } - private val queue: SourceQueueWithComplete[Span] = Source - .queue[Span](buffer, OverflowStrategy.dropHead) - .to(sendToGoogle) - .run() - - private def submit(span: Span): Unit = queue.offer(span).failed.map { e => - System.err.println(s"Error adding span to submission queue: $e") - } - - private def startSpan( - traceId: String, - spanId: String, - parentSpanId: Option[String], - displayName: String, - attributes: Map[String, String]) = Span( - s"projects/${credentials.getProjectId}/traces/$traceId/spans/$spanId", - spanId, - parentSpanId, - TruncatableString(displayName), - Instant.now(), - Instant.now(), - Attributes(attributes ++ Map("service.namespace" -> namespace)), - Failure(new IllegalStateException("span status not set")) - ) - - def traceWithOptionalParent[A]( - operationName: String, - tags: Map[String, String], - parent: Option[(SpanContext, CausalRelation)])(operation: SpanContext => A): A = { - val child = parent match { - case Some((p, _)) => SpanContext(p.traceId, f"${Random.nextLong()}%016x") - case None => SpanContext.fresh() - } - val span = startSpan(child.traceId, child.spanId, parent.map(_._1.spanId), operationName, tags) - val result = operation(child) - span.endTime = Instant.now() - submit(span) - result - } - - def traceWithOptionalParentAsync[A]( - operationName: String, - tags: Map[String, String], - parent: Option[(SpanContext, CausalRelation)])(operation: SpanContext => Future[A]): Future[A] = { - val child = parent match { - case Some((p, _)) => SpanContext(p.traceId, f"${Random.nextLong()}%016x") - case None => SpanContext.fresh() - } - val span = startSpan(child.traceId, child.spanId, parent.map(_._1.spanId), operationName, tags) - val result = operation(child) - result.onComplete { result => - span.endTime = Instant.now() - span.status = result - submit(span) - } - result - } - - override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])( - implicit ctx: SpanContext): Unit = - sys.error("Submitting logs directly to GCP is " + - "currently not supported. Messages should go to stdout.") // TODO: attach logs to traces and submit them directly - -} - -object GoogleReporter { - - val DefaultBufferSize: Int = 10000 - val DefaultInterval: FiniteDuration = 5.seconds - - private case class Attributes(attributeMap: Map[String, String]) - private case class TruncatableString(value: String) - private case class Span( - name: String, - spanId: String, - parentSpanId: Option[String], - displayName: TruncatableString, - startTime: Instant, - var endTime: Instant, - var attributes: Attributes, - var status: Try[_] - ) - - private case class Spans(spans: Seq[Span]) - - private implicit val instantFormat: RootJsonFormat[Instant] = new RootJsonFormat[Instant] { - override def write(obj: Instant): JsValue = obj.toString.toJson - override def read(json: JsValue): Instant = Instant.parse(json.convertTo[String]) - } - - private implicit val mapFormat = new RootJsonFormat[Map[String, String]] { - override def read(json: JsValue): Map[String, String] = sys.error("unimplemented") - override def write(obj: Map[String, String]): JsValue = { - val withValueObjects = obj.mapValues(value => JsObject("stringValue" -> JsObject("value" -> value.toJson))) - JsObject(withValueObjects) - } - } - - private implicit val statusFormat = new RootJsonFormat[Try[_]] { - override def read(json: JsValue): Try[_] = sys.error("unimplemented") - override def write(obj: Try[_]) = { - // error codes defined at https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto - val (code, message) = obj match { - case Success(_) => (0, "success") - case Failure(_) => (2, "failure") - } - JsObject( - "code" -> code.toJson, - "message" -> message.toJson, - "details" -> JsArray() - ) - } - } - - private implicit val attributeFormat: RootJsonFormat[Attributes] = jsonFormat1(Attributes) - private implicit val truncatableStringFormat: RootJsonFormat[TruncatableString] = jsonFormat1(TruncatableString) - private implicit val spanFormat: RootJsonFormat[Span] = jsonFormat8(Span) - private implicit val spansFormat: RootJsonFormat[Spans] = jsonFormat1(Spans) - -} diff --git a/src/main/scala/xyz/driver/core/reporting/NoReporter.scala b/src/main/scala/xyz/driver/core/reporting/NoReporter.scala deleted file mode 100644 index c1c81f4..0000000 --- a/src/main/scala/xyz/driver/core/reporting/NoReporter.scala +++ /dev/null @@ -1,8 +0,0 @@ -package xyz.driver.core -package reporting - -trait NoReporter extends NoTraceReporter { - override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])( - implicit ctx: SpanContext): Unit = () -} -object NoReporter extends NoReporter diff --git a/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala b/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala deleted file mode 100644 index b49cfda..0000000 --- a/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala +++ /dev/null @@ -1,20 +0,0 @@ -package xyz.driver.core -package reporting - -import scala.concurrent.Future - -/** A reporter mixin that does not emit traces. */ -trait NoTraceReporter extends Reporter { - - override def traceWithOptionalParent[A]( - name: String, - tags: Map[String, String], - parent: Option[(SpanContext, Reporter.CausalRelation)])(op: SpanContext => A): A = op(SpanContext.fresh()) - - override def traceWithOptionalParentAsync[A]( - name: String, - tags: Map[String, String], - parent: Option[(SpanContext, Reporter.CausalRelation)])(op: SpanContext => Future[A]): Future[A] = - op(SpanContext.fresh()) - -} diff --git a/src/main/scala/xyz/driver/core/reporting/Reporter.scala b/src/main/scala/xyz/driver/core/reporting/Reporter.scala deleted file mode 100644 index 469084c..0000000 --- a/src/main/scala/xyz/driver/core/reporting/Reporter.scala +++ /dev/null @@ -1,183 +0,0 @@ -package xyz.driver.core -package reporting - -import scala.concurrent.Future - -/** Context-aware diagnostic utility for distributed systems, combining logging and tracing. - * - * Diagnostic messages (i.e. logs) are a vital tool for monitoring applications. Tying such messages to an - * execution context, such as a stack trace, simplifies debugging greatly by giving insight to the chains of events - * that led to a particular message. In synchronous systems, execution contexts can easily be determined by an - * external observer, and, as such, do not need to be propagated explicitly to sub-components (e.g. a stack trace on - * the JVM shows all relevant information). In asynchronous systems and especially distributed systems however, - * execution contexts are not easily determined by an external observer and hence need to be explicitly passed across - * service boundaries. - * - * This reporter provides tracing and logging utilities that explicitly require references to execution contexts - * (called [[SpanContext]]s here) intended to be passed across service boundaries. It embraces Scala's - * implicit-parameter-as-a-context paradigm. - * - * Tracing is intended to be compatible with the - * [[https://github.com/opentracing/specification/blob/master/specification.md OpenTrace specification]], and hence its - * guidelines on naming and tagging apply to methods provided by this Reporter as well. - * - * Usage example: - * {{{ - * val reporter: Reporter = ??? - * object Repo { - * def getUserQuery(userId: String)(implicit ctx: SpanContext) = reporter.trace("query"){ implicit ctx => - * reporter.debug("Running query") - * // run query - * } - * } - * object Service { - * def getUser(userId: String)(implicit ctx: SpanContext) = reporter.trace("get_user"){ implicit ctx => - * reporter.debug("Getting user") - * Repo.getUserQuery(userId) - * } - * } - * reporter.traceRoot("static_get", Map("user" -> "john")) { implicit ctx => - * Service.getUser("john") - * } - * }}} - * - * '''Note that computing traces may be a more expensive operation than traditional logging frameworks provide (in terms - * of memory and processing). It should be used in interesting and actionable code paths.''' - * - * @define rootWarning Note: the idea of the reporting framework is to pass along references to traces as - * implicit parameters. This method should only be used for top-level traces when no parent - * traces are available. - */ -trait Reporter { - import Reporter._ - - def traceWithOptionalParent[A]( - name: String, - tags: Map[String, String], - parent: Option[(SpanContext, CausalRelation)])(op: SpanContext => A): A - def traceWithOptionalParentAsync[A]( - name: String, - tags: Map[String, String], - parent: Option[(SpanContext, CausalRelation)])(op: SpanContext => Future[A]): Future[A] - - /** Trace the execution of an operation, if no parent trace is available. - * - * $rootWarning - */ - final def traceRoot[A](name: String, tags: Map[String, String] = Map.empty)(op: SpanContext => A): A = - traceWithOptionalParent( - name, - tags, - None - )(op) - - /** Trace the execution of an asynchronous operation, if no parent trace is available. - * - * $rootWarning - * - * @see traceRoot - */ - final def traceRootAsync[A](name: String, tags: Map[String, String] = Map.empty)( - op: SpanContext => Future[A]): Future[A] = - traceWithOptionalParentAsync( - name, - tags, - None - )(op) - - /** Trace the execution of an operation, in relation to a parent context. - * - * @param name The name of the operation. Note that this name should not be too specific. According to the - * OpenTrace RFC: "An operation name, a human-readable string which concisely represents the work done - * by the Span (for example, an RPC method name, a function name, or the name of a subtask or stage - * within a larger computation). The operation name should be the most general string that identifies a - * (statistically) interesting class of Span instances. That is, `"get_user"` is better than - * `"get_user/314159"`". - * @param tags Attributes associated with the traced event. Following the above example, if `"get_user"` is an - * operation name, a good tag would be `("account_id" -> 314159)`. - * @param relation Relation of the operation to its parent context. - * @param op The operation to be traced. The trace will complete once the operation returns. - * @param ctx Context of the parent trace. - * @tparam A Return type of the operation. - * @return The value of the child operation. - */ - final def trace[A]( - name: String, - tags: Map[String, String] = Map.empty, - relation: CausalRelation = CausalRelation.Child)(op: /* implicit (gotta wait for Scala 3) */ SpanContext => A)( - implicit ctx: SpanContext): A = - traceWithOptionalParent( - name, - tags, - Some(ctx -> relation) - )(op) - - /** Trace the operation of an asynchronous operation. - * - * Contrary to the synchronous version of this method, a trace is completed once the child operation completes - * (rather than returns). - * - * @see trace - */ - final def traceAsync[A]( - name: String, - tags: Map[String, String] = Map.empty, - relation: CausalRelation = CausalRelation.Child)( - op: /* implicit (gotta wait for Scala 3) */ SpanContext => Future[A])(implicit ctx: SpanContext): Future[A] = - traceWithOptionalParentAsync( - name, - tags, - Some(ctx -> relation) - )(op) - - /** Log a message. */ - def log(severity: Severity, message: String, reason: Option[Throwable])(implicit ctx: SpanContext): Unit - - /** Log a debug message. */ - final def debug(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Debug, message, None) - final def debug(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = - log(Severity.Debug, message, Some(reason)) - - /** Log an informational message. */ - final def info(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Informational, message, None) - final def info(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = - log(Severity.Informational, message, Some(reason)) - - /** Log a warning message. */ - final def warn(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Warning, message, None) - final def warn(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = - log(Severity.Warning, message, Some(reason)) - - /** Log an error message. */ - final def error(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Error, message, None) - final def error(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = - log(Severity.Error, message, Some(reason)) - -} - -object Reporter { - - /** A relation in cause. - * - * Corresponds to - * [[https://github.com/opentracing/specification/blob/master/specification.md#references-between-spans OpenTrace references between spans]] - */ - sealed trait CausalRelation - object CausalRelation { - - /** One event is the child of another. The parent completes once the child is complete. */ - case object Child extends CausalRelation - - /** One event follows from another, not necessarily being the parent. */ - case object Follows extends CausalRelation - } - - sealed trait Severity - object Severity { - case object Debug extends Severity - case object Informational extends Severity - case object Warning extends Severity - case object Error extends Severity - } - -} diff --git a/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala b/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala deleted file mode 100644 index 0ff5574..0000000 --- a/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala +++ /dev/null @@ -1,36 +0,0 @@ -package xyz.driver.core -package reporting - -import com.typesafe.scalalogging.{Logger => ScalaLogger} - -/** Compatibility mixin for reporters, that enables implicit conversions to scala-logging loggers. */ -trait ScalaLoggingCompat extends Reporter { - import Reporter.Severity - - def logger: ScalaLogger - - override def log(severity: Severity, message: String, reason: Option[Throwable])(implicit ctx: SpanContext): Unit = - severity match { - case Severity.Debug => logger.debug(message, reason.orNull) - case Severity.Informational => logger.info(message, reason.orNull) - case Severity.Warning => logger.warn(message, reason.orNull) - case Severity.Error => logger.error(message, reason.orNull) - } - -} - -object ScalaLoggingCompat { - import scala.language.implicitConversions - - def defaultScalaLogger(json: Boolean = false): ScalaLogger = { - if (json) { - System.setProperty("logback.configurationFile", "deployed-logback.xml") - } else { - System.setProperty("logback.configurationFile", "logback.xml") - } - ScalaLogger.apply("application") - } - - implicit def toScalaLogger(logger: ScalaLoggingCompat): ScalaLogger = logger.logger - -} diff --git a/src/main/scala/xyz/driver/core/reporting/SpanContext.scala b/src/main/scala/xyz/driver/core/reporting/SpanContext.scala deleted file mode 100644 index 04a822d..0000000 --- a/src/main/scala/xyz/driver/core/reporting/SpanContext.scala +++ /dev/null @@ -1,13 +0,0 @@ -package xyz.driver.core -package reporting - -import scala.util.Random - -case class SpanContext private[core] (traceId: String, spanId: String) - -object SpanContext { - def fresh(): SpanContext = SpanContext( - f"${Random.nextLong()}%016x${Random.nextLong()}%016x", - f"${Random.nextLong()}%016x" - ) -} -- cgit v1.2.3