diff options
Diffstat (limited to 'src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala')
-rw-r--r-- | src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala | 17 |
1 files changed, 8 insertions, 9 deletions
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala index bff566b..658b5b1 100644 --- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala +++ b/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala @@ -1,9 +1,9 @@ package xyz.driver.pdsuicommon.concurrent import java.util.concurrent.LinkedBlockingQueue +import java.util.function.Predicate import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Item -import xyz.driver.pdsuicommon.domain.LongId import xyz.driver.pdsuicommon.logging.PhiLogging import scala.collection.JavaConverters._ @@ -16,9 +16,9 @@ class InMemoryBridgeUploadQueue extends BridgeUploadQueue with PhiLogging { private val queue = new LinkedBlockingQueue[Item]() - override def add(item: Item): Future[Unit] = { + override def add(item: Item): Future[Item] = { queue.add(item) - done + Future.successful(item) } override def tryRetry(item: Item): Future[Option[Item]] = Future.successful(Some(item)) @@ -28,11 +28,10 @@ class InMemoryBridgeUploadQueue extends BridgeUploadQueue with PhiLogging { Future.successful(r) } - override def remove(item: LongId[Item]): Future[Unit] = { - queue.remove(item) - done + override def complete(kind: String, tag: String): Future[Unit] = { + queue.removeIf(new Predicate[Item] { + override def test(t: Item): Boolean = t.kind == kind && t.tag == tag + }) + Future.successful(()) } - - private val done = Future.successful(()) - } |