From 07c3303f9c6b8d46763d270f47b730d5519eb204 Mon Sep 17 00:00:00 2001 From: vlad Date: Tue, 19 Sep 2017 20:47:28 -0700 Subject: Utils from pds-common --- src/main/scala/xyz/driver/core/cache.scala | 109 ++++++++++++++++++++++++++++ src/main/scala/xyz/driver/core/future.scala | 87 ++++++++++++++++++++++ 2 files changed, 196 insertions(+) create mode 100644 src/main/scala/xyz/driver/core/cache.scala create mode 100644 src/main/scala/xyz/driver/core/future.scala diff --git a/src/main/scala/xyz/driver/core/cache.scala b/src/main/scala/xyz/driver/core/cache.scala new file mode 100644 index 0000000..79ba2d7 --- /dev/null +++ b/src/main/scala/xyz/driver/core/cache.scala @@ -0,0 +1,109 @@ +package xyz.driver.core + +import java.util.concurrent.{Callable, TimeUnit} + +import com.google.common.cache.{CacheBuilder, Cache => GuavaCache} +import com.typesafe.scalalogging.Logger + +import scala.concurrent.duration.{Duration, _} +import scala.concurrent.{ExecutionContext, Future} + +object cache { + + /** + * FutureCache is used to represent an in-memory, in-process, asynchronous cache. + * + * Every cache operation is atomic. + * + * This implementation evicts failed results, + * and doesn't interrupt the underlying request that has been fired off. + */ + class AsyncCache[K, V](name: String, cache: GuavaCache[K, Future[V]])(implicit executionContext: ExecutionContext) { + + private[this] val log = Logger(s"AsyncCache.$name") + private[this] val underlying = cache.asMap() + + private[this] def evictOnFailure(key: K, f: Future[V]): Future[V] = { + f onFailure { + case ex: Throwable => + log.debug(s"Evict key $key due to exception $ex") + evict(key, f) + } + f // we return the original future to make evict(k, f) easier to work with. + } + + /** + * Equivalent to getOrElseUpdate + */ + def apply(key: K)(value: => Future[V]): Future[V] = getOrElseUpdate(key)(value) + + /** + * Gets the cached Future. + * + * @return None if a value hasn't been specified for that key yet + * Some(ksync computation) if the value has been specified. Just + * because this returns Some(..) doesn't mean that it has been + * satisfied, but if it hasn't been satisfied, it's probably + * in-flight. + */ + def get(key: K): Option[Future[V]] = Option(underlying.get(key)) + + /** + * Gets the cached Future, or if it hasn't been returned yet, computes it and + * returns that value. + */ + def getOrElseUpdate(key: K)(compute: => Future[V]): Future[V] = { + log.debug(s"Try to retrieve key $key from cache") + evictOnFailure(key, cache.get(key, new Callable[Future[V]] { + def call(): Future[V] = { + log.debug(s"Cache miss, load the key: $key") + compute + } + })) + } + + /** + * Unconditionally sets a value for a given key + */ + def set(key: K, value: Future[V]): Unit = { + cache.put(key, value) + evictOnFailure(key, value) + } + + /** + * Evicts the contents of a `key` if the old value is `value`. + * + * Since `scala.concurrent.Future` uses reference equality, you must use the + * same object reference to evict a value. + * + * @return true if the key was evicted + * false if the key was not evicted + */ + def evict(key: K, value: Future[V]): Boolean = underlying.remove(key, value) + + /** + * @return the number of results that have been computed successfully or are in flight. + */ + def size: Int = cache.size.toInt + } + + object AsyncCache { + val DEFAULT_CAPACITY: Long = 10000L + val DEFAULT_READ_EXPIRATION: Duration = 10 minutes + val DEFAULT_WRITE_EXPIRATION: Duration = 1 hour + + def apply[K <: AnyRef, V <: AnyRef](name: String, + capacity: Long = DEFAULT_CAPACITY, + readExpiration: Duration = DEFAULT_READ_EXPIRATION, + writeExpiration: Duration = DEFAULT_WRITE_EXPIRATION)( + implicit executionContext: ExecutionContext): AsyncCache[K, V] = { + val guavaCache = CacheBuilder + .newBuilder() + .maximumSize(capacity) + .expireAfterAccess(readExpiration.toSeconds, TimeUnit.SECONDS) + .expireAfterWrite(writeExpiration.toSeconds, TimeUnit.SECONDS) + .build[K, Future[V]]() + new AsyncCache(name, guavaCache) + } + } +} diff --git a/src/main/scala/xyz/driver/core/future.scala b/src/main/scala/xyz/driver/core/future.scala new file mode 100644 index 0000000..1ee3576 --- /dev/null +++ b/src/main/scala/xyz/driver/core/future.scala @@ -0,0 +1,87 @@ +package xyz.driver.core + +import com.typesafe.scalalogging.Logger + +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Success, Try} + +object future { + val log = Logger("Driver.Future") + + implicit class RichFuture[T](f: Future[T]) { + def mapAll[U](pf: PartialFunction[Try[T], U])(implicit executionContext: ExecutionContext): Future[U] = { + val p = Promise[U]() + f.onComplete(r => p.complete(Try(pf(r)))) + p.future + } + + def failFastZip[U](that: Future[U])(implicit executionContext: ExecutionContext): Future[(T, U)] = { + future.failFastZip(f, that) + } + } + + def failFastSequence[T](t: Iterable[Future[T]])(implicit ec: ExecutionContext): Future[Seq[T]] = { + t.foldLeft(Future.successful(Nil: List[T])) { (f, i) => + failFastZip(f, i).map { case (tail, h) => h :: tail } + } + .map(_.reverse) + } + + /** + * Standard scala zip waits forever on the left side, even if the right side fails + */ + def failFastZip[T, U](ft: Future[T], fu: Future[U])(implicit ec: ExecutionContext): Future[(T, U)] = { + type State = Either[(T, Promise[U]), (U, Promise[T])] + val middleState = Promise[State]() + + ft.onComplete { + case f @ Failure(err) => + if (!middleState.tryFailure(err)) { + // the right has already succeeded + middleState.future.foreach { + case Right((_, pt)) => pt.complete(f) + case Left((t1, _)) => // This should never happen + log.error(s"Logic error: tried to set Failure($err) but Left($t1) already set") + } + } + case Success(t) => + // Create the next promise: + val pu = Promise[U]() + if (!middleState.trySuccess(Left((t, pu)))) { + // we can't set, so the other promise beat us here. + middleState.future.foreach { + case Right((_, pt)) => pt.success(t) + case Left((t1, _)) => // This should never happen + log.error(s"Logic error: tried to set Left($t) but Left($t1) already set") + } + } + } + fu.onComplete { + case f @ Failure(err) => + if (!middleState.tryFailure(err)) { + // we can't set, so the other promise beat us here. + middleState.future.foreach { + case Left((_, pu)) => pu.complete(f) + case Right((u1, _)) => // This should never happen + log.error(s"Logic error: tried to set Failure($err) but Right($u1) already set") + } + } + case Success(u) => + // Create the next promise: + val pt = Promise[T]() + if (!middleState.trySuccess(Right((u, pt)))) { + // we can't set, so the other promise beat us here. + middleState.future.foreach { + case Left((_, pu)) => pu.success(u) + case Right((u1, _)) => // This should never happen + log.error(s"Logic error: tried to set Right($u) but Right($u1) already set") + } + } + } + + middleState.future.flatMap { + case Left((t, pu)) => pu.future.map((t, _)) + case Right((u, pt)) => pt.future.map((_, u)) + } + } +} -- cgit v1.2.3