aboutsummaryrefslogblamecommitdiff
path: root/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala
blob: c6a21449993d45da8bb60ffd0c03cd8f000dd28f (plain) (tree)







































































































































                                                                                         
package xyz.driver.common.concurrent

import java.time.LocalDateTime
import java.time.temporal.ChronoUnit

import xyz.driver.common.concurrent.BridgeUploadQueue.Item
import xyz.driver.common.concurrent.BridgeUploadQueueRepositoryAdapter.Strategy
import xyz.driver.common.db.Transactions
import xyz.driver.common.db.repositories.BridgeUploadQueueRepository
import xyz.driver.common.domain.LongId
import xyz.driver.common.logging._

import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}

object BridgeUploadQueueRepositoryAdapter {

  sealed trait Strategy {

    def onComplete: Strategy.OnComplete

    def on(attempt: Int): Strategy.OnAttempt

  }

  object Strategy {

    /**
      * Works forever, but has a limit for intervals.
      */
    final case class LimitExponential(startInterval: FiniteDuration,
                                      intervalFactor: Double,
                                      maxInterval: FiniteDuration,
                                      onComplete: OnComplete) extends Strategy {

      override def on(attempt: Int): OnAttempt = {
        OnAttempt.Continue(intervalFor(attempt).min(maxInterval))
      }

      private def intervalFor(attempt: Int): Duration = {
        startInterval * Math.pow(intervalFactor, attempt.toDouble)
      }
    }

    /**
      * Used only in tests.
      */
    case object Ignore extends Strategy {

      override val onComplete = OnComplete.Delete

      override def on(attempt: Int) = OnAttempt.Complete

    }

    /**
      * Used only in tests.
      */
    final case class Constant(interval: FiniteDuration) extends Strategy {

      override val onComplete = OnComplete.Delete

      override def on(attempt: Int) = OnAttempt.Continue(interval)

    }

    sealed trait OnComplete
    object OnComplete {
      case object Delete extends OnComplete
      case object Mark extends OnComplete

      implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString)
    }

    sealed trait OnAttempt
    object OnAttempt {
      case object Complete extends OnAttempt
      case class Continue(interval: Duration) extends OnAttempt

      implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString)
    }
  }
}

class BridgeUploadQueueRepositoryAdapter(strategy: Strategy,
                                         repository: BridgeUploadQueueRepository,
                                         transactions: Transactions)
                                        (implicit executionContext: ExecutionContext)
  extends BridgeUploadQueue with PhiLogging {

  override def add(item: Item): Future[Unit] = transactions.run { _ =>
    repository.add(item)
  }

  override def get(kind: String): Future[Option[Item]] = {
    repository.getOne(kind)
  }

  override def remove(item: LongId[Item]): Future[Unit] = transactions.run { _ =>
    import Strategy.OnComplete._

    strategy.onComplete match {
      case Delete => repository.delete(item)
      case Mark =>
        repository.getById(item) match {
          case Some(x) => repository.update(x.copy(completed = true))
          case None => throw new RuntimeException(s"Can not find the $item task")
        }
    }
  }

  override def tryRetry(item: Item): Future[Option[Item]] = transactions.run { _ =>
    import Strategy.OnAttempt._

    logger.trace(phi"tryRetry($item)")

    val newAttempts = item.attempts + 1
    val action = strategy.on(newAttempts)
    logger.debug(phi"Action for ${Unsafe(newAttempts)}: $action")

    action match {
      case Continue(newInterval) =>
        val draftItem = item.copy(
          attempts = newAttempts,
          nextAttempt = LocalDateTime.now().plus(newInterval.toMillis, ChronoUnit.MILLIS)
        )

        logger.debug(draftItem)
        Some(repository.update(draftItem))

      case Complete =>
        repository.delete(item.id)
        None
    }
  }
}