aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/pdsuicommon/concurrent
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/xyz/driver/pdsuicommon/concurrent')
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueue.scala84
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/concurrent/Cron.scala93
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala37
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcExecutionContext.scala35
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcThreadFactory.scala33
-rw-r--r--src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala60
6 files changed, 0 insertions, 342 deletions
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueue.scala
deleted file mode 100644
index 8213262..0000000
--- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueue.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-package xyz.driver.pdsuicommon.concurrent
-
-import java.time.LocalDateTime
-
-import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Item
-import xyz.driver.pdsuicommon.logging._
-
-import scala.concurrent.Future
-
-object BridgeUploadQueue {
-
- /**
- * @param kind For example documents
- * @param tag For example, a patient's id: 1
- * @param attempts Which attempt
- * @param created When the task was created
- * @param nextAttempt Time of the next attempt
- */
- final case class Item(kind: String,
- tag: String,
- created: LocalDateTime,
- attempts: Int,
- nextAttempt: LocalDateTime,
- completed: Boolean,
- dependencyKind: Option[String],
- dependencyTag: Option[String]) {
-
- def dependency: Option[Dependency] = {
- dependencyKind
- .zip(dependencyTag)
- .headOption
- .map(Function.tupled(Dependency.apply))
- }
-
- }
-
- object Item {
-
- implicit def toPhiString(x: Item): PhiString = {
- import x._
- phi"BridgeUploadQueue.Item(kind=${Unsafe(kind)}, tag=${Unsafe(tag)}, " +
- phi"attempts=${Unsafe(attempts)}, start=$created, nextAttempt=$nextAttempt, completed=$completed, " +
- phi"dependency=$dependency)"
- }
-
- def apply(kind: String, tag: String, dependency: Option[Dependency] = None): Item = {
- val now = LocalDateTime.now()
-
- Item(
- kind = kind,
- tag = tag,
- created = now,
- attempts = 0,
- nextAttempt = now,
- completed = false,
- dependencyKind = dependency.map(_.kind),
- dependencyTag = dependency.map(_.tag)
- )
- }
-
- }
-
- final case class Dependency(kind: String, tag: String)
-
- object Dependency {
-
- implicit def toPhiString(x: Dependency): PhiString = {
- import x._
- phi"Dependency(kind=${Unsafe(kind)}, tag=${Unsafe(tag)})"
- }
- }
-}
-
-trait BridgeUploadQueue {
-
- def add(item: Item): Future[Item]
-
- def get(kind: String): Future[Option[Item]]
-
- def complete(kind: String, tag: String): Future[Unit]
-
- def tryRetry(item: Item): Future[Option[Item]]
-
-}
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/Cron.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/Cron.scala
deleted file mode 100644
index 6659088..0000000
--- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/Cron.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-package xyz.driver.pdsuicommon.concurrent
-
-import java.io.Closeable
-import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.{Timer, TimerTask}
-
-import com.typesafe.scalalogging.StrictLogging
-import org.slf4j.MDC
-import xyz.driver.pdsuicommon.error.ExceptionFormatter
-import xyz.driver.pdsuicommon.utils.RandomUtils
-
-import scala.concurrent.duration.FiniteDuration
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success, Try}
-
-class Cron(settings: Cron.Settings) extends Closeable with StrictLogging {
-
- import Cron._
-
- private val timer = new Timer("cronTimer", true)
-
- private val jobs = ConcurrentHashMap.newKeySet[String]()
-
- def register(name: String)(job: () => Future[Unit])(implicit ec: ExecutionContext): Unit = {
- logger.trace("register({})", name)
- val disableList = settings.disable.split(",").map(_.trim).toList
- if (disableList.contains(name)) logger.info("The task '{}' is disabled", name)
- else {
- settings.intervals.get(name) match {
- case None =>
- logger.error("Can not find an interval for task '{}', check the settings", name)
- throw new IllegalArgumentException(s"Can not find an interval for task '$name', check the settings")
-
- case Some(period) =>
- logger.info("register a new task '{}' with a period of {}ms", name, period.toMillis.asInstanceOf[AnyRef])
- timer.schedule(new SingletonTask(name, job), 0, period.toMillis)
- }
- }
-
- jobs.add(name)
- }
-
- /**
- * Checks unused jobs
- */
- def verify(): Unit = {
- import scala.collection.JavaConverters._
-
- val unusedJobs = settings.intervals.keySet -- jobs.asScala.toSet
- unusedJobs.foreach { job =>
- logger.warn(s"The job '$job' is listed, but not registered or ignored")
- }
- }
-
- override def close(): Unit = {
- timer.cancel()
- }
-}
-
-object Cron {
-
- final case class Settings(disable: String, intervals: Map[String, FiniteDuration])
-
- private class SingletonTask(taskName: String, job: () => Future[Unit])(implicit ec: ExecutionContext)
- extends TimerTask with StrictLogging {
-
- private val isWorking = new AtomicBoolean(false)
-
- override def run(): Unit = {
- if (isWorking.compareAndSet(false, true)) {
- MDC.put("userId", "cron")
- MDC.put("requestId", RandomUtils.randomString(15))
-
- logger.info("Start '{}'", taskName)
- Try {
- job()
- .andThen {
- case Success(_) => logger.info("'{}' is completed", taskName)
- case Failure(e) => logger.error(s"Job '{}' is failed: ${ExceptionFormatter.format(e)}", taskName)
- }
- .onComplete(_ => isWorking.set(false))
- } match {
- case Success(_) =>
- case Failure(e) =>
- logger.error("Can't start '{}'", taskName, e)
- }
- } else {
- logger.debug("The previous job '{}' is in progress", taskName)
- }
- }
- }
-}
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala
deleted file mode 100644
index 658b5b1..0000000
--- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-package xyz.driver.pdsuicommon.concurrent
-
-import java.util.concurrent.LinkedBlockingQueue
-import java.util.function.Predicate
-
-import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Item
-import xyz.driver.pdsuicommon.logging.PhiLogging
-
-import scala.collection.JavaConverters._
-import scala.concurrent.Future
-
-/**
- * Use it only for tests
- */
-class InMemoryBridgeUploadQueue extends BridgeUploadQueue with PhiLogging {
-
- private val queue = new LinkedBlockingQueue[Item]()
-
- override def add(item: Item): Future[Item] = {
- queue.add(item)
- Future.successful(item)
- }
-
- override def tryRetry(item: Item): Future[Option[Item]] = Future.successful(Some(item))
-
- override def get(kind: String): Future[Option[Item]] = {
- val r = queue.iterator().asScala.find(_.kind == kind)
- Future.successful(r)
- }
-
- override def complete(kind: String, tag: String): Future[Unit] = {
- queue.removeIf(new Predicate[Item] {
- override def test(t: Item): Boolean = t.kind == kind && t.tag == tag
- })
- Future.successful(())
- }
-}
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcExecutionContext.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcExecutionContext.scala
deleted file mode 100644
index 3dee8ea..0000000
--- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcExecutionContext.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-package xyz.driver.pdsuicommon.concurrent
-
-import org.slf4j.MDC
-
-import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
-
-object MdcExecutionContext {
- def from(orig: ExecutionContext): ExecutionContext = new MdcExecutionContext(orig)
-}
-
-class MdcExecutionContext(orig: ExecutionContext) extends ExecutionContextExecutor {
-
- def execute(runnable: Runnable): Unit = {
- val parentMdcContext = MDC.getCopyOfContextMap
-
- orig.execute(new Runnable {
- def run(): Unit = {
- val saveMdcContext = MDC.getCopyOfContextMap
- setContextMap(parentMdcContext)
-
- try {
- runnable.run()
- } finally {
- setContextMap(saveMdcContext)
- }
- }
- })
- }
-
- private[this] def setContextMap(context: java.util.Map[String, String]): Unit =
- Option(context).fold(MDC.clear())(MDC.setContextMap)
-
- def reportFailure(t: Throwable): Unit = orig.reportFailure(t)
-
-}
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcThreadFactory.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcThreadFactory.scala
deleted file mode 100644
index d1dc3ae..0000000
--- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcThreadFactory.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-package xyz.driver.pdsuicommon.concurrent
-
-import java.util.concurrent.ThreadFactory
-
-import org.slf4j.MDC
-
-object MdcThreadFactory {
- def from(orig: ThreadFactory): ThreadFactory = new MdcThreadFactory(orig)
-}
-
-class MdcThreadFactory(orig: ThreadFactory) extends ThreadFactory {
-
- override def newThread(runnable: Runnable): Thread = {
- val parentMdcContext = MDC.getCopyOfContextMap
-
- orig.newThread(new Runnable {
- def run(): Unit = {
- val saveMdcContext = MDC.getCopyOfContextMap
- setContextMap(parentMdcContext)
-
- try {
- runnable.run()
- } finally {
- setContextMap(saveMdcContext)
- }
- }
- })
- }
-
- private[this] def setContextMap(context: java.util.Map[String, String]): Unit =
- Option(context).fold(MDC.clear())(MDC.setContextMap)
-
-}
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala
deleted file mode 100644
index 2f7fe6c..0000000
--- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-package xyz.driver.pdsuicommon.concurrent
-
-import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Dependency
-import xyz.driver.pdsuicommon.concurrent.SafeBridgeUploadQueue.{DependencyResolver, SafeTask, Tag}
-import xyz.driver.pdsuicommon.logging._
-import xyz.driver.pdsuicommon.serialization.Marshaller
-
-import scala.concurrent.{ExecutionContext, Future}
-
-object SafeBridgeUploadQueue {
-
- trait Tag extends Product with Serializable
-
- final case class SafeTask[T <: Tag](tag: T, private[SafeBridgeUploadQueue] val queueItem: BridgeUploadQueue.Item)
-
- object SafeTask {
- implicit def toPhiString[T <: Tag](x: SafeTask[T]): PhiString = {
- import x._
- phi"SafeTask(tag=${Unsafe(tag)}, $queueItem)"
- }
- }
-
- trait DependencyResolver[T <: Tag] {
- def getDependency(tag: T): Option[Dependency]
- }
-
-}
-
-class SafeBridgeUploadQueue[T <: Tag](kind: String, origQueue: BridgeUploadQueue)(
- implicit tagMarshaller: Marshaller[T, String],
- dependencyResolver: DependencyResolver[T],
- executionContext: ExecutionContext) {
-
- type Task = SafeTask[T]
-
- def add(tag: T): Future[BridgeUploadQueue.Item] =
- origQueue.add(
- BridgeUploadQueue.Item(
- kind = kind,
- tag = tagMarshaller.write(tag),
- dependency = dependencyResolver.getDependency(tag)
- ))
-
- def tryRetry(task: Task): Future[Option[Task]] = wrap(origQueue.tryRetry(task.queueItem))
-
- def get: Future[Option[Task]] = wrap(origQueue.get(kind))
-
- def complete(tag: T): Future[Unit] = origQueue.complete(kind, tagMarshaller.write(tag))
-
- private def wrap(x: Future[Option[BridgeUploadQueue.Item]]): Future[Option[Task]] = x.map(_.map(cover))
-
- private def cover(rawTask: BridgeUploadQueue.Item): Task = {
- val tag = tagMarshaller
- .read(rawTask.tag)
- .getOrElse(throw new IllegalArgumentException(s"Can not parse tag '${rawTask.tag}'"))
-
- SafeTask(tag, rawTask)
- }
-
-}