aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/common/concurrent
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/common/concurrent')
-rw-r--r--src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueue.scala88
-rw-r--r--src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala136
-rw-r--r--src/main/scala/xyz/driver/common/concurrent/Cron.scala97
-rw-r--r--src/main/scala/xyz/driver/common/concurrent/InMemoryBridgeUploadQueue.scala38
-rw-r--r--src/main/scala/xyz/driver/common/concurrent/MdcExecutionContext.scala35
-rw-r--r--src/main/scala/xyz/driver/common/concurrent/MdcThreadFactory.scala33
6 files changed, 0 insertions, 427 deletions
diff --git a/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueue.scala b/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueue.scala
deleted file mode 100644
index 6ecb299..0000000
--- a/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueue.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-package xyz.driver.common.concurrent
-
-import java.time.LocalDateTime
-
-import xyz.driver.common.concurrent.BridgeUploadQueue.Item
-import xyz.driver.common.domain.LongId
-import xyz.driver.common.logging._
-
-import scala.concurrent.Future
-
-object BridgeUploadQueue {
-
- /**
- * @param kind For example documents
- * @param tag For example, a patient's id: 1
- * @param attempts Which attempt
- * @param created When the task was created
- * @param nextAttempt Time of the next attempt
- */
- final case class Item(id: LongId[Item],
- kind: String,
- tag: String,
- created: LocalDateTime,
- attempts: Int,
- nextAttempt: LocalDateTime,
- completed: Boolean,
- dependencyKind: Option[String],
- dependencyTag: Option[String]) {
-
- def dependency: Option[Dependency] = {
- dependencyKind.zip(dependencyTag)
- .headOption
- .map(Function.tupled(Dependency.apply))
- }
-
- }
-
- object Item {
-
- implicit def toPhiString(x: Item): PhiString = {
- import x._
- phi"BridgeUploadQueue.Item(id=$id, kind=${Unsafe(kind)}, tag=${Unsafe(tag)}, " +
- phi"attempts=${Unsafe(attempts)}, start=$created, nextAttempt=$nextAttempt, completed=$completed, " +
- phi"dependency=$dependency)"
- }
-
- def apply(kind: String, tag: String, dependency: Option[Dependency] = None): Item = {
- val now = LocalDateTime.now()
-
- Item(
- id = LongId(0),
- kind = kind,
- tag = tag,
- created = now,
- attempts = 0,
- nextAttempt = now,
- completed = false,
- dependencyKind = dependency.map(_.kind),
- dependencyTag = dependency.map(_.tag)
- )
- }
-
- }
-
- final case class Dependency(kind: String, tag: String)
-
- object Dependency {
-
- implicit def toPhiString(x: Dependency): PhiString = {
- import x._
- phi"Dependency(kind=${Unsafe(kind)}, tag=${Unsafe(tag)})"
- }
-
- }
-
-}
-
-trait BridgeUploadQueue {
-
- def add(item: Item): Future[Unit]
-
- def get(kind: String): Future[Option[Item]]
-
- def remove(item: LongId[Item]): Future[Unit]
-
- def tryRetry(item: Item): Future[Option[Item]]
-
-}
diff --git a/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala b/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala
deleted file mode 100644
index c6a2144..0000000
--- a/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-package xyz.driver.common.concurrent
-
-import java.time.LocalDateTime
-import java.time.temporal.ChronoUnit
-
-import xyz.driver.common.concurrent.BridgeUploadQueue.Item
-import xyz.driver.common.concurrent.BridgeUploadQueueRepositoryAdapter.Strategy
-import xyz.driver.common.db.Transactions
-import xyz.driver.common.db.repositories.BridgeUploadQueueRepository
-import xyz.driver.common.domain.LongId
-import xyz.driver.common.logging._
-
-import scala.concurrent.duration.{Duration, FiniteDuration}
-import scala.concurrent.{ExecutionContext, Future}
-
-object BridgeUploadQueueRepositoryAdapter {
-
- sealed trait Strategy {
-
- def onComplete: Strategy.OnComplete
-
- def on(attempt: Int): Strategy.OnAttempt
-
- }
-
- object Strategy {
-
- /**
- * Works forever, but has a limit for intervals.
- */
- final case class LimitExponential(startInterval: FiniteDuration,
- intervalFactor: Double,
- maxInterval: FiniteDuration,
- onComplete: OnComplete) extends Strategy {
-
- override def on(attempt: Int): OnAttempt = {
- OnAttempt.Continue(intervalFor(attempt).min(maxInterval))
- }
-
- private def intervalFor(attempt: Int): Duration = {
- startInterval * Math.pow(intervalFactor, attempt.toDouble)
- }
- }
-
- /**
- * Used only in tests.
- */
- case object Ignore extends Strategy {
-
- override val onComplete = OnComplete.Delete
-
- override def on(attempt: Int) = OnAttempt.Complete
-
- }
-
- /**
- * Used only in tests.
- */
- final case class Constant(interval: FiniteDuration) extends Strategy {
-
- override val onComplete = OnComplete.Delete
-
- override def on(attempt: Int) = OnAttempt.Continue(interval)
-
- }
-
- sealed trait OnComplete
- object OnComplete {
- case object Delete extends OnComplete
- case object Mark extends OnComplete
-
- implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString)
- }
-
- sealed trait OnAttempt
- object OnAttempt {
- case object Complete extends OnAttempt
- case class Continue(interval: Duration) extends OnAttempt
-
- implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString)
- }
- }
-}
-
-class BridgeUploadQueueRepositoryAdapter(strategy: Strategy,
- repository: BridgeUploadQueueRepository,
- transactions: Transactions)
- (implicit executionContext: ExecutionContext)
- extends BridgeUploadQueue with PhiLogging {
-
- override def add(item: Item): Future[Unit] = transactions.run { _ =>
- repository.add(item)
- }
-
- override def get(kind: String): Future[Option[Item]] = {
- repository.getOne(kind)
- }
-
- override def remove(item: LongId[Item]): Future[Unit] = transactions.run { _ =>
- import Strategy.OnComplete._
-
- strategy.onComplete match {
- case Delete => repository.delete(item)
- case Mark =>
- repository.getById(item) match {
- case Some(x) => repository.update(x.copy(completed = true))
- case None => throw new RuntimeException(s"Can not find the $item task")
- }
- }
- }
-
- override def tryRetry(item: Item): Future[Option[Item]] = transactions.run { _ =>
- import Strategy.OnAttempt._
-
- logger.trace(phi"tryRetry($item)")
-
- val newAttempts = item.attempts + 1
- val action = strategy.on(newAttempts)
- logger.debug(phi"Action for ${Unsafe(newAttempts)}: $action")
-
- action match {
- case Continue(newInterval) =>
- val draftItem = item.copy(
- attempts = newAttempts,
- nextAttempt = LocalDateTime.now().plus(newInterval.toMillis, ChronoUnit.MILLIS)
- )
-
- logger.debug(draftItem)
- Some(repository.update(draftItem))
-
- case Complete =>
- repository.delete(item.id)
- None
- }
- }
-}
diff --git a/src/main/scala/xyz/driver/common/concurrent/Cron.scala b/src/main/scala/xyz/driver/common/concurrent/Cron.scala
deleted file mode 100644
index 9dd3155..0000000
--- a/src/main/scala/xyz/driver/common/concurrent/Cron.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-package xyz.driver.common.concurrent
-
-import java.io.Closeable
-import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.{Timer, TimerTask}
-
-import com.typesafe.scalalogging.StrictLogging
-import org.slf4j.MDC
-import xyz.driver.common.error.ExceptionFormatter
-import xyz.driver.common.utils.RandomUtils
-
-import scala.concurrent.duration.FiniteDuration
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success, Try}
-
-class Cron(settings: Cron.Settings) extends Closeable with StrictLogging {
-
- import Cron._
-
- private val timer = new Timer("cronTimer", true)
-
- private val jobs = ConcurrentHashMap.newKeySet[String]()
-
- def register(name: String)(job: () => Future[Unit])(implicit ec: ExecutionContext): Unit = {
- logger.trace("register({})", name)
- val disableList = settings.disable.split(",").map(_.trim).toList
- if (disableList.contains(name)) logger.info("The task '{}' is disabled", name)
- else {
- settings.intervals.get(name) match {
- case None =>
- logger.error("Can not find an interval for task '{}', check the settings", name)
- throw new IllegalArgumentException(s"Can not find an interval for task '$name', check the settings")
-
- case Some(period) =>
- logger.info("register a new task '{}' with a period of {}ms", name, period.toMillis.asInstanceOf[AnyRef])
- timer.schedule(new SingletonTask(name, job), 0, period.toMillis)
- }
- }
-
- jobs.add(name)
- }
-
- /**
- * Checks unused jobs
- */
- def verify(): Unit = {
- import scala.collection.JavaConversions.asScalaSet
-
- val unusedJobs = settings.intervals.keySet -- jobs.toSet
- unusedJobs.foreach { job =>
- logger.warn(s"The job '$job' is listed, but not registered or ignored")
- }
- }
-
- override def close(): Unit = {
- timer.cancel()
- }
-
-}
-
-object Cron {
-
- case class Settings(disable: String, intervals: Map[String, FiniteDuration])
-
- private class SingletonTask(taskName: String,
- job: () => Future[Unit])
- (implicit ec: ExecutionContext)
- extends TimerTask with StrictLogging {
-
- private val isWorking = new AtomicBoolean(false)
-
- override def run(): Unit = {
- if (isWorking.compareAndSet(false, true)) {
- MDC.put("userId", "cron")
- MDC.put("requestId", RandomUtils.randomString(15))
-
- logger.info("Start '{}'", taskName)
- Try {
- job()
- .andThen {
- case Success(_) => logger.info("'{}' is completed", taskName)
- case Failure(e) => logger.error(s"Job '{}' is failed: ${ExceptionFormatter.format(e)}", taskName)
- }
- .onComplete(_ => isWorking.set(false))
- } match {
- case Success(_) =>
- case Failure(e) =>
- logger.error("Can't start '{}'", taskName, e)
- }
- } else {
- logger.debug("The previous job '{}' is in progress", taskName)
- }
- }
- }
-
-}
diff --git a/src/main/scala/xyz/driver/common/concurrent/InMemoryBridgeUploadQueue.scala b/src/main/scala/xyz/driver/common/concurrent/InMemoryBridgeUploadQueue.scala
deleted file mode 100644
index b19be42..0000000
--- a/src/main/scala/xyz/driver/common/concurrent/InMemoryBridgeUploadQueue.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-package xyz.driver.common.concurrent
-
-import java.util.concurrent.LinkedBlockingQueue
-
-import xyz.driver.common.concurrent.BridgeUploadQueue.Item
-import xyz.driver.common.domain.LongId
-import xyz.driver.common.logging.PhiLogging
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Future
-
-/**
- * Use it only for tests
- */
-class InMemoryBridgeUploadQueue extends BridgeUploadQueue with PhiLogging {
-
- private val queue = new LinkedBlockingQueue[Item]()
-
- override def add(item: Item): Future[Unit] = {
- queue.add(item)
- done
- }
-
- override def tryRetry(item: Item): Future[Option[Item]] = Future.successful(Some(item))
-
- override def get(kind: String): Future[Option[Item]] = {
- val r = queue.iterator().asScala.find(_.kind == kind)
- Future.successful(r)
- }
-
- override def remove(item: LongId[Item]): Future[Unit] = {
- queue.remove(item)
- done
- }
-
- private val done = Future.successful(())
-
-}
diff --git a/src/main/scala/xyz/driver/common/concurrent/MdcExecutionContext.scala b/src/main/scala/xyz/driver/common/concurrent/MdcExecutionContext.scala
deleted file mode 100644
index cd2b394..0000000
--- a/src/main/scala/xyz/driver/common/concurrent/MdcExecutionContext.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-package xyz.driver.common.concurrent
-
-import org.slf4j.MDC
-
-import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
-
-object MdcExecutionContext {
- def from(orig: ExecutionContext): ExecutionContext = new MdcExecutionContext(orig)
-}
-
-class MdcExecutionContext(orig: ExecutionContext) extends ExecutionContextExecutor {
-
- def execute(runnable: Runnable): Unit = {
- val parentMdcContext = MDC.getCopyOfContextMap
-
- orig.execute(new Runnable {
- def run(): Unit = {
- val saveMdcContext = MDC.getCopyOfContextMap
- setContextMap(parentMdcContext)
-
- try {
- runnable.run()
- } finally {
- setContextMap(saveMdcContext)
- }
- }
- })
- }
-
- private[this] def setContextMap(context: java.util.Map[String, String]): Unit =
- Option(context).fold(MDC.clear())(MDC.setContextMap)
-
- def reportFailure(t: Throwable): Unit = orig.reportFailure(t)
-
-}
diff --git a/src/main/scala/xyz/driver/common/concurrent/MdcThreadFactory.scala b/src/main/scala/xyz/driver/common/concurrent/MdcThreadFactory.scala
deleted file mode 100644
index 9e59a64..0000000
--- a/src/main/scala/xyz/driver/common/concurrent/MdcThreadFactory.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-package xyz.driver.common.concurrent
-
-import java.util.concurrent.ThreadFactory
-
-import org.slf4j.MDC
-
-object MdcThreadFactory {
- def from(orig: ThreadFactory): ThreadFactory = new MdcThreadFactory(orig)
-}
-
-class MdcThreadFactory(orig: ThreadFactory) extends ThreadFactory {
-
- override def newThread(runnable: Runnable): Thread = {
- val parentMdcContext = MDC.getCopyOfContextMap
-
- orig.newThread(new Runnable {
- def run(): Unit = {
- val saveMdcContext = MDC.getCopyOfContextMap
- setContextMap(parentMdcContext)
-
- try {
- runnable.run()
- } finally {
- setContextMap(saveMdcContext)
- }
- }
- })
- }
-
- private[this] def setContextMap(context: java.util.Map[String, String]): Unit =
- Option(context).fold(MDC.clear())(MDC.setContextMap)
-
-}