package xyz.driver.common.concurrent import java.time.LocalDateTime import java.time.temporal.ChronoUnit import xyz.driver.common.concurrent.BridgeUploadQueue.Item import xyz.driver.common.concurrent.BridgeUploadQueueRepositoryAdapter.Strategy import xyz.driver.common.db.Transactions import xyz.driver.common.db.repositories.BridgeUploadQueueRepository import xyz.driver.common.domain.LongId import xyz.driver.common.logging._ import scala.concurrent.duration.{Duration, FiniteDuration} import scala.concurrent.{ExecutionContext, Future} object BridgeUploadQueueRepositoryAdapter { sealed trait Strategy { def onComplete: Strategy.OnComplete def on(attempt: Int): Strategy.OnAttempt } object Strategy { /** * Works forever, but has a limit for intervals. */ final case class LimitExponential(startInterval: FiniteDuration, intervalFactor: Double, maxInterval: FiniteDuration, onComplete: OnComplete) extends Strategy { override def on(attempt: Int): OnAttempt = { OnAttempt.Continue(intervalFor(attempt).min(maxInterval)) } private def intervalFor(attempt: Int): Duration = { startInterval * Math.pow(intervalFactor, attempt.toDouble) } } /** * Used only in tests. */ case object Ignore extends Strategy { override val onComplete = OnComplete.Delete override def on(attempt: Int) = OnAttempt.Complete } /** * Used only in tests. */ final case class Constant(interval: FiniteDuration) extends Strategy { override val onComplete = OnComplete.Delete override def on(attempt: Int) = OnAttempt.Continue(interval) } sealed trait OnComplete object OnComplete { case object Delete extends OnComplete case object Mark extends OnComplete implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString) } sealed trait OnAttempt object OnAttempt { case object Complete extends OnAttempt case class Continue(interval: Duration) extends OnAttempt implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString) } } } class BridgeUploadQueueRepositoryAdapter(strategy: Strategy, repository: BridgeUploadQueueRepository, transactions: Transactions) (implicit executionContext: ExecutionContext) extends BridgeUploadQueue with PhiLogging { override def add(item: Item): Future[Unit] = transactions.run { _ => repository.add(item) } override def get(kind: String): Future[Option[Item]] = { repository.getOne(kind) } override def remove(item: LongId[Item]): Future[Unit] = transactions.run { _ => import Strategy.OnComplete._ strategy.onComplete match { case Delete => repository.delete(item) case Mark => repository.getById(item) match { case Some(x) => repository.update(x.copy(completed = true)) case None => throw new RuntimeException(s"Can not find the $item task") } } } override def tryRetry(item: Item): Future[Option[Item]] = transactions.run { _ => import Strategy.OnAttempt._ logger.trace(phi"tryRetry($item)") val newAttempts = item.attempts + 1 val action = strategy.on(newAttempts) logger.debug(phi"Action for ${Unsafe(newAttempts)}: $action") action match { case Continue(newInterval) => val draftItem = item.copy( attempts = newAttempts, nextAttempt = LocalDateTime.now().plus(newInterval.toMillis, ChronoUnit.MILLIS) ) logger.debug(draftItem) Some(repository.update(draftItem)) case Complete => repository.delete(item.id) None } } }