package xyz.driver.core package reporting import java.security.Signature import java.time.Instant import java.util import akka.NotUsed import akka.stream.scaladsl.{Flow, RestartSink, Sink, Source, SourceQueueWithComplete} import akka.stream.{Materializer, OverflowStrategy} import com.google.auth.oauth2.ServiceAccountCredentials import com.softwaremill.sttp._ import spray.json.DerivedJsonProtocol._ import spray.json._ import xyz.driver.core.reporting.Reporter.CausalRelation import scala.async.Async._ import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} import scala.util.control.NonFatal import scala.util.{Failure, Random, Success, Try} /** A reporter that collects traces and submits them to * [[https://cloud.google.com/trace/docs/reference/v2/rest/ Google's Stackdriver Trace API]]. */ class GoogleReporter( val credentials: ServiceAccountCredentials, namespace: String, buffer: Int = GoogleReporter.DefaultBufferSize, interval: FiniteDuration = GoogleReporter.DefaultInterval)( implicit client: SttpBackend[Future, _], mat: Materializer, ec: ExecutionContext ) extends Reporter { import GoogleReporter._ private val getToken: () => Future[String] = Refresh.every(55.minutes) { def jwt = { val now = Instant.now().getEpochSecond val base64 = util.Base64.getEncoder val header = base64.encodeToString("""{"alg":"RS256","typ":"JWT"}""".getBytes("utf-8")) val body = base64.encodeToString( s"""|{ | "iss": "${credentials.getClientEmail}", | "scope": "https://www.googleapis.com/auth/trace.append", | "aud": "https://www.googleapis.com/oauth2/v4/token", | "exp": ${now + 60.minutes.toSeconds}, | "iat": $now |}""".stripMargin.getBytes("utf-8") ) val signer = Signature.getInstance("SHA256withRSA") signer.initSign(credentials.getPrivateKey) signer.update(s"$header.$body".getBytes("utf-8")) val signature = base64.encodeToString(signer.sign()) s"$header.$body.$signature" } sttp .post(uri"https://www.googleapis.com/oauth2/v4/token") .body( "grant_type" -> "urn:ietf:params:oauth:grant-type:jwt-bearer", "assertion" -> jwt ) .mapResponse(s => s.parseJson.asJsObject.fields("access_token").convertTo[String]) .send() .map(_.unsafeBody) } private val sendToGoogle: Sink[Span, NotUsed] = RestartSink.withBackoff( minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly ) { () => Flow[Span] .groupedWithin(buffer, interval) .mapAsync(1) { spans => async { val token = await(getToken()) val res = await( sttp .post(uri"https://cloudtrace.googleapis.com/v2/projects/${credentials.getProjectId}/traces:batchWrite") .auth .bearer(token) .body( Spans(spans).toJson.compactPrint ) .send() .map(_.unsafeBody)) res } } .recover { case NonFatal(e) => System.err.println(s"Error submitting trace spans: $e") // scalastyle: ignore throw e } .to(Sink.ignore) } private val queue: SourceQueueWithComplete[Span] = Source .queue[Span](buffer, OverflowStrategy.dropHead) .to(sendToGoogle) .run() private def submit(span: Span): Unit = queue.offer(span).failed.map { e => System.err.println(s"Error adding span to submission queue: $e") } private def startSpan( traceId: String, spanId: String, parentSpanId: Option[String], displayName: String, attributes: Map[String, String]) = Span( s"projects/${credentials.getProjectId}/traces/$traceId/spans/$spanId", spanId, parentSpanId, TruncatableString(displayName), Instant.now(), Instant.now(), Attributes(attributes ++ Map("service.namespace" -> namespace)), Failure(new IllegalStateException("span status not set")) ) def traceWithOptionalParent[A]( operationName: String, tags: Map[String, String], parent: Option[(SpanContext, CausalRelation)])(operation: SpanContext => A): A = { val child = parent match { case Some((p, _)) => SpanContext(p.traceId, f"${Random.nextLong()}%016x") case None => SpanContext.fresh() } val span = startSpan(child.traceId, child.spanId, parent.map(_._1.spanId), operationName, tags) val result = operation(child) span.endTime = Instant.now() submit(span) result } def traceWithOptionalParentAsync[A]( operationName: String, tags: Map[String, String], parent: Option[(SpanContext, CausalRelation)])(operation: SpanContext => Future[A]): Future[A] = { val child = parent match { case Some((p, _)) => SpanContext(p.traceId, f"${Random.nextLong()}%016x") case None => SpanContext.fresh() } val span = startSpan(child.traceId, child.spanId, parent.map(_._1.spanId), operationName, tags) val result = operation(child) result.onComplete { result => span.endTime = Instant.now() span.status = result submit(span) } result } override def log(severity: Reporter.Severity, message: String, reason: Option[Throwable])( implicit ctx: SpanContext): Unit = sys.error("Submitting logs directly to GCP is " + "currently not supported. Messages should go to stdout.") // TODO: attach logs to traces and submit them directly } object GoogleReporter { val DefaultBufferSize: Int = 10000 val DefaultInterval: FiniteDuration = 5.seconds private case class Attributes(attributeMap: Map[String, String]) private case class TruncatableString(value: String) private case class Span( name: String, spanId: String, parentSpanId: Option[String], displayName: TruncatableString, startTime: Instant, var endTime: Instant, var attributes: Attributes, var status: Try[_] ) private case class Spans(spans: Seq[Span]) private implicit val instantFormat: RootJsonFormat[Instant] = new RootJsonFormat[Instant] { override def write(obj: Instant): JsValue = obj.toString.toJson override def read(json: JsValue): Instant = Instant.parse(json.convertTo[String]) } private implicit val mapFormat = new RootJsonFormat[Map[String, String]] { override def read(json: JsValue): Map[String, String] = sys.error("unimplemented") override def write(obj: Map[String, String]): JsValue = { val withValueObjects = obj.mapValues(value => JsObject("stringValue" -> JsObject("value" -> value.toJson))) JsObject(withValueObjects) } } private implicit val statusFormat = new RootJsonFormat[Try[_]] { override def read(json: JsValue): Try[_] = sys.error("unimplemented") override def write(obj: Try[_]) = { // error codes defined at https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto val (code, message) = obj match { case Success(_) => (0, "success") case Failure(_) => (2, "failure") } JsObject( "code" -> code.toJson, "message" -> message.toJson, "details" -> JsArray() ) } } private implicit val attributeFormat: RootJsonFormat[Attributes] = jsonFormat1(Attributes) private implicit val truncatableStringFormat: RootJsonFormat[TruncatableString] = jsonFormat1(TruncatableString) private implicit val spanFormat: RootJsonFormat[Span] = jsonFormat8(Span) private implicit val spansFormat: RootJsonFormat[Spans] = jsonFormat1(Spans) }