diff options
Diffstat (limited to 'src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala')
-rw-r--r-- | src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala | 60 |
1 files changed, 0 insertions, 60 deletions
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala deleted file mode 100644 index 2f7fe6c..0000000 --- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala +++ /dev/null @@ -1,60 +0,0 @@ -package xyz.driver.pdsuicommon.concurrent - -import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Dependency -import xyz.driver.pdsuicommon.concurrent.SafeBridgeUploadQueue.{DependencyResolver, SafeTask, Tag} -import xyz.driver.pdsuicommon.logging._ -import xyz.driver.pdsuicommon.serialization.Marshaller - -import scala.concurrent.{ExecutionContext, Future} - -object SafeBridgeUploadQueue { - - trait Tag extends Product with Serializable - - final case class SafeTask[T <: Tag](tag: T, private[SafeBridgeUploadQueue] val queueItem: BridgeUploadQueue.Item) - - object SafeTask { - implicit def toPhiString[T <: Tag](x: SafeTask[T]): PhiString = { - import x._ - phi"SafeTask(tag=${Unsafe(tag)}, $queueItem)" - } - } - - trait DependencyResolver[T <: Tag] { - def getDependency(tag: T): Option[Dependency] - } - -} - -class SafeBridgeUploadQueue[T <: Tag](kind: String, origQueue: BridgeUploadQueue)( - implicit tagMarshaller: Marshaller[T, String], - dependencyResolver: DependencyResolver[T], - executionContext: ExecutionContext) { - - type Task = SafeTask[T] - - def add(tag: T): Future[BridgeUploadQueue.Item] = - origQueue.add( - BridgeUploadQueue.Item( - kind = kind, - tag = tagMarshaller.write(tag), - dependency = dependencyResolver.getDependency(tag) - )) - - def tryRetry(task: Task): Future[Option[Task]] = wrap(origQueue.tryRetry(task.queueItem)) - - def get: Future[Option[Task]] = wrap(origQueue.get(kind)) - - def complete(tag: T): Future[Unit] = origQueue.complete(kind, tagMarshaller.write(tag)) - - private def wrap(x: Future[Option[BridgeUploadQueue.Item]]): Future[Option[Task]] = x.map(_.map(cover)) - - private def cover(rawTask: BridgeUploadQueue.Item): Task = { - val tag = tagMarshaller - .read(rawTask.tag) - .getOrElse(throw new IllegalArgumentException(s"Can not parse tag '${rawTask.tag}'")) - - SafeTask(tag, rawTask) - } - -} |