aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala')
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala61
1 files changed, 61 insertions, 0 deletions
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala
new file mode 100644
index 0000000..bab29d5
--- /dev/null
+++ b/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala
@@ -0,0 +1,61 @@
+package xyz.driver.pdsuicommon.concurrent
+
+import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Dependency
+import xyz.driver.pdsuicommon.concurrent.SafeBridgeUploadQueue.{DependencyResolver, SafeTask, Tag}
+import xyz.driver.pdsuicommon.logging._
+import xyz.driver.pdsuicommon.serialization.Marshaller
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object SafeBridgeUploadQueue {
+
+ trait Tag extends Product with Serializable
+
+ case class SafeTask[T <: Tag](tag: T,
+ private[SafeBridgeUploadQueue] val queueItem: BridgeUploadQueue.Item)
+
+ object SafeTask {
+ implicit def toPhiString[T <: Tag](x: SafeTask[T]): PhiString = {
+ import x._
+ phi"SafeTask(tag=${Unsafe(tag)}, $queueItem)"
+ }
+ }
+
+ trait DependencyResolver[T <: Tag] {
+ def getDependency(tag: T): Option[Dependency]
+ }
+
+}
+
+class SafeBridgeUploadQueue[T <: Tag](kind: String,
+ origQueue: BridgeUploadQueue)
+ (implicit
+ tagMarshaller: Marshaller[T, String],
+ dependencyResolver: DependencyResolver[T],
+ executionContext: ExecutionContext) {
+
+ type Task = SafeTask[T]
+
+ def add(tag: T): Future[BridgeUploadQueue.Item] = origQueue.add(BridgeUploadQueue.Item(
+ kind = kind,
+ tag = tagMarshaller.write(tag),
+ dependency = dependencyResolver.getDependency(tag)
+ ))
+
+ def tryRetry(task: Task): Future[Option[Task]] = wrap(origQueue.tryRetry(task.queueItem))
+
+ def get: Future[Option[Task]] = wrap(origQueue.get(kind))
+
+ def complete(tag: T): Future[Unit] = origQueue.complete(kind, tagMarshaller.write(tag))
+
+ private def wrap(x: Future[Option[BridgeUploadQueue.Item]]): Future[Option[Task]] = x.map(_.map(cover))
+
+ private def cover(rawTask: BridgeUploadQueue.Item): Task = {
+ val tag = tagMarshaller
+ .read(rawTask.tag)
+ .getOrElse(throw new IllegalArgumentException(s"Can not parse tag '${rawTask.tag}'"))
+
+ SafeTask(tag, rawTask)
+ }
+
+}