diff options
author | vlad <vlad@driver.xyz> | 2017-06-13 10:25:55 -0700 |
---|---|---|
committer | vlad <vlad@driver.xyz> | 2017-06-13 10:25:55 -0700 |
commit | 0000a65ab4479a2a40e2d6468036438e9705b4aa (patch) | |
tree | 60c868828741e7e5367aa7b6d167abbdaf91d5b8 /src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala | |
download | rest-query-0000a65ab4479a2a40e2d6468036438e9705b4aa.tar.gz rest-query-0000a65ab4479a2a40e2d6468036438e9705b4aa.tar.bz2 rest-query-0000a65ab4479a2a40e2d6468036438e9705b4aa.zip |
Initial extraction of Driver non-specific utilities
Diffstat (limited to 'src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala')
-rw-r--r-- | src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala | 136 |
1 files changed, 136 insertions, 0 deletions
diff --git a/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala b/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala new file mode 100644 index 0000000..c6a2144 --- /dev/null +++ b/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala @@ -0,0 +1,136 @@ +package xyz.driver.common.concurrent + +import java.time.LocalDateTime +import java.time.temporal.ChronoUnit + +import xyz.driver.common.concurrent.BridgeUploadQueue.Item +import xyz.driver.common.concurrent.BridgeUploadQueueRepositoryAdapter.Strategy +import xyz.driver.common.db.Transactions +import xyz.driver.common.db.repositories.BridgeUploadQueueRepository +import xyz.driver.common.domain.LongId +import xyz.driver.common.logging._ + +import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.concurrent.{ExecutionContext, Future} + +object BridgeUploadQueueRepositoryAdapter { + + sealed trait Strategy { + + def onComplete: Strategy.OnComplete + + def on(attempt: Int): Strategy.OnAttempt + + } + + object Strategy { + + /** + * Works forever, but has a limit for intervals. + */ + final case class LimitExponential(startInterval: FiniteDuration, + intervalFactor: Double, + maxInterval: FiniteDuration, + onComplete: OnComplete) extends Strategy { + + override def on(attempt: Int): OnAttempt = { + OnAttempt.Continue(intervalFor(attempt).min(maxInterval)) + } + + private def intervalFor(attempt: Int): Duration = { + startInterval * Math.pow(intervalFactor, attempt.toDouble) + } + } + + /** + * Used only in tests. + */ + case object Ignore extends Strategy { + + override val onComplete = OnComplete.Delete + + override def on(attempt: Int) = OnAttempt.Complete + + } + + /** + * Used only in tests. + */ + final case class Constant(interval: FiniteDuration) extends Strategy { + + override val onComplete = OnComplete.Delete + + override def on(attempt: Int) = OnAttempt.Continue(interval) + + } + + sealed trait OnComplete + object OnComplete { + case object Delete extends OnComplete + case object Mark extends OnComplete + + implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString) + } + + sealed trait OnAttempt + object OnAttempt { + case object Complete extends OnAttempt + case class Continue(interval: Duration) extends OnAttempt + + implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString) + } + } +} + +class BridgeUploadQueueRepositoryAdapter(strategy: Strategy, + repository: BridgeUploadQueueRepository, + transactions: Transactions) + (implicit executionContext: ExecutionContext) + extends BridgeUploadQueue with PhiLogging { + + override def add(item: Item): Future[Unit] = transactions.run { _ => + repository.add(item) + } + + override def get(kind: String): Future[Option[Item]] = { + repository.getOne(kind) + } + + override def remove(item: LongId[Item]): Future[Unit] = transactions.run { _ => + import Strategy.OnComplete._ + + strategy.onComplete match { + case Delete => repository.delete(item) + case Mark => + repository.getById(item) match { + case Some(x) => repository.update(x.copy(completed = true)) + case None => throw new RuntimeException(s"Can not find the $item task") + } + } + } + + override def tryRetry(item: Item): Future[Option[Item]] = transactions.run { _ => + import Strategy.OnAttempt._ + + logger.trace(phi"tryRetry($item)") + + val newAttempts = item.attempts + 1 + val action = strategy.on(newAttempts) + logger.debug(phi"Action for ${Unsafe(newAttempts)}: $action") + + action match { + case Continue(newInterval) => + val draftItem = item.copy( + attempts = newAttempts, + nextAttempt = LocalDateTime.now().plus(newInterval.toMillis, ChronoUnit.MILLIS) + ) + + logger.debug(draftItem) + Some(repository.update(draftItem)) + + case Complete => + repository.delete(item.id) + None + } + } +} |