diff options
Diffstat (limited to 'src/main/scala/xyz/driver/common/concurrent')
6 files changed, 427 insertions, 0 deletions
diff --git a/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueue.scala b/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueue.scala new file mode 100644 index 0000000..6ecb299 --- /dev/null +++ b/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueue.scala @@ -0,0 +1,88 @@ +package xyz.driver.common.concurrent + +import java.time.LocalDateTime + +import xyz.driver.common.concurrent.BridgeUploadQueue.Item +import xyz.driver.common.domain.LongId +import xyz.driver.common.logging._ + +import scala.concurrent.Future + +object BridgeUploadQueue { + + /** + * @param kind For example documents + * @param tag For example, a patient's id: 1 + * @param attempts Which attempt + * @param created When the task was created + * @param nextAttempt Time of the next attempt + */ + final case class Item(id: LongId[Item], + kind: String, + tag: String, + created: LocalDateTime, + attempts: Int, + nextAttempt: LocalDateTime, + completed: Boolean, + dependencyKind: Option[String], + dependencyTag: Option[String]) { + + def dependency: Option[Dependency] = { + dependencyKind.zip(dependencyTag) + .headOption + .map(Function.tupled(Dependency.apply)) + } + + } + + object Item { + + implicit def toPhiString(x: Item): PhiString = { + import x._ + phi"BridgeUploadQueue.Item(id=$id, kind=${Unsafe(kind)}, tag=${Unsafe(tag)}, " + + phi"attempts=${Unsafe(attempts)}, start=$created, nextAttempt=$nextAttempt, completed=$completed, " + + phi"dependency=$dependency)" + } + + def apply(kind: String, tag: String, dependency: Option[Dependency] = None): Item = { + val now = LocalDateTime.now() + + Item( + id = LongId(0), + kind = kind, + tag = tag, + created = now, + attempts = 0, + nextAttempt = now, + completed = false, + dependencyKind = dependency.map(_.kind), + dependencyTag = dependency.map(_.tag) + ) + } + + } + + final case class Dependency(kind: String, tag: String) + + object Dependency { + + implicit def toPhiString(x: Dependency): PhiString = { + import x._ + phi"Dependency(kind=${Unsafe(kind)}, tag=${Unsafe(tag)})" + } + + } + +} + +trait BridgeUploadQueue { + + def add(item: Item): Future[Unit] + + def get(kind: String): Future[Option[Item]] + + def remove(item: LongId[Item]): Future[Unit] + + def tryRetry(item: Item): Future[Option[Item]] + +} diff --git a/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala b/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala new file mode 100644 index 0000000..c6a2144 --- /dev/null +++ b/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala @@ -0,0 +1,136 @@ +package xyz.driver.common.concurrent + +import java.time.LocalDateTime +import java.time.temporal.ChronoUnit + +import xyz.driver.common.concurrent.BridgeUploadQueue.Item +import xyz.driver.common.concurrent.BridgeUploadQueueRepositoryAdapter.Strategy +import xyz.driver.common.db.Transactions +import xyz.driver.common.db.repositories.BridgeUploadQueueRepository +import xyz.driver.common.domain.LongId +import xyz.driver.common.logging._ + +import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.concurrent.{ExecutionContext, Future} + +object BridgeUploadQueueRepositoryAdapter { + + sealed trait Strategy { + + def onComplete: Strategy.OnComplete + + def on(attempt: Int): Strategy.OnAttempt + + } + + object Strategy { + + /** + * Works forever, but has a limit for intervals. + */ + final case class LimitExponential(startInterval: FiniteDuration, + intervalFactor: Double, + maxInterval: FiniteDuration, + onComplete: OnComplete) extends Strategy { + + override def on(attempt: Int): OnAttempt = { + OnAttempt.Continue(intervalFor(attempt).min(maxInterval)) + } + + private def intervalFor(attempt: Int): Duration = { + startInterval * Math.pow(intervalFactor, attempt.toDouble) + } + } + + /** + * Used only in tests. + */ + case object Ignore extends Strategy { + + override val onComplete = OnComplete.Delete + + override def on(attempt: Int) = OnAttempt.Complete + + } + + /** + * Used only in tests. + */ + final case class Constant(interval: FiniteDuration) extends Strategy { + + override val onComplete = OnComplete.Delete + + override def on(attempt: Int) = OnAttempt.Continue(interval) + + } + + sealed trait OnComplete + object OnComplete { + case object Delete extends OnComplete + case object Mark extends OnComplete + + implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString) + } + + sealed trait OnAttempt + object OnAttempt { + case object Complete extends OnAttempt + case class Continue(interval: Duration) extends OnAttempt + + implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString) + } + } +} + +class BridgeUploadQueueRepositoryAdapter(strategy: Strategy, + repository: BridgeUploadQueueRepository, + transactions: Transactions) + (implicit executionContext: ExecutionContext) + extends BridgeUploadQueue with PhiLogging { + + override def add(item: Item): Future[Unit] = transactions.run { _ => + repository.add(item) + } + + override def get(kind: String): Future[Option[Item]] = { + repository.getOne(kind) + } + + override def remove(item: LongId[Item]): Future[Unit] = transactions.run { _ => + import Strategy.OnComplete._ + + strategy.onComplete match { + case Delete => repository.delete(item) + case Mark => + repository.getById(item) match { + case Some(x) => repository.update(x.copy(completed = true)) + case None => throw new RuntimeException(s"Can not find the $item task") + } + } + } + + override def tryRetry(item: Item): Future[Option[Item]] = transactions.run { _ => + import Strategy.OnAttempt._ + + logger.trace(phi"tryRetry($item)") + + val newAttempts = item.attempts + 1 + val action = strategy.on(newAttempts) + logger.debug(phi"Action for ${Unsafe(newAttempts)}: $action") + + action match { + case Continue(newInterval) => + val draftItem = item.copy( + attempts = newAttempts, + nextAttempt = LocalDateTime.now().plus(newInterval.toMillis, ChronoUnit.MILLIS) + ) + + logger.debug(draftItem) + Some(repository.update(draftItem)) + + case Complete => + repository.delete(item.id) + None + } + } +} diff --git a/src/main/scala/xyz/driver/common/concurrent/Cron.scala b/src/main/scala/xyz/driver/common/concurrent/Cron.scala new file mode 100644 index 0000000..9dd3155 --- /dev/null +++ b/src/main/scala/xyz/driver/common/concurrent/Cron.scala @@ -0,0 +1,97 @@ +package xyz.driver.common.concurrent + +import java.io.Closeable +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean +import java.util.{Timer, TimerTask} + +import com.typesafe.scalalogging.StrictLogging +import org.slf4j.MDC +import xyz.driver.common.error.ExceptionFormatter +import xyz.driver.common.utils.RandomUtils + +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success, Try} + +class Cron(settings: Cron.Settings) extends Closeable with StrictLogging { + + import Cron._ + + private val timer = new Timer("cronTimer", true) + + private val jobs = ConcurrentHashMap.newKeySet[String]() + + def register(name: String)(job: () => Future[Unit])(implicit ec: ExecutionContext): Unit = { + logger.trace("register({})", name) + val disableList = settings.disable.split(",").map(_.trim).toList + if (disableList.contains(name)) logger.info("The task '{}' is disabled", name) + else { + settings.intervals.get(name) match { + case None => + logger.error("Can not find an interval for task '{}', check the settings", name) + throw new IllegalArgumentException(s"Can not find an interval for task '$name', check the settings") + + case Some(period) => + logger.info("register a new task '{}' with a period of {}ms", name, period.toMillis.asInstanceOf[AnyRef]) + timer.schedule(new SingletonTask(name, job), 0, period.toMillis) + } + } + + jobs.add(name) + } + + /** + * Checks unused jobs + */ + def verify(): Unit = { + import scala.collection.JavaConversions.asScalaSet + + val unusedJobs = settings.intervals.keySet -- jobs.toSet + unusedJobs.foreach { job => + logger.warn(s"The job '$job' is listed, but not registered or ignored") + } + } + + override def close(): Unit = { + timer.cancel() + } + +} + +object Cron { + + case class Settings(disable: String, intervals: Map[String, FiniteDuration]) + + private class SingletonTask(taskName: String, + job: () => Future[Unit]) + (implicit ec: ExecutionContext) + extends TimerTask with StrictLogging { + + private val isWorking = new AtomicBoolean(false) + + override def run(): Unit = { + if (isWorking.compareAndSet(false, true)) { + MDC.put("userId", "cron") + MDC.put("requestId", RandomUtils.randomString(15)) + + logger.info("Start '{}'", taskName) + Try { + job() + .andThen { + case Success(_) => logger.info("'{}' is completed", taskName) + case Failure(e) => logger.error(s"Job '{}' is failed: ${ExceptionFormatter.format(e)}", taskName) + } + .onComplete(_ => isWorking.set(false)) + } match { + case Success(_) => + case Failure(e) => + logger.error("Can't start '{}'", taskName, e) + } + } else { + logger.debug("The previous job '{}' is in progress", taskName) + } + } + } + +} diff --git a/src/main/scala/xyz/driver/common/concurrent/InMemoryBridgeUploadQueue.scala b/src/main/scala/xyz/driver/common/concurrent/InMemoryBridgeUploadQueue.scala new file mode 100644 index 0000000..b19be42 --- /dev/null +++ b/src/main/scala/xyz/driver/common/concurrent/InMemoryBridgeUploadQueue.scala @@ -0,0 +1,38 @@ +package xyz.driver.common.concurrent + +import java.util.concurrent.LinkedBlockingQueue + +import xyz.driver.common.concurrent.BridgeUploadQueue.Item +import xyz.driver.common.domain.LongId +import xyz.driver.common.logging.PhiLogging + +import scala.collection.JavaConverters._ +import scala.concurrent.Future + +/** + * Use it only for tests + */ +class InMemoryBridgeUploadQueue extends BridgeUploadQueue with PhiLogging { + + private val queue = new LinkedBlockingQueue[Item]() + + override def add(item: Item): Future[Unit] = { + queue.add(item) + done + } + + override def tryRetry(item: Item): Future[Option[Item]] = Future.successful(Some(item)) + + override def get(kind: String): Future[Option[Item]] = { + val r = queue.iterator().asScala.find(_.kind == kind) + Future.successful(r) + } + + override def remove(item: LongId[Item]): Future[Unit] = { + queue.remove(item) + done + } + + private val done = Future.successful(()) + +} diff --git a/src/main/scala/xyz/driver/common/concurrent/MdcExecutionContext.scala b/src/main/scala/xyz/driver/common/concurrent/MdcExecutionContext.scala new file mode 100644 index 0000000..cd2b394 --- /dev/null +++ b/src/main/scala/xyz/driver/common/concurrent/MdcExecutionContext.scala @@ -0,0 +1,35 @@ +package xyz.driver.common.concurrent + +import org.slf4j.MDC + +import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} + +object MdcExecutionContext { + def from(orig: ExecutionContext): ExecutionContext = new MdcExecutionContext(orig) +} + +class MdcExecutionContext(orig: ExecutionContext) extends ExecutionContextExecutor { + + def execute(runnable: Runnable): Unit = { + val parentMdcContext = MDC.getCopyOfContextMap + + orig.execute(new Runnable { + def run(): Unit = { + val saveMdcContext = MDC.getCopyOfContextMap + setContextMap(parentMdcContext) + + try { + runnable.run() + } finally { + setContextMap(saveMdcContext) + } + } + }) + } + + private[this] def setContextMap(context: java.util.Map[String, String]): Unit = + Option(context).fold(MDC.clear())(MDC.setContextMap) + + def reportFailure(t: Throwable): Unit = orig.reportFailure(t) + +} diff --git a/src/main/scala/xyz/driver/common/concurrent/MdcThreadFactory.scala b/src/main/scala/xyz/driver/common/concurrent/MdcThreadFactory.scala new file mode 100644 index 0000000..9e59a64 --- /dev/null +++ b/src/main/scala/xyz/driver/common/concurrent/MdcThreadFactory.scala @@ -0,0 +1,33 @@ +package xyz.driver.common.concurrent + +import java.util.concurrent.ThreadFactory + +import org.slf4j.MDC + +object MdcThreadFactory { + def from(orig: ThreadFactory): ThreadFactory = new MdcThreadFactory(orig) +} + +class MdcThreadFactory(orig: ThreadFactory) extends ThreadFactory { + + override def newThread(runnable: Runnable): Thread = { + val parentMdcContext = MDC.getCopyOfContextMap + + orig.newThread(new Runnable { + def run(): Unit = { + val saveMdcContext = MDC.getCopyOfContextMap + setContextMap(parentMdcContext) + + try { + runnable.run() + } finally { + setContextMap(saveMdcContext) + } + } + }) + } + + private[this] def setContextMap(context: java.util.Map[String, String]): Unit = + Option(context).fold(MDC.clear())(MDC.setContextMap) + +} |