package xyz.driver.core.rest
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.`User-Agent`
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri}
import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult, ThrottleMode}
import xyz.driver.core.Name
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent.duration._
import scala.util.{Failure, Success}
class PooledHttpClient(
baseUri: Uri,
applicationName: Name[App],
applicationVersion: String,
requestRateLimit: Int = 64,
requestQueueSize: Int = 1024)(implicit actorSystem: ActorSystem, executionContext: ExecutionContext)
extends HttpClient {
private val host = baseUri.authority.host.toString()
private val port = baseUri.effectivePort
private val scheme = baseUri.scheme
protected implicit val materializer: ActorMaterializer = ActorMaterializer()(actorSystem)
private val clientConnectionSettings: ClientConnectionSettings =
ClientConnectionSettings(actorSystem).withUserAgentHeader(
Option(`User-Agent`(applicationName.value + "/" + applicationVersion)))
private val connectionPoolSettings: ConnectionPoolSettings = ConnectionPoolSettings(actorSystem)
.withConnectionSettings(clientConnectionSettings)
private val pool = if (scheme.equalsIgnoreCase("https")) {
Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host, port, settings = connectionPoolSettings)
} else {
Http().cachedHostConnectionPool[Promise[HttpResponse]](host, port, settings = connectionPoolSettings)
}
private val queue = Source
.queue[(HttpRequest, Promise[HttpResponse])](requestQueueSize, OverflowStrategy.dropNew)
.via(pool)
.throttle(requestRateLimit, 1.second, maximumBurst = requestRateLimit, ThrottleMode.shaping)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
.run
def makeRequest(request: HttpRequest): Future[HttpResponse] = {
val responsePromise = Promise[HttpResponse]()
queue.offer(request -> responsePromise).flatMap {
case QueueOfferResult.Enqueued =>
responsePromise.future
case QueueOfferResult.Dropped =>
Future.failed(new Exception(s"Request queue to the host $host is overflown"))
case QueueOfferResult.Failure(ex) =>
Future.failed(ex)
case QueueOfferResult.QueueClosed =>
Future.failed(new Exception("Queue was closed (pool shut down) while running the request"))
}
}
}