1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
package xyz.driver.pdsuicommon.concurrent
import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Dependency
import xyz.driver.pdsuicommon.concurrent.SafeBridgeUploadQueue.{DependencyResolver, SafeTask, Tag}
import xyz.driver.pdsuicommon.logging._
import xyz.driver.pdsuicommon.serialization.Marshaller
import scala.concurrent.{ExecutionContext, Future}
object SafeBridgeUploadQueue {
trait Tag extends Product with Serializable
case class SafeTask[T <: Tag](tag: T, private[SafeBridgeUploadQueue] val queueItem: BridgeUploadQueue.Item)
object SafeTask {
implicit def toPhiString[T <: Tag](x: SafeTask[T]): PhiString = {
import x._
phi"SafeTask(tag=${Unsafe(tag)}, $queueItem)"
}
}
trait DependencyResolver[T <: Tag] {
def getDependency(tag: T): Option[Dependency]
}
}
class SafeBridgeUploadQueue[T <: Tag](kind: String, origQueue: BridgeUploadQueue)(
implicit tagMarshaller: Marshaller[T, String],
dependencyResolver: DependencyResolver[T],
executionContext: ExecutionContext) {
type Task = SafeTask[T]
def add(tag: T): Future[BridgeUploadQueue.Item] =
origQueue.add(
BridgeUploadQueue.Item(
kind = kind,
tag = tagMarshaller.write(tag),
dependency = dependencyResolver.getDependency(tag)
))
def tryRetry(task: Task): Future[Option[Task]] = wrap(origQueue.tryRetry(task.queueItem))
def get: Future[Option[Task]] = wrap(origQueue.get(kind))
def complete(tag: T): Future[Unit] = origQueue.complete(kind, tagMarshaller.write(tag))
private def wrap(x: Future[Option[BridgeUploadQueue.Item]]): Future[Option[Task]] = x.map(_.map(cover))
private def cover(rawTask: BridgeUploadQueue.Item): Task = {
val tag = tagMarshaller
.read(rawTask.tag)
.getOrElse(throw new IllegalArgumentException(s"Can not parse tag '${rawTask.tag}'"))
SafeTask(tag, rawTask)
}
}
|