aboutsummaryrefslogtreecommitdiff
path: root/core-reporting
diff options
context:
space:
mode:
authorJakob Odersky <jakob@driver.xyz>2018-09-12 16:03:17 -0700
committerJakob Odersky <jakob@odersky.com>2018-10-09 16:19:39 -0700
commit616e62e733dbbd4e6bacc5f563deef534794dc9e (patch)
tree069d5cbb261793540700a1a7d04e4474806eb294 /core-reporting
parent7a793ffa068fda8f2146f84fa785328d928dba03 (diff)
downloaddriver-core-616e62e733dbbd4e6bacc5f563deef534794dc9e.tar.gz
driver-core-616e62e733dbbd4e6bacc5f563deef534794dc9e.tar.bz2
driver-core-616e62e733dbbd4e6bacc5f563deef534794dc9e.zip
Move reporting into separate project
Diffstat (limited to 'core-reporting')
-rw-r--r--core-reporting/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala14
-rw-r--r--core-reporting/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala217
-rw-r--r--core-reporting/src/main/scala/xyz/driver/core/reporting/NoReporter.scala8
-rw-r--r--core-reporting/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala20
-rw-r--r--core-reporting/src/main/scala/xyz/driver/core/reporting/Reporter.scala183
-rw-r--r--core-reporting/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala36
-rw-r--r--core-reporting/src/main/scala/xyz/driver/core/reporting/SpanContext.scala13
7 files changed, 491 insertions, 0 deletions
diff --git a/core-reporting/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala b/core-reporting/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala
new file mode 100644
index 0000000..f5c41cf
--- /dev/null
+++ b/core-reporting/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala
@@ -0,0 +1,14 @@
+package xyz.driver.core
+package reporting
+
+import org.slf4j.MDC
+
+trait GoogleMdcLogger extends Reporter { self: GoogleReporter =>
+
+ abstract override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])(
+ implicit ctx: SpanContext): Unit = {
+ MDC.put("trace", s"projects/${credentials.getProjectId}/traces/${ctx.traceId}")
+ super.log(severity, message, reason)
+ }
+
+}
diff --git a/core-reporting/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala b/core-reporting/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala
new file mode 100644
index 0000000..14c4954
--- /dev/null
+++ b/core-reporting/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala
@@ -0,0 +1,217 @@
+package xyz.driver.core
+package reporting
+
+import java.security.Signature
+import java.time.Instant
+import java.util
+
+import akka.NotUsed
+import akka.stream.scaladsl.{Flow, RestartSink, Sink, Source, SourceQueueWithComplete}
+import akka.stream.{Materializer, OverflowStrategy}
+import com.google.auth.oauth2.ServiceAccountCredentials
+import com.softwaremill.sttp._
+import spray.json.DerivedJsonProtocol._
+import spray.json._
+import xyz.driver.core.reporting.Reporter.CausalRelation
+
+import scala.async.Async._
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.control.NonFatal
+import scala.util.{Failure, Random, Success, Try}
+
+/** A reporter that collects traces and submits them to
+ * [[https://cloud.google.com/trace/docs/reference/v2/rest/ Google's Stackdriver Trace API]].
+ */
+class GoogleReporter(
+ val credentials: ServiceAccountCredentials,
+ namespace: String,
+ buffer: Int = GoogleReporter.DefaultBufferSize,
+ interval: FiniteDuration = GoogleReporter.DefaultInterval)(
+ implicit client: SttpBackend[Future, _],
+ mat: Materializer,
+ ec: ExecutionContext
+) extends Reporter {
+ import GoogleReporter._
+
+ private val getToken: () => Future[String] = Refresh.every(55.minutes) {
+ def jwt = {
+ val now = Instant.now().getEpochSecond
+ val base64 = util.Base64.getEncoder
+ val header = base64.encodeToString("""{"alg":"RS256","typ":"JWT"}""".getBytes("utf-8"))
+ val body = base64.encodeToString(
+ s"""|{
+ | "iss": "${credentials.getClientEmail}",
+ | "scope": "https://www.googleapis.com/auth/trace.append",
+ | "aud": "https://www.googleapis.com/oauth2/v4/token",
+ | "exp": ${now + 60.minutes.toSeconds},
+ | "iat": $now
+ |}""".stripMargin.getBytes("utf-8")
+ )
+ val signer = Signature.getInstance("SHA256withRSA")
+ signer.initSign(credentials.getPrivateKey)
+ signer.update(s"$header.$body".getBytes("utf-8"))
+ val signature = base64.encodeToString(signer.sign())
+ s"$header.$body.$signature"
+ }
+ sttp
+ .post(uri"https://www.googleapis.com/oauth2/v4/token")
+ .body(
+ "grant_type" -> "urn:ietf:params:oauth:grant-type:jwt-bearer",
+ "assertion" -> jwt
+ )
+ .mapResponse(s => s.parseJson.asJsObject.fields("access_token").convertTo[String])
+ .send()
+ .map(_.unsafeBody)
+ }
+
+ private val sendToGoogle: Sink[Span, NotUsed] = RestartSink.withBackoff(
+ minBackoff = 3.seconds,
+ maxBackoff = 30.seconds,
+ randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
+ ) { () =>
+ Flow[Span]
+ .groupedWithin(buffer, interval)
+ .mapAsync(1) { spans =>
+ async {
+ val token = await(getToken())
+ val res = await(
+ sttp
+ .post(uri"https://cloudtrace.googleapis.com/v2/projects/${credentials.getProjectId}/traces:batchWrite")
+ .auth
+ .bearer(token)
+ .body(
+ Spans(spans).toJson.compactPrint
+ )
+ .send()
+ .map(_.unsafeBody))
+ res
+ }
+ }
+ .recover {
+ case NonFatal(e) =>
+ System.err.println(s"Error submitting trace spans: $e") // scalastyle: ignore
+ throw e
+ }
+ .to(Sink.ignore)
+ }
+ private val queue: SourceQueueWithComplete[Span] = Source
+ .queue[Span](buffer, OverflowStrategy.dropHead)
+ .to(sendToGoogle)
+ .run()
+
+ private def submit(span: Span): Unit = queue.offer(span).failed.map { e =>
+ System.err.println(s"Error adding span to submission queue: $e")
+ }
+
+ private def startSpan(
+ traceId: String,
+ spanId: String,
+ parentSpanId: Option[String],
+ displayName: String,
+ attributes: Map[String, String]) = Span(
+ s"projects/${credentials.getProjectId}/traces/$traceId/spans/$spanId",
+ spanId,
+ parentSpanId,
+ TruncatableString(displayName),
+ Instant.now(),
+ Instant.now(),
+ Attributes(attributes ++ Map("service.namespace" -> namespace)),
+ Failure(new IllegalStateException("span status not set"))
+ )
+
+ def traceWithOptionalParent[A](
+ operationName: String,
+ tags: Map[String, String],
+ parent: Option[(SpanContext, CausalRelation)])(operation: SpanContext => A): A = {
+ val child = parent match {
+ case Some((p, _)) => SpanContext(p.traceId, f"${Random.nextLong()}%016x")
+ case None => SpanContext.fresh()
+ }
+ val span = startSpan(child.traceId, child.spanId, parent.map(_._1.spanId), operationName, tags)
+ val result = operation(child)
+ span.endTime = Instant.now()
+ submit(span)
+ result
+ }
+
+ def traceWithOptionalParentAsync[A](
+ operationName: String,
+ tags: Map[String, String],
+ parent: Option[(SpanContext, CausalRelation)])(operation: SpanContext => Future[A]): Future[A] = {
+ val child = parent match {
+ case Some((p, _)) => SpanContext(p.traceId, f"${Random.nextLong()}%016x")
+ case None => SpanContext.fresh()
+ }
+ val span = startSpan(child.traceId, child.spanId, parent.map(_._1.spanId), operationName, tags)
+ val result = operation(child)
+ result.onComplete { result =>
+ span.endTime = Instant.now()
+ span.status = result
+ submit(span)
+ }
+ result
+ }
+
+ override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])(
+ implicit ctx: SpanContext): Unit =
+ sys.error("Submitting logs directly to GCP is " +
+ "currently not supported. Messages should go to stdout.") // TODO: attach logs to traces and submit them directly
+
+}
+
+object GoogleReporter {
+
+ val DefaultBufferSize: Int = 10000
+ val DefaultInterval: FiniteDuration = 5.seconds
+
+ private case class Attributes(attributeMap: Map[String, String])
+ private case class TruncatableString(value: String)
+ private case class Span(
+ name: String,
+ spanId: String,
+ parentSpanId: Option[String],
+ displayName: TruncatableString,
+ startTime: Instant,
+ var endTime: Instant,
+ var attributes: Attributes,
+ var status: Try[_]
+ )
+
+ private case class Spans(spans: Seq[Span])
+
+ private implicit val instantFormat: RootJsonFormat[Instant] = new RootJsonFormat[Instant] {
+ override def write(obj: Instant): JsValue = obj.toString.toJson
+ override def read(json: JsValue): Instant = Instant.parse(json.convertTo[String])
+ }
+
+ private implicit val mapFormat = new RootJsonFormat[Map[String, String]] {
+ override def read(json: JsValue): Map[String, String] = sys.error("unimplemented")
+ override def write(obj: Map[String, String]): JsValue = {
+ val withValueObjects = obj.mapValues(value => JsObject("stringValue" -> JsObject("value" -> value.toJson)))
+ JsObject(withValueObjects)
+ }
+ }
+
+ private implicit val statusFormat = new RootJsonFormat[Try[_]] {
+ override def read(json: JsValue): Try[_] = sys.error("unimplemented")
+ override def write(obj: Try[_]) = {
+ // error codes defined at https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto
+ val (code, message) = obj match {
+ case Success(_) => (0, "success")
+ case Failure(_) => (2, "failure")
+ }
+ JsObject(
+ "code" -> code.toJson,
+ "message" -> message.toJson,
+ "details" -> JsArray()
+ )
+ }
+ }
+
+ private implicit val attributeFormat: RootJsonFormat[Attributes] = jsonFormat1(Attributes)
+ private implicit val truncatableStringFormat: RootJsonFormat[TruncatableString] = jsonFormat1(TruncatableString)
+ private implicit val spanFormat: RootJsonFormat[Span] = jsonFormat8(Span)
+ private implicit val spansFormat: RootJsonFormat[Spans] = jsonFormat1(Spans)
+
+}
diff --git a/core-reporting/src/main/scala/xyz/driver/core/reporting/NoReporter.scala b/core-reporting/src/main/scala/xyz/driver/core/reporting/NoReporter.scala
new file mode 100644
index 0000000..c1c81f4
--- /dev/null
+++ b/core-reporting/src/main/scala/xyz/driver/core/reporting/NoReporter.scala
@@ -0,0 +1,8 @@
+package xyz.driver.core
+package reporting
+
+trait NoReporter extends NoTraceReporter {
+ override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])(
+ implicit ctx: SpanContext): Unit = ()
+}
+object NoReporter extends NoReporter
diff --git a/core-reporting/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala b/core-reporting/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala
new file mode 100644
index 0000000..b49cfda
--- /dev/null
+++ b/core-reporting/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala
@@ -0,0 +1,20 @@
+package xyz.driver.core
+package reporting
+
+import scala.concurrent.Future
+
+/** A reporter mixin that does not emit traces. */
+trait NoTraceReporter extends Reporter {
+
+ override def traceWithOptionalParent[A](
+ name: String,
+ tags: Map[String, String],
+ parent: Option[(SpanContext, Reporter.CausalRelation)])(op: SpanContext => A): A = op(SpanContext.fresh())
+
+ override def traceWithOptionalParentAsync[A](
+ name: String,
+ tags: Map[String, String],
+ parent: Option[(SpanContext, Reporter.CausalRelation)])(op: SpanContext => Future[A]): Future[A] =
+ op(SpanContext.fresh())
+
+}
diff --git a/core-reporting/src/main/scala/xyz/driver/core/reporting/Reporter.scala b/core-reporting/src/main/scala/xyz/driver/core/reporting/Reporter.scala
new file mode 100644
index 0000000..469084c
--- /dev/null
+++ b/core-reporting/src/main/scala/xyz/driver/core/reporting/Reporter.scala
@@ -0,0 +1,183 @@
+package xyz.driver.core
+package reporting
+
+import scala.concurrent.Future
+
+/** Context-aware diagnostic utility for distributed systems, combining logging and tracing.
+ *
+ * Diagnostic messages (i.e. logs) are a vital tool for monitoring applications. Tying such messages to an
+ * execution context, such as a stack trace, simplifies debugging greatly by giving insight to the chains of events
+ * that led to a particular message. In synchronous systems, execution contexts can easily be determined by an
+ * external observer, and, as such, do not need to be propagated explicitly to sub-components (e.g. a stack trace on
+ * the JVM shows all relevant information). In asynchronous systems and especially distributed systems however,
+ * execution contexts are not easily determined by an external observer and hence need to be explicitly passed across
+ * service boundaries.
+ *
+ * This reporter provides tracing and logging utilities that explicitly require references to execution contexts
+ * (called [[SpanContext]]s here) intended to be passed across service boundaries. It embraces Scala's
+ * implicit-parameter-as-a-context paradigm.
+ *
+ * Tracing is intended to be compatible with the
+ * [[https://github.com/opentracing/specification/blob/master/specification.md OpenTrace specification]], and hence its
+ * guidelines on naming and tagging apply to methods provided by this Reporter as well.
+ *
+ * Usage example:
+ * {{{
+ * val reporter: Reporter = ???
+ * object Repo {
+ * def getUserQuery(userId: String)(implicit ctx: SpanContext) = reporter.trace("query"){ implicit ctx =>
+ * reporter.debug("Running query")
+ * // run query
+ * }
+ * }
+ * object Service {
+ * def getUser(userId: String)(implicit ctx: SpanContext) = reporter.trace("get_user"){ implicit ctx =>
+ * reporter.debug("Getting user")
+ * Repo.getUserQuery(userId)
+ * }
+ * }
+ * reporter.traceRoot("static_get", Map("user" -> "john")) { implicit ctx =>
+ * Service.getUser("john")
+ * }
+ * }}}
+ *
+ * '''Note that computing traces may be a more expensive operation than traditional logging frameworks provide (in terms
+ * of memory and processing). It should be used in interesting and actionable code paths.'''
+ *
+ * @define rootWarning Note: the idea of the reporting framework is to pass along references to traces as
+ * implicit parameters. This method should only be used for top-level traces when no parent
+ * traces are available.
+ */
+trait Reporter {
+ import Reporter._
+
+ def traceWithOptionalParent[A](
+ name: String,
+ tags: Map[String, String],
+ parent: Option[(SpanContext, CausalRelation)])(op: SpanContext => A): A
+ def traceWithOptionalParentAsync[A](
+ name: String,
+ tags: Map[String, String],
+ parent: Option[(SpanContext, CausalRelation)])(op: SpanContext => Future[A]): Future[A]
+
+ /** Trace the execution of an operation, if no parent trace is available.
+ *
+ * $rootWarning
+ */
+ final def traceRoot[A](name: String, tags: Map[String, String] = Map.empty)(op: SpanContext => A): A =
+ traceWithOptionalParent(
+ name,
+ tags,
+ None
+ )(op)
+
+ /** Trace the execution of an asynchronous operation, if no parent trace is available.
+ *
+ * $rootWarning
+ *
+ * @see traceRoot
+ */
+ final def traceRootAsync[A](name: String, tags: Map[String, String] = Map.empty)(
+ op: SpanContext => Future[A]): Future[A] =
+ traceWithOptionalParentAsync(
+ name,
+ tags,
+ None
+ )(op)
+
+ /** Trace the execution of an operation, in relation to a parent context.
+ *
+ * @param name The name of the operation. Note that this name should not be too specific. According to the
+ * OpenTrace RFC: "An operation name, a human-readable string which concisely represents the work done
+ * by the Span (for example, an RPC method name, a function name, or the name of a subtask or stage
+ * within a larger computation). The operation name should be the most general string that identifies a
+ * (statistically) interesting class of Span instances. That is, `"get_user"` is better than
+ * `"get_user/314159"`".
+ * @param tags Attributes associated with the traced event. Following the above example, if `"get_user"` is an
+ * operation name, a good tag would be `("account_id" -> 314159)`.
+ * @param relation Relation of the operation to its parent context.
+ * @param op The operation to be traced. The trace will complete once the operation returns.
+ * @param ctx Context of the parent trace.
+ * @tparam A Return type of the operation.
+ * @return The value of the child operation.
+ */
+ final def trace[A](
+ name: String,
+ tags: Map[String, String] = Map.empty,
+ relation: CausalRelation = CausalRelation.Child)(op: /* implicit (gotta wait for Scala 3) */ SpanContext => A)(
+ implicit ctx: SpanContext): A =
+ traceWithOptionalParent(
+ name,
+ tags,
+ Some(ctx -> relation)
+ )(op)
+
+ /** Trace the operation of an asynchronous operation.
+ *
+ * Contrary to the synchronous version of this method, a trace is completed once the child operation completes
+ * (rather than returns).
+ *
+ * @see trace
+ */
+ final def traceAsync[A](
+ name: String,
+ tags: Map[String, String] = Map.empty,
+ relation: CausalRelation = CausalRelation.Child)(
+ op: /* implicit (gotta wait for Scala 3) */ SpanContext => Future[A])(implicit ctx: SpanContext): Future[A] =
+ traceWithOptionalParentAsync(
+ name,
+ tags,
+ Some(ctx -> relation)
+ )(op)
+
+ /** Log a message. */
+ def log(severity: Severity, message: String, reason: Option[Throwable])(implicit ctx: SpanContext): Unit
+
+ /** Log a debug message. */
+ final def debug(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Debug, message, None)
+ final def debug(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit =
+ log(Severity.Debug, message, Some(reason))
+
+ /** Log an informational message. */
+ final def info(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Informational, message, None)
+ final def info(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit =
+ log(Severity.Informational, message, Some(reason))
+
+ /** Log a warning message. */
+ final def warn(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Warning, message, None)
+ final def warn(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit =
+ log(Severity.Warning, message, Some(reason))
+
+ /** Log an error message. */
+ final def error(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Error, message, None)
+ final def error(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit =
+ log(Severity.Error, message, Some(reason))
+
+}
+
+object Reporter {
+
+ /** A relation in cause.
+ *
+ * Corresponds to
+ * [[https://github.com/opentracing/specification/blob/master/specification.md#references-between-spans OpenTrace references between spans]]
+ */
+ sealed trait CausalRelation
+ object CausalRelation {
+
+ /** One event is the child of another. The parent completes once the child is complete. */
+ case object Child extends CausalRelation
+
+ /** One event follows from another, not necessarily being the parent. */
+ case object Follows extends CausalRelation
+ }
+
+ sealed trait Severity
+ object Severity {
+ case object Debug extends Severity
+ case object Informational extends Severity
+ case object Warning extends Severity
+ case object Error extends Severity
+ }
+
+}
diff --git a/core-reporting/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala b/core-reporting/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala
new file mode 100644
index 0000000..0ff5574
--- /dev/null
+++ b/core-reporting/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala
@@ -0,0 +1,36 @@
+package xyz.driver.core
+package reporting
+
+import com.typesafe.scalalogging.{Logger => ScalaLogger}
+
+/** Compatibility mixin for reporters, that enables implicit conversions to scala-logging loggers. */
+trait ScalaLoggingCompat extends Reporter {
+ import Reporter.Severity
+
+ def logger: ScalaLogger
+
+ override def log(severity: Severity, message: String, reason: Option[Throwable])(implicit ctx: SpanContext): Unit =
+ severity match {
+ case Severity.Debug => logger.debug(message, reason.orNull)
+ case Severity.Informational => logger.info(message, reason.orNull)
+ case Severity.Warning => logger.warn(message, reason.orNull)
+ case Severity.Error => logger.error(message, reason.orNull)
+ }
+
+}
+
+object ScalaLoggingCompat {
+ import scala.language.implicitConversions
+
+ def defaultScalaLogger(json: Boolean = false): ScalaLogger = {
+ if (json) {
+ System.setProperty("logback.configurationFile", "deployed-logback.xml")
+ } else {
+ System.setProperty("logback.configurationFile", "logback.xml")
+ }
+ ScalaLogger.apply("application")
+ }
+
+ implicit def toScalaLogger(logger: ScalaLoggingCompat): ScalaLogger = logger.logger
+
+}
diff --git a/core-reporting/src/main/scala/xyz/driver/core/reporting/SpanContext.scala b/core-reporting/src/main/scala/xyz/driver/core/reporting/SpanContext.scala
new file mode 100644
index 0000000..04a822d
--- /dev/null
+++ b/core-reporting/src/main/scala/xyz/driver/core/reporting/SpanContext.scala
@@ -0,0 +1,13 @@
+package xyz.driver.core
+package reporting
+
+import scala.util.Random
+
+case class SpanContext private[core] (traceId: String, spanId: String)
+
+object SpanContext {
+ def fresh(): SpanContext = SpanContext(
+ f"${Random.nextLong()}%016x${Random.nextLong()}%016x",
+ f"${Random.nextLong()}%016x"
+ )
+}