From e4a55e9fdcb3fb89963416b273654259a9699d03 Mon Sep 17 00:00:00 2001 From: vlad Date: Tue, 19 Sep 2017 22:19:59 -0700 Subject: Utils from pds-common --- src/main/scala/xyz/driver/core/rest.scala | 109 ++++++++++++++++++++++++------ 1 file changed, 89 insertions(+), 20 deletions(-) diff --git a/src/main/scala/xyz/driver/core/rest.scala b/src/main/scala/xyz/driver/core/rest.scala index 2527b75..dde570a 100644 --- a/src/main/scala/xyz/driver/core/rest.scala +++ b/src/main/scala/xyz/driver/core/rest.scala @@ -10,10 +10,11 @@ import akka.http.scaladsl.marshalling.{ToEntityMarshaller, ToResponseMarshallabl import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.{HttpChallenges, RawHeader, `User-Agent`} import akka.http.scaladsl.server.AuthenticationFailedRejection.CredentialsRejected +import akka.http.scaladsl.server.Route import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings} import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} -import akka.stream.scaladsl.Flow -import akka.stream.{ActorMaterializer, Materializer} +import akka.stream.scaladsl.{Flow, Keep, Sink, Source} +import akka.stream._ import akka.util.ByteString import com.github.swagger.akka.model._ import com.github.swagger.akka.{HasActorSystem, SwaggerHttpService} @@ -26,7 +27,8 @@ import xyz.driver.core.auth._ import xyz.driver.core.{Name, generators} import xyz.driver.core.time.provider.TimeProvider -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent.duration._ import scala.util.{Failure, Success} import scalaz.Scalaz.{futureInstance, intInstance, listInstance, mapEqual, mapMonoid, stringInstance} import scalaz.syntax.equal._ @@ -365,10 +367,10 @@ trait RestService extends Service { protected def delete(baseUri: Uri, path: String) = HttpRequest(HttpMethods.DELETE, endpointUri(baseUri, path)) - protected def endpointUri(baseUri: Uri, path: String) = + protected def endpointUri(baseUri: Uri, path: String): Uri = baseUri.withPath(Uri.Path(path)) - protected def endpointUri(baseUri: Uri, path: String, query: Seq[(String, String)]) = + protected def endpointUri(baseUri: Uri, path: String, query: Seq[(String, String)]): Uri = baseUri.withPath(Uri.Path(path)).withQuery(Uri.Query(query: _*)) } @@ -409,17 +411,9 @@ class HttpRestServiceTransport(applicationName: Name[App], time: TimeProvider) extends ServiceTransport { - protected implicit val materializer = ActorMaterializer()(actorSystem) - protected implicit val execution = executionContext + protected implicit val execution: ExecutionContext = executionContext - private val client = Http()(actorSystem) - - private val clientConnectionSettings: ClientConnectionSettings = - ClientConnectionSettings(actorSystem).withUserAgentHeader( - Option(`User-Agent`(applicationName.value + "/" + applicationVersion))) - - private val connectionPoolSettings: ConnectionPoolSettings = ConnectionPoolSettings(actorSystem) - .withConnectionSettings(clientConnectionSettings) + protected val httpClient: HttpClient = new SingleRequestHttpClient(applicationName, applicationVersion, actorSystem) def sendRequestGetResponse(context: ServiceRequestContext)(requestStub: HttpRequest): Future[HttpResponse] = { @@ -439,7 +433,7 @@ class HttpRestServiceTransport(applicationName: Name[App], log.info(s"Sending request to ${request.method} ${request.uri}") - val response = client.singleRequest(request, settings = connectionPoolSettings)(materializer) + val response = httpClient.makeRequest(request) response.onComplete { case Success(r) => @@ -469,6 +463,81 @@ class HttpRestServiceTransport(applicationName: Name[App], } } +trait HttpClient { + def makeRequest(request: HttpRequest): Future[HttpResponse] +} + +class SingleRequestHttpClient(applicationName: Name[App], applicationVersion: String, actorSystem: ActorSystem) + extends HttpClient { + + protected implicit val materializer: ActorMaterializer = ActorMaterializer()(actorSystem) + private val client = Http()(actorSystem) + + private val clientConnectionSettings: ClientConnectionSettings = + ClientConnectionSettings(actorSystem).withUserAgentHeader( + Option(`User-Agent`(applicationName.value + "/" + applicationVersion))) + + private val connectionPoolSettings: ConnectionPoolSettings = ConnectionPoolSettings(actorSystem) + .withConnectionSettings(clientConnectionSettings) + + def makeRequest(request: HttpRequest): Future[HttpResponse] = { + client.singleRequest(request, settings = connectionPoolSettings)(materializer) + } +} + +class PooledHttpClient( + baseUri: Uri, + applicationName: Name[App], + applicationVersion: String, + requestRateLimit: Int = 64, + requestQueueSize: Int = 1024)(implicit actorSystem: ActorSystem, executionContext: ExecutionContext) + extends HttpClient { + + private val host = baseUri.authority.host.toString() + private val port = baseUri.effectivePort + private val scheme = baseUri.scheme + + protected implicit val materializer: ActorMaterializer = ActorMaterializer()(actorSystem) + + private val clientConnectionSettings: ClientConnectionSettings = + ClientConnectionSettings(actorSystem).withUserAgentHeader( + Option(`User-Agent`(applicationName.value + "/" + applicationVersion))) + + private val connectionPoolSettings: ConnectionPoolSettings = ConnectionPoolSettings(actorSystem) + .withConnectionSettings(clientConnectionSettings) + + private val pool = if (scheme.equalsIgnoreCase("https")) { + Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host, port, settings = connectionPoolSettings) + } else { + Http().cachedHostConnectionPool[Promise[HttpResponse]](host, port, settings = connectionPoolSettings) + } + + private val queue = Source + .queue[(HttpRequest, Promise[HttpResponse])](requestQueueSize, OverflowStrategy.dropNew) + .via(pool) + .throttle(requestRateLimit, 1.second, maximumBurst = requestRateLimit, ThrottleMode.shaping) + .toMat(Sink.foreach({ + case ((Success(resp), p)) => p.success(resp) + case ((Failure(e), p)) => p.failure(e) + }))(Keep.left) + .run + + def makeRequest(request: HttpRequest): Future[HttpResponse] = { + val responsePromise = Promise[HttpResponse]() + + queue.offer(request -> responsePromise).flatMap { + case QueueOfferResult.Enqueued => + responsePromise.future + case QueueOfferResult.Dropped => + Future.failed(new Exception(s"Request queue to the host $host is overflown")) + case QueueOfferResult.Failure(ex) => + Future.failed(ex) + case QueueOfferResult.QueueClosed => + Future.failed(new Exception("Queue was closed (pool shut down) while running the request")) + } + } +} + import scala.reflect.runtime.universe._ class Swagger(override val host: String, @@ -479,10 +548,10 @@ class Swagger(override val host: String, val config: Config) extends SwaggerHttpService with HasActorSystem { - val materializer = ActorMaterializer()(actorSystem) + val materializer: ActorMaterializer = ActorMaterializer()(actorSystem) - override val basePath = config.getString("swagger.basePath") - override val apiDocsPath = config.getString("swagger.docsPath") + override val basePath: String = config.getString("swagger.basePath") + override val apiDocsPath: String = config.getString("swagger.docsPath") override val info = Info( config.getString("swagger.apiInfo.description"), @@ -503,7 +572,7 @@ class Swagger(override val host: String, vendorExtensions = Map.empty[String, AnyRef] ) - def swaggerUI = get { + def swaggerUI: Route = get { pathPrefix("") { pathEndOrSingleSlash { getFromResource("swagger-ui/index.html") -- cgit v1.2.3