From bd85221d5ca6d4580efd69d40e37237ade79b20d Mon Sep 17 00:00:00 2001 From: vlad Date: Mon, 1 Aug 2016 16:10:49 -0700 Subject: Service call implementation with crypto and service discovery --- project/DriverConfigurations.scala | 3 +- scalastyle-config.xml | 2 +- src/main/scala/com/drivergrp/core/core.scala | 6 +- src/main/scala/com/drivergrp/core/crypto.scala | 23 +++++++ src/main/scala/com/drivergrp/core/rest.scala | 86 +++++++++++++++++++------- 5 files changed, 94 insertions(+), 26 deletions(-) create mode 100644 src/main/scala/com/drivergrp/core/crypto.scala diff --git a/project/DriverConfigurations.scala b/project/DriverConfigurations.scala index 7bb2ed5..4044f34 100644 --- a/project/DriverConfigurations.scala +++ b/project/DriverConfigurations.scala @@ -16,7 +16,8 @@ object DriverConfigurations { lazy val wartRemoverSettings = Seq( wartremoverErrors in (Compile, compile) ++= Warts.allBut( Wart.AsInstanceOf, Wart.Nothing, Wart.Overloading, Wart.DefaultArguments, Wart.Any, - Wart.Option2Iterable, Wart.ExplicitImplicitTypes, Wart.Throw, Wart.ToString) + Wart.Option2Iterable, Wart.ExplicitImplicitTypes, Wart.Throw, Wart.ToString, Wart.NoNeedForMonad + ) ) lazy val acyclicSettings = Seq( diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 9b74756..92b23a5 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -77,7 +77,7 @@ - + diff --git a/src/main/scala/com/drivergrp/core/core.scala b/src/main/scala/com/drivergrp/core/core.scala index e9c403a..8f38bc1 100644 --- a/src/main/scala/com/drivergrp/core/core.scala +++ b/src/main/scala/com/drivergrp/core/core.scala @@ -18,8 +18,10 @@ package object core { } } - private[core] trait Tagged[+V, +Tag] - type @@[+V, +Tag] = V with Tagged[V, Tag] + object tagging { + private[core] trait Tagged[+V, +Tag] + } + type @@[+V, +Tag] = V with tagging.Tagged[V, Tag] type Id[+Tag] = Long @@ Tag object Id { diff --git a/src/main/scala/com/drivergrp/core/crypto.scala b/src/main/scala/com/drivergrp/core/crypto.scala new file mode 100644 index 0000000..2910260 --- /dev/null +++ b/src/main/scala/com/drivergrp/core/crypto.scala @@ -0,0 +1,23 @@ +package com.drivergrp.core + +object crypto { + + final case class Macaroon(value: String) + + final case class Base64[T](value: String) + + final case class AuthToken(value: Base64[Macaroon]) + + final case class EncryptionKey(value: String) + + final case class DecryptionKey(value: String) + + trait Crypto { + + def keyForToken(authToken: AuthToken): EncryptionKey + + def encrypt(encryptionKey: EncryptionKey)(message: Array[Byte]): Array[Byte] + + def decrypt(decryptionKey: EncryptionKey)(message: Array[Byte]): Array[Byte] + } +} diff --git a/src/main/scala/com/drivergrp/core/rest.scala b/src/main/scala/com/drivergrp/core/rest.scala index 91e9d3b..4edb466 100644 --- a/src/main/scala/com/drivergrp/core/rest.scala +++ b/src/main/scala/com/drivergrp/core/rest.scala @@ -2,9 +2,14 @@ package com.drivergrp.core import akka.actor.ActorSystem import akka.http.scaladsl.Http -import akka.http.scaladsl.model.{HttpRequest, HttpResponse} +import akka.http.scaladsl.marshalling.{Marshal, Marshaller} +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers.RawHeader +import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} import akka.stream.ActorMaterializer -import akka.util.Timeout +import akka.stream.scaladsl.Flow +import akka.util.{ByteString, Timeout} +import com.drivergrp.core.crypto.{AuthToken, Crypto} import com.drivergrp.core.logging.Logger import com.drivergrp.core.stats.Stats import com.drivergrp.core.time.TimeRange @@ -18,48 +23,85 @@ import scala.concurrent.{ExecutionContext, Future} import scala.language.postfixOps import scala.util.{Failure, Success} import scalaz.{Failure => _, Success => _} +import scalaz.Scalaz._ object rest { - trait RestService { - def sendRequest(request: HttpRequest): Future[HttpResponse] + final case class ServiceVersion(majorVersion: Int, minorVersion: Int) { + def isCompatible(otherVersion: ServiceVersion) = + this.majorVersion === otherVersion.majorVersion } - class AkkaHttpRestService(actorSystem: ActorSystem) extends RestService { - protected implicit val materializer = ActorMaterializer()(actorSystem) + trait Service { - def sendRequest(request: HttpRequest): Future[HttpResponse] = - Http()(actorSystem).singleRequest(request)(materializer) + def sendRequest[I,O](authToken: AuthToken)(requestInput: I) + (implicit marshaller: Marshaller[I, RequestEntity], + unmarshaller: Unmarshaller[ResponseEntity, O]): Future[O] } - class ProxyRestService(actorSystem: ActorSystem, log: Logger, stats: Stats, - time: TimeProvider, executionContext: ExecutionContext) - extends AkkaHttpRestService(actorSystem) { + trait ServiceDiscovery { - protected implicit val timeout = Timeout(5 seconds) + def discover(serviceName: Name[Service], version: ServiceVersion): Service + } - override def sendRequest(request: HttpRequest): Future[HttpResponse] = { + class HttpRestService(method: HttpMethod, uri: Uri, version: ServiceVersion, + actorSystem: ActorSystem, executionContext: ExecutionContext, + crypto: Crypto, log: Logger, stats: Stats, time: TimeProvider) extends Service { - log.audit(s"Sending to ${request.uri} request $request") + protected implicit val materializer = ActorMaterializer()(actorSystem) + protected implicit val execution = executionContext + protected implicit val timeout = Timeout(5 seconds) + + def sendRequest[I,O](authToken: AuthToken)(requestInput: I) + (implicit marshaller: Marshaller[I, RequestEntity], + unmarshaller: Unmarshaller[ResponseEntity, O]): Future[O] = { val requestTime = time.currentTime() - val response = super.sendRequest(request) + val encryptionFlow = Flow[ByteString] map { bytes => + ByteString(crypto.encrypt(crypto.keyForToken(authToken))(bytes.toArray)) + } + val decryptionFlow = Flow[ByteString] map { bytes => + ByteString(crypto.decrypt(crypto.keyForToken(authToken))(bytes.toArray)) + } + + val response: Future[O] = for { + requestData: RequestEntity <- Marshal(requestInput).to[RequestEntity](marshaller, executionContext) + encryptedMessage = requestData.transformDataBytes(encryptionFlow) + request: HttpRequest = buildRequest(authToken, requestData) + _ = log.audit(s"Sending to ${request.uri} request $request") + response <- Http()(actorSystem).singleRequest(request)(materializer) + decryptedResponse = requestData.transformDataBytes(decryptionFlow) + responseEntity <- Unmarshal(decryptedResponse).to[O](unmarshaller, executionContext, materializer) + } yield { + responseEntity + } response.onComplete { - case Success(_) => + case Success(r) => val responseTime = time.currentTime() - log.audit(s"Response from ${request.uri} to request $request is successful") - stats.recordStats(Seq("request", request.uri.toString, "success"), TimeRange(requestTime, responseTime), 1) + log.audit(s"Response from $uri to request $requestInput is successful") + stats.recordStats(Seq("request", uri.toString, "success"), TimeRange(requestTime, responseTime), 1) - case Failure(t) => + case Failure(t: Throwable) => val responseTime = time.currentTime() - log.audit(s"Failed to receive response from ${request.uri} to request $request") - log.error(s"Failed to receive response from ${request.uri} to request $request", t) - stats.recordStats(Seq("request", request.uri.toString, "fail"), TimeRange(requestTime, responseTime), 1) + log.audit(s"Failed to receive response from $uri of version $version to request $requestInput") + log.error(s"Failed to receive response from $uri of version $version to request $requestInput", t) + stats.recordStats(Seq("request", uri.toString, "fail"), TimeRange(requestTime, responseTime), 1) } (executionContext) response } + + private def buildRequest(authToken: AuthToken, requestData: RequestEntity): HttpRequest = { + + HttpRequest( + method, uri, + headers = Vector( + RawHeader("WWW-Authenticate", s"Macaroon ${authToken.value.value}"), + RawHeader("Api-Version", version.majorVersion + "." + version.minorVersion) + ), + entity = requestData) + } } import scala.reflect.runtime.universe._ -- cgit v1.2.3