aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorvlad <vlad@driver.xyz>2017-09-19 22:19:59 -0700
committervlad <vlad@driver.xyz>2017-09-19 22:19:59 -0700
commite4a55e9fdcb3fb89963416b273654259a9699d03 (patch)
treed3e68b2d03a8aad344df17c788d8c2ba5aaf7430
parent3fc91f50ae799106c99d3aa730d62d12be80b247 (diff)
downloaddriver-core-e4a55e9fdcb3fb89963416b273654259a9699d03.tar.gz
driver-core-e4a55e9fdcb3fb89963416b273654259a9699d03.tar.bz2
driver-core-e4a55e9fdcb3fb89963416b273654259a9699d03.zip
Utils from pds-common
-rw-r--r--src/main/scala/xyz/driver/core/rest.scala109
1 files changed, 89 insertions, 20 deletions
diff --git a/src/main/scala/xyz/driver/core/rest.scala b/src/main/scala/xyz/driver/core/rest.scala
index 2527b75..dde570a 100644
--- a/src/main/scala/xyz/driver/core/rest.scala
+++ b/src/main/scala/xyz/driver/core/rest.scala
@@ -10,10 +10,11 @@ import akka.http.scaladsl.marshalling.{ToEntityMarshaller, ToResponseMarshallabl
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{HttpChallenges, RawHeader, `User-Agent`}
import akka.http.scaladsl.server.AuthenticationFailedRejection.CredentialsRejected
+import akka.http.scaladsl.server.Route
import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings}
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
-import akka.stream.scaladsl.Flow
-import akka.stream.{ActorMaterializer, Materializer}
+import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
+import akka.stream._
import akka.util.ByteString
import com.github.swagger.akka.model._
import com.github.swagger.akka.{HasActorSystem, SwaggerHttpService}
@@ -26,7 +27,8 @@ import xyz.driver.core.auth._
import xyz.driver.core.{Name, generators}
import xyz.driver.core.time.provider.TimeProvider
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.concurrent.duration._
import scala.util.{Failure, Success}
import scalaz.Scalaz.{futureInstance, intInstance, listInstance, mapEqual, mapMonoid, stringInstance}
import scalaz.syntax.equal._
@@ -365,10 +367,10 @@ trait RestService extends Service {
protected def delete(baseUri: Uri, path: String) =
HttpRequest(HttpMethods.DELETE, endpointUri(baseUri, path))
- protected def endpointUri(baseUri: Uri, path: String) =
+ protected def endpointUri(baseUri: Uri, path: String): Uri =
baseUri.withPath(Uri.Path(path))
- protected def endpointUri(baseUri: Uri, path: String, query: Seq[(String, String)]) =
+ protected def endpointUri(baseUri: Uri, path: String, query: Seq[(String, String)]): Uri =
baseUri.withPath(Uri.Path(path)).withQuery(Uri.Query(query: _*))
}
@@ -409,17 +411,9 @@ class HttpRestServiceTransport(applicationName: Name[App],
time: TimeProvider)
extends ServiceTransport {
- protected implicit val materializer = ActorMaterializer()(actorSystem)
- protected implicit val execution = executionContext
+ protected implicit val execution: ExecutionContext = executionContext
- private val client = Http()(actorSystem)
-
- private val clientConnectionSettings: ClientConnectionSettings =
- ClientConnectionSettings(actorSystem).withUserAgentHeader(
- Option(`User-Agent`(applicationName.value + "/" + applicationVersion)))
-
- private val connectionPoolSettings: ConnectionPoolSettings = ConnectionPoolSettings(actorSystem)
- .withConnectionSettings(clientConnectionSettings)
+ protected val httpClient: HttpClient = new SingleRequestHttpClient(applicationName, applicationVersion, actorSystem)
def sendRequestGetResponse(context: ServiceRequestContext)(requestStub: HttpRequest): Future[HttpResponse] = {
@@ -439,7 +433,7 @@ class HttpRestServiceTransport(applicationName: Name[App],
log.info(s"Sending request to ${request.method} ${request.uri}")
- val response = client.singleRequest(request, settings = connectionPoolSettings)(materializer)
+ val response = httpClient.makeRequest(request)
response.onComplete {
case Success(r) =>
@@ -469,6 +463,81 @@ class HttpRestServiceTransport(applicationName: Name[App],
}
}
+trait HttpClient {
+ def makeRequest(request: HttpRequest): Future[HttpResponse]
+}
+
+class SingleRequestHttpClient(applicationName: Name[App], applicationVersion: String, actorSystem: ActorSystem)
+ extends HttpClient {
+
+ protected implicit val materializer: ActorMaterializer = ActorMaterializer()(actorSystem)
+ private val client = Http()(actorSystem)
+
+ private val clientConnectionSettings: ClientConnectionSettings =
+ ClientConnectionSettings(actorSystem).withUserAgentHeader(
+ Option(`User-Agent`(applicationName.value + "/" + applicationVersion)))
+
+ private val connectionPoolSettings: ConnectionPoolSettings = ConnectionPoolSettings(actorSystem)
+ .withConnectionSettings(clientConnectionSettings)
+
+ def makeRequest(request: HttpRequest): Future[HttpResponse] = {
+ client.singleRequest(request, settings = connectionPoolSettings)(materializer)
+ }
+}
+
+class PooledHttpClient(
+ baseUri: Uri,
+ applicationName: Name[App],
+ applicationVersion: String,
+ requestRateLimit: Int = 64,
+ requestQueueSize: Int = 1024)(implicit actorSystem: ActorSystem, executionContext: ExecutionContext)
+ extends HttpClient {
+
+ private val host = baseUri.authority.host.toString()
+ private val port = baseUri.effectivePort
+ private val scheme = baseUri.scheme
+
+ protected implicit val materializer: ActorMaterializer = ActorMaterializer()(actorSystem)
+
+ private val clientConnectionSettings: ClientConnectionSettings =
+ ClientConnectionSettings(actorSystem).withUserAgentHeader(
+ Option(`User-Agent`(applicationName.value + "/" + applicationVersion)))
+
+ private val connectionPoolSettings: ConnectionPoolSettings = ConnectionPoolSettings(actorSystem)
+ .withConnectionSettings(clientConnectionSettings)
+
+ private val pool = if (scheme.equalsIgnoreCase("https")) {
+ Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host, port, settings = connectionPoolSettings)
+ } else {
+ Http().cachedHostConnectionPool[Promise[HttpResponse]](host, port, settings = connectionPoolSettings)
+ }
+
+ private val queue = Source
+ .queue[(HttpRequest, Promise[HttpResponse])](requestQueueSize, OverflowStrategy.dropNew)
+ .via(pool)
+ .throttle(requestRateLimit, 1.second, maximumBurst = requestRateLimit, ThrottleMode.shaping)
+ .toMat(Sink.foreach({
+ case ((Success(resp), p)) => p.success(resp)
+ case ((Failure(e), p)) => p.failure(e)
+ }))(Keep.left)
+ .run
+
+ def makeRequest(request: HttpRequest): Future[HttpResponse] = {
+ val responsePromise = Promise[HttpResponse]()
+
+ queue.offer(request -> responsePromise).flatMap {
+ case QueueOfferResult.Enqueued =>
+ responsePromise.future
+ case QueueOfferResult.Dropped =>
+ Future.failed(new Exception(s"Request queue to the host $host is overflown"))
+ case QueueOfferResult.Failure(ex) =>
+ Future.failed(ex)
+ case QueueOfferResult.QueueClosed =>
+ Future.failed(new Exception("Queue was closed (pool shut down) while running the request"))
+ }
+ }
+}
+
import scala.reflect.runtime.universe._
class Swagger(override val host: String,
@@ -479,10 +548,10 @@ class Swagger(override val host: String,
val config: Config)
extends SwaggerHttpService with HasActorSystem {
- val materializer = ActorMaterializer()(actorSystem)
+ val materializer: ActorMaterializer = ActorMaterializer()(actorSystem)
- override val basePath = config.getString("swagger.basePath")
- override val apiDocsPath = config.getString("swagger.docsPath")
+ override val basePath: String = config.getString("swagger.basePath")
+ override val apiDocsPath: String = config.getString("swagger.docsPath")
override val info = Info(
config.getString("swagger.apiInfo.description"),
@@ -503,7 +572,7 @@ class Swagger(override val host: String,
vendorExtensions = Map.empty[String, AnyRef]
)
- def swaggerUI = get {
+ def swaggerUI: Route = get {
pathPrefix("") {
pathEndOrSingleSlash {
getFromResource("swagger-ui/index.html")