aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/core/cache.scala
blob: 9897c16bea0c3b6d6049efee5519fc6985134d34 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package xyz.driver.core

import java.util.concurrent.{Callable, TimeUnit}

import com.google.common.cache.{CacheBuilder, Cache => GuavaCache}
import com.typesafe.scalalogging.Logger

import scala.concurrent.duration.{Duration, _}
import scala.concurrent.{ExecutionContext, Future}

object cache {

  /**
    * FutureCache is used to represent an in-memory, in-process, asynchronous cache.
    *
    * Every cache operation is atomic.
    *
    * This implementation evicts failed results,
    * and doesn't interrupt the underlying request that has been fired off.
    */
  class AsyncCache[K, V](name: String, cache: GuavaCache[K, Future[V]])(implicit executionContext: ExecutionContext) {

    private[this] val log        = Logger(s"AsyncCache.$name")
    private[this] val underlying = cache.asMap()

    private[this] def evictOnFailure(key: K, f: Future[V]): Future[V] = {
      f.failed foreach {
        case ex: Throwable =>
          log.debug(s"Evict key $key due to exception $ex")
          evict(key, f)
      }
      f // we return the original future to make evict(k, f) easier to work with.
    }

    /**
      * Equivalent to getOrElseUpdate
      */
    def apply(key: K)(value: => Future[V]): Future[V] = getOrElseUpdate(key)(value)

    /**
      * Gets the cached Future.
      *
      * @return None if a value hasn't been specified for that key yet
      *         Some(ksync computation) if the value has been specified.  Just
      *         because this returns Some(..) doesn't mean that it has been
      *         satisfied, but if it hasn't been satisfied, it's probably
      *         in-flight.
      */
    def get(key: K): Option[Future[V]] = Option(underlying.get(key))

    /**
      * Gets the cached Future, or if it hasn't been returned yet, computes it and
      * returns that value.
      */
    def getOrElseUpdate(key: K)(compute: => Future[V]): Future[V] = {
      log.debug(s"Try to retrieve key $key from cache")
      evictOnFailure(key, cache.get(key, new Callable[Future[V]] {
        def call(): Future[V] = {
          log.debug(s"Cache miss, load the key: $key")
          compute
        }
      }))
    }

    /**
      * Unconditionally sets a value for a given key
      */
    def set(key: K, value: Future[V]): Unit = {
      cache.put(key, value)
      evictOnFailure(key, value)
    }

    /**
      * Evicts the contents of a `key` if the old value is `value`.
      *
      * Since `scala.concurrent.Future` uses reference equality, you must use the
      * same object reference to evict a value.
      *
      * @return true if the key was evicted
      *         false if the key was not evicted
      */
    def evict(key: K, value: Future[V]): Boolean = underlying.remove(key, value)

    /**
      * @return the number of results that have been computed successfully or are in flight.
      */
    def size: Int = cache.size.toInt
  }

  object AsyncCache {
    val DEFAULT_CAPACITY: Long             = 10000L
    val DEFAULT_READ_EXPIRATION: Duration  = 10.minutes
    val DEFAULT_WRITE_EXPIRATION: Duration = 1.hour

    def apply[K <: AnyRef, V <: AnyRef](
        name: String,
        capacity: Long = DEFAULT_CAPACITY,
        readExpiration: Duration = DEFAULT_READ_EXPIRATION,
        writeExpiration: Duration = DEFAULT_WRITE_EXPIRATION)(
        implicit executionContext: ExecutionContext): AsyncCache[K, V] = {
      val guavaCache = CacheBuilder
        .newBuilder()
        .maximumSize(capacity)
        .expireAfterAccess(readExpiration.toSeconds, TimeUnit.SECONDS)
        .expireAfterWrite(writeExpiration.toSeconds, TimeUnit.SECONDS)
        .build[K, Future[V]]()
      new AsyncCache(name, guavaCache)
    }
  }
}