diff options
author | vlad <vlad@driver.xyz> | 2017-06-13 16:12:20 -0700 |
---|---|---|
committer | vlad <vlad@driver.xyz> | 2017-06-13 16:12:20 -0700 |
commit | cd1b635b2ae90d9ac2d8b1779183a1fbd8c5fd5c (patch) | |
tree | 062e8dad1a1513e26b0fd08b1742d6ff2ee874f7 /src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala | |
parent | 0000a65ab4479a2a40e2d6468036438e9705b4aa (diff) | |
download | rest-query-cd1b635b2ae90d9ac2d8b1779183a1fbd8c5fd5c.tar.gz rest-query-cd1b635b2ae90d9ac2d8b1779183a1fbd8c5fd5c.tar.bz2 rest-query-cd1b635b2ae90d9ac2d8b1779183a1fbd8c5fd5c.zip |
Adding domain entitiesv0.1.0
Diffstat (limited to 'src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala')
-rw-r--r-- | src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala | 136 |
1 files changed, 0 insertions, 136 deletions
diff --git a/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala b/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala deleted file mode 100644 index c6a2144..0000000 --- a/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala +++ /dev/null @@ -1,136 +0,0 @@ -package xyz.driver.common.concurrent - -import java.time.LocalDateTime -import java.time.temporal.ChronoUnit - -import xyz.driver.common.concurrent.BridgeUploadQueue.Item -import xyz.driver.common.concurrent.BridgeUploadQueueRepositoryAdapter.Strategy -import xyz.driver.common.db.Transactions -import xyz.driver.common.db.repositories.BridgeUploadQueueRepository -import xyz.driver.common.domain.LongId -import xyz.driver.common.logging._ - -import scala.concurrent.duration.{Duration, FiniteDuration} -import scala.concurrent.{ExecutionContext, Future} - -object BridgeUploadQueueRepositoryAdapter { - - sealed trait Strategy { - - def onComplete: Strategy.OnComplete - - def on(attempt: Int): Strategy.OnAttempt - - } - - object Strategy { - - /** - * Works forever, but has a limit for intervals. - */ - final case class LimitExponential(startInterval: FiniteDuration, - intervalFactor: Double, - maxInterval: FiniteDuration, - onComplete: OnComplete) extends Strategy { - - override def on(attempt: Int): OnAttempt = { - OnAttempt.Continue(intervalFor(attempt).min(maxInterval)) - } - - private def intervalFor(attempt: Int): Duration = { - startInterval * Math.pow(intervalFactor, attempt.toDouble) - } - } - - /** - * Used only in tests. - */ - case object Ignore extends Strategy { - - override val onComplete = OnComplete.Delete - - override def on(attempt: Int) = OnAttempt.Complete - - } - - /** - * Used only in tests. - */ - final case class Constant(interval: FiniteDuration) extends Strategy { - - override val onComplete = OnComplete.Delete - - override def on(attempt: Int) = OnAttempt.Continue(interval) - - } - - sealed trait OnComplete - object OnComplete { - case object Delete extends OnComplete - case object Mark extends OnComplete - - implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString) - } - - sealed trait OnAttempt - object OnAttempt { - case object Complete extends OnAttempt - case class Continue(interval: Duration) extends OnAttempt - - implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString) - } - } -} - -class BridgeUploadQueueRepositoryAdapter(strategy: Strategy, - repository: BridgeUploadQueueRepository, - transactions: Transactions) - (implicit executionContext: ExecutionContext) - extends BridgeUploadQueue with PhiLogging { - - override def add(item: Item): Future[Unit] = transactions.run { _ => - repository.add(item) - } - - override def get(kind: String): Future[Option[Item]] = { - repository.getOne(kind) - } - - override def remove(item: LongId[Item]): Future[Unit] = transactions.run { _ => - import Strategy.OnComplete._ - - strategy.onComplete match { - case Delete => repository.delete(item) - case Mark => - repository.getById(item) match { - case Some(x) => repository.update(x.copy(completed = true)) - case None => throw new RuntimeException(s"Can not find the $item task") - } - } - } - - override def tryRetry(item: Item): Future[Option[Item]] = transactions.run { _ => - import Strategy.OnAttempt._ - - logger.trace(phi"tryRetry($item)") - - val newAttempts = item.attempts + 1 - val action = strategy.on(newAttempts) - logger.debug(phi"Action for ${Unsafe(newAttempts)}: $action") - - action match { - case Continue(newInterval) => - val draftItem = item.copy( - attempts = newAttempts, - nextAttempt = LocalDateTime.now().plus(newInterval.toMillis, ChronoUnit.MILLIS) - ) - - logger.debug(draftItem) - Some(repository.update(draftItem)) - - case Complete => - repository.delete(item.id) - None - } - } -} |