aboutsummaryrefslogtreecommitdiff
path: root/core-rest/src/main/scala/xyz/driver/core/rest/PooledHttpClient.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core-rest/src/main/scala/xyz/driver/core/rest/PooledHttpClient.scala')
-rw-r--r--core-rest/src/main/scala/xyz/driver/core/rest/PooledHttpClient.scala67
1 files changed, 67 insertions, 0 deletions
diff --git a/core-rest/src/main/scala/xyz/driver/core/rest/PooledHttpClient.scala b/core-rest/src/main/scala/xyz/driver/core/rest/PooledHttpClient.scala
new file mode 100644
index 0000000..2854257
--- /dev/null
+++ b/core-rest/src/main/scala/xyz/driver/core/rest/PooledHttpClient.scala
@@ -0,0 +1,67 @@
+package xyz.driver.core.rest
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.model.headers.`User-Agent`
+import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri}
+import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings}
+import akka.stream.scaladsl.{Keep, Sink, Source}
+import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult, ThrottleMode}
+import xyz.driver.core.Name
+
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.concurrent.duration._
+import scala.util.{Failure, Success}
+
+class PooledHttpClient(
+ baseUri: Uri,
+ applicationName: Name[App],
+ applicationVersion: String,
+ requestRateLimit: Int = 64,
+ requestQueueSize: Int = 1024)(implicit actorSystem: ActorSystem, executionContext: ExecutionContext)
+ extends HttpClient {
+
+ private val host = baseUri.authority.host.toString()
+ private val port = baseUri.effectivePort
+ private val scheme = baseUri.scheme
+
+ protected implicit val materializer: ActorMaterializer = ActorMaterializer()(actorSystem)
+
+ private val clientConnectionSettings: ClientConnectionSettings =
+ ClientConnectionSettings(actorSystem).withUserAgentHeader(
+ Option(`User-Agent`(applicationName.value + "/" + applicationVersion)))
+
+ private val connectionPoolSettings: ConnectionPoolSettings = ConnectionPoolSettings(actorSystem)
+ .withConnectionSettings(clientConnectionSettings)
+
+ private val pool = if (scheme.equalsIgnoreCase("https")) {
+ Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host, port, settings = connectionPoolSettings)
+ } else {
+ Http().cachedHostConnectionPool[Promise[HttpResponse]](host, port, settings = connectionPoolSettings)
+ }
+
+ private val queue = Source
+ .queue[(HttpRequest, Promise[HttpResponse])](requestQueueSize, OverflowStrategy.dropNew)
+ .via(pool)
+ .throttle(requestRateLimit, 1.second, maximumBurst = requestRateLimit, ThrottleMode.shaping)
+ .toMat(Sink.foreach({
+ case ((Success(resp), p)) => p.success(resp)
+ case ((Failure(e), p)) => p.failure(e)
+ }))(Keep.left)
+ .run
+
+ def makeRequest(request: HttpRequest): Future[HttpResponse] = {
+ val responsePromise = Promise[HttpResponse]()
+
+ queue.offer(request -> responsePromise).flatMap {
+ case QueueOfferResult.Enqueued =>
+ responsePromise.future
+ case QueueOfferResult.Dropped =>
+ Future.failed(new Exception(s"Request queue to the host $host is overflown"))
+ case QueueOfferResult.Failure(ex) =>
+ Future.failed(ex)
+ case QueueOfferResult.QueueClosed =>
+ Future.failed(new Exception("Queue was closed (pool shut down) while running the request"))
+ }
+ }
+}