aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jakob@driver.xyz>2017-10-01 20:24:02 -0700
committerJakob Odersky <jakob@driver.xyz>2017-10-01 20:24:29 -0700
commit2c08b51411be5b0cce57f876377fcd52bee99990 (patch)
treefee56a21e6a5f3d2dd459b51e5afb355db6c7f02
parent5bd947dd08eec1d6c64a9549566f3ce0e91fe74f (diff)
downloadtracing-2c08b51411be5b0cce57f876377fcd52bee99990.tar.gz
tracing-2c08b51411be5b0cce57f876377fcd52bee99990.tar.bz2
tracing-2c08b51411be5b0cce57f876377fcd52bee99990.zip
Flatten file hierarchy and implement OAUTH2 authentication
-rw-r--r--build.sbt6
-rw-r--r--src/main/scala/GoogleTracer.scala (renamed from src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala)32
-rw-r--r--src/main/scala/Span.scala (renamed from src/main/scala/xyz/driver/tracing/Span.scala)4
-rw-r--r--src/main/scala/Tracer.scala (renamed from src/main/scala/xyz/driver/tracing/Tracer.scala)0
-rw-r--r--src/main/scala/TracingDirectives.scala (renamed from src/main/scala/xyz/driver/tracing/TracingDirectives.scala)7
-rw-r--r--src/main/scala/google/OAuth2.scala108
-rw-r--r--src/main/scala/google/api.scala (renamed from src/main/scala/xyz/driver/tracing/google/api.scala)4
7 files changed, 143 insertions, 18 deletions
diff --git a/build.sbt b/build.sbt
index 733739e..2aa0701 100644
--- a/build.sbt
+++ b/build.sbt
@@ -3,5 +3,9 @@ scalaVersion := "2.12.3"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http" % "10.0.10",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.0.10",
- "io.spray" %% "spray-json" % "1.3.3"
+ "com.pauldijou" %% "jwt-core" % "0.14.0",
+ "io.spray" %% "spray-json" % "1.3.3",
+ "org.scalatest" %% "scalatest" % "3.0.2" % "test",
)
+
+fork in test := true
diff --git a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala b/src/main/scala/GoogleTracer.scala
index 6a4d4fa..5b3c9df 100644
--- a/src/main/scala/xyz/driver/tracing/google/GoogleTracer.scala
+++ b/src/main/scala/GoogleTracer.scala
@@ -1,6 +1,7 @@
package xyz.driver.tracing
-package google
+import google._
+import java.nio.file.Path
import akka.stream._
import akka.stream.scaladsl._
import akka.actor.ActorSystem
@@ -14,10 +15,13 @@ import scala.concurrent.duration._
import spray.json.DefaultJsonProtocol._
import java.util.UUID
-class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000, concurrentConnections: Int = 1)(
- implicit system: ActorSystem,
- materializer: Materializer)
+class GoogleTracer(projectId: String,
+ serviceAccountFile: Path,
+ bufferSize: Int = 1000,
+ concurrentConnections: Int = 1)(implicit system: ActorSystem,
+ materializer: Materializer)
extends Tracer {
+
import system.dispatcher
lazy val connectionPool = Http().superPool[Unit]()
@@ -43,22 +47,32 @@ class GoogleTracer(projectId: String, authToken: String, bufferSize: Int = 1000,
lazy val queue: SourceQueueWithComplete[Span] = {
Source
.queue[Span](bufferSize, OverflowStrategy.dropNew)
+ .log("debug")
.viaMat(batchingPipeline)(Keep.left)
.mapAsync(concurrentConnections) { (traces: Traces) =>
- Marshal(traces).to[RequestEntity].map{ entity =>
+ println(traces)
+ Marshal(traces).to[RequestEntity].map { entity =>
HttpRequest(
- HttpMethods.PATCH,
- s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces",
+ method = HttpMethods.PATCH,
+ uri =
+ s"https://cloudtrace.googleapis.com/v1/projects/${projectId}/traces",
entity = entity
)
}
}
+ .viaMat(
+ OAuth2.authenticatedFlow(
+ Http(),
+ serviceAccountFile,
+ Seq(
+ "https://www.googleapis.com/auth/trace.append"
+ )))(Keep.left)
.map(req => (req, ()))
.viaMat(connectionPool)(Keep.left)
.mapError {
case NonFatal(e) =>
- system.log.warning(
- s"Exception encountered while submitting trace: $e")
+ system.log.error(s"Exception encountered while submitting trace", e)
+ e.printStackTrace
e
}
.to(Sink.ignore)
diff --git a/src/main/scala/xyz/driver/tracing/Span.scala b/src/main/scala/Span.scala
index 2ae640e..fcd52b6 100644
--- a/src/main/scala/xyz/driver/tracing/Span.scala
+++ b/src/main/scala/Span.scala
@@ -4,9 +4,9 @@ import java.util.UUID
import java.time._
case class Span(
- traceId: UUID,
- spanId: UUID,
name: String,
+ traceId: UUID = UUID.randomUUID(),
+ spanId: UUID = UUID.randomUUID(),
parentSpanId: Option[UUID] = None,
labels: Map[String, String] = Map.empty,
startTime: Instant = Instant.now,
diff --git a/src/main/scala/xyz/driver/tracing/Tracer.scala b/src/main/scala/Tracer.scala
index 00a1fbf..00a1fbf 100644
--- a/src/main/scala/xyz/driver/tracing/Tracer.scala
+++ b/src/main/scala/Tracer.scala
diff --git a/src/main/scala/xyz/driver/tracing/TracingDirectives.scala b/src/main/scala/TracingDirectives.scala
index 9e0237c..c55ac40 100644
--- a/src/main/scala/xyz/driver/tracing/TracingDirectives.scala
+++ b/src/main/scala/TracingDirectives.scala
@@ -28,17 +28,14 @@ trait TracingDirectives {
val span: Span = parent match {
case None => // no parent span, create new trace
Span(
- traceId = UUID.randomUUID,
- spanId = UUID.randomUUID,
name = name,
labels = labels
)
case Some(TraceContext(traceId, parentSpanId)) =>
Span(
+ name = name,
traceId = traceId,
- spanId = UUID.randomUUID,
parentSpanId = parentSpanId,
- name = name,
labels = labels
)
}
@@ -56,7 +53,7 @@ trait TracingDirectives {
inner(())(ctx).map(f)(ctx.executionContext)
}
}
- */
+ */
}
diff --git a/src/main/scala/google/OAuth2.scala b/src/main/scala/google/OAuth2.scala
new file mode 100644
index 0000000..43811c4
--- /dev/null
+++ b/src/main/scala/google/OAuth2.scala
@@ -0,0 +1,108 @@
+package xyz.driver.tracing
+package google
+
+import akka.stream.scaladsl._
+import akka.stream._
+import akka.stream.stage._
+import akka.http.scaladsl._
+import akka.http.scaladsl.unmarshalling._
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.model.headers._
+import akka.util.ByteString
+import java.time._
+import java.nio.file._
+import spray.json._
+import spray.json.DefaultJsonProtocol._
+import pdi.jwt._
+import scala.concurrent._
+import scala.concurrent.duration._
+
+object OAuth2 {
+
+ private case class ServiceAccount(project_id: String,
+ private_key: String,
+ client_email: String)
+ private implicit val serviceAccountFormat = jsonFormat3(ServiceAccount)
+
+ private case class GrantResponse(access_token: String, expires_in: Int)
+ private implicit val grantResponseFormat = jsonFormat2(GrantResponse)
+
+ /** Request a new access token for the given scopes.
+ *
+ * Implements the OAUTH2 workflow as descried here
+ * https://developers.google.com/identity/protocols/OAuth2ServiceAccount
+ */
+ def requestAccessToken(
+ http: HttpExt,
+ serviceAccountFile: Path,
+ scopes: Seq[String]
+ )(implicit ec: ExecutionContext,
+ mat: Materializer): Future[(Instant, String)] =
+ Future {
+ val now = Instant.now.toEpochMilli / 1000
+ val credentials =
+ (new String(Files.readAllBytes(serviceAccountFile), "utf-8")).parseJson
+ .convertTo[ServiceAccount]
+
+ val claim = JwtClaim(
+ issuer = Some(credentials.client_email),
+ expiration = Some(now + 60 * 60),
+ issuedAt = Some(now)
+ ) +
+ ("aud", "https://www.googleapis.com/oauth2/v4/token") +
+ ("scope", scopes.mkString(" "))
+
+ Jwt.encode(claim, credentials.private_key, JwtAlgorithm.RS256)
+ } flatMap { assertion =>
+ http.singleRequest(
+ HttpRequest(
+ method = HttpMethods.POST,
+ uri = "https://www.googleapis.com/oauth2/v4/token"
+ ).withEntity(
+ FormData(
+ "grant_type" -> "urn:ietf:params:oauth:grant-type:jwt-bearer",
+ "assertion" -> assertion
+ ).toEntity))
+ } flatMap { response =>
+ Unmarshal(response).to[GrantResponse]
+ } map { grant =>
+ (Instant.now.plusSeconds(grant.expires_in), grant.access_token)
+ }
+
+ /** Flow that injects access tokens into a stream of HTTP requests.
+ *
+ * Re-authentication happens transparently when access tokens expire. Note:
+ * in case an access token gets revoked, this flow needs to be restarted in
+ * order to re-authenticate
+ */
+ def authenticatedFlow(http: HttpExt,
+ serviceAccountFile: Path,
+ scopes: Seq[String],
+ graceSeconds: Int = 300)(
+ implicit ec: ExecutionContext,
+ mat: Materializer): Flow[HttpRequest, HttpRequest, _] =
+ Flow[HttpRequest]
+ .scanAsync[(HttpRequest, Instant, String)](
+ (HttpRequest(), Instant.now, "")) {
+ case ((_, expiration, accessToken), request) =>
+ if (Instant.now isAfter expiration.minusSeconds(graceSeconds)) {
+ http.system.log.info("tracing access token expired, refreshing")
+ requestAccessToken(http, serviceAccountFile, scopes).map {
+ case (newExpiration, newToken) =>
+ http.system.log.debug("new tracing access token otained")
+ (request, newExpiration, newToken)
+ }
+ } else {
+ Future.successful((request, expiration, accessToken))
+ }
+ }
+ .drop(1) // drop initial element
+ .map {
+ case (request, _, accessToken) =>
+ request.withHeaders(
+ RawHeader("Authorization", "Bearer " + accessToken)
+ )
+ }
+
+}
diff --git a/src/main/scala/xyz/driver/tracing/google/api.scala b/src/main/scala/google/api.scala
index 356e3da..122b695 100644
--- a/src/main/scala/xyz/driver/tracing/google/api.scala
+++ b/src/main/scala/google/api.scala
@@ -46,7 +46,9 @@ object TraceSpan {
}
implicit val instantFormat = new JsonFormat[Instant] {
- val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXXZ")
+ val formatter = DateTimeFormatter
+ .ofPattern("yyyy-MM-dd'T'HH:mm:ssXXXZ")
+ .withZone(ZoneId.of("UTC"))
override def write(x: Instant): JsValue = JsString(formatter.format(x))
override def read(x: JsValue): Instant = x match {
case JsString(x) => Instant.parse(x)