aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVlad Uspensky <v.uspenskiy@icloud.com>2017-09-19 22:25:16 -0700
committerGitHub <noreply@github.com>2017-09-19 22:25:16 -0700
commit6ac9f69274df066c64a12bb9541f1f07d827a097 (patch)
treed3e68b2d03a8aad344df17c788d8c2ba5aaf7430
parentb822938ae7057af99cd03dba7e8b81233962fd54 (diff)
parente4a55e9fdcb3fb89963416b273654259a9699d03 (diff)
downloaddriver-core-6ac9f69274df066c64a12bb9541f1f07d827a097.tar.gz
driver-core-6ac9f69274df066c64a12bb9541f1f07d827a097.tar.bz2
driver-core-6ac9f69274df066c64a12bb9541f1f07d827a097.zip
Merge pull request #63 from drivergroup/pds-common-utilsv0.16.9
Pds common utils
-rw-r--r--src/main/scala/xyz/driver/core/cache.scala109
-rw-r--r--src/main/scala/xyz/driver/core/future.scala87
-rw-r--r--src/main/scala/xyz/driver/core/rest.scala109
3 files changed, 285 insertions, 20 deletions
diff --git a/src/main/scala/xyz/driver/core/cache.scala b/src/main/scala/xyz/driver/core/cache.scala
new file mode 100644
index 0000000..79ba2d7
--- /dev/null
+++ b/src/main/scala/xyz/driver/core/cache.scala
@@ -0,0 +1,109 @@
+package xyz.driver.core
+
+import java.util.concurrent.{Callable, TimeUnit}
+
+import com.google.common.cache.{CacheBuilder, Cache => GuavaCache}
+import com.typesafe.scalalogging.Logger
+
+import scala.concurrent.duration.{Duration, _}
+import scala.concurrent.{ExecutionContext, Future}
+
+object cache {
+
+ /**
+ * FutureCache is used to represent an in-memory, in-process, asynchronous cache.
+ *
+ * Every cache operation is atomic.
+ *
+ * This implementation evicts failed results,
+ * and doesn't interrupt the underlying request that has been fired off.
+ */
+ class AsyncCache[K, V](name: String, cache: GuavaCache[K, Future[V]])(implicit executionContext: ExecutionContext) {
+
+ private[this] val log = Logger(s"AsyncCache.$name")
+ private[this] val underlying = cache.asMap()
+
+ private[this] def evictOnFailure(key: K, f: Future[V]): Future[V] = {
+ f onFailure {
+ case ex: Throwable =>
+ log.debug(s"Evict key $key due to exception $ex")
+ evict(key, f)
+ }
+ f // we return the original future to make evict(k, f) easier to work with.
+ }
+
+ /**
+ * Equivalent to getOrElseUpdate
+ */
+ def apply(key: K)(value: => Future[V]): Future[V] = getOrElseUpdate(key)(value)
+
+ /**
+ * Gets the cached Future.
+ *
+ * @return None if a value hasn't been specified for that key yet
+ * Some(ksync computation) if the value has been specified. Just
+ * because this returns Some(..) doesn't mean that it has been
+ * satisfied, but if it hasn't been satisfied, it's probably
+ * in-flight.
+ */
+ def get(key: K): Option[Future[V]] = Option(underlying.get(key))
+
+ /**
+ * Gets the cached Future, or if it hasn't been returned yet, computes it and
+ * returns that value.
+ */
+ def getOrElseUpdate(key: K)(compute: => Future[V]): Future[V] = {
+ log.debug(s"Try to retrieve key $key from cache")
+ evictOnFailure(key, cache.get(key, new Callable[Future[V]] {
+ def call(): Future[V] = {
+ log.debug(s"Cache miss, load the key: $key")
+ compute
+ }
+ }))
+ }
+
+ /**
+ * Unconditionally sets a value for a given key
+ */
+ def set(key: K, value: Future[V]): Unit = {
+ cache.put(key, value)
+ evictOnFailure(key, value)
+ }
+
+ /**
+ * Evicts the contents of a `key` if the old value is `value`.
+ *
+ * Since `scala.concurrent.Future` uses reference equality, you must use the
+ * same object reference to evict a value.
+ *
+ * @return true if the key was evicted
+ * false if the key was not evicted
+ */
+ def evict(key: K, value: Future[V]): Boolean = underlying.remove(key, value)
+
+ /**
+ * @return the number of results that have been computed successfully or are in flight.
+ */
+ def size: Int = cache.size.toInt
+ }
+
+ object AsyncCache {
+ val DEFAULT_CAPACITY: Long = 10000L
+ val DEFAULT_READ_EXPIRATION: Duration = 10 minutes
+ val DEFAULT_WRITE_EXPIRATION: Duration = 1 hour
+
+ def apply[K <: AnyRef, V <: AnyRef](name: String,
+ capacity: Long = DEFAULT_CAPACITY,
+ readExpiration: Duration = DEFAULT_READ_EXPIRATION,
+ writeExpiration: Duration = DEFAULT_WRITE_EXPIRATION)(
+ implicit executionContext: ExecutionContext): AsyncCache[K, V] = {
+ val guavaCache = CacheBuilder
+ .newBuilder()
+ .maximumSize(capacity)
+ .expireAfterAccess(readExpiration.toSeconds, TimeUnit.SECONDS)
+ .expireAfterWrite(writeExpiration.toSeconds, TimeUnit.SECONDS)
+ .build[K, Future[V]]()
+ new AsyncCache(name, guavaCache)
+ }
+ }
+}
diff --git a/src/main/scala/xyz/driver/core/future.scala b/src/main/scala/xyz/driver/core/future.scala
new file mode 100644
index 0000000..1ee3576
--- /dev/null
+++ b/src/main/scala/xyz/driver/core/future.scala
@@ -0,0 +1,87 @@
+package xyz.driver.core
+
+import com.typesafe.scalalogging.Logger
+
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success, Try}
+
+object future {
+ val log = Logger("Driver.Future")
+
+ implicit class RichFuture[T](f: Future[T]) {
+ def mapAll[U](pf: PartialFunction[Try[T], U])(implicit executionContext: ExecutionContext): Future[U] = {
+ val p = Promise[U]()
+ f.onComplete(r => p.complete(Try(pf(r))))
+ p.future
+ }
+
+ def failFastZip[U](that: Future[U])(implicit executionContext: ExecutionContext): Future[(T, U)] = {
+ future.failFastZip(f, that)
+ }
+ }
+
+ def failFastSequence[T](t: Iterable[Future[T]])(implicit ec: ExecutionContext): Future[Seq[T]] = {
+ t.foldLeft(Future.successful(Nil: List[T])) { (f, i) =>
+ failFastZip(f, i).map { case (tail, h) => h :: tail }
+ }
+ .map(_.reverse)
+ }
+
+ /**
+ * Standard scala zip waits forever on the left side, even if the right side fails
+ */
+ def failFastZip[T, U](ft: Future[T], fu: Future[U])(implicit ec: ExecutionContext): Future[(T, U)] = {
+ type State = Either[(T, Promise[U]), (U, Promise[T])]
+ val middleState = Promise[State]()
+
+ ft.onComplete {
+ case f @ Failure(err) =>
+ if (!middleState.tryFailure(err)) {
+ // the right has already succeeded
+ middleState.future.foreach {
+ case Right((_, pt)) => pt.complete(f)
+ case Left((t1, _)) => // This should never happen
+ log.error(s"Logic error: tried to set Failure($err) but Left($t1) already set")
+ }
+ }
+ case Success(t) =>
+ // Create the next promise:
+ val pu = Promise[U]()
+ if (!middleState.trySuccess(Left((t, pu)))) {
+ // we can't set, so the other promise beat us here.
+ middleState.future.foreach {
+ case Right((_, pt)) => pt.success(t)
+ case Left((t1, _)) => // This should never happen
+ log.error(s"Logic error: tried to set Left($t) but Left($t1) already set")
+ }
+ }
+ }
+ fu.onComplete {
+ case f @ Failure(err) =>
+ if (!middleState.tryFailure(err)) {
+ // we can't set, so the other promise beat us here.
+ middleState.future.foreach {
+ case Left((_, pu)) => pu.complete(f)
+ case Right((u1, _)) => // This should never happen
+ log.error(s"Logic error: tried to set Failure($err) but Right($u1) already set")
+ }
+ }
+ case Success(u) =>
+ // Create the next promise:
+ val pt = Promise[T]()
+ if (!middleState.trySuccess(Right((u, pt)))) {
+ // we can't set, so the other promise beat us here.
+ middleState.future.foreach {
+ case Left((_, pu)) => pu.success(u)
+ case Right((u1, _)) => // This should never happen
+ log.error(s"Logic error: tried to set Right($u) but Right($u1) already set")
+ }
+ }
+ }
+
+ middleState.future.flatMap {
+ case Left((t, pu)) => pu.future.map((t, _))
+ case Right((u, pt)) => pt.future.map((_, u))
+ }
+ }
+}
diff --git a/src/main/scala/xyz/driver/core/rest.scala b/src/main/scala/xyz/driver/core/rest.scala
index 2527b75..dde570a 100644
--- a/src/main/scala/xyz/driver/core/rest.scala
+++ b/src/main/scala/xyz/driver/core/rest.scala
@@ -10,10 +10,11 @@ import akka.http.scaladsl.marshalling.{ToEntityMarshaller, ToResponseMarshallabl
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{HttpChallenges, RawHeader, `User-Agent`}
import akka.http.scaladsl.server.AuthenticationFailedRejection.CredentialsRejected
+import akka.http.scaladsl.server.Route
import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings}
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
-import akka.stream.scaladsl.Flow
-import akka.stream.{ActorMaterializer, Materializer}
+import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
+import akka.stream._
import akka.util.ByteString
import com.github.swagger.akka.model._
import com.github.swagger.akka.{HasActorSystem, SwaggerHttpService}
@@ -26,7 +27,8 @@ import xyz.driver.core.auth._
import xyz.driver.core.{Name, generators}
import xyz.driver.core.time.provider.TimeProvider
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.concurrent.duration._
import scala.util.{Failure, Success}
import scalaz.Scalaz.{futureInstance, intInstance, listInstance, mapEqual, mapMonoid, stringInstance}
import scalaz.syntax.equal._
@@ -365,10 +367,10 @@ trait RestService extends Service {
protected def delete(baseUri: Uri, path: String) =
HttpRequest(HttpMethods.DELETE, endpointUri(baseUri, path))
- protected def endpointUri(baseUri: Uri, path: String) =
+ protected def endpointUri(baseUri: Uri, path: String): Uri =
baseUri.withPath(Uri.Path(path))
- protected def endpointUri(baseUri: Uri, path: String, query: Seq[(String, String)]) =
+ protected def endpointUri(baseUri: Uri, path: String, query: Seq[(String, String)]): Uri =
baseUri.withPath(Uri.Path(path)).withQuery(Uri.Query(query: _*))
}
@@ -409,17 +411,9 @@ class HttpRestServiceTransport(applicationName: Name[App],
time: TimeProvider)
extends ServiceTransport {
- protected implicit val materializer = ActorMaterializer()(actorSystem)
- protected implicit val execution = executionContext
+ protected implicit val execution: ExecutionContext = executionContext
- private val client = Http()(actorSystem)
-
- private val clientConnectionSettings: ClientConnectionSettings =
- ClientConnectionSettings(actorSystem).withUserAgentHeader(
- Option(`User-Agent`(applicationName.value + "/" + applicationVersion)))
-
- private val connectionPoolSettings: ConnectionPoolSettings = ConnectionPoolSettings(actorSystem)
- .withConnectionSettings(clientConnectionSettings)
+ protected val httpClient: HttpClient = new SingleRequestHttpClient(applicationName, applicationVersion, actorSystem)
def sendRequestGetResponse(context: ServiceRequestContext)(requestStub: HttpRequest): Future[HttpResponse] = {
@@ -439,7 +433,7 @@ class HttpRestServiceTransport(applicationName: Name[App],
log.info(s"Sending request to ${request.method} ${request.uri}")
- val response = client.singleRequest(request, settings = connectionPoolSettings)(materializer)
+ val response = httpClient.makeRequest(request)
response.onComplete {
case Success(r) =>
@@ -469,6 +463,81 @@ class HttpRestServiceTransport(applicationName: Name[App],
}
}
+trait HttpClient {
+ def makeRequest(request: HttpRequest): Future[HttpResponse]
+}
+
+class SingleRequestHttpClient(applicationName: Name[App], applicationVersion: String, actorSystem: ActorSystem)
+ extends HttpClient {
+
+ protected implicit val materializer: ActorMaterializer = ActorMaterializer()(actorSystem)
+ private val client = Http()(actorSystem)
+
+ private val clientConnectionSettings: ClientConnectionSettings =
+ ClientConnectionSettings(actorSystem).withUserAgentHeader(
+ Option(`User-Agent`(applicationName.value + "/" + applicationVersion)))
+
+ private val connectionPoolSettings: ConnectionPoolSettings = ConnectionPoolSettings(actorSystem)
+ .withConnectionSettings(clientConnectionSettings)
+
+ def makeRequest(request: HttpRequest): Future[HttpResponse] = {
+ client.singleRequest(request, settings = connectionPoolSettings)(materializer)
+ }
+}
+
+class PooledHttpClient(
+ baseUri: Uri,
+ applicationName: Name[App],
+ applicationVersion: String,
+ requestRateLimit: Int = 64,
+ requestQueueSize: Int = 1024)(implicit actorSystem: ActorSystem, executionContext: ExecutionContext)
+ extends HttpClient {
+
+ private val host = baseUri.authority.host.toString()
+ private val port = baseUri.effectivePort
+ private val scheme = baseUri.scheme
+
+ protected implicit val materializer: ActorMaterializer = ActorMaterializer()(actorSystem)
+
+ private val clientConnectionSettings: ClientConnectionSettings =
+ ClientConnectionSettings(actorSystem).withUserAgentHeader(
+ Option(`User-Agent`(applicationName.value + "/" + applicationVersion)))
+
+ private val connectionPoolSettings: ConnectionPoolSettings = ConnectionPoolSettings(actorSystem)
+ .withConnectionSettings(clientConnectionSettings)
+
+ private val pool = if (scheme.equalsIgnoreCase("https")) {
+ Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host, port, settings = connectionPoolSettings)
+ } else {
+ Http().cachedHostConnectionPool[Promise[HttpResponse]](host, port, settings = connectionPoolSettings)
+ }
+
+ private val queue = Source
+ .queue[(HttpRequest, Promise[HttpResponse])](requestQueueSize, OverflowStrategy.dropNew)
+ .via(pool)
+ .throttle(requestRateLimit, 1.second, maximumBurst = requestRateLimit, ThrottleMode.shaping)
+ .toMat(Sink.foreach({
+ case ((Success(resp), p)) => p.success(resp)
+ case ((Failure(e), p)) => p.failure(e)
+ }))(Keep.left)
+ .run
+
+ def makeRequest(request: HttpRequest): Future[HttpResponse] = {
+ val responsePromise = Promise[HttpResponse]()
+
+ queue.offer(request -> responsePromise).flatMap {
+ case QueueOfferResult.Enqueued =>
+ responsePromise.future
+ case QueueOfferResult.Dropped =>
+ Future.failed(new Exception(s"Request queue to the host $host is overflown"))
+ case QueueOfferResult.Failure(ex) =>
+ Future.failed(ex)
+ case QueueOfferResult.QueueClosed =>
+ Future.failed(new Exception("Queue was closed (pool shut down) while running the request"))
+ }
+ }
+}
+
import scala.reflect.runtime.universe._
class Swagger(override val host: String,
@@ -479,10 +548,10 @@ class Swagger(override val host: String,
val config: Config)
extends SwaggerHttpService with HasActorSystem {
- val materializer = ActorMaterializer()(actorSystem)
+ val materializer: ActorMaterializer = ActorMaterializer()(actorSystem)
- override val basePath = config.getString("swagger.basePath")
- override val apiDocsPath = config.getString("swagger.docsPath")
+ override val basePath: String = config.getString("swagger.basePath")
+ override val apiDocsPath: String = config.getString("swagger.docsPath")
override val info = Info(
config.getString("swagger.apiInfo.description"),
@@ -503,7 +572,7 @@ class Swagger(override val host: String,
vendorExtensions = Map.empty[String, AnyRef]
)
- def swaggerUI = get {
+ def swaggerUI: Route = get {
pathPrefix("") {
pathEndOrSingleSlash {
getFromResource("swagger-ui/index.html")