From a997aa6539d1f0af4ab4fc395ff2033335da312a Mon Sep 17 00:00:00 2001 From: vlad Date: Fri, 30 Jun 2017 12:29:54 -0700 Subject: Latest PDS UI utils --- .../concurrent/InMemoryBridgeUploadQueue.scala | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) (limited to 'src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala') diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala index bff566b..658b5b1 100644 --- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala +++ b/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala @@ -1,9 +1,9 @@ package xyz.driver.pdsuicommon.concurrent import java.util.concurrent.LinkedBlockingQueue +import java.util.function.Predicate import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Item -import xyz.driver.pdsuicommon.domain.LongId import xyz.driver.pdsuicommon.logging.PhiLogging import scala.collection.JavaConverters._ @@ -16,9 +16,9 @@ class InMemoryBridgeUploadQueue extends BridgeUploadQueue with PhiLogging { private val queue = new LinkedBlockingQueue[Item]() - override def add(item: Item): Future[Unit] = { + override def add(item: Item): Future[Item] = { queue.add(item) - done + Future.successful(item) } override def tryRetry(item: Item): Future[Option[Item]] = Future.successful(Some(item)) @@ -28,11 +28,10 @@ class InMemoryBridgeUploadQueue extends BridgeUploadQueue with PhiLogging { Future.successful(r) } - override def remove(item: LongId[Item]): Future[Unit] = { - queue.remove(item) - done + override def complete(kind: String, tag: String): Future[Unit] = { + queue.removeIf(new Predicate[Item] { + override def test(t: Item): Boolean = t.kind == kind && t.tag == tag + }) + Future.successful(()) } - - private val done = Future.successful(()) - } -- cgit v1.2.3