aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJakob Odersky <jakob@driver.xyz>2018-09-12 16:03:17 -0700
committerJakob Odersky <jakob@odersky.com>2018-10-09 16:19:39 -0700
commit616e62e733dbbd4e6bacc5f563deef534794dc9e (patch)
tree069d5cbb261793540700a1a7d04e4474806eb294 /src
parent7a793ffa068fda8f2146f84fa785328d928dba03 (diff)
downloaddriver-core-616e62e733dbbd4e6bacc5f563deef534794dc9e.tar.gz
driver-core-616e62e733dbbd4e6bacc5f563deef534794dc9e.tar.bz2
driver-core-616e62e733dbbd4e6bacc5f563deef534794dc9e.zip
Move reporting into separate project
Diffstat (limited to 'src')
-rw-r--r--src/main/scala/xyz/driver/core/Refresh.scala69
-rw-r--r--src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala14
-rw-r--r--src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala217
-rw-r--r--src/main/scala/xyz/driver/core/reporting/NoReporter.scala8
-rw-r--r--src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala20
-rw-r--r--src/main/scala/xyz/driver/core/reporting/Reporter.scala183
-rw-r--r--src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala36
-rw-r--r--src/main/scala/xyz/driver/core/reporting/SpanContext.scala13
8 files changed, 0 insertions, 560 deletions
diff --git a/src/main/scala/xyz/driver/core/Refresh.scala b/src/main/scala/xyz/driver/core/Refresh.scala
deleted file mode 100644
index 6db9c26..0000000
--- a/src/main/scala/xyz/driver/core/Refresh.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-package xyz.driver.core
-
-import java.time.Instant
-import java.util.concurrent.atomic.AtomicReference
-
-import scala.concurrent.{ExecutionContext, Future, Promise}
-import scala.concurrent.duration.Duration
-
-/** A single-value asynchronous cache with TTL.
- *
- * Slightly adapted from
- * [[https://github.com/twitter/util/blob/ae0ab09134414438af9dfaa88a4613cecbff4741/util-cache/src/main/scala/com/twitter/cache/Refresh.scala
- * Twitter's "util" library]]
- *
- * Released under the Apache License 2.0.
- */
-object Refresh {
-
- /** Creates a function that will provide a cached value for a given time-to-live (TTL).
- *
- * It avoids the "thundering herd" problem if multiple requests arrive
- * simultanously and the cached value has expired or is unset.
- *
- * Usage example:
- * {{{
- * def freshToken(): Future[String] = // expensive network call to get an access token
- * val getToken: () => Future[String] = Refresh.every(1.hour)(freshToken())
- *
- * getToken() // new token is issued
- * getToken() // subsequent calls use the cached token
- * // wait 1 hour
- * getToken() // new token is issued
- * }}}
- *
- * @param ttl Time-To-Live duration to cache a computed value.
- * @param compute Call-by-name operation that eventually computes a value to
- * be cached. Note that if the computation (i.e. the future) fails, the value
- * is not cached.
- * @param ec The execution context in which valeu computations will be run.
- * @return A zero-arg function that returns the cached value.
- */
- def every[A](ttl: Duration)(compute: => Future[A])(implicit ec: ExecutionContext): () => Future[A] = {
- val ref = new AtomicReference[(Future[A], Instant)](
- (Future.failed(new NoSuchElementException("Cached value was never computed")), Instant.MIN)
- )
- def refresh(): Future[A] = {
- val tuple = ref.get
- val (cachedValue, lastRetrieved) = tuple
- val now = Instant.now
- if (now.getEpochSecond < lastRetrieved.getEpochSecond + ttl.toSeconds) {
- cachedValue
- } else {
- val p = Promise[A]
- val nextTuple = (p.future, now)
- if (ref.compareAndSet(tuple, nextTuple)) {
- compute.onComplete { done =>
- if (done.isFailure) {
- ref.set((p.future, lastRetrieved)) // don't update retrieval time in case of failure
- }
- p.complete(done)
- }
- }
- refresh()
- }
- }
- refresh _
- }
-
-}
diff --git a/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala b/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala
deleted file mode 100644
index f5c41cf..0000000
--- a/src/main/scala/xyz/driver/core/reporting/GoogleMdcLogger.scala
+++ /dev/null
@@ -1,14 +0,0 @@
-package xyz.driver.core
-package reporting
-
-import org.slf4j.MDC
-
-trait GoogleMdcLogger extends Reporter { self: GoogleReporter =>
-
- abstract override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])(
- implicit ctx: SpanContext): Unit = {
- MDC.put("trace", s"projects/${credentials.getProjectId}/traces/${ctx.traceId}")
- super.log(severity, message, reason)
- }
-
-}
diff --git a/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala b/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala
deleted file mode 100644
index 14c4954..0000000
--- a/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-package xyz.driver.core
-package reporting
-
-import java.security.Signature
-import java.time.Instant
-import java.util
-
-import akka.NotUsed
-import akka.stream.scaladsl.{Flow, RestartSink, Sink, Source, SourceQueueWithComplete}
-import akka.stream.{Materializer, OverflowStrategy}
-import com.google.auth.oauth2.ServiceAccountCredentials
-import com.softwaremill.sttp._
-import spray.json.DerivedJsonProtocol._
-import spray.json._
-import xyz.driver.core.reporting.Reporter.CausalRelation
-
-import scala.async.Async._
-import scala.concurrent.duration._
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.control.NonFatal
-import scala.util.{Failure, Random, Success, Try}
-
-/** A reporter that collects traces and submits them to
- * [[https://cloud.google.com/trace/docs/reference/v2/rest/ Google's Stackdriver Trace API]].
- */
-class GoogleReporter(
- val credentials: ServiceAccountCredentials,
- namespace: String,
- buffer: Int = GoogleReporter.DefaultBufferSize,
- interval: FiniteDuration = GoogleReporter.DefaultInterval)(
- implicit client: SttpBackend[Future, _],
- mat: Materializer,
- ec: ExecutionContext
-) extends Reporter {
- import GoogleReporter._
-
- private val getToken: () => Future[String] = Refresh.every(55.minutes) {
- def jwt = {
- val now = Instant.now().getEpochSecond
- val base64 = util.Base64.getEncoder
- val header = base64.encodeToString("""{"alg":"RS256","typ":"JWT"}""".getBytes("utf-8"))
- val body = base64.encodeToString(
- s"""|{
- | "iss": "${credentials.getClientEmail}",
- | "scope": "https://www.googleapis.com/auth/trace.append",
- | "aud": "https://www.googleapis.com/oauth2/v4/token",
- | "exp": ${now + 60.minutes.toSeconds},
- | "iat": $now
- |}""".stripMargin.getBytes("utf-8")
- )
- val signer = Signature.getInstance("SHA256withRSA")
- signer.initSign(credentials.getPrivateKey)
- signer.update(s"$header.$body".getBytes("utf-8"))
- val signature = base64.encodeToString(signer.sign())
- s"$header.$body.$signature"
- }
- sttp
- .post(uri"https://www.googleapis.com/oauth2/v4/token")
- .body(
- "grant_type" -> "urn:ietf:params:oauth:grant-type:jwt-bearer",
- "assertion" -> jwt
- )
- .mapResponse(s => s.parseJson.asJsObject.fields("access_token").convertTo[String])
- .send()
- .map(_.unsafeBody)
- }
-
- private val sendToGoogle: Sink[Span, NotUsed] = RestartSink.withBackoff(
- minBackoff = 3.seconds,
- maxBackoff = 30.seconds,
- randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
- ) { () =>
- Flow[Span]
- .groupedWithin(buffer, interval)
- .mapAsync(1) { spans =>
- async {
- val token = await(getToken())
- val res = await(
- sttp
- .post(uri"https://cloudtrace.googleapis.com/v2/projects/${credentials.getProjectId}/traces:batchWrite")
- .auth
- .bearer(token)
- .body(
- Spans(spans).toJson.compactPrint
- )
- .send()
- .map(_.unsafeBody))
- res
- }
- }
- .recover {
- case NonFatal(e) =>
- System.err.println(s"Error submitting trace spans: $e") // scalastyle: ignore
- throw e
- }
- .to(Sink.ignore)
- }
- private val queue: SourceQueueWithComplete[Span] = Source
- .queue[Span](buffer, OverflowStrategy.dropHead)
- .to(sendToGoogle)
- .run()
-
- private def submit(span: Span): Unit = queue.offer(span).failed.map { e =>
- System.err.println(s"Error adding span to submission queue: $e")
- }
-
- private def startSpan(
- traceId: String,
- spanId: String,
- parentSpanId: Option[String],
- displayName: String,
- attributes: Map[String, String]) = Span(
- s"projects/${credentials.getProjectId}/traces/$traceId/spans/$spanId",
- spanId,
- parentSpanId,
- TruncatableString(displayName),
- Instant.now(),
- Instant.now(),
- Attributes(attributes ++ Map("service.namespace" -> namespace)),
- Failure(new IllegalStateException("span status not set"))
- )
-
- def traceWithOptionalParent[A](
- operationName: String,
- tags: Map[String, String],
- parent: Option[(SpanContext, CausalRelation)])(operation: SpanContext => A): A = {
- val child = parent match {
- case Some((p, _)) => SpanContext(p.traceId, f"${Random.nextLong()}%016x")
- case None => SpanContext.fresh()
- }
- val span = startSpan(child.traceId, child.spanId, parent.map(_._1.spanId), operationName, tags)
- val result = operation(child)
- span.endTime = Instant.now()
- submit(span)
- result
- }
-
- def traceWithOptionalParentAsync[A](
- operationName: String,
- tags: Map[String, String],
- parent: Option[(SpanContext, CausalRelation)])(operation: SpanContext => Future[A]): Future[A] = {
- val child = parent match {
- case Some((p, _)) => SpanContext(p.traceId, f"${Random.nextLong()}%016x")
- case None => SpanContext.fresh()
- }
- val span = startSpan(child.traceId, child.spanId, parent.map(_._1.spanId), operationName, tags)
- val result = operation(child)
- result.onComplete { result =>
- span.endTime = Instant.now()
- span.status = result
- submit(span)
- }
- result
- }
-
- override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])(
- implicit ctx: SpanContext): Unit =
- sys.error("Submitting logs directly to GCP is " +
- "currently not supported. Messages should go to stdout.") // TODO: attach logs to traces and submit them directly
-
-}
-
-object GoogleReporter {
-
- val DefaultBufferSize: Int = 10000
- val DefaultInterval: FiniteDuration = 5.seconds
-
- private case class Attributes(attributeMap: Map[String, String])
- private case class TruncatableString(value: String)
- private case class Span(
- name: String,
- spanId: String,
- parentSpanId: Option[String],
- displayName: TruncatableString,
- startTime: Instant,
- var endTime: Instant,
- var attributes: Attributes,
- var status: Try[_]
- )
-
- private case class Spans(spans: Seq[Span])
-
- private implicit val instantFormat: RootJsonFormat[Instant] = new RootJsonFormat[Instant] {
- override def write(obj: Instant): JsValue = obj.toString.toJson
- override def read(json: JsValue): Instant = Instant.parse(json.convertTo[String])
- }
-
- private implicit val mapFormat = new RootJsonFormat[Map[String, String]] {
- override def read(json: JsValue): Map[String, String] = sys.error("unimplemented")
- override def write(obj: Map[String, String]): JsValue = {
- val withValueObjects = obj.mapValues(value => JsObject("stringValue" -> JsObject("value" -> value.toJson)))
- JsObject(withValueObjects)
- }
- }
-
- private implicit val statusFormat = new RootJsonFormat[Try[_]] {
- override def read(json: JsValue): Try[_] = sys.error("unimplemented")
- override def write(obj: Try[_]) = {
- // error codes defined at https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto
- val (code, message) = obj match {
- case Success(_) => (0, "success")
- case Failure(_) => (2, "failure")
- }
- JsObject(
- "code" -> code.toJson,
- "message" -> message.toJson,
- "details" -> JsArray()
- )
- }
- }
-
- private implicit val attributeFormat: RootJsonFormat[Attributes] = jsonFormat1(Attributes)
- private implicit val truncatableStringFormat: RootJsonFormat[TruncatableString] = jsonFormat1(TruncatableString)
- private implicit val spanFormat: RootJsonFormat[Span] = jsonFormat8(Span)
- private implicit val spansFormat: RootJsonFormat[Spans] = jsonFormat1(Spans)
-
-}
diff --git a/src/main/scala/xyz/driver/core/reporting/NoReporter.scala b/src/main/scala/xyz/driver/core/reporting/NoReporter.scala
deleted file mode 100644
index c1c81f4..0000000
--- a/src/main/scala/xyz/driver/core/reporting/NoReporter.scala
+++ /dev/null
@@ -1,8 +0,0 @@
-package xyz.driver.core
-package reporting
-
-trait NoReporter extends NoTraceReporter {
- override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])(
- implicit ctx: SpanContext): Unit = ()
-}
-object NoReporter extends NoReporter
diff --git a/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala b/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala
deleted file mode 100644
index b49cfda..0000000
--- a/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-package xyz.driver.core
-package reporting
-
-import scala.concurrent.Future
-
-/** A reporter mixin that does not emit traces. */
-trait NoTraceReporter extends Reporter {
-
- override def traceWithOptionalParent[A](
- name: String,
- tags: Map[String, String],
- parent: Option[(SpanContext, Reporter.CausalRelation)])(op: SpanContext => A): A = op(SpanContext.fresh())
-
- override def traceWithOptionalParentAsync[A](
- name: String,
- tags: Map[String, String],
- parent: Option[(SpanContext, Reporter.CausalRelation)])(op: SpanContext => Future[A]): Future[A] =
- op(SpanContext.fresh())
-
-}
diff --git a/src/main/scala/xyz/driver/core/reporting/Reporter.scala b/src/main/scala/xyz/driver/core/reporting/Reporter.scala
deleted file mode 100644
index 469084c..0000000
--- a/src/main/scala/xyz/driver/core/reporting/Reporter.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-package xyz.driver.core
-package reporting
-
-import scala.concurrent.Future
-
-/** Context-aware diagnostic utility for distributed systems, combining logging and tracing.
- *
- * Diagnostic messages (i.e. logs) are a vital tool for monitoring applications. Tying such messages to an
- * execution context, such as a stack trace, simplifies debugging greatly by giving insight to the chains of events
- * that led to a particular message. In synchronous systems, execution contexts can easily be determined by an
- * external observer, and, as such, do not need to be propagated explicitly to sub-components (e.g. a stack trace on
- * the JVM shows all relevant information). In asynchronous systems and especially distributed systems however,
- * execution contexts are not easily determined by an external observer and hence need to be explicitly passed across
- * service boundaries.
- *
- * This reporter provides tracing and logging utilities that explicitly require references to execution contexts
- * (called [[SpanContext]]s here) intended to be passed across service boundaries. It embraces Scala's
- * implicit-parameter-as-a-context paradigm.
- *
- * Tracing is intended to be compatible with the
- * [[https://github.com/opentracing/specification/blob/master/specification.md OpenTrace specification]], and hence its
- * guidelines on naming and tagging apply to methods provided by this Reporter as well.
- *
- * Usage example:
- * {{{
- * val reporter: Reporter = ???
- * object Repo {
- * def getUserQuery(userId: String)(implicit ctx: SpanContext) = reporter.trace("query"){ implicit ctx =>
- * reporter.debug("Running query")
- * // run query
- * }
- * }
- * object Service {
- * def getUser(userId: String)(implicit ctx: SpanContext) = reporter.trace("get_user"){ implicit ctx =>
- * reporter.debug("Getting user")
- * Repo.getUserQuery(userId)
- * }
- * }
- * reporter.traceRoot("static_get", Map("user" -> "john")) { implicit ctx =>
- * Service.getUser("john")
- * }
- * }}}
- *
- * '''Note that computing traces may be a more expensive operation than traditional logging frameworks provide (in terms
- * of memory and processing). It should be used in interesting and actionable code paths.'''
- *
- * @define rootWarning Note: the idea of the reporting framework is to pass along references to traces as
- * implicit parameters. This method should only be used for top-level traces when no parent
- * traces are available.
- */
-trait Reporter {
- import Reporter._
-
- def traceWithOptionalParent[A](
- name: String,
- tags: Map[String, String],
- parent: Option[(SpanContext, CausalRelation)])(op: SpanContext => A): A
- def traceWithOptionalParentAsync[A](
- name: String,
- tags: Map[String, String],
- parent: Option[(SpanContext, CausalRelation)])(op: SpanContext => Future[A]): Future[A]
-
- /** Trace the execution of an operation, if no parent trace is available.
- *
- * $rootWarning
- */
- final def traceRoot[A](name: String, tags: Map[String, String] = Map.empty)(op: SpanContext => A): A =
- traceWithOptionalParent(
- name,
- tags,
- None
- )(op)
-
- /** Trace the execution of an asynchronous operation, if no parent trace is available.
- *
- * $rootWarning
- *
- * @see traceRoot
- */
- final def traceRootAsync[A](name: String, tags: Map[String, String] = Map.empty)(
- op: SpanContext => Future[A]): Future[A] =
- traceWithOptionalParentAsync(
- name,
- tags,
- None
- )(op)
-
- /** Trace the execution of an operation, in relation to a parent context.
- *
- * @param name The name of the operation. Note that this name should not be too specific. According to the
- * OpenTrace RFC: "An operation name, a human-readable string which concisely represents the work done
- * by the Span (for example, an RPC method name, a function name, or the name of a subtask or stage
- * within a larger computation). The operation name should be the most general string that identifies a
- * (statistically) interesting class of Span instances. That is, `"get_user"` is better than
- * `"get_user/314159"`".
- * @param tags Attributes associated with the traced event. Following the above example, if `"get_user"` is an
- * operation name, a good tag would be `("account_id" -> 314159)`.
- * @param relation Relation of the operation to its parent context.
- * @param op The operation to be traced. The trace will complete once the operation returns.
- * @param ctx Context of the parent trace.
- * @tparam A Return type of the operation.
- * @return The value of the child operation.
- */
- final def trace[A](
- name: String,
- tags: Map[String, String] = Map.empty,
- relation: CausalRelation = CausalRelation.Child)(op: /* implicit (gotta wait for Scala 3) */ SpanContext => A)(
- implicit ctx: SpanContext): A =
- traceWithOptionalParent(
- name,
- tags,
- Some(ctx -> relation)
- )(op)
-
- /** Trace the operation of an asynchronous operation.
- *
- * Contrary to the synchronous version of this method, a trace is completed once the child operation completes
- * (rather than returns).
- *
- * @see trace
- */
- final def traceAsync[A](
- name: String,
- tags: Map[String, String] = Map.empty,
- relation: CausalRelation = CausalRelation.Child)(
- op: /* implicit (gotta wait for Scala 3) */ SpanContext => Future[A])(implicit ctx: SpanContext): Future[A] =
- traceWithOptionalParentAsync(
- name,
- tags,
- Some(ctx -> relation)
- )(op)
-
- /** Log a message. */
- def log(severity: Severity, message: String, reason: Option[Throwable])(implicit ctx: SpanContext): Unit
-
- /** Log a debug message. */
- final def debug(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Debug, message, None)
- final def debug(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit =
- log(Severity.Debug, message, Some(reason))
-
- /** Log an informational message. */
- final def info(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Informational, message, None)
- final def info(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit =
- log(Severity.Informational, message, Some(reason))
-
- /** Log a warning message. */
- final def warn(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Warning, message, None)
- final def warn(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit =
- log(Severity.Warning, message, Some(reason))
-
- /** Log an error message. */
- final def error(message: String)(implicit ctx: SpanContext): Unit = log(Severity.Error, message, None)
- final def error(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit =
- log(Severity.Error, message, Some(reason))
-
-}
-
-object Reporter {
-
- /** A relation in cause.
- *
- * Corresponds to
- * [[https://github.com/opentracing/specification/blob/master/specification.md#references-between-spans OpenTrace references between spans]]
- */
- sealed trait CausalRelation
- object CausalRelation {
-
- /** One event is the child of another. The parent completes once the child is complete. */
- case object Child extends CausalRelation
-
- /** One event follows from another, not necessarily being the parent. */
- case object Follows extends CausalRelation
- }
-
- sealed trait Severity
- object Severity {
- case object Debug extends Severity
- case object Informational extends Severity
- case object Warning extends Severity
- case object Error extends Severity
- }
-
-}
diff --git a/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala b/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala
deleted file mode 100644
index 0ff5574..0000000
--- a/src/main/scala/xyz/driver/core/reporting/ScalaLoggingCompat.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-package xyz.driver.core
-package reporting
-
-import com.typesafe.scalalogging.{Logger => ScalaLogger}
-
-/** Compatibility mixin for reporters, that enables implicit conversions to scala-logging loggers. */
-trait ScalaLoggingCompat extends Reporter {
- import Reporter.Severity
-
- def logger: ScalaLogger
-
- override def log(severity: Severity, message: String, reason: Option[Throwable])(implicit ctx: SpanContext): Unit =
- severity match {
- case Severity.Debug => logger.debug(message, reason.orNull)
- case Severity.Informational => logger.info(message, reason.orNull)
- case Severity.Warning => logger.warn(message, reason.orNull)
- case Severity.Error => logger.error(message, reason.orNull)
- }
-
-}
-
-object ScalaLoggingCompat {
- import scala.language.implicitConversions
-
- def defaultScalaLogger(json: Boolean = false): ScalaLogger = {
- if (json) {
- System.setProperty("logback.configurationFile", "deployed-logback.xml")
- } else {
- System.setProperty("logback.configurationFile", "logback.xml")
- }
- ScalaLogger.apply("application")
- }
-
- implicit def toScalaLogger(logger: ScalaLoggingCompat): ScalaLogger = logger.logger
-
-}
diff --git a/src/main/scala/xyz/driver/core/reporting/SpanContext.scala b/src/main/scala/xyz/driver/core/reporting/SpanContext.scala
deleted file mode 100644
index 04a822d..0000000
--- a/src/main/scala/xyz/driver/core/reporting/SpanContext.scala
+++ /dev/null
@@ -1,13 +0,0 @@
-package xyz.driver.core
-package reporting
-
-import scala.util.Random
-
-case class SpanContext private[core] (traceId: String, spanId: String)
-
-object SpanContext {
- def fresh(): SpanContext = SpanContext(
- f"${Random.nextLong()}%016x${Random.nextLong()}%016x",
- f"${Random.nextLong()}%016x"
- )
-}