diff options
Diffstat (limited to 'src/main/scala/xyz/driver/pdsuicommon/concurrent')
4 files changed, 98 insertions, 39 deletions
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueue.scala index feb3774..8213262 100644 --- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueue.scala +++ b/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueue.scala @@ -3,7 +3,6 @@ package xyz.driver.pdsuicommon.concurrent import java.time.LocalDateTime import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Item -import xyz.driver.pdsuicommon.domain.LongId import xyz.driver.pdsuicommon.logging._ import scala.concurrent.Future @@ -17,8 +16,7 @@ object BridgeUploadQueue { * @param created When the task was created * @param nextAttempt Time of the next attempt */ - final case class Item(id: LongId[Item], - kind: String, + final case class Item(kind: String, tag: String, created: LocalDateTime, attempts: Int, @@ -40,7 +38,7 @@ object BridgeUploadQueue { implicit def toPhiString(x: Item): PhiString = { import x._ - phi"BridgeUploadQueue.Item(id=$id, kind=${Unsafe(kind)}, tag=${Unsafe(tag)}, " + + phi"BridgeUploadQueue.Item(kind=${Unsafe(kind)}, tag=${Unsafe(tag)}, " + phi"attempts=${Unsafe(attempts)}, start=$created, nextAttempt=$nextAttempt, completed=$completed, " + phi"dependency=$dependency)" } @@ -49,7 +47,6 @@ object BridgeUploadQueue { val now = LocalDateTime.now() Item( - id = LongId(0), kind = kind, tag = tag, created = now, @@ -76,11 +73,11 @@ object BridgeUploadQueue { trait BridgeUploadQueue { - def add(item: Item): Future[Unit] + def add(item: Item): Future[Item] def get(kind: String): Future[Option[Item]] - def remove(item: LongId[Item]): Future[Unit] + def complete(kind: String, tag: String): Future[Unit] def tryRetry(item: Item): Future[Option[Item]] diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala index 528be59..48c81c2 100644 --- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala +++ b/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala @@ -5,9 +5,8 @@ import java.time.temporal.ChronoUnit import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Item import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueueRepositoryAdapter.Strategy -import xyz.driver.pdsuicommon.db.Transactions +import xyz.driver.pdsuicommon.db._ import xyz.driver.pdsuicommon.db.repositories.BridgeUploadQueueRepository -import xyz.driver.pdsuicommon.domain.LongId import xyz.driver.pdsuicommon.logging._ import scala.concurrent.duration.{Duration, FiniteDuration} @@ -16,6 +15,9 @@ import scala.util.Try object BridgeUploadQueueRepositoryAdapter { + /** + * Defines how we work with queue, when an user attempts to remove/tryRetry an item. + */ sealed trait Strategy { def onComplete: Strategy.OnComplete @@ -48,9 +50,7 @@ object BridgeUploadQueueRepositoryAdapter { /** * Used only in tests. */ - case object Ignore extends Strategy { - - override val onComplete = OnComplete.Delete + final case class Stop(onComplete: OnComplete = OnComplete.Delete) extends Strategy { override def on(attempt: Int) = OnAttempt.Complete @@ -85,33 +85,33 @@ object BridgeUploadQueueRepositoryAdapter { } } -class BridgeUploadQueueRepositoryAdapter(strategy: Strategy, - repository: BridgeUploadQueueRepository, - transactions: Transactions)(implicit executionContext: ExecutionContext) +class BridgeUploadQueueRepositoryAdapter(strategy: Strategy, repository: BridgeUploadQueueRepository, dbIo: DbIo)( + implicit executionContext: ExecutionContext) extends BridgeUploadQueue with PhiLogging { - override def add(item: Item): Future[Unit] = transactions.run { _ => - repository.add(item) - } + override def add(item: Item): Future[Item] = dbIo.runAsync(repository.add(item)) - override def get(kind: String): Future[Option[Item]] = { - repository.getOne(kind) - } + override def get(kind: String): Future[Option[Item]] = dbIo.runAsync(repository.getOne(kind)) - override def remove(item: LongId[Item]): Future[Unit] = transactions.run { _ => + override def complete(kind: String, tag: String): Future[Unit] = { import Strategy.OnComplete._ strategy.onComplete match { - case Delete => repository.delete(item) + case Delete => dbIo.runAsync(repository.delete(kind, tag)) case Mark => - repository.getById(item) match { - case Some(x) => repository.update(x.copy(completed = true)) - case None => throw new RuntimeException(s"Can not find the $item task") + dbIo.runAsyncTx { + repository.getById(kind, tag) match { + case Some(x) => repository.update(x.copy(completed = true)) + case None => throw new RuntimeException(s"Can not find the task: kind=$kind, tag=$tag") + } } } } - override def tryRetry(item: Item): Future[Option[Item]] = transactions.run { _ => + /** + * Tries to continue the task or complete it + */ + override def tryRetry(item: Item): Future[Option[Item]] = { import Strategy.OnAttempt._ logger.trace(phi"tryRetry($item)") @@ -128,11 +128,13 @@ class BridgeUploadQueueRepositoryAdapter(strategy: Strategy, ) logger.debug(draftItem) - Some(repository.update(draftItem)) + dbIo.runAsync { + Some(repository.update(draftItem)) + } case Complete => - repository.delete(item.id) - None + logger.warn(phi"All attempts are out for $item, complete the task") + complete(item.kind, item.tag).map(_ => None) } } } diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala index bff566b..658b5b1 100644 --- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala +++ b/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala @@ -1,9 +1,9 @@ package xyz.driver.pdsuicommon.concurrent import java.util.concurrent.LinkedBlockingQueue +import java.util.function.Predicate import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Item -import xyz.driver.pdsuicommon.domain.LongId import xyz.driver.pdsuicommon.logging.PhiLogging import scala.collection.JavaConverters._ @@ -16,9 +16,9 @@ class InMemoryBridgeUploadQueue extends BridgeUploadQueue with PhiLogging { private val queue = new LinkedBlockingQueue[Item]() - override def add(item: Item): Future[Unit] = { + override def add(item: Item): Future[Item] = { queue.add(item) - done + Future.successful(item) } override def tryRetry(item: Item): Future[Option[Item]] = Future.successful(Some(item)) @@ -28,11 +28,10 @@ class InMemoryBridgeUploadQueue extends BridgeUploadQueue with PhiLogging { Future.successful(r) } - override def remove(item: LongId[Item]): Future[Unit] = { - queue.remove(item) - done + override def complete(kind: String, tag: String): Future[Unit] = { + queue.removeIf(new Predicate[Item] { + override def test(t: Item): Boolean = t.kind == kind && t.tag == tag + }) + Future.successful(()) } - - private val done = Future.successful(()) - } diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala new file mode 100644 index 0000000..bab29d5 --- /dev/null +++ b/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala @@ -0,0 +1,61 @@ +package xyz.driver.pdsuicommon.concurrent + +import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Dependency +import xyz.driver.pdsuicommon.concurrent.SafeBridgeUploadQueue.{DependencyResolver, SafeTask, Tag} +import xyz.driver.pdsuicommon.logging._ +import xyz.driver.pdsuicommon.serialization.Marshaller + +import scala.concurrent.{ExecutionContext, Future} + +object SafeBridgeUploadQueue { + + trait Tag extends Product with Serializable + + case class SafeTask[T <: Tag](tag: T, + private[SafeBridgeUploadQueue] val queueItem: BridgeUploadQueue.Item) + + object SafeTask { + implicit def toPhiString[T <: Tag](x: SafeTask[T]): PhiString = { + import x._ + phi"SafeTask(tag=${Unsafe(tag)}, $queueItem)" + } + } + + trait DependencyResolver[T <: Tag] { + def getDependency(tag: T): Option[Dependency] + } + +} + +class SafeBridgeUploadQueue[T <: Tag](kind: String, + origQueue: BridgeUploadQueue) + (implicit + tagMarshaller: Marshaller[T, String], + dependencyResolver: DependencyResolver[T], + executionContext: ExecutionContext) { + + type Task = SafeTask[T] + + def add(tag: T): Future[BridgeUploadQueue.Item] = origQueue.add(BridgeUploadQueue.Item( + kind = kind, + tag = tagMarshaller.write(tag), + dependency = dependencyResolver.getDependency(tag) + )) + + def tryRetry(task: Task): Future[Option[Task]] = wrap(origQueue.tryRetry(task.queueItem)) + + def get: Future[Option[Task]] = wrap(origQueue.get(kind)) + + def complete(tag: T): Future[Unit] = origQueue.complete(kind, tagMarshaller.write(tag)) + + private def wrap(x: Future[Option[BridgeUploadQueue.Item]]): Future[Option[Task]] = x.map(_.map(cover)) + + private def cover(rawTask: BridgeUploadQueue.Item): Task = { + val tag = tagMarshaller + .read(rawTask.tag) + .getOrElse(throw new IllegalArgumentException(s"Can not parse tag '${rawTask.tag}'")) + + SafeTask(tag, rawTask) + } + +} |