diff options
Diffstat (limited to 'src/main/scala/xyz/driver/pdsuicommon/concurrent')
6 files changed, 0 insertions, 342 deletions
diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueue.scala deleted file mode 100644 index 8213262..0000000 --- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/BridgeUploadQueue.scala +++ /dev/null @@ -1,84 +0,0 @@ -package xyz.driver.pdsuicommon.concurrent - -import java.time.LocalDateTime - -import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Item -import xyz.driver.pdsuicommon.logging._ - -import scala.concurrent.Future - -object BridgeUploadQueue { - - /** - * @param kind For example documents - * @param tag For example, a patient's id: 1 - * @param attempts Which attempt - * @param created When the task was created - * @param nextAttempt Time of the next attempt - */ - final case class Item(kind: String, - tag: String, - created: LocalDateTime, - attempts: Int, - nextAttempt: LocalDateTime, - completed: Boolean, - dependencyKind: Option[String], - dependencyTag: Option[String]) { - - def dependency: Option[Dependency] = { - dependencyKind - .zip(dependencyTag) - .headOption - .map(Function.tupled(Dependency.apply)) - } - - } - - object Item { - - implicit def toPhiString(x: Item): PhiString = { - import x._ - phi"BridgeUploadQueue.Item(kind=${Unsafe(kind)}, tag=${Unsafe(tag)}, " + - phi"attempts=${Unsafe(attempts)}, start=$created, nextAttempt=$nextAttempt, completed=$completed, " + - phi"dependency=$dependency)" - } - - def apply(kind: String, tag: String, dependency: Option[Dependency] = None): Item = { - val now = LocalDateTime.now() - - Item( - kind = kind, - tag = tag, - created = now, - attempts = 0, - nextAttempt = now, - completed = false, - dependencyKind = dependency.map(_.kind), - dependencyTag = dependency.map(_.tag) - ) - } - - } - - final case class Dependency(kind: String, tag: String) - - object Dependency { - - implicit def toPhiString(x: Dependency): PhiString = { - import x._ - phi"Dependency(kind=${Unsafe(kind)}, tag=${Unsafe(tag)})" - } - } -} - -trait BridgeUploadQueue { - - def add(item: Item): Future[Item] - - def get(kind: String): Future[Option[Item]] - - def complete(kind: String, tag: String): Future[Unit] - - def tryRetry(item: Item): Future[Option[Item]] - -} diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/Cron.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/Cron.scala deleted file mode 100644 index 6659088..0000000 --- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/Cron.scala +++ /dev/null @@ -1,93 +0,0 @@ -package xyz.driver.pdsuicommon.concurrent - -import java.io.Closeable -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicBoolean -import java.util.{Timer, TimerTask} - -import com.typesafe.scalalogging.StrictLogging -import org.slf4j.MDC -import xyz.driver.pdsuicommon.error.ExceptionFormatter -import xyz.driver.pdsuicommon.utils.RandomUtils - -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success, Try} - -class Cron(settings: Cron.Settings) extends Closeable with StrictLogging { - - import Cron._ - - private val timer = new Timer("cronTimer", true) - - private val jobs = ConcurrentHashMap.newKeySet[String]() - - def register(name: String)(job: () => Future[Unit])(implicit ec: ExecutionContext): Unit = { - logger.trace("register({})", name) - val disableList = settings.disable.split(",").map(_.trim).toList - if (disableList.contains(name)) logger.info("The task '{}' is disabled", name) - else { - settings.intervals.get(name) match { - case None => - logger.error("Can not find an interval for task '{}', check the settings", name) - throw new IllegalArgumentException(s"Can not find an interval for task '$name', check the settings") - - case Some(period) => - logger.info("register a new task '{}' with a period of {}ms", name, period.toMillis.asInstanceOf[AnyRef]) - timer.schedule(new SingletonTask(name, job), 0, period.toMillis) - } - } - - jobs.add(name) - } - - /** - * Checks unused jobs - */ - def verify(): Unit = { - import scala.collection.JavaConverters._ - - val unusedJobs = settings.intervals.keySet -- jobs.asScala.toSet - unusedJobs.foreach { job => - logger.warn(s"The job '$job' is listed, but not registered or ignored") - } - } - - override def close(): Unit = { - timer.cancel() - } -} - -object Cron { - - final case class Settings(disable: String, intervals: Map[String, FiniteDuration]) - - private class SingletonTask(taskName: String, job: () => Future[Unit])(implicit ec: ExecutionContext) - extends TimerTask with StrictLogging { - - private val isWorking = new AtomicBoolean(false) - - override def run(): Unit = { - if (isWorking.compareAndSet(false, true)) { - MDC.put("userId", "cron") - MDC.put("requestId", RandomUtils.randomString(15)) - - logger.info("Start '{}'", taskName) - Try { - job() - .andThen { - case Success(_) => logger.info("'{}' is completed", taskName) - case Failure(e) => logger.error(s"Job '{}' is failed: ${ExceptionFormatter.format(e)}", taskName) - } - .onComplete(_ => isWorking.set(false)) - } match { - case Success(_) => - case Failure(e) => - logger.error("Can't start '{}'", taskName, e) - } - } else { - logger.debug("The previous job '{}' is in progress", taskName) - } - } - } -} diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala deleted file mode 100644 index 658b5b1..0000000 --- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/InMemoryBridgeUploadQueue.scala +++ /dev/null @@ -1,37 +0,0 @@ -package xyz.driver.pdsuicommon.concurrent - -import java.util.concurrent.LinkedBlockingQueue -import java.util.function.Predicate - -import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Item -import xyz.driver.pdsuicommon.logging.PhiLogging - -import scala.collection.JavaConverters._ -import scala.concurrent.Future - -/** - * Use it only for tests - */ -class InMemoryBridgeUploadQueue extends BridgeUploadQueue with PhiLogging { - - private val queue = new LinkedBlockingQueue[Item]() - - override def add(item: Item): Future[Item] = { - queue.add(item) - Future.successful(item) - } - - override def tryRetry(item: Item): Future[Option[Item]] = Future.successful(Some(item)) - - override def get(kind: String): Future[Option[Item]] = { - val r = queue.iterator().asScala.find(_.kind == kind) - Future.successful(r) - } - - override def complete(kind: String, tag: String): Future[Unit] = { - queue.removeIf(new Predicate[Item] { - override def test(t: Item): Boolean = t.kind == kind && t.tag == tag - }) - Future.successful(()) - } -} diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcExecutionContext.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcExecutionContext.scala deleted file mode 100644 index 3dee8ea..0000000 --- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcExecutionContext.scala +++ /dev/null @@ -1,35 +0,0 @@ -package xyz.driver.pdsuicommon.concurrent - -import org.slf4j.MDC - -import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} - -object MdcExecutionContext { - def from(orig: ExecutionContext): ExecutionContext = new MdcExecutionContext(orig) -} - -class MdcExecutionContext(orig: ExecutionContext) extends ExecutionContextExecutor { - - def execute(runnable: Runnable): Unit = { - val parentMdcContext = MDC.getCopyOfContextMap - - orig.execute(new Runnable { - def run(): Unit = { - val saveMdcContext = MDC.getCopyOfContextMap - setContextMap(parentMdcContext) - - try { - runnable.run() - } finally { - setContextMap(saveMdcContext) - } - } - }) - } - - private[this] def setContextMap(context: java.util.Map[String, String]): Unit = - Option(context).fold(MDC.clear())(MDC.setContextMap) - - def reportFailure(t: Throwable): Unit = orig.reportFailure(t) - -} diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcThreadFactory.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcThreadFactory.scala deleted file mode 100644 index d1dc3ae..0000000 --- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/MdcThreadFactory.scala +++ /dev/null @@ -1,33 +0,0 @@ -package xyz.driver.pdsuicommon.concurrent - -import java.util.concurrent.ThreadFactory - -import org.slf4j.MDC - -object MdcThreadFactory { - def from(orig: ThreadFactory): ThreadFactory = new MdcThreadFactory(orig) -} - -class MdcThreadFactory(orig: ThreadFactory) extends ThreadFactory { - - override def newThread(runnable: Runnable): Thread = { - val parentMdcContext = MDC.getCopyOfContextMap - - orig.newThread(new Runnable { - def run(): Unit = { - val saveMdcContext = MDC.getCopyOfContextMap - setContextMap(parentMdcContext) - - try { - runnable.run() - } finally { - setContextMap(saveMdcContext) - } - } - }) - } - - private[this] def setContextMap(context: java.util.Map[String, String]): Unit = - Option(context).fold(MDC.clear())(MDC.setContextMap) - -} diff --git a/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala b/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala deleted file mode 100644 index 2f7fe6c..0000000 --- a/src/main/scala/xyz/driver/pdsuicommon/concurrent/SafeBridgeUploadQueue.scala +++ /dev/null @@ -1,60 +0,0 @@ -package xyz.driver.pdsuicommon.concurrent - -import xyz.driver.pdsuicommon.concurrent.BridgeUploadQueue.Dependency -import xyz.driver.pdsuicommon.concurrent.SafeBridgeUploadQueue.{DependencyResolver, SafeTask, Tag} -import xyz.driver.pdsuicommon.logging._ -import xyz.driver.pdsuicommon.serialization.Marshaller - -import scala.concurrent.{ExecutionContext, Future} - -object SafeBridgeUploadQueue { - - trait Tag extends Product with Serializable - - final case class SafeTask[T <: Tag](tag: T, private[SafeBridgeUploadQueue] val queueItem: BridgeUploadQueue.Item) - - object SafeTask { - implicit def toPhiString[T <: Tag](x: SafeTask[T]): PhiString = { - import x._ - phi"SafeTask(tag=${Unsafe(tag)}, $queueItem)" - } - } - - trait DependencyResolver[T <: Tag] { - def getDependency(tag: T): Option[Dependency] - } - -} - -class SafeBridgeUploadQueue[T <: Tag](kind: String, origQueue: BridgeUploadQueue)( - implicit tagMarshaller: Marshaller[T, String], - dependencyResolver: DependencyResolver[T], - executionContext: ExecutionContext) { - - type Task = SafeTask[T] - - def add(tag: T): Future[BridgeUploadQueue.Item] = - origQueue.add( - BridgeUploadQueue.Item( - kind = kind, - tag = tagMarshaller.write(tag), - dependency = dependencyResolver.getDependency(tag) - )) - - def tryRetry(task: Task): Future[Option[Task]] = wrap(origQueue.tryRetry(task.queueItem)) - - def get: Future[Option[Task]] = wrap(origQueue.get(kind)) - - def complete(tag: T): Future[Unit] = origQueue.complete(kind, tagMarshaller.write(tag)) - - private def wrap(x: Future[Option[BridgeUploadQueue.Item]]): Future[Option[Task]] = x.map(_.map(cover)) - - private def cover(rawTask: BridgeUploadQueue.Item): Task = { - val tag = tagMarshaller - .read(rawTask.tag) - .getOrElse(throw new IllegalArgumentException(s"Can not parse tag '${rawTask.tag}'")) - - SafeTask(tag, rawTask) - } - -} |