aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala')
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala60
1 files changed, 0 insertions, 60 deletions
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala
deleted file mode 100644
index 2f7fe6c..0000000
--- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-package xyz.driver.pdsuicommon.concurrent
-
-import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Dependency
-import xyz.driver.pdsuicommon.concurrent.SafeBridgeUploadQueue.{DependencyResolver, SafeTask, Tag}
-import xyz.driver.pdsuicommon.logging._
-import xyz.driver.pdsuicommon.serialization.Marshaller
-
-import scala.concurrent.{ExecutionContext, Future}
-
-object SafeBridgeUploadQueue {
-
- trait Tag extends Product with Serializable
-
- final case class SafeTask[T <: Tag](tag: T, private[SafeBridgeUploadQueue] val queueItem: BridgeUploadQueue.Item)
-
- object SafeTask {
- implicit def toPhiString[T <: Tag](x: SafeTask[T]): PhiString = {
- import x._
- phi"SafeTask(tag=${Unsafe(tag)}, $queueItem)"
- }
- }
-
- trait DependencyResolver[T <: Tag] {
- def getDependency(tag: T): Option[Dependency]
- }
-
-}
-
-class SafeBridgeUploadQueue[T <: Tag](kind: String, origQueue: BridgeUploadQueue)(
- implicit tagMarshaller: Marshaller[T, String],
- dependencyResolver: DependencyResolver[T],
- executionContext: ExecutionContext) {
-
- type Task = SafeTask[T]
-
- def add(tag: T): Future[BridgeUploadQueue.Item] =
- origQueue.add(
- BridgeUploadQueue.Item(
- kind = kind,
- tag = tagMarshaller.write(tag),
- dependency = dependencyResolver.getDependency(tag)
- ))
-
- def tryRetry(task: Task): Future[Option[Task]] = wrap(origQueue.tryRetry(task.queueItem))
-
- def get: Future[Option[Task]] = wrap(origQueue.get(kind))
-
- def complete(tag: T): Future[Unit] = origQueue.complete(kind, tagMarshaller.write(tag))
-
- private def wrap(x: Future[Option[BridgeUploadQueue.Item]]): Future[Option[Task]] = x.map(_.map(cover))
-
- private def cover(rawTask: BridgeUploadQueue.Item): Task = {
- val tag = tagMarshaller
- .read(rawTask.tag)
- .getOrElse(throw new IllegalArgumentException(s"Can not parse tag '${rawTask.tag}'"))
-
- SafeTask(tag, rawTask)
- }
-
-}