blob: a98d2cdd1481832c3223ac97c95eec3bca580460 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
package xyz.driver.pdsuicommon.concurrent
import java.time.LocalDateTime
import java.time.temporal.ChronoUnit
import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Item
import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueueRepositoryAdapter.Strategy
import xyz.driver.pdsuicommon.db.Transactions
import xyz.driver.pdsuicommon.db.repositories.BridgeUploadQueueRepository
import xyz.driver.pdsuicommon.domain.LongId
import xyz.driver.pdsuicommon.logging._
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
object BridgeUploadQueueRepositoryAdapter {
sealed trait Strategy {
def onComplete: Strategy.OnComplete
def on(attempt: Int): Strategy.OnAttempt
}
object Strategy {
/**
* Works forever, but has a limit for intervals.
*/
final case class LimitExponential(startInterval: FiniteDuration,
intervalFactor: Double,
maxInterval: FiniteDuration,
onComplete: OnComplete)
extends Strategy {
override def on(attempt: Int): OnAttempt = {
OnAttempt.Continue(intervalFor(attempt).min(maxInterval))
}
private def intervalFor(attempt: Int): Duration = {
startInterval * Math.pow(intervalFactor, attempt.toDouble)
}
}
/**
* Used only in tests.
*/
case object Ignore extends Strategy {
override val onComplete = OnComplete.Delete
override def on(attempt: Int) = OnAttempt.Complete
}
/**
* Used only in tests.
*/
final case class Constant(interval: FiniteDuration) extends Strategy {
override val onComplete = OnComplete.Delete
override def on(attempt: Int) = OnAttempt.Continue(interval)
}
sealed trait OnComplete
object OnComplete {
case object Delete extends OnComplete
case object Mark extends OnComplete
implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString)
}
sealed trait OnAttempt
object OnAttempt {
case object Complete extends OnAttempt
case class Continue(interval: Duration) extends OnAttempt
implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString)
}
}
}
class BridgeUploadQueueRepositoryAdapter(strategy: Strategy,
repository: BridgeUploadQueueRepository,
transactions: Transactions)(implicit executionContext: ExecutionContext)
extends BridgeUploadQueue with PhiLogging {
override def add(item: Item): Future[Unit] = transactions.run { _ =>
repository.add(item)
}
override def get(kind: String): Future[Option[Item]] = {
repository.getOne(kind)
}
override def remove(item: LongId[Item]): Future[Unit] = transactions.run { _ =>
import Strategy.OnComplete._
strategy.onComplete match {
case Delete => repository.delete(item)
case Mark =>
repository.getById(item) match {
case Some(x) => repository.update(x.copy(completed = true))
case None => throw new RuntimeException(s"Can not find the $item task")
}
}
}
override def tryRetry(item: Item): Future[Option[Item]] = transactions.run { _ =>
import Strategy.OnAttempt._
logger.trace(phi"tryRetry($item)")
val newAttempts = item.attempts + 1
val action = strategy.on(newAttempts)
logger.debug(phi"Action for ${Unsafe(newAttempts)}: $action")
action match {
case Continue(newInterval) =>
val draftItem = item.copy(
attempts = newAttempts,
nextAttempt = LocalDateTime.now().plus(newInterval.toMillis, ChronoUnit.MILLIS)
)
logger.debug(draftItem)
Some(repository.update(draftItem))
case Complete =>
repository.delete(item.id)
None
}
}
}
|