aboutsummaryrefslogblamecommitdiff
path: root/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala
blob: bab29d525be0e7d1428e900e9ac1ec3db764ed1d (plain) (tree)




























































                                                                                                         
package xyz.driver.pdsuicommon.concurrent

import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Dependency
import xyz.driver.pdsuicommon.concurrent.SafeBridgeUploadQueue.{DependencyResolver, SafeTask, Tag}
import xyz.driver.pdsuicommon.logging._
import xyz.driver.pdsuicommon.serialization.Marshaller

import scala.concurrent.{ExecutionContext, Future}

object SafeBridgeUploadQueue {

  trait Tag extends Product with Serializable

  case class SafeTask[T <: Tag](tag: T,
                                private[SafeBridgeUploadQueue] val queueItem: BridgeUploadQueue.Item)

  object SafeTask {
    implicit def toPhiString[T <: Tag](x: SafeTask[T]): PhiString = {
      import x._
      phi"SafeTask(tag=${Unsafe(tag)}, $queueItem)"
    }
  }

  trait DependencyResolver[T <: Tag] {
    def getDependency(tag: T): Option[Dependency]
  }

}

class SafeBridgeUploadQueue[T <: Tag](kind: String,
                                      origQueue: BridgeUploadQueue)
                                     (implicit
                                      tagMarshaller: Marshaller[T, String],
                                      dependencyResolver: DependencyResolver[T],
                                      executionContext: ExecutionContext) {

  type Task = SafeTask[T]

  def add(tag: T): Future[BridgeUploadQueue.Item] = origQueue.add(BridgeUploadQueue.Item(
    kind = kind,
    tag = tagMarshaller.write(tag),
    dependency = dependencyResolver.getDependency(tag)
  ))

  def tryRetry(task: Task): Future[Option[Task]] = wrap(origQueue.tryRetry(task.queueItem))

  def get: Future[Option[Task]] = wrap(origQueue.get(kind))

  def complete(tag: T): Future[Unit] = origQueue.complete(kind, tagMarshaller.write(tag))

  private def wrap(x: Future[Option[BridgeUploadQueue.Item]]): Future[Option[Task]] = x.map(_.map(cover))

  private def cover(rawTask: BridgeUploadQueue.Item): Task = {
    val tag = tagMarshaller
      .read(rawTask.tag)
      .getOrElse(throw new IllegalArgumentException(s"Can not parse tag '${rawTask.tag}'"))

    SafeTask(tag, rawTask)
  }

}