aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala')
-rw-r--r--src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala136
1 files changed, 0 insertions, 136 deletions
diff --git a/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala b/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala
deleted file mode 100644
index c6a2144..0000000
--- a/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-package xyz.driver.common.concurrent
-
-import java.time.LocalDateTime
-import java.time.temporal.ChronoUnit
-
-import xyz.driver.common.concurrent.BridgeUploadQueue.Item
-import xyz.driver.common.concurrent.BridgeUploadQueueRepositoryAdapter.Strategy
-import xyz.driver.common.db.Transactions
-import xyz.driver.common.db.repositories.BridgeUploadQueueRepository
-import xyz.driver.common.domain.LongId
-import xyz.driver.common.logging._
-
-import scala.concurrent.duration.{Duration, FiniteDuration}
-import scala.concurrent.{ExecutionContext, Future}
-
-object BridgeUploadQueueRepositoryAdapter {
-
- sealed trait Strategy {
-
- def onComplete: Strategy.OnComplete
-
- def on(attempt: Int): Strategy.OnAttempt
-
- }
-
- object Strategy {
-
- /**
- * Works forever, but has a limit for intervals.
- */
- final case class LimitExponential(startInterval: FiniteDuration,
- intervalFactor: Double,
- maxInterval: FiniteDuration,
- onComplete: OnComplete) extends Strategy {
-
- override def on(attempt: Int): OnAttempt = {
- OnAttempt.Continue(intervalFor(attempt).min(maxInterval))
- }
-
- private def intervalFor(attempt: Int): Duration = {
- startInterval * Math.pow(intervalFactor, attempt.toDouble)
- }
- }
-
- /**
- * Used only in tests.
- */
- case object Ignore extends Strategy {
-
- override val onComplete = OnComplete.Delete
-
- override def on(attempt: Int) = OnAttempt.Complete
-
- }
-
- /**
- * Used only in tests.
- */
- final case class Constant(interval: FiniteDuration) extends Strategy {
-
- override val onComplete = OnComplete.Delete
-
- override def on(attempt: Int) = OnAttempt.Continue(interval)
-
- }
-
- sealed trait OnComplete
- object OnComplete {
- case object Delete extends OnComplete
- case object Mark extends OnComplete
-
- implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString)
- }
-
- sealed trait OnAttempt
- object OnAttempt {
- case object Complete extends OnAttempt
- case class Continue(interval: Duration) extends OnAttempt
-
- implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString)
- }
- }
-}
-
-class BridgeUploadQueueRepositoryAdapter(strategy: Strategy,
- repository: BridgeUploadQueueRepository,
- transactions: Transactions)
- (implicit executionContext: ExecutionContext)
- extends BridgeUploadQueue with PhiLogging {
-
- override def add(item: Item): Future[Unit] = transactions.run { _ =>
- repository.add(item)
- }
-
- override def get(kind: String): Future[Option[Item]] = {
- repository.getOne(kind)
- }
-
- override def remove(item: LongId[Item]): Future[Unit] = transactions.run { _ =>
- import Strategy.OnComplete._
-
- strategy.onComplete match {
- case Delete => repository.delete(item)
- case Mark =>
- repository.getById(item) match {
- case Some(x) => repository.update(x.copy(completed = true))
- case None => throw new RuntimeException(s"Can not find the $item task")
- }
- }
- }
-
- override def tryRetry(item: Item): Future[Option[Item]] = transactions.run { _ =>
- import Strategy.OnAttempt._
-
- logger.trace(phi"tryRetry($item)")
-
- val newAttempts = item.attempts + 1
- val action = strategy.on(newAttempts)
- logger.debug(phi"Action for ${Unsafe(newAttempts)}: $action")
-
- action match {
- case Continue(newInterval) =>
- val draftItem = item.copy(
- attempts = newAttempts,
- nextAttempt = LocalDateTime.now().plus(newInterval.toMillis, ChronoUnit.MILLIS)
- )
-
- logger.debug(draftItem)
- Some(repository.update(draftItem))
-
- case Complete =>
- repository.delete(item.id)
- None
- }
- }
-}