aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala
blob: 2f7fe6c2234565700a34b89a6a7ed1e3a9d61534 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package xyz.driver.pdsuicommon.concurrent

import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Dependency
import xyz.driver.pdsuicommon.concurrent.SafeBridgeUploadQueue.{DependencyResolver, SafeTask, Tag}
import xyz.driver.pdsuicommon.logging._
import xyz.driver.pdsuicommon.serialization.Marshaller

import scala.concurrent.{ExecutionContext, Future}

object SafeBridgeUploadQueue {

  trait Tag extends Product with Serializable

  final case class SafeTask[T <: Tag](tag: T, private[SafeBridgeUploadQueue] val queueItem: BridgeUploadQueue.Item)

  object SafeTask {
    implicit def toPhiString[T <: Tag](x: SafeTask[T]): PhiString = {
      import x._
      phi"SafeTask(tag=${Unsafe(tag)}, $queueItem)"
    }
  }

  trait DependencyResolver[T <: Tag] {
    def getDependency(tag: T): Option[Dependency]
  }

}

class SafeBridgeUploadQueue[T <: Tag](kind: String, origQueue: BridgeUploadQueue)(
        implicit tagMarshaller: Marshaller[T, String],
        dependencyResolver: DependencyResolver[T],
        executionContext: ExecutionContext) {

  type Task = SafeTask[T]

  def add(tag: T): Future[BridgeUploadQueue.Item] =
    origQueue.add(
      BridgeUploadQueue.Item(
        kind = kind,
        tag = tagMarshaller.write(tag),
        dependency = dependencyResolver.getDependency(tag)
      ))

  def tryRetry(task: Task): Future[Option[Task]] = wrap(origQueue.tryRetry(task.queueItem))

  def get: Future[Option[Task]] = wrap(origQueue.get(kind))

  def complete(tag: T): Future[Unit] = origQueue.complete(kind, tagMarshaller.write(tag))

  private def wrap(x: Future[Option[BridgeUploadQueue.Item]]): Future[Option[Task]] = x.map(_.map(cover))

  private def cover(rawTask: BridgeUploadQueue.Item): Task = {
    val tag = tagMarshaller
      .read(rawTask.tag)
      .getOrElse(throw new IllegalArgumentException(s"Can not parse tag '${rawTask.tag}'"))

    SafeTask(tag, rawTask)
  }

}