From a997aa6539d1f0af4ab4fc395ff2033335da312a Mon Sep 17 00:00:00 2001 From: vlad Date: Fri, 30 Jun 2017 12:29:54 -0700 Subject: Latest PDS UI utils --- .../BridgeUploadQueueRepositoryAdapter.scala | 48 +++++++++++----------- 1 file changed, 25 insertions(+), 23 deletions(-) (limited to 'src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala') diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala index 528be59..48c81c2 100644 --- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala +++ b/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala @@ -5,9 +5,8 @@ import java.time.temporal.ChronoUnit import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Item import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueueRepositoryAdapter.Strategy -import xyz.driver.pdsuicommon.db.Transactions +import xyz.driver.pdsuicommon.db._ import xyz.driver.pdsuicommon.db.repositories.BridgeUploadQueueRepository -import xyz.driver.pdsuicommon.domain.LongId import xyz.driver.pdsuicommon.logging._ import scala.concurrent.duration.{Duration, FiniteDuration} @@ -16,6 +15,9 @@ import scala.util.Try object BridgeUploadQueueRepositoryAdapter { + /** + * Defines how we work with queue, when an user attempts to remove/tryRetry an item. + */ sealed trait Strategy { def onComplete: Strategy.OnComplete @@ -48,9 +50,7 @@ object BridgeUploadQueueRepositoryAdapter { /** * Used only in tests. */ - case object Ignore extends Strategy { - - override val onComplete = OnComplete.Delete + final case class Stop(onComplete: OnComplete = OnComplete.Delete) extends Strategy { override def on(attempt: Int) = OnAttempt.Complete @@ -85,33 +85,33 @@ object BridgeUploadQueueRepositoryAdapter { } } -class BridgeUploadQueueRepositoryAdapter(strategy: Strategy, - repository: BridgeUploadQueueRepository, - transactions: Transactions)(implicit executionContext: ExecutionContext) +class BridgeUploadQueueRepositoryAdapter(strategy: Strategy, repository: BridgeUploadQueueRepository, dbIo: DbIo)( + implicit executionContext: ExecutionContext) extends BridgeUploadQueue with PhiLogging { - override def add(item: Item): Future[Unit] = transactions.run { _ => - repository.add(item) - } + override def add(item: Item): Future[Item] = dbIo.runAsync(repository.add(item)) - override def get(kind: String): Future[Option[Item]] = { - repository.getOne(kind) - } + override def get(kind: String): Future[Option[Item]] = dbIo.runAsync(repository.getOne(kind)) - override def remove(item: LongId[Item]): Future[Unit] = transactions.run { _ => + override def complete(kind: String, tag: String): Future[Unit] = { import Strategy.OnComplete._ strategy.onComplete match { - case Delete => repository.delete(item) + case Delete => dbIo.runAsync(repository.delete(kind, tag)) case Mark => - repository.getById(item) match { - case Some(x) => repository.update(x.copy(completed = true)) - case None => throw new RuntimeException(s"Can not find the $item task") + dbIo.runAsyncTx { + repository.getById(kind, tag) match { + case Some(x) => repository.update(x.copy(completed = true)) + case None => throw new RuntimeException(s"Can not find the task: kind=$kind, tag=$tag") + } } } } - override def tryRetry(item: Item): Future[Option[Item]] = transactions.run { _ => + /** + * Tries to continue the task or complete it + */ + override def tryRetry(item: Item): Future[Option[Item]] = { import Strategy.OnAttempt._ logger.trace(phi"tryRetry($item)") @@ -128,11 +128,13 @@ class BridgeUploadQueueRepositoryAdapter(strategy: Strategy, ) logger.debug(draftItem) - Some(repository.update(draftItem)) + dbIo.runAsync { + Some(repository.update(draftItem)) + } case Complete => - repository.delete(item.id) - None + logger.warn(phi"All attempts are out for $item, complete the task") + complete(item.kind, item.tag).map(_ => None) } } } -- cgit v1.2.3