aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorvlad <vlad@driver.xyz>2017-09-19 20:47:28 -0700
committervlad <vlad@driver.xyz>2017-09-19 20:47:28 -0700
commit07c3303f9c6b8d46763d270f47b730d5519eb204 (patch)
tree777b0c52b88090b54e615f6ad797ee04eedc8014
parent83e7be6269bc6efc74fc5b954d801b5907404aca (diff)
downloaddriver-core-07c3303f9c6b8d46763d270f47b730d5519eb204.tar.gz
driver-core-07c3303f9c6b8d46763d270f47b730d5519eb204.tar.bz2
driver-core-07c3303f9c6b8d46763d270f47b730d5519eb204.zip
Utils from pds-common
-rw-r--r--src/main/scala/xyz/driver/core/cache.scala109
-rw-r--r--src/main/scala/xyz/driver/core/future.scala87
2 files changed, 196 insertions, 0 deletions
diff --git a/src/main/scala/xyz/driver/core/cache.scala b/src/main/scala/xyz/driver/core/cache.scala
new file mode 100644
index 0000000..79ba2d7
--- /dev/null
+++ b/src/main/scala/xyz/driver/core/cache.scala
@@ -0,0 +1,109 @@
+package xyz.driver.core
+
+import java.util.concurrent.{Callable, TimeUnit}
+
+import com.google.common.cache.{CacheBuilder, Cache => GuavaCache}
+import com.typesafe.scalalogging.Logger
+
+import scala.concurrent.duration.{Duration, _}
+import scala.concurrent.{ExecutionContext, Future}
+
+object cache {
+
+ /**
+ * FutureCache is used to represent an in-memory, in-process, asynchronous cache.
+ *
+ * Every cache operation is atomic.
+ *
+ * This implementation evicts failed results,
+ * and doesn't interrupt the underlying request that has been fired off.
+ */
+ class AsyncCache[K, V](name: String, cache: GuavaCache[K, Future[V]])(implicit executionContext: ExecutionContext) {
+
+ private[this] val log = Logger(s"AsyncCache.$name")
+ private[this] val underlying = cache.asMap()
+
+ private[this] def evictOnFailure(key: K, f: Future[V]): Future[V] = {
+ f onFailure {
+ case ex: Throwable =>
+ log.debug(s"Evict key $key due to exception $ex")
+ evict(key, f)
+ }
+ f // we return the original future to make evict(k, f) easier to work with.
+ }
+
+ /**
+ * Equivalent to getOrElseUpdate
+ */
+ def apply(key: K)(value: => Future[V]): Future[V] = getOrElseUpdate(key)(value)
+
+ /**
+ * Gets the cached Future.
+ *
+ * @return None if a value hasn't been specified for that key yet
+ * Some(ksync computation) if the value has been specified. Just
+ * because this returns Some(..) doesn't mean that it has been
+ * satisfied, but if it hasn't been satisfied, it's probably
+ * in-flight.
+ */
+ def get(key: K): Option[Future[V]] = Option(underlying.get(key))
+
+ /**
+ * Gets the cached Future, or if it hasn't been returned yet, computes it and
+ * returns that value.
+ */
+ def getOrElseUpdate(key: K)(compute: => Future[V]): Future[V] = {
+ log.debug(s"Try to retrieve key $key from cache")
+ evictOnFailure(key, cache.get(key, new Callable[Future[V]] {
+ def call(): Future[V] = {
+ log.debug(s"Cache miss, load the key: $key")
+ compute
+ }
+ }))
+ }
+
+ /**
+ * Unconditionally sets a value for a given key
+ */
+ def set(key: K, value: Future[V]): Unit = {
+ cache.put(key, value)
+ evictOnFailure(key, value)
+ }
+
+ /**
+ * Evicts the contents of a `key` if the old value is `value`.
+ *
+ * Since `scala.concurrent.Future` uses reference equality, you must use the
+ * same object reference to evict a value.
+ *
+ * @return true if the key was evicted
+ * false if the key was not evicted
+ */
+ def evict(key: K, value: Future[V]): Boolean = underlying.remove(key, value)
+
+ /**
+ * @return the number of results that have been computed successfully or are in flight.
+ */
+ def size: Int = cache.size.toInt
+ }
+
+ object AsyncCache {
+ val DEFAULT_CAPACITY: Long = 10000L
+ val DEFAULT_READ_EXPIRATION: Duration = 10 minutes
+ val DEFAULT_WRITE_EXPIRATION: Duration = 1 hour
+
+ def apply[K <: AnyRef, V <: AnyRef](name: String,
+ capacity: Long = DEFAULT_CAPACITY,
+ readExpiration: Duration = DEFAULT_READ_EXPIRATION,
+ writeExpiration: Duration = DEFAULT_WRITE_EXPIRATION)(
+ implicit executionContext: ExecutionContext): AsyncCache[K, V] = {
+ val guavaCache = CacheBuilder
+ .newBuilder()
+ .maximumSize(capacity)
+ .expireAfterAccess(readExpiration.toSeconds, TimeUnit.SECONDS)
+ .expireAfterWrite(writeExpiration.toSeconds, TimeUnit.SECONDS)
+ .build[K, Future[V]]()
+ new AsyncCache(name, guavaCache)
+ }
+ }
+}
diff --git a/src/main/scala/xyz/driver/core/future.scala b/src/main/scala/xyz/driver/core/future.scala
new file mode 100644
index 0000000..1ee3576
--- /dev/null
+++ b/src/main/scala/xyz/driver/core/future.scala
@@ -0,0 +1,87 @@
+package xyz.driver.core
+
+import com.typesafe.scalalogging.Logger
+
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success, Try}
+
+object future {
+ val log = Logger("Driver.Future")
+
+ implicit class RichFuture[T](f: Future[T]) {
+ def mapAll[U](pf: PartialFunction[Try[T], U])(implicit executionContext: ExecutionContext): Future[U] = {
+ val p = Promise[U]()
+ f.onComplete(r => p.complete(Try(pf(r))))
+ p.future
+ }
+
+ def failFastZip[U](that: Future[U])(implicit executionContext: ExecutionContext): Future[(T, U)] = {
+ future.failFastZip(f, that)
+ }
+ }
+
+ def failFastSequence[T](t: Iterable[Future[T]])(implicit ec: ExecutionContext): Future[Seq[T]] = {
+ t.foldLeft(Future.successful(Nil: List[T])) { (f, i) =>
+ failFastZip(f, i).map { case (tail, h) => h :: tail }
+ }
+ .map(_.reverse)
+ }
+
+ /**
+ * Standard scala zip waits forever on the left side, even if the right side fails
+ */
+ def failFastZip[T, U](ft: Future[T], fu: Future[U])(implicit ec: ExecutionContext): Future[(T, U)] = {
+ type State = Either[(T, Promise[U]), (U, Promise[T])]
+ val middleState = Promise[State]()
+
+ ft.onComplete {
+ case f @ Failure(err) =>
+ if (!middleState.tryFailure(err)) {
+ // the right has already succeeded
+ middleState.future.foreach {
+ case Right((_, pt)) => pt.complete(f)
+ case Left((t1, _)) => // This should never happen
+ log.error(s"Logic error: tried to set Failure($err) but Left($t1) already set")
+ }
+ }
+ case Success(t) =>
+ // Create the next promise:
+ val pu = Promise[U]()
+ if (!middleState.trySuccess(Left((t, pu)))) {
+ // we can't set, so the other promise beat us here.
+ middleState.future.foreach {
+ case Right((_, pt)) => pt.success(t)
+ case Left((t1, _)) => // This should never happen
+ log.error(s"Logic error: tried to set Left($t) but Left($t1) already set")
+ }
+ }
+ }
+ fu.onComplete {
+ case f @ Failure(err) =>
+ if (!middleState.tryFailure(err)) {
+ // we can't set, so the other promise beat us here.
+ middleState.future.foreach {
+ case Left((_, pu)) => pu.complete(f)
+ case Right((u1, _)) => // This should never happen
+ log.error(s"Logic error: tried to set Failure($err) but Right($u1) already set")
+ }
+ }
+ case Success(u) =>
+ // Create the next promise:
+ val pt = Promise[T]()
+ if (!middleState.trySuccess(Right((u, pt)))) {
+ // we can't set, so the other promise beat us here.
+ middleState.future.foreach {
+ case Left((_, pu)) => pu.success(u)
+ case Right((u1, _)) => // This should never happen
+ log.error(s"Logic error: tried to set Right($u) but Right($u1) already set")
+ }
+ }
+ }
+
+ middleState.future.flatMap {
+ case Left((t, pu)) => pu.future.map((t, _))
+ case Right((u, pt)) => pt.future.map((_, u))
+ }
+ }
+}