aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala')
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala17
1 files changed, 8 insertions, 9 deletions
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala
index bff566b..658b5b1 100644
--- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala
+++ b/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala
@@ -1,9 +1,9 @@
package xyz.driver.pdsuicommon.concurrent
import java.util.concurrent.LinkedBlockingQueue
+import java.util.function.Predicate
import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Item
-import xyz.driver.pdsuicommon.domain.LongId
import xyz.driver.pdsuicommon.logging.PhiLogging
import scala.collection.JavaConverters._
@@ -16,9 +16,9 @@ class InMemoryBridgeUploadQueue extends BridgeUploadQueue with PhiLogging {
private val queue = new LinkedBlockingQueue[Item]()
- override def add(item: Item): Future[Unit] = {
+ override def add(item: Item): Future[Item] = {
queue.add(item)
- done
+ Future.successful(item)
}
override def tryRetry(item: Item): Future[Option[Item]] = Future.successful(Some(item))
@@ -28,11 +28,10 @@ class InMemoryBridgeUploadQueue extends BridgeUploadQueue with PhiLogging {
Future.successful(r)
}
- override def remove(item: LongId[Item]): Future[Unit] = {
- queue.remove(item)
- done
+ override def complete(kind: String, tag: String): Future[Unit] = {
+ queue.removeIf(new Predicate[Item] {
+ override def test(t: Item): Boolean = t.kind == kind && t.tag == tag
+ })
+ Future.successful(())
}
-
- private val done = Future.successful(())
-
}