From e6552f3b31b55396c652c196c5c3a9c3a6cfed71 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Tue, 31 Jul 2018 12:13:47 -0700 Subject: Add message bus and topic abstractions (#181) --- src/main/scala/xyz/driver/core/Refresh.scala | 56 ++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 src/main/scala/xyz/driver/core/Refresh.scala (limited to 'src/main/scala/xyz/driver/core/Refresh.scala') diff --git a/src/main/scala/xyz/driver/core/Refresh.scala b/src/main/scala/xyz/driver/core/Refresh.scala new file mode 100644 index 0000000..ca28da7 --- /dev/null +++ b/src/main/scala/xyz/driver/core/Refresh.scala @@ -0,0 +1,56 @@ +package xyz.driver.core + +import java.time.Instant +import java.util.concurrent.atomic.AtomicReference + +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent.duration.Duration + +/** A single-value asynchronous cache with TTL. + * + * Slightly adapted from Twitter's "util" library + * https://github.com/twitter/util/blob/ae0ab09134414438af9dfaa88a4613cecbff4741/util-cache/src/main/scala/com/twitter/cache/Refresh.scala + * + * Released under the Apache License 2.0. + */ +object Refresh { + + /** Creates a function that will provide a cached value for a given time-to-live (TTL). + * + * {{{ + * def freshToken(): Future[String] = // expensive network call to get an access token + * val getToken: Future[String] = Refresh.every(1.hour)(freshToken()) + * + * getToken() // new token is issued + * getToken() // subsequent calls use the cached token + * // wait 1 hour + * getToken() // new token is issued + * }}} */ + def every[A](ttl: Duration)(compute: => Future[A])(implicit ec: ExecutionContext): () => Future[A] = { + val ref = new AtomicReference[(Future[A], Instant)]( + (Future.failed(new NoSuchElementException("Cached value was never computed")), Instant.MIN) + ) + def refresh(): Future[A] = { + val tuple = ref.get + val (cachedValue, lastRetrieved) = tuple + val now = Instant.now + if (now.getEpochSecond < lastRetrieved.getEpochSecond + ttl.toSeconds) { + cachedValue + } else { + val p = Promise[A] + val nextTuple = (p.future, now) + if (ref.compareAndSet(tuple, nextTuple)) { + compute.onComplete { done => + if (done.isFailure) { + ref.set((p.future, lastRetrieved)) // don't update retrieval time in case of failure + } + p.complete(done) + } + } + refresh() + } + } + refresh _ + } + +} -- cgit v1.2.3