From 0000a65ab4479a2a40e2d6468036438e9705b4aa Mon Sep 17 00:00:00 2001 From: vlad Date: Tue, 13 Jun 2017 10:25:55 -0700 Subject: Initial extraction of Driver non-specific utilities --- .../concurrent/InMemoryBridgeUploadQueue.scala | 38 ++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 src/main/scala/xyz/driver/common/concurrent/InMemoryBridgeUploadQueue.scala (limited to 'src/main/scala/xyz/driver/common/concurrent/InMemoryBridgeUploadQueue.scala') diff --git a/src/main/scala/xyz/driver/common/concurrent/InMemoryBridgeUploadQueue.scala b/src/main/scala/xyz/driver/common/concurrent/InMemoryBridgeUploadQueue.scala new file mode 100644 index 0000000..b19be42 --- /dev/null +++ b/src/main/scala/xyz/driver/common/concurrent/InMemoryBridgeUploadQueue.scala @@ -0,0 +1,38 @@ +package xyz.driver.common.concurrent + +import java.util.concurrent.LinkedBlockingQueue + +import xyz.driver.common.concurrent.BridgeUploadQueue.Item +import xyz.driver.common.domain.LongId +import xyz.driver.common.logging.PhiLogging + +import scala.collection.JavaConverters._ +import scala.concurrent.Future + +/** + * Use it only for tests + */ +class InMemoryBridgeUploadQueue extends BridgeUploadQueue with PhiLogging { + + private val queue = new LinkedBlockingQueue[Item]() + + override def add(item: Item): Future[Unit] = { + queue.add(item) + done + } + + override def tryRetry(item: Item): Future[Option[Item]] = Future.successful(Some(item)) + + override def get(kind: String): Future[Option[Item]] = { + val r = queue.iterator().asScala.find(_.kind == kind) + Future.successful(r) + } + + override def remove(item: LongId[Item]): Future[Unit] = { + queue.remove(item) + done + } + + private val done = Future.successful(()) + +} -- cgit v1.2.3