aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/pdsuicommon/concurrent
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/pdsuicommon/concurrent')
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueue.scala86
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala136
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/concurrent/Cron.scala95
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala38
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcExecutionContext.scala35
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcThreadFactory.scala33
6 files changed, 423 insertions, 0 deletions
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueue.scala
new file mode 100644
index 0000000..320666d
--- /dev/null
+++ b/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueue.scala
@@ -0,0 +1,86 @@
+package xyz.driver.pdsuicommon.concurrent
+
+import java.time.LocalDateTime
+
+import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Item
+import xyz.driver.pdsuicommon.domain.LongId
+import xyz.driver.pdsuicommon.logging._
+
+import scala.concurrent.Future
+
+object BridgeUploadQueue {
+
+ /**
+ * @param kind For example documents
+ * @param tag For example, a patient's id: 1
+ * @param attempts Which attempt
+ * @param created When the task was created
+ * @param nextAttempt Time of the next attempt
+ */
+ final case class Item(id: LongId[Item],
+ kind: String,
+ tag: String,
+ created: LocalDateTime,
+ attempts: Int,
+ nextAttempt: LocalDateTime,
+ completed: Boolean,
+ dependencyKind: Option[String],
+ dependencyTag: Option[String]) {
+
+ def dependency: Option[Dependency] = {
+ dependencyKind.zip(dependencyTag)
+ .headOption
+ .map(Function.tupled(Dependency.apply))
+ }
+
+ }
+
+ object Item {
+
+ implicit def toPhiString(x: Item): PhiString = {
+ import x._
+ phi"BridgeUploadQueue.Item(id=$id, kind=${Unsafe(kind)}, tag=${Unsafe(tag)}, " +
+ phi"attempts=${Unsafe(attempts)}, start=$created, nextAttempt=$nextAttempt, completed=$completed, " +
+ phi"dependency=$dependency)"
+ }
+
+ def apply(kind: String, tag: String, dependency: Option[Dependency] = None): Item = {
+ val now = LocalDateTime.now()
+
+ Item(
+ id = LongId(0),
+ kind = kind,
+ tag = tag,
+ created = now,
+ attempts = 0,
+ nextAttempt = now,
+ completed = false,
+ dependencyKind = dependency.map(_.kind),
+ dependencyTag = dependency.map(_.tag)
+ )
+ }
+
+ }
+
+ final case class Dependency(kind: String, tag: String)
+
+ object Dependency {
+
+ implicit def toPhiString(x: Dependency): PhiString = {
+ import x._
+ phi"Dependency(kind=${Unsafe(kind)}, tag=${Unsafe(tag)})"
+ }
+ }
+}
+
+trait BridgeUploadQueue {
+
+ def add(item: Item): Future[Unit]
+
+ def get(kind: String): Future[Option[Item]]
+
+ def remove(item: LongId[Item]): Future[Unit]
+
+ def tryRetry(item: Item): Future[Option[Item]]
+
+}
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala
new file mode 100644
index 0000000..8c87b60
--- /dev/null
+++ b/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueueRepositoryAdapter.scala
@@ -0,0 +1,136 @@
+package xyz.driver.pdsuicommon.concurrent
+
+import java.time.LocalDateTime
+import java.time.temporal.ChronoUnit
+
+import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Item
+import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueueRepositoryAdapter.Strategy
+import xyz.driver.pdsuicommon.db.Transactions
+import xyz.driver.pdsuicommon.db.repositories.BridgeUploadQueueRepository
+import xyz.driver.pdsuicommon.domain.LongId
+import xyz.driver.pdsuicommon.logging._
+
+import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.concurrent.{ExecutionContext, Future}
+
+object BridgeUploadQueueRepositoryAdapter {
+
+ sealed trait Strategy {
+
+ def onComplete: Strategy.OnComplete
+
+ def on(attempt: Int): Strategy.OnAttempt
+
+ }
+
+ object Strategy {
+
+ /**
+ * Works forever, but has a limit for intervals.
+ */
+ final case class LimitExponential(startInterval: FiniteDuration,
+ intervalFactor: Double,
+ maxInterval: FiniteDuration,
+ onComplete: OnComplete) extends Strategy {
+
+ override def on(attempt: Int): OnAttempt = {
+ OnAttempt.Continue(intervalFor(attempt).min(maxInterval))
+ }
+
+ private def intervalFor(attempt: Int): Duration = {
+ startInterval * Math.pow(intervalFactor, attempt.toDouble)
+ }
+ }
+
+ /**
+ * Used only in tests.
+ */
+ case object Ignore extends Strategy {
+
+ override val onComplete = OnComplete.Delete
+
+ override def on(attempt: Int) = OnAttempt.Complete
+
+ }
+
+ /**
+ * Used only in tests.
+ */
+ final case class Constant(interval: FiniteDuration) extends Strategy {
+
+ override val onComplete = OnComplete.Delete
+
+ override def on(attempt: Int) = OnAttempt.Continue(interval)
+
+ }
+
+ sealed trait OnComplete
+ object OnComplete {
+ case object Delete extends OnComplete
+ case object Mark extends OnComplete
+
+ implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString)
+ }
+
+ sealed trait OnAttempt
+ object OnAttempt {
+ case object Complete extends OnAttempt
+ case class Continue(interval: Duration) extends OnAttempt
+
+ implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString)
+ }
+ }
+}
+
+class BridgeUploadQueueRepositoryAdapter(strategy: Strategy,
+ repository: BridgeUploadQueueRepository,
+ transactions: Transactions)
+ (implicit executionContext: ExecutionContext)
+ extends BridgeUploadQueue with PhiLogging {
+
+ override def add(item: Item): Future[Unit] = transactions.run { _ =>
+ repository.add(item)
+ }
+
+ override def get(kind: String): Future[Option[Item]] = {
+ repository.getOne(kind)
+ }
+
+ override def remove(item: LongId[Item]): Future[Unit] = transactions.run { _ =>
+ import Strategy.OnComplete._
+
+ strategy.onComplete match {
+ case Delete => repository.delete(item)
+ case Mark =>
+ repository.getById(item) match {
+ case Some(x) => repository.update(x.copy(completed = true))
+ case None => throw new RuntimeException(s"Can not find the $item task")
+ }
+ }
+ }
+
+ override def tryRetry(item: Item): Future[Option[Item]] = transactions.run { _ =>
+ import Strategy.OnAttempt._
+
+ logger.trace(phi"tryRetry($item)")
+
+ val newAttempts = item.attempts + 1
+ val action = strategy.on(newAttempts)
+ logger.debug(phi"Action for ${Unsafe(newAttempts)}: $action")
+
+ action match {
+ case Continue(newInterval) =>
+ val draftItem = item.copy(
+ attempts = newAttempts,
+ nextAttempt = LocalDateTime.now().plus(newInterval.toMillis, ChronoUnit.MILLIS)
+ )
+
+ logger.debug(draftItem)
+ Some(repository.update(draftItem))
+
+ case Complete =>
+ repository.delete(item.id)
+ None
+ }
+ }
+}
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/Cron.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/Cron.scala
new file mode 100644
index 0000000..a6e9d2c
--- /dev/null
+++ b/src/main/scala/xyz/driver/pdsuicommon/concurrent/Cron.scala
@@ -0,0 +1,95 @@
+package xyz.driver.pdsuicommon.concurrent
+
+import java.io.Closeable
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.{Timer, TimerTask}
+
+import com.typesafe.scalalogging.StrictLogging
+import org.slf4j.MDC
+import xyz.driver.pdsuicommon.error.ExceptionFormatter
+import xyz.driver.pdsuicommon.utils.RandomUtils
+
+import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success, Try}
+
+class Cron(settings: Cron.Settings) extends Closeable with StrictLogging {
+
+ import Cron._
+
+ private val timer = new Timer("cronTimer", true)
+
+ private val jobs = ConcurrentHashMap.newKeySet[String]()
+
+ def register(name: String)(job: () => Future[Unit])(implicit ec: ExecutionContext): Unit = {
+ logger.trace("register({})", name)
+ val disableList = settings.disable.split(",").map(_.trim).toList
+ if (disableList.contains(name)) logger.info("The task '{}' is disabled", name)
+ else {
+ settings.intervals.get(name) match {
+ case None =>
+ logger.error("Can not find an interval for task '{}', check the settings", name)
+ throw new IllegalArgumentException(s"Can not find an interval for task '$name', check the settings")
+
+ case Some(period) =>
+ logger.info("register a new task '{}' with a period of {}ms", name, period.toMillis.asInstanceOf[AnyRef])
+ timer.schedule(new SingletonTask(name, job), 0, period.toMillis)
+ }
+ }
+
+ jobs.add(name)
+ }
+
+ /**
+ * Checks unused jobs
+ */
+ def verify(): Unit = {
+ import scala.collection.JavaConversions.asScalaSet
+
+ val unusedJobs = settings.intervals.keySet -- jobs.toSet
+ unusedJobs.foreach { job =>
+ logger.warn(s"The job '$job' is listed, but not registered or ignored")
+ }
+ }
+
+ override def close(): Unit = {
+ timer.cancel()
+ }
+}
+
+object Cron {
+
+ case class Settings(disable: String, intervals: Map[String, FiniteDuration])
+
+ private class SingletonTask(taskName: String,
+ job: () => Future[Unit])
+ (implicit ec: ExecutionContext)
+ extends TimerTask with StrictLogging {
+
+ private val isWorking = new AtomicBoolean(false)
+
+ override def run(): Unit = {
+ if (isWorking.compareAndSet(false, true)) {
+ MDC.put("userId", "cron")
+ MDC.put("requestId", RandomUtils.randomString(15))
+
+ logger.info("Start '{}'", taskName)
+ Try {
+ job()
+ .andThen {
+ case Success(_) => logger.info("'{}' is completed", taskName)
+ case Failure(e) => logger.error(s"Job '{}' is failed: ${ExceptionFormatter.format(e)}", taskName)
+ }
+ .onComplete(_ => isWorking.set(false))
+ } match {
+ case Success(_) =>
+ case Failure(e) =>
+ logger.error("Can't start '{}'", taskName, e)
+ }
+ } else {
+ logger.debug("The previous job '{}' is in progress", taskName)
+ }
+ }
+ }
+}
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala
new file mode 100644
index 0000000..bff566b
--- /dev/null
+++ b/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala
@@ -0,0 +1,38 @@
+package xyz.driver.pdsuicommon.concurrent
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Item
+import xyz.driver.pdsuicommon.domain.LongId
+import xyz.driver.pdsuicommon.logging.PhiLogging
+
+import scala.collection.JavaConverters._
+import scala.concurrent.Future
+
+/**
+ * Use it only for tests
+ */
+class InMemoryBridgeUploadQueue extends BridgeUploadQueue with PhiLogging {
+
+ private val queue = new LinkedBlockingQueue[Item]()
+
+ override def add(item: Item): Future[Unit] = {
+ queue.add(item)
+ done
+ }
+
+ override def tryRetry(item: Item): Future[Option[Item]] = Future.successful(Some(item))
+
+ override def get(kind: String): Future[Option[Item]] = {
+ val r = queue.iterator().asScala.find(_.kind == kind)
+ Future.successful(r)
+ }
+
+ override def remove(item: LongId[Item]): Future[Unit] = {
+ queue.remove(item)
+ done
+ }
+
+ private val done = Future.successful(())
+
+}
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcExecutionContext.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcExecutionContext.scala
new file mode 100644
index 0000000..3dee8ea
--- /dev/null
+++ b/src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcExecutionContext.scala
@@ -0,0 +1,35 @@
+package xyz.driver.pdsuicommon.concurrent
+
+import org.slf4j.MDC
+
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
+
+object MdcExecutionContext {
+ def from(orig: ExecutionContext): ExecutionContext = new MdcExecutionContext(orig)
+}
+
+class MdcExecutionContext(orig: ExecutionContext) extends ExecutionContextExecutor {
+
+ def execute(runnable: Runnable): Unit = {
+ val parentMdcContext = MDC.getCopyOfContextMap
+
+ orig.execute(new Runnable {
+ def run(): Unit = {
+ val saveMdcContext = MDC.getCopyOfContextMap
+ setContextMap(parentMdcContext)
+
+ try {
+ runnable.run()
+ } finally {
+ setContextMap(saveMdcContext)
+ }
+ }
+ })
+ }
+
+ private[this] def setContextMap(context: java.util.Map[String, String]): Unit =
+ Option(context).fold(MDC.clear())(MDC.setContextMap)
+
+ def reportFailure(t: Throwable): Unit = orig.reportFailure(t)
+
+}
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcThreadFactory.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcThreadFactory.scala
new file mode 100644
index 0000000..d1dc3ae
--- /dev/null
+++ b/src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcThreadFactory.scala
@@ -0,0 +1,33 @@
+package xyz.driver.pdsuicommon.concurrent
+
+import java.util.concurrent.ThreadFactory
+
+import org.slf4j.MDC
+
+object MdcThreadFactory {
+ def from(orig: ThreadFactory): ThreadFactory = new MdcThreadFactory(orig)
+}
+
+class MdcThreadFactory(orig: ThreadFactory) extends ThreadFactory {
+
+ override def newThread(runnable: Runnable): Thread = {
+ val parentMdcContext = MDC.getCopyOfContextMap
+
+ orig.newThread(new Runnable {
+ def run(): Unit = {
+ val saveMdcContext = MDC.getCopyOfContextMap
+ setContextMap(parentMdcContext)
+
+ try {
+ runnable.run()
+ } finally {
+ setContextMap(saveMdcContext)
+ }
+ }
+ })
+ }
+
+ private[this] def setContextMap(context: java.util.Map[String, String]): Unit =
+ Option(context).fold(MDC.clear())(MDC.setContextMap)
+
+}