diff options
Diffstat (limited to 'src/main/scala/xyz/driver/core/reporting')
5 files changed, 417 insertions, 0 deletions
diff --git a/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala b/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala new file mode 100644 index 0000000..40cb1e5 --- /dev/null +++ b/src/main/scala/xyz/driver/core/reporting/GoogleReporter.scala @@ -0,0 +1,193 @@ +package xyz.driver.core +package reporting +import java.security.Signature +import java.time.Instant +import java.util + +import akka.NotUsed +import akka.stream.scaladsl.{Flow, RestartSink, Sink, Source, SourceQueueWithComplete} +import akka.stream.{Materializer, OverflowStrategy} +import com.google.auth.oauth2.ServiceAccountCredentials +import com.softwaremill.sttp._ +import com.typesafe.scalalogging.Logger +import spray.json.DerivedJsonProtocol._ +import spray.json._ +import xyz.driver.core.reporting.Reporter.CausalRelation + +import scala.async.Async._ +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Random +import scala.util.control.NonFatal + +/** A reporter that collects traces and submits them to + * [[https://cloud.google.com/trace/docs/reference/v2/rest/ Google's Stackdriver Trace API]]. + */ +class GoogleReporter( + credentials: ServiceAccountCredentials, + namespace: String, + val logger: Logger, + buffer: Int = GoogleReporter.DefaultBufferSize, + interval: FiniteDuration = GoogleReporter.DefaultInterval)( + implicit client: SttpBackend[Future, _], + mat: Materializer, + ec: ExecutionContext +) extends Reporter with ScalaLoggerLike { + import GoogleReporter._ + + private val getToken: () => Future[String] = Refresh.every(55.minutes) { + def jwt = { + val now = Instant.now().getEpochSecond + val base64 = util.Base64.getEncoder + val header = base64.encodeToString("""{"alg":"RS256","typ":"JWT"}""".getBytes("utf-8")) + val body = base64.encodeToString( + s"""|{ + | "iss": "${credentials.getClientEmail}", + | "scope": "https://www.googleapis.com/auth/trace.append", + | "aud": "https://www.googleapis.com/oauth2/v4/token", + | "exp": ${now + 60.minutes.toSeconds}, + | "iat": $now + |}""".stripMargin.getBytes("utf-8") + ) + val signer = Signature.getInstance("SHA256withRSA") + signer.initSign(credentials.getPrivateKey) + signer.update(s"$header.$body".getBytes("utf-8")) + val signature = base64.encodeToString(signer.sign()) + s"$header.$body.$signature" + } + sttp + .post(uri"https://www.googleapis.com/oauth2/v4/token") + .body( + "grant_type" -> "urn:ietf:params:oauth:grant-type:jwt-bearer", + "assertion" -> jwt + ) + .mapResponse(s => s.parseJson.asJsObject.fields("access_token").convertTo[String]) + .send() + .map(_.unsafeBody) + } + + private val sendToGoogle: Sink[Span, NotUsed] = RestartSink.withBackoff( + minBackoff = 3.seconds, + maxBackoff = 30.seconds, + randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly + ) { () => + Flow[Span] + .groupedWithin(buffer, interval) + .mapAsync(1) { spans => + async { + val token = await(getToken()) + val res = await( + sttp + .post(uri"https://cloudtrace.googleapis.com/v2/projects/${credentials.getProjectId}/traces:batchWrite") + .auth + .bearer(token) + .body( + Spans(spans).toJson.compactPrint + ) + .send() + .map(_.unsafeBody)) + res + } + } + .recover { + case NonFatal(e) => + System.err.println(s"Error submitting trace spans: $e") // scalastyle: ignore + throw e + } + .to(Sink.ignore) + } + private val queue: SourceQueueWithComplete[Span] = Source + .queue[Span](buffer, OverflowStrategy.dropHead) + .to(sendToGoogle) + .run() + + private def submit(span: Span): Unit = queue.offer(span).failed.map { e => + System.err.println(s"Error adding span to submission queue: $e") + } + + private def startSpan( + traceId: String, + spanId: String, + parentSpanId: Option[String], + displayName: String, + attributes: Map[String, String]) = Span( + s"project/${credentials.getProjectId}/traces/$traceId/spans/$spanId", + spanId, + parentSpanId, + TruncatableString(displayName), + Instant.now(), + Instant.now(), + Attributes(attributes ++ Map("namespace" -> namespace)) + ) + + def traceWithOptionalParent[A]( + operationName: String, + tags: Map[String, String], + parent: Option[(SpanContext, CausalRelation)])(operation: SpanContext => A): A = { + val child = parent match { + case Some((p, _)) => SpanContext(p.traceId, f"${Random.nextLong()}%02x") + case None => SpanContext.fresh() + } + val span = startSpan(child.traceId, child.spanId, parent.map(_._1.spanId), operationName, tags) + val result = operation(child) + span.endTime = Instant.now() + submit(span) + result + } + + def traceWithOptionalParentAsync[A]( + operationName: String, + tags: Map[String, String], + parent: Option[(SpanContext, CausalRelation)])(operation: SpanContext => Future[A]): Future[A] = { + val child = parent match { + case Some((p, _)) => SpanContext(p.traceId, f"${Random.nextLong()}%02x") + case None => SpanContext.fresh() + } + val span = startSpan(child.traceId, child.spanId, parent.map(_._1.spanId), operationName, tags) + val result = operation(child) + result.onComplete { _ => + span.endTime = Instant.now() + submit(span) + } + result + } +} + +object GoogleReporter { + + val DefaultBufferSize: Int = 10000 + val DefaultInterval: FiniteDuration = 5.seconds + + private case class Attributes(attributeMap: Map[String, String]) + private case class TruncatableString(value: String) + private case class Span( + name: String, + spanId: String, + parentSpanId: Option[String], + displayName: TruncatableString, + startTime: Instant, + var endTime: Instant, + attributes: Attributes + ) + + private case class Spans(spans: Seq[Span]) + + private implicit val instantFormat: RootJsonFormat[Instant] = new RootJsonFormat[Instant] { + override def write(obj: Instant): JsValue = obj.toString.toJson + override def read(json: JsValue): Instant = Instant.parse(json.convertTo[String]) + } + + private implicit val mapFormat = new RootJsonFormat[Map[String, String]] { + override def read(json: JsValue): Map[String, String] = sys.error("unimplemented") + override def write(obj: Map[String, String]): JsValue = { + val withValueObjects = obj.mapValues(value => JsObject("stringValue" -> JsObject("value" -> value.toJson))) + JsObject(withValueObjects) + } + } + + private implicit val attributeFormat: RootJsonFormat[Attributes] = jsonFormat1(Attributes) + private implicit val truncatableStringFormat: RootJsonFormat[TruncatableString] = jsonFormat1(TruncatableString) + private implicit val spanFormat: RootJsonFormat[Span] = jsonFormat7(Span) + private implicit val spansFormat: RootJsonFormat[Spans] = jsonFormat1(Spans) + +} diff --git a/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala b/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala new file mode 100644 index 0000000..9179f42 --- /dev/null +++ b/src/main/scala/xyz/driver/core/reporting/NoTraceReporter.scala @@ -0,0 +1,18 @@ +package xyz.driver.core +package reporting + +import com.typesafe.scalalogging.Logger + +import scala.concurrent.Future + +class NoTraceReporter(val logger: Logger) extends Reporter with ScalaLoggerLike { + override def traceWithOptionalParent[A]( + name: String, + tags: Map[String, String], + parent: Option[(SpanContext, Reporter.CausalRelation)])(op: SpanContext => A): A = op(SpanContext.fresh()) + override def traceWithOptionalParentAsync[A]( + name: String, + tags: Map[String, String], + parent: Option[(SpanContext, Reporter.CausalRelation)])(op: SpanContext => Future[A]): Future[A] = + op(SpanContext.fresh()) +} diff --git a/src/main/scala/xyz/driver/core/reporting/Reporter.scala b/src/main/scala/xyz/driver/core/reporting/Reporter.scala new file mode 100644 index 0000000..2425044 --- /dev/null +++ b/src/main/scala/xyz/driver/core/reporting/Reporter.scala @@ -0,0 +1,164 @@ +package xyz.driver.core.reporting + +import com.typesafe.scalalogging.Logger +import org.slf4j.helpers.NOPLogger +import xyz.driver.core.reporting.Reporter.CausalRelation + +import scala.concurrent.Future + +/** Context-aware diagnostic utility for distributed systems, combining logging and tracing. + * + * Diagnostic messages (i.e. logs) are a vital tool for monitoring applications. Tying such messages to an + * execution context, such as a stack trace, simplifies debugging greatly by giving insight to the chains of events + * that led to a particular message. In synchronous systems, execution contexts can easily be determined by an + * external observer, and, as such, do not need to be propagated explicitly to sub-components (e.g. a stack trace on + * the JVM shows all relevant information). In asynchronous systems and especially distributed systems however, + * execution contexts are not easily determined by an external observer and hence need to be explcictly passed across + * service boundaries. + * + * This reporter provides tracing and logging utilities that explicitly require references to execution contexts + * (called [[SpanContext]]s here) intended to be passed across service boundaries. It embraces Scala's + * implicit-parameter-as-a-context paradigm. + * + * Tracing is intended to be compatible with the + * [[https://github.com/opentracing/specification/blob/master/specification.md OpenTrace specification]], and hence its + * guidelines on naming and tagging apply to methods provided by this Reporter as well. + * + * Usage example: + * {{{ + * val reporter: Reporter = ??? + * object Repo { + * def getUserQuery(userId: String)(implicit ctx: SpanContext) = reporter.trace("query"){ implicit ctx => + * reporter.debug("Running query") + * // run query + * } + * } + * object Service { + * def getUser(userId: String)(implicit ctx: SpanContext) = reporter.trace("get_user"){ implicit ctx => + * reporter.debug("Getting user") + * Repo.getUserQuery(userId) + * } + * } + * reporter.traceRoot("static_get", Map("user" -> "john")) { implicit ctx => + * Service.getUser("john") + * } + * }}} + * + * Note that computing traces may be a more expensive operation than traditional logging frameworks provide (in terms + * of memory and processing). It should be used in interesting and actionable code paths. + * + * @define rootWarning Note: the idea of the reporting framework is to pass along references to traces as + * implicit parameters. This method should only be used for top-level traces when no parent + * traces are available. + */ +trait Reporter { + + def traceWithOptionalParent[A]( + name: String, + tags: Map[String, String], + parent: Option[(SpanContext, CausalRelation)])(op: SpanContext => A): A + def traceWithOptionalParentAsync[A]( + name: String, + tags: Map[String, String], + parent: Option[(SpanContext, CausalRelation)])(op: SpanContext => Future[A]): Future[A] + + /** Trace the execution of an operation, if no parent trace is available. + * + * $rootWarning + */ + def traceRoot[A](name: String, tags: Map[String, String])(op: SpanContext => A): A = + traceWithOptionalParent( + name, + tags, + None + )(op) + + /** Trace the execution of an asynchronous operation, if no parent trace is available. + * + * $rootWarning + * + * @see traceRoot + */ + def traceRootAsync[A](name: String, tags: Map[String, String])(op: SpanContext => Future[A]): Future[A] = + traceWithOptionalParentAsync( + name, + tags, + None + )(op) + + /** Trace the execution of an operation, in relation to a parent context. + * + * @param name The name of the operation. Note that this name should not be too specific. According to the + * OpenTrace RFC: "An operation name, a human-readable string which concisely represents the work done + * by the Span (for example, an RPC method name, a function name, or the name of a subtask or stage + * within a larger computation). The operation name should be the most general string that identifies a + * (statistically) interesting class of Span instances. That is, `"get_user"` is better than + * `"get_user/314159"`". + * @param tags Attributes associated with the traced event. Following the above example, if `"get_user"` is an + * operation name, a good tag would be `("account_id" -> 314159)`. + * @param relation Relation of the operation to its parent context. + * @param op The operation to be traced. The trace will complete once the operation returns. + * @param ctx Context of the parent trace. + * @tparam A Return type of the operation. + * @return The value of the child operation. + */ + def trace[A](name: String, tags: Map[String, String], relation: CausalRelation = CausalRelation.Child)( + op: /* implicit (gotta wait for Scala 3) */ SpanContext => A)(implicit ctx: SpanContext): A = + traceWithOptionalParent( + name, + tags, + Some(ctx -> relation) + )(op) + + /** Trace the operation of an asynchronous operation. + * + * Contrary to the synchronous version of this method, a trace is completed once the child operation completes + * (rather than returns). + * + * @see trace + */ + def traceAsync[A](name: String, tags: Map[String, String], relation: CausalRelation = CausalRelation.Child)( + op: /* implicit (gotta wait for Scala 3) */ SpanContext => Future[A])(implicit ctx: SpanContext): Future[A] = + traceWithOptionalParentAsync( + name, + tags, + Some(ctx -> relation) + )(op) + + /** Log a debug message. */ + def debug(message: String)(implicit ctx: SpanContext): Unit + + /** Log an informational message. */ + def info(message: String)(implicit ctx: SpanContext): Unit + + /** Log a warning message. */ + def warn(message: String)(implicit ctx: SpanContext): Unit + + /** Log a error message. */ + def error(message: String)(implicit ctx: SpanContext): Unit + + /** Log a error message with an associated throwable that caused the error condition. */ + def error(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit + +} + +object Reporter { + + val NoReporter: Reporter = new NoTraceReporter(Logger.apply(NOPLogger.NOP_LOGGER)) + + /** A relation in cause. + * + * Corresponds to + * [[https://github.com/opentracing/specification/blob/master/specification.md#references-between-spans OpenTrace references between spans]] + */ + sealed trait CausalRelation + object CausalRelation { + + /** One event is the child of another. The parent completes once the child is complete. */ + case object Child extends CausalRelation + + /** One event follows from another, not necessarily being the parent. */ + case object Follows extends CausalRelation + } + +} diff --git a/src/main/scala/xyz/driver/core/reporting/ScalaLoggerLike.scala b/src/main/scala/xyz/driver/core/reporting/ScalaLoggerLike.scala new file mode 100644 index 0000000..c1131fb --- /dev/null +++ b/src/main/scala/xyz/driver/core/reporting/ScalaLoggerLike.scala @@ -0,0 +1,31 @@ +package xyz.driver.core.reporting +import com.typesafe.scalalogging.Logger + +trait ScalaLoggerLike extends Reporter { + + def logger: Logger + + override def debug(message: String)(implicit ctx: SpanContext): Unit = logger.debug(message) + override def info(message: String)(implicit ctx: SpanContext): Unit = logger.info(message) + override def warn(message: String)(implicit ctx: SpanContext): Unit = logger.warn(message) + override def error(message: String)(implicit ctx: SpanContext): Unit = logger.error(message) + override def error(message: String, reason: Throwable)(implicit ctx: SpanContext): Unit = + logger.error(message, reason) + +} + +object ScalaLoggerLike { + import scala.language.implicitConversions + + def defaultScalaLogger(json: Boolean = false): Logger = { + if (json) { + System.setProperty("logback.configurationFile", "deployed-logback.xml") + } else { + System.setProperty("logback.configurationFile", "logback.xml") + } + Logger.apply("application") + } + + implicit def toScalaLogger(reporter: ScalaLoggerLike): Logger = reporter.logger + +} diff --git a/src/main/scala/xyz/driver/core/reporting/SpanContext.scala b/src/main/scala/xyz/driver/core/reporting/SpanContext.scala new file mode 100644 index 0000000..58ab973 --- /dev/null +++ b/src/main/scala/xyz/driver/core/reporting/SpanContext.scala @@ -0,0 +1,11 @@ +package xyz.driver.core +package reporting +import scala.util.Random + +case class SpanContext private[core] (traceId: String, spanId: String) +object SpanContext { + def fresh() = SpanContext( + f"${Random.nextLong()}%02x${Random.nextLong()}%02x", + f"${Random.nextLong()}%02x" + ) +} |