diff options
author | Jakob Odersky <jakob@driver.xyz> | 2017-10-01 20:24:02 -0700 |
---|---|---|
committer | Jakob Odersky <jakob@driver.xyz> | 2017-10-01 20:24:29 -0700 |
commit | 2c08b51411be5b0cce57f876377fcd52bee99990 (patch) | |
tree | fee56a21e6a5f3d2dd459b51e5afb355db6c7f02 | |
parent | 5bd947dd08eec1d6c64a9549566f3ce0e91fe74f (diff) | |
download | tracing-2c08b51411be5b0cce57f876377fcd52bee99990.tar.gz tracing-2c08b51411be5b0cce57f876377fcd52bee99990.tar.bz2 tracing-2c08b51411be5b0cce57f876377fcd52bee99990.zip |
Flatten file hierarchy and implement OAUTH2 authentication
-rw-r--r-- | build.sbt | 6 | ||||
-rw-r--r-- | src/main/scala/GoogleTracer.scala (renamed from src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala) | 32 | ||||
-rw-r--r-- | src/main/scala/Span.scala (renamed from src/main/scala/xyz/driver/tracing/Span.scala) | 4 | ||||
-rw-r--r-- | src/main/scala/Tracer.scala (renamed from src/main/scala/xyz/driver/tracing/Tracer.scala) | 0 | ||||
-rw-r--r-- | src/main/scala/TracingDirectives.scala (renamed from src/main/scala/xyz/driver/tracing/TracingDirectives.scala) | 7 | ||||
-rw-r--r-- | src/main/scala/google/OAuth2.scala | 108 | ||||
-rw-r--r-- | src/main/scala/google/api.scala (renamed from src/main/scala/xyz/driver/tracing/google/api.scala) | 4 |
7 files changed, 143 insertions, 18 deletions
@@ -3,5 +3,9 @@ scalaVersion := "2.12.3" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-http" % "10.0.10", "com.typesafe.akka" %% "akka-http-spray-json" % "10.0.10", - "io.spray" %% "spray-json" % "1.3.3" + "com.pauldijou" %% "jwt-core" % "0.14.0", + "io.spray" %% "spray-json" % "1.3.3", + "org.scalatest" %% "scalatest" % "3.0.2" % "test", ) + +fork in test := true diff --git a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala b/src/main/scala/GoogleTracer.scala index 6a4d4fa..5b3c9df 100644 --- a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala +++ b/src/main/scala/GoogleTracer.scala @@ -1,6 +1,7 @@ package xyz.driver.tracing -package google +import google._ +import java.nio.file.Path import akka.stream._ import akka.stream.scaladsl._ import akka.actor.ActorSystem @@ -14,10 +15,13 @@ import scala.concurrent.duration._ import spray.json.DefaultJsonProtocol._ import java.util.UUID -class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000, concurrentConnections: Int = 1)( - implicit system: ActorSystem, - materializer: Materializer) +class GoogleTracer(projectId: String, + serviceAccountFile: Path, + bufferSize: Int = 1000, + concurrentConnections: Int = 1)(implicit system: ActorSystem, + materializer: Materializer) extends Tracer { + import system.dispatcher lazy val connectionPool = Http().superPool[Unit]() @@ -43,22 +47,32 @@ class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000, lazy val queue: SourceQueueWithComplete[Span] = { Source .queue[Span](bufferSize, OverflowStrategy.dropNew) + .log("debug") .viaMat(batchingPipeline)(Keep.left) .mapAsync(concurrentConnections) { (traces: Traces) => - Marshal(traces).to[RequestEntity].map{ entity => + println(traces) + Marshal(traces).to[RequestEntity].map { entity => HttpRequest( - HttpMethods.PATCH, - s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces", + method = HttpMethods.PATCH, + uri = + s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces", entity = entity ) } } + .viaMat( + OAuth2.authenticatedFlow( + Http(), + serviceAccountFile, + Seq( + "https://www.googleapis.com/auth/trace.append" + )))(Keep.left) .map(req => (req, ())) .viaMat(connectionPool)(Keep.left) .mapError { case NonFatal(e) => - system.log.warning( - s"Exception encountered while submitting trace: $e") + system.log.error(s"Exception encountered while submitting trace", e) + e.printStackTrace e } .to(Sink.ignore) diff --git a/src/main/scala/xyz/driver/tracing/Span.scala b/src/main/scala/Span.scala index 2ae640e..fcd52b6 100644 --- a/src/main/scala/xyz/driver/tracing/Span.scala +++ b/src/main/scala/Span.scala @@ -4,9 +4,9 @@ import java.util.UUID import java.time._ case class Span( - traceId: UUID, - spanId: UUID, name: String, + traceId: UUID = UUID.randomUUID(), + spanId: UUID = UUID.randomUUID(), parentSpanId: Option[UUID] = None, labels: Map[String, String] = Map.empty, startTime: Instant = Instant.now, diff --git a/src/main/scala/xyz/driver/tracing/Tracer.scala b/src/main/scala/Tracer.scala index 00a1fbf..00a1fbf 100644 --- a/src/main/scala/xyz/driver/tracing/Tracer.scala +++ b/src/main/scala/Tracer.scala diff --git a/src/main/scala/xyz/driver/tracing/TracingDirectives.scala b/src/main/scala/TracingDirectives.scala index 9e0237c..c55ac40 100644 --- a/src/main/scala/xyz/driver/tracing/TracingDirectives.scala +++ b/src/main/scala/TracingDirectives.scala @@ -28,17 +28,14 @@ trait TracingDirectives { val span: Span = parent match { case None => // no parent span, create new trace Span( - traceId = UUID.randomUUID, - spanId = UUID.randomUUID, name = name, labels = labels ) case Some(TraceContext(traceId, parentSpanId)) => Span( + name = name, traceId = traceId, - spanId = UUID.randomUUID, parentSpanId = parentSpanId, - name = name, labels = labels ) } @@ -56,7 +53,7 @@ trait TracingDirectives { inner(())(ctx).map(f)(ctx.executionContext) } } - */ + */ } diff --git a/src/main/scala/google/OAuth2.scala b/src/main/scala/google/OAuth2.scala new file mode 100644 index 0000000..43811c4 --- /dev/null +++ b/src/main/scala/google/OAuth2.scala @@ -0,0 +1,108 @@ +package xyz.driver.tracing +package google + +import akka.stream.scaladsl._ +import akka.stream._ +import akka.stream.stage._ +import akka.http.scaladsl._ +import akka.http.scaladsl.unmarshalling._ +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers._ +import akka.util.ByteString +import java.time._ +import java.nio.file._ +import spray.json._ +import spray.json.DefaultJsonProtocol._ +import pdi.jwt._ +import scala.concurrent._ +import scala.concurrent.duration._ + +object OAuth2 { + + private case class ServiceAccount(project_id: String, + private_key: String, + client_email: String) + private implicit val serviceAccountFormat = jsonFormat3(ServiceAccount) + + private case class GrantResponse(access_token: String, expires_in: Int) + private implicit val grantResponseFormat = jsonFormat2(GrantResponse) + + /** Request a new access token for the given scopes. + * + * Implements the OAUTH2 workflow as descried here + * https://developers.google.com/identity/protocols/OAuth2ServiceAccount + */ + def requestAccessToken( + http: HttpExt, + serviceAccountFile: Path, + scopes: Seq[String] + )(implicit ec: ExecutionContext, + mat: Materializer): Future[(Instant, String)] = + Future { + val now = Instant.now.toEpochMilli / 1000 + val credentials = + (new String(Files.readAllBytes(serviceAccountFile), "utf-8")).parseJson + .convertTo[ServiceAccount] + + val claim = JwtClaim( + issuer = Some(credentials.client_email), + expiration = Some(now + 60 * 60), + issuedAt = Some(now) + ) + + ("aud", "https://www.googleapis.com/oauth2/v4/token") + + ("scope", scopes.mkString(" ")) + + Jwt.encode(claim, credentials.private_key, JwtAlgorithm.RS256) + } flatMap { assertion => + http.singleRequest( + HttpRequest( + method = HttpMethods.POST, + uri = "https://www.googleapis.com/oauth2/v4/token" + ).withEntity( + FormData( + "grant_type" -> "urn:ietf:params:oauth:grant-type:jwt-bearer", + "assertion" -> assertion + ).toEntity)) + } flatMap { response => + Unmarshal(response).to[GrantResponse] + } map { grant => + (Instant.now.plusSeconds(grant.expires_in), grant.access_token) + } + + /** Flow that injects access tokens into a stream of HTTP requests. + * + * Re-authentication happens transparently when access tokens expire. Note: + * in case an access token gets revoked, this flow needs to be restarted in + * order to re-authenticate + */ + def authenticatedFlow(http: HttpExt, + serviceAccountFile: Path, + scopes: Seq[String], + graceSeconds: Int = 300)( + implicit ec: ExecutionContext, + mat: Materializer): Flow[HttpRequest, HttpRequest, _] = + Flow[HttpRequest] + .scanAsync[(HttpRequest, Instant, String)]( + (HttpRequest(), Instant.now, "")) { + case ((_, expiration, accessToken), request) => + if (Instant.now isAfter expiration.minusSeconds(graceSeconds)) { + http.system.log.info("tracing access token expired, refreshing") + requestAccessToken(http, serviceAccountFile, scopes).map { + case (newExpiration, newToken) => + http.system.log.debug("new tracing access token otained") + (request, newExpiration, newToken) + } + } else { + Future.successful((request, expiration, accessToken)) + } + } + .drop(1) // drop initial element + .map { + case (request, _, accessToken) => + request.withHeaders( + RawHeader("Authorization", "Bearer " + accessToken) + ) + } + +} diff --git a/src/main/scala/xyz/driver/tracing/google/api.scala b/src/main/scala/google/api.scala index 356e3da..122b695 100644 --- a/src/main/scala/xyz/driver/tracing/google/api.scala +++ b/src/main/scala/google/api.scala @@ -46,7 +46,9 @@ object TraceSpan { } implicit val instantFormat = new JsonFormat[Instant] { - val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXXZ") + val formatter = DateTimeFormatter + .ofPattern("yyyy-MM-dd'T'HH:mm:ssXXXZ") + .withZone(ZoneId.of("UTC")) override def write(x: Instant): JsValue = JsString(formatter.format(x)) override def read(x: JsValue): Instant = x match { case JsString(x) => Instant.parse(x) |