aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/pdsuicommon/concurrent
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/pdsuicommon/concurrent')
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueue.scala11
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala48
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala17
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala61
4 files changed, 98 insertions, 39 deletions
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueue.scala
index feb3774..8213262 100644
--- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueue.scala
+++ b/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueue.scala
@@ -3,7 +3,6 @@ package xyz.driver.pdsuicommon.concurrent
import java.time.LocalDateTime
import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Item
-import xyz.driver.pdsuicommon.domain.LongId
import xyz.driver.pdsuicommon.logging._
import scala.concurrent.Future
@@ -17,8 +16,7 @@ object BridgeUploadQueue {
* @param created When the task was created
* @param nextAttempt Time of the next attempt
*/
- final case class Item(id: LongId[Item],
- kind: String,
+ final case class Item(kind: String,
tag: String,
created: LocalDateTime,
attempts: Int,
@@ -40,7 +38,7 @@ object BridgeUploadQueue {
implicit def toPhiString(x: Item): PhiString = {
import x._
- phi"BridgeUploadQueue.Item(id=$id, kind=${Unsafe(kind)}, tag=${Unsafe(tag)}, " +
+ phi"BridgeUploadQueue.Item(kind=${Unsafe(kind)}, tag=${Unsafe(tag)}, " +
phi"attempts=${Unsafe(attempts)}, start=$created, nextAttempt=$nextAttempt, completed=$completed, " +
phi"dependency=$dependency)"
}
@@ -49,7 +47,6 @@ object BridgeUploadQueue {
val now = LocalDateTime.now()
Item(
- id = LongId(0),
kind = kind,
tag = tag,
created = now,
@@ -76,11 +73,11 @@ object BridgeUploadQueue {
trait BridgeUploadQueue {
- def add(item: Item): Future[Unit]
+ def add(item: Item): Future[Item]
def get(kind: String): Future[Option[Item]]
- def remove(item: LongId[Item]): Future[Unit]
+ def complete(kind: String, tag: String): Future[Unit]
def tryRetry(item: Item): Future[Option[Item]]
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala
index 528be59..48c81c2 100644
--- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala
+++ b/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala
@@ -5,9 +5,8 @@ import java.time.temporal.ChronoUnit
import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Item
import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueueRepositoryAdapter.Strategy
-import xyz.driver.pdsuicommon.db.Transactions
+import xyz.driver.pdsuicommon.db._
import xyz.driver.pdsuicommon.db.repositories.BridgeUploadQueueRepository
-import xyz.driver.pdsuicommon.domain.LongId
import xyz.driver.pdsuicommon.logging._
import scala.concurrent.duration.{Duration, FiniteDuration}
@@ -16,6 +15,9 @@ import scala.util.Try
object BridgeUploadQueueRepositoryAdapter {
+ /**
+ * Defines how we work with queue, when an user attempts to remove/tryRetry an item.
+ */
sealed trait Strategy {
def onComplete: Strategy.OnComplete
@@ -48,9 +50,7 @@ object BridgeUploadQueueRepositoryAdapter {
/**
* Used only in tests.
*/
- case object Ignore extends Strategy {
-
- override val onComplete = OnComplete.Delete
+ final case class Stop(onComplete: OnComplete = OnComplete.Delete) extends Strategy {
override def on(attempt: Int) = OnAttempt.Complete
@@ -85,33 +85,33 @@ object BridgeUploadQueueRepositoryAdapter {
}
}
-class BridgeUploadQueueRepositoryAdapter(strategy: Strategy,
- repository: BridgeUploadQueueRepository,
- transactions: Transactions)(implicit executionContext: ExecutionContext)
+class BridgeUploadQueueRepositoryAdapter(strategy: Strategy, repository: BridgeUploadQueueRepository, dbIo: DbIo)(
+ implicit executionContext: ExecutionContext)
extends BridgeUploadQueue with PhiLogging {
- override def add(item: Item): Future[Unit] = transactions.run { _ =>
- repository.add(item)
- }
+ override def add(item: Item): Future[Item] = dbIo.runAsync(repository.add(item))
- override def get(kind: String): Future[Option[Item]] = {
- repository.getOne(kind)
- }
+ override def get(kind: String): Future[Option[Item]] = dbIo.runAsync(repository.getOne(kind))
- override def remove(item: LongId[Item]): Future[Unit] = transactions.run { _ =>
+ override def complete(kind: String, tag: String): Future[Unit] = {
import Strategy.OnComplete._
strategy.onComplete match {
- case Delete => repository.delete(item)
+ case Delete => dbIo.runAsync(repository.delete(kind, tag))
case Mark =>
- repository.getById(item) match {
- case Some(x) => repository.update(x.copy(completed = true))
- case None => throw new RuntimeException(s"Can not find the $item task")
+ dbIo.runAsyncTx {
+ repository.getById(kind, tag) match {
+ case Some(x) => repository.update(x.copy(completed = true))
+ case None => throw new RuntimeException(s"Can not find the task: kind=$kind, tag=$tag")
+ }
}
}
}
- override def tryRetry(item: Item): Future[Option[Item]] = transactions.run { _ =>
+ /**
+ * Tries to continue the task or complete it
+ */
+ override def tryRetry(item: Item): Future[Option[Item]] = {
import Strategy.OnAttempt._
logger.trace(phi"tryRetry($item)")
@@ -128,11 +128,13 @@ class BridgeUploadQueueRepositoryAdapter(strategy: Strategy,
)
logger.debug(draftItem)
- Some(repository.update(draftItem))
+ dbIo.runAsync {
+ Some(repository.update(draftItem))
+ }
case Complete =>
- repository.delete(item.id)
- None
+ logger.warn(phi"All attempts are out for $item, complete the task")
+ complete(item.kind, item.tag).map(_ => None)
}
}
}
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala
index bff566b..658b5b1 100644
--- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala
+++ b/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala
@@ -1,9 +1,9 @@
package xyz.driver.pdsuicommon.concurrent
import java.util.concurrent.LinkedBlockingQueue
+import java.util.function.Predicate
import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Item
-import xyz.driver.pdsuicommon.domain.LongId
import xyz.driver.pdsuicommon.logging.PhiLogging
import scala.collection.JavaConverters._
@@ -16,9 +16,9 @@ class InMemoryBridgeUploadQueue extends BridgeUploadQueue with PhiLogging {
private val queue = new LinkedBlockingQueue[Item]()
- override def add(item: Item): Future[Unit] = {
+ override def add(item: Item): Future[Item] = {
queue.add(item)
- done
+ Future.successful(item)
}
override def tryRetry(item: Item): Future[Option[Item]] = Future.successful(Some(item))
@@ -28,11 +28,10 @@ class InMemoryBridgeUploadQueue extends BridgeUploadQueue with PhiLogging {
Future.successful(r)
}
- override def remove(item: LongId[Item]): Future[Unit] = {
- queue.remove(item)
- done
+ override def complete(kind: String, tag: String): Future[Unit] = {
+ queue.removeIf(new Predicate[Item] {
+ override def test(t: Item): Boolean = t.kind == kind && t.tag == tag
+ })
+ Future.successful(())
}
-
- private val done = Future.successful(())
-
}
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala
new file mode 100644
index 0000000..bab29d5
--- /dev/null
+++ b/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala
@@ -0,0 +1,61 @@
+package xyz.driver.pdsuicommon.concurrent
+
+import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Dependency
+import xyz.driver.pdsuicommon.concurrent.SafeBridgeUploadQueue.{DependencyResolver, SafeTask, Tag}
+import xyz.driver.pdsuicommon.logging._
+import xyz.driver.pdsuicommon.serialization.Marshaller
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object SafeBridgeUploadQueue {
+
+ trait Tag extends Product with Serializable
+
+ case class SafeTask[T <: Tag](tag: T,
+ private[SafeBridgeUploadQueue] val queueItem: BridgeUploadQueue.Item)
+
+ object SafeTask {
+ implicit def toPhiString[T <: Tag](x: SafeTask[T]): PhiString = {
+ import x._
+ phi"SafeTask(tag=${Unsafe(tag)}, $queueItem)"
+ }
+ }
+
+ trait DependencyResolver[T <: Tag] {
+ def getDependency(tag: T): Option[Dependency]
+ }
+
+}
+
+class SafeBridgeUploadQueue[T <: Tag](kind: String,
+ origQueue: BridgeUploadQueue)
+ (implicit
+ tagMarshaller: Marshaller[T, String],
+ dependencyResolver: DependencyResolver[T],
+ executionContext: ExecutionContext) {
+
+ type Task = SafeTask[T]
+
+ def add(tag: T): Future[BridgeUploadQueue.Item] = origQueue.add(BridgeUploadQueue.Item(
+ kind = kind,
+ tag = tagMarshaller.write(tag),
+ dependency = dependencyResolver.getDependency(tag)
+ ))
+
+ def tryRetry(task: Task): Future[Option[Task]] = wrap(origQueue.tryRetry(task.queueItem))
+
+ def get: Future[Option[Task]] = wrap(origQueue.get(kind))
+
+ def complete(tag: T): Future[Unit] = origQueue.complete(kind, tagMarshaller.write(tag))
+
+ private def wrap(x: Future[Option[BridgeUploadQueue.Item]]): Future[Option[Task]] = x.map(_.map(cover))
+
+ private def cover(rawTask: BridgeUploadQueue.Item): Task = {
+ val tag = tagMarshaller
+ .read(rawTask.tag)
+ .getOrElse(throw new IllegalArgumentException(s"Can not parse tag '${rawTask.tag}'"))
+
+ SafeTask(tag, rawTask)
+ }
+
+}