From 3008753cf543caaecb7d0e325c9f4473ad8a0322 Mon Sep 17 00:00:00 2001 From: vlad Date: Tue, 2 Aug 2016 10:54:00 -0700 Subject: Domain model responsible for auth is in core + Akka-http auth directives + More specific REST service API --- src/main/scala/com/drivergrp/core/rest.scala | 79 +++++++++++----------------- 1 file changed, 31 insertions(+), 48 deletions(-) (limited to 'src/main/scala/com/drivergrp/core/rest.scala') diff --git a/src/main/scala/com/drivergrp/core/rest.scala b/src/main/scala/com/drivergrp/core/rest.scala index 4edb466..ebb2640 100644 --- a/src/main/scala/com/drivergrp/core/rest.scala +++ b/src/main/scala/com/drivergrp/core/rest.scala @@ -2,14 +2,14 @@ package com.drivergrp.core import akka.actor.ActorSystem import akka.http.scaladsl.Http -import akka.http.scaladsl.marshalling.{Marshal, Marshaller} import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.RawHeader -import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} +import akka.http.scaladsl.unmarshalling.Unmarshal import akka.stream.ActorMaterializer import akka.stream.scaladsl.Flow -import akka.util.{ByteString, Timeout} -import com.drivergrp.core.crypto.{AuthToken, Crypto} +import akka.util.ByteString +import com.drivergrp.core.auth.AuthToken +import com.drivergrp.core.crypto.Crypto import com.drivergrp.core.logging.Logger import com.drivergrp.core.stats.Stats import com.drivergrp.core.time.TimeRange @@ -18,12 +18,10 @@ import com.github.swagger.akka.model._ import com.github.swagger.akka.{HasActorSystem, SwaggerHttpService} import com.typesafe.config.Config -import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} -import scala.language.postfixOps import scala.util.{Failure, Success} -import scalaz.{Failure => _, Success => _} import scalaz.Scalaz._ +import scalaz.{Failure => _, Success => _} object rest { @@ -32,29 +30,25 @@ object rest { this.majorVersion === otherVersion.majorVersion } - trait Service { + type Service = AnyRef - def sendRequest[I,O](authToken: AuthToken)(requestInput: I) - (implicit marshaller: Marshaller[I, RequestEntity], - unmarshaller: Unmarshaller[ResponseEntity, O]): Future[O] + trait ServiceTransport { + + def sendRequest(authToken: AuthToken)(requestStub: HttpRequest): Future[Unmarshal[ResponseEntity]] } - trait ServiceDiscovery { + trait ServiceDiscovery[T <: Service] { - def discover(serviceName: Name[Service], version: ServiceVersion): Service + def discover(serviceName: Name[Service], version: ServiceVersion): T } - class HttpRestService(method: HttpMethod, uri: Uri, version: ServiceVersion, - actorSystem: ActorSystem, executionContext: ExecutionContext, - crypto: Crypto, log: Logger, stats: Stats, time: TimeProvider) extends Service { + class HttpRestServiceTransport(actorSystem: ActorSystem, executionContext: ExecutionContext, + crypto: Crypto, log: Logger, stats: Stats, time: TimeProvider) extends ServiceTransport { protected implicit val materializer = ActorMaterializer()(actorSystem) protected implicit val execution = executionContext - protected implicit val timeout = Timeout(5 seconds) - def sendRequest[I,O](authToken: AuthToken)(requestInput: I) - (implicit marshaller: Marshaller[I, RequestEntity], - unmarshaller: Unmarshaller[ResponseEntity, O]): Future[O] = { + def sendRequest(authToken: AuthToken)(requestStub: HttpRequest): Future[Unmarshal[ResponseEntity]] = { val requestTime = time.currentTime() val encryptionFlow = Flow[ByteString] map { bytes => @@ -64,43 +58,32 @@ object rest { ByteString(crypto.decrypt(crypto.keyForToken(authToken))(bytes.toArray)) } - val response: Future[O] = for { - requestData: RequestEntity <- Marshal(requestInput).to[RequestEntity](marshaller, executionContext) - encryptedMessage = requestData.transformDataBytes(encryptionFlow) - request: HttpRequest = buildRequest(authToken, requestData) - _ = log.audit(s"Sending to ${request.uri} request $request") - response <- Http()(actorSystem).singleRequest(request)(materializer) - decryptedResponse = requestData.transformDataBytes(decryptionFlow) - responseEntity <- Unmarshal(decryptedResponse).to[O](unmarshaller, executionContext, materializer) - } yield { - responseEntity + val request = requestStub + .withEntity(requestStub.entity.transformDataBytes(encryptionFlow)) + .withHeaders( + RawHeader(auth.directives.AuthenticationTokenHeader, s"Macaroon ${authToken.value.value}")) + + log.audit(s"Sending to ${request.uri} request $request") + + val responseEntity = Http()(actorSystem).singleRequest(request)(materializer) map { response => + if(response.status.isFailure()) throw new Exception("Http status is failure " + response.status) + else Unmarshal(response.entity.transformDataBytes(decryptionFlow)) } - response.onComplete { + responseEntity.onComplete { case Success(r) => val responseTime = time.currentTime() - log.audit(s"Response from $uri to request $requestInput is successful") - stats.recordStats(Seq("request", uri.toString, "success"), TimeRange(requestTime, responseTime), 1) + log.audit(s"Response from ${request.uri} to request $requestStub is successful") + stats.recordStats(Seq("request", request.uri.toString, "success"), TimeRange(requestTime, responseTime), 1) case Failure(t: Throwable) => val responseTime = time.currentTime() - log.audit(s"Failed to receive response from $uri of version $version to request $requestInput") - log.error(s"Failed to receive response from $uri of version $version to request $requestInput", t) - stats.recordStats(Seq("request", uri.toString, "fail"), TimeRange(requestTime, responseTime), 1) + log.audit(s"Failed to receive response from ${request.uri} to request $requestStub") + log.error(s"Failed to receive response from ${request.uri} to request $requestStub", t) + stats.recordStats(Seq("request", request.uri.toString, "fail"), TimeRange(requestTime, responseTime), 1) } (executionContext) - response - } - - private def buildRequest(authToken: AuthToken, requestData: RequestEntity): HttpRequest = { - - HttpRequest( - method, uri, - headers = Vector( - RawHeader("WWW-Authenticate", s"Macaroon ${authToken.value.value}"), - RawHeader("Api-Version", version.majorVersion + "." + version.minorVersion) - ), - entity = requestData) + responseEntity } } -- cgit v1.2.3