aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala')
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala48
1 files changed, 25 insertions, 23 deletions
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala
index 528be59..48c81c2 100644
--- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala
+++ b/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala
@@ -5,9 +5,8 @@ import java.time.temporal.ChronoUnit
import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Item
import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueueRepositoryAdapter.Strategy
-import xyz.driver.pdsuicommon.db.Transactions
+import xyz.driver.pdsuicommon.db._
import xyz.driver.pdsuicommon.db.repositories.BridgeUploadQueueRepository
-import xyz.driver.pdsuicommon.domain.LongId
import xyz.driver.pdsuicommon.logging._
import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -16,6 +15,9 @@ import scala.util.Try
object BridgeUploadQueueRepositoryAdapter {
+ /**
+ * Defines how we work with queue, when an user attempts to remove/tryRetry an item.
+ */
sealed trait Strategy {
def onComplete: Strategy.OnComplete
@@ -48,9 +50,7 @@ object BridgeUploadQueueRepositoryAdapter {
/**
* Used only in tests.
*/
- case object Ignore extends Strategy {
-
- override val onComplete = OnComplete.Delete
+ final case class Stop(onComplete: OnComplete = OnComplete.Delete) extends Strategy {
override def on(attempt: Int) = OnAttempt.Complete
@@ -85,33 +85,33 @@ object BridgeUploadQueueRepositoryAdapter {
}
}
-class BridgeUploadQueueRepositoryAdapter(strategy: Strategy,
- repository: BridgeUploadQueueRepository,
- transactions: Transactions)(implicit executionContext: ExecutionContext)
+class BridgeUploadQueueRepositoryAdapter(strategy: Strategy, repository: BridgeUploadQueueRepository, dbIo: DbIo)(
+ implicit executionContext: ExecutionContext)
extends BridgeUploadQueue with PhiLogging {
- override def add(item: Item): Future[Unit] = transactions.run { _ =>
- repository.add(item)
- }
+ override def add(item: Item): Future[Item] = dbIo.runAsync(repository.add(item))
- override def get(kind: String): Future[Option[Item]] = {
- repository.getOne(kind)
- }
+ override def get(kind: String): Future[Option[Item]] = dbIo.runAsync(repository.getOne(kind))
- override def remove(item: LongId[Item]): Future[Unit] = transactions.run { _ =>
+ override def complete(kind: String, tag: String): Future[Unit] = {
import Strategy.OnComplete._
strategy.onComplete match {
- case Delete => repository.delete(item)
+ case Delete => dbIo.runAsync(repository.delete(kind, tag))
case Mark =>
- repository.getById(item) match {
- case Some(x) => repository.update(x.copy(completed = true))
- case None => throw new RuntimeException(s"Can not find the $item task")
+ dbIo.runAsyncTx {
+ repository.getById(kind, tag) match {
+ case Some(x) => repository.update(x.copy(completed = true))
+ case None => throw new RuntimeException(s"Can not find the task: kind=$kind, tag=$tag")
+ }
}
}
}
- override def tryRetry(item: Item): Future[Option[Item]] = transactions.run { _ =>
+ /**
+ * Tries to continue the task or complete it
+ */
+ override def tryRetry(item: Item): Future[Option[Item]] = {
import Strategy.OnAttempt._
logger.trace(phi"tryRetry($item)")
@@ -128,11 +128,13 @@ class BridgeUploadQueueRepositoryAdapter(strategy: Strategy,
)
logger.debug(draftItem)
- Some(repository.update(draftItem))
+ dbIo.runAsync {
+ Some(repository.update(draftItem))
+ }
case Complete =>
- repository.delete(item.id)
- None
+ logger.warn(phi"All attempts are out for $item, complete the task")
+ complete(item.kind, item.tag).map(_ => None)
}
}
}