diff options
author | vlad <vlad@driver.xyz> | 2017-06-13 16:12:20 -0700 |
---|---|---|
committer | vlad <vlad@driver.xyz> | 2017-06-13 16:12:20 -0700 |
commit | cd1b635b2ae90d9ac2d8b1779183a1fbd8c5fd5c (patch) | |
tree | 062e8dad1a1513e26b0fd08b1742d6ff2ee874f7 /src/main/scala/xyz/driver/common/concurrent | |
parent | 0000a65ab4479a2a40e2d6468036438e9705b4aa (diff) | |
download | rest-query-cd1b635b2ae90d9ac2d8b1779183a1fbd8c5fd5c.tar.gz rest-query-cd1b635b2ae90d9ac2d8b1779183a1fbd8c5fd5c.tar.bz2 rest-query-cd1b635b2ae90d9ac2d8b1779183a1fbd8c5fd5c.zip |
Adding domain entitiesv0.1.0
Diffstat (limited to 'src/main/scala/xyz/driver/common/concurrent')
6 files changed, 0 insertions, 427 deletions
diff --git a/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueue.scala b/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueue.scala deleted file mode 100644 index 6ecb299..0000000 --- a/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueue.scala +++ /dev/null @@ -1,88 +0,0 @@ -package xyz.driver.common.concurrent - -import java.time.LocalDateTime - -import xyz.driver.common.concurrent.BridgeUploadQueue.Item -import xyz.driver.common.domain.LongId -import xyz.driver.common.logging._ - -import scala.concurrent.Future - -object BridgeUploadQueue { - - /** - * @param kind For example documents - * @param tag For example, a patient's id: 1 - * @param attempts Which attempt - * @param created When the task was created - * @param nextAttempt Time of the next attempt - */ - final case class Item(id: LongId[Item], - kind: String, - tag: String, - created: LocalDateTime, - attempts: Int, - nextAttempt: LocalDateTime, - completed: Boolean, - dependencyKind: Option[String], - dependencyTag: Option[String]) { - - def dependency: Option[Dependency] = { - dependencyKind.zip(dependencyTag) - .headOption - .map(Function.tupled(Dependency.apply)) - } - - } - - object Item { - - implicit def toPhiString(x: Item): PhiString = { - import x._ - phi"BridgeUploadQueue.Item(id=$id, kind=${Unsafe(kind)}, tag=${Unsafe(tag)}, " + - phi"attempts=${Unsafe(attempts)}, start=$created, nextAttempt=$nextAttempt, completed=$completed, " + - phi"dependency=$dependency)" - } - - def apply(kind: String, tag: String, dependency: Option[Dependency] = None): Item = { - val now = LocalDateTime.now() - - Item( - id = LongId(0), - kind = kind, - tag = tag, - created = now, - attempts = 0, - nextAttempt = now, - completed = false, - dependencyKind = dependency.map(_.kind), - dependencyTag = dependency.map(_.tag) - ) - } - - } - - final case class Dependency(kind: String, tag: String) - - object Dependency { - - implicit def toPhiString(x: Dependency): PhiString = { - import x._ - phi"Dependency(kind=${Unsafe(kind)}, tag=${Unsafe(tag)})" - } - - } - -} - -trait BridgeUploadQueue { - - def add(item: Item): Future[Unit] - - def get(kind: String): Future[Option[Item]] - - def remove(item: LongId[Item]): Future[Unit] - - def tryRetry(item: Item): Future[Option[Item]] - -} diff --git a/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala b/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala deleted file mode 100644 index c6a2144..0000000 --- a/src/main/scala/xyz/driver/common/concurrent/BridgeUploadQueueRepositoryAdapter.scala +++ /dev/null @@ -1,136 +0,0 @@ -package xyz.driver.common.concurrent - -import java.time.LocalDateTime -import java.time.temporal.ChronoUnit - -import xyz.driver.common.concurrent.BridgeUploadQueue.Item -import xyz.driver.common.concurrent.BridgeUploadQueueRepositoryAdapter.Strategy -import xyz.driver.common.db.Transactions -import xyz.driver.common.db.repositories.BridgeUploadQueueRepository -import xyz.driver.common.domain.LongId -import xyz.driver.common.logging._ - -import scala.concurrent.duration.{Duration, FiniteDuration} -import scala.concurrent.{ExecutionContext, Future} - -object BridgeUploadQueueRepositoryAdapter { - - sealed trait Strategy { - - def onComplete: Strategy.OnComplete - - def on(attempt: Int): Strategy.OnAttempt - - } - - object Strategy { - - /** - * Works forever, but has a limit for intervals. - */ - final case class LimitExponential(startInterval: FiniteDuration, - intervalFactor: Double, - maxInterval: FiniteDuration, - onComplete: OnComplete) extends Strategy { - - override def on(attempt: Int): OnAttempt = { - OnAttempt.Continue(intervalFor(attempt).min(maxInterval)) - } - - private def intervalFor(attempt: Int): Duration = { - startInterval * Math.pow(intervalFactor, attempt.toDouble) - } - } - - /** - * Used only in tests. - */ - case object Ignore extends Strategy { - - override val onComplete = OnComplete.Delete - - override def on(attempt: Int) = OnAttempt.Complete - - } - - /** - * Used only in tests. - */ - final case class Constant(interval: FiniteDuration) extends Strategy { - - override val onComplete = OnComplete.Delete - - override def on(attempt: Int) = OnAttempt.Continue(interval) - - } - - sealed trait OnComplete - object OnComplete { - case object Delete extends OnComplete - case object Mark extends OnComplete - - implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString) - } - - sealed trait OnAttempt - object OnAttempt { - case object Complete extends OnAttempt - case class Continue(interval: Duration) extends OnAttempt - - implicit def toPhiString(x: OnAttempt): PhiString = Unsafe(x.toString) - } - } -} - -class BridgeUploadQueueRepositoryAdapter(strategy: Strategy, - repository: BridgeUploadQueueRepository, - transactions: Transactions) - (implicit executionContext: ExecutionContext) - extends BridgeUploadQueue with PhiLogging { - - override def add(item: Item): Future[Unit] = transactions.run { _ => - repository.add(item) - } - - override def get(kind: String): Future[Option[Item]] = { - repository.getOne(kind) - } - - override def remove(item: LongId[Item]): Future[Unit] = transactions.run { _ => - import Strategy.OnComplete._ - - strategy.onComplete match { - case Delete => repository.delete(item) - case Mark => - repository.getById(item) match { - case Some(x) => repository.update(x.copy(completed = true)) - case None => throw new RuntimeException(s"Can not find the $item task") - } - } - } - - override def tryRetry(item: Item): Future[Option[Item]] = transactions.run { _ => - import Strategy.OnAttempt._ - - logger.trace(phi"tryRetry($item)") - - val newAttempts = item.attempts + 1 - val action = strategy.on(newAttempts) - logger.debug(phi"Action for ${Unsafe(newAttempts)}: $action") - - action match { - case Continue(newInterval) => - val draftItem = item.copy( - attempts = newAttempts, - nextAttempt = LocalDateTime.now().plus(newInterval.toMillis, ChronoUnit.MILLIS) - ) - - logger.debug(draftItem) - Some(repository.update(draftItem)) - - case Complete => - repository.delete(item.id) - None - } - } -} diff --git a/src/main/scala/xyz/driver/common/concurrent/Cron.scala b/src/main/scala/xyz/driver/common/concurrent/Cron.scala deleted file mode 100644 index 9dd3155..0000000 --- a/src/main/scala/xyz/driver/common/concurrent/Cron.scala +++ /dev/null @@ -1,97 +0,0 @@ -package xyz.driver.common.concurrent - -import java.io.Closeable -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicBoolean -import java.util.{Timer, TimerTask} - -import com.typesafe.scalalogging.StrictLogging -import org.slf4j.MDC -import xyz.driver.common.error.ExceptionFormatter -import xyz.driver.common.utils.RandomUtils - -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success, Try} - -class Cron(settings: Cron.Settings) extends Closeable with StrictLogging { - - import Cron._ - - private val timer = new Timer("cronTimer", true) - - private val jobs = ConcurrentHashMap.newKeySet[String]() - - def register(name: String)(job: () => Future[Unit])(implicit ec: ExecutionContext): Unit = { - logger.trace("register({})", name) - val disableList = settings.disable.split(",").map(_.trim).toList - if (disableList.contains(name)) logger.info("The task '{}' is disabled", name) - else { - settings.intervals.get(name) match { - case None => - logger.error("Can not find an interval for task '{}', check the settings", name) - throw new IllegalArgumentException(s"Can not find an interval for task '$name', check the settings") - - case Some(period) => - logger.info("register a new task '{}' with a period of {}ms", name, period.toMillis.asInstanceOf[AnyRef]) - timer.schedule(new SingletonTask(name, job), 0, period.toMillis) - } - } - - jobs.add(name) - } - - /** - * Checks unused jobs - */ - def verify(): Unit = { - import scala.collection.JavaConversions.asScalaSet - - val unusedJobs = settings.intervals.keySet -- jobs.toSet - unusedJobs.foreach { job => - logger.warn(s"The job '$job' is listed, but not registered or ignored") - } - } - - override def close(): Unit = { - timer.cancel() - } - -} - -object Cron { - - case class Settings(disable: String, intervals: Map[String, FiniteDuration]) - - private class SingletonTask(taskName: String, - job: () => Future[Unit]) - (implicit ec: ExecutionContext) - extends TimerTask with StrictLogging { - - private val isWorking = new AtomicBoolean(false) - - override def run(): Unit = { - if (isWorking.compareAndSet(false, true)) { - MDC.put("userId", "cron") - MDC.put("requestId", RandomUtils.randomString(15)) - - logger.info("Start '{}'", taskName) - Try { - job() - .andThen { - case Success(_) => logger.info("'{}' is completed", taskName) - case Failure(e) => logger.error(s"Job '{}' is failed: ${ExceptionFormatter.format(e)}", taskName) - } - .onComplete(_ => isWorking.set(false)) - } match { - case Success(_) => - case Failure(e) => - logger.error("Can't start '{}'", taskName, e) - } - } else { - logger.debug("The previous job '{}' is in progress", taskName) - } - } - } - -} diff --git a/src/main/scala/xyz/driver/common/concurrent/InMemoryBridgeUploadQueue.scala b/src/main/scala/xyz/driver/common/concurrent/InMemoryBridgeUploadQueue.scala deleted file mode 100644 index b19be42..0000000 --- a/src/main/scala/xyz/driver/common/concurrent/InMemoryBridgeUploadQueue.scala +++ /dev/null @@ -1,38 +0,0 @@ -package xyz.driver.common.concurrent - -import java.util.concurrent.LinkedBlockingQueue - -import xyz.driver.common.concurrent.BridgeUploadQueue.Item -import xyz.driver.common.domain.LongId -import xyz.driver.common.logging.PhiLogging - -import scala.collection.JavaConverters._ -import scala.concurrent.Future - -/** - * Use it only for tests - */ -class InMemoryBridgeUploadQueue extends BridgeUploadQueue with PhiLogging { - - private val queue = new LinkedBlockingQueue[Item]() - - override def add(item: Item): Future[Unit] = { - queue.add(item) - done - } - - override def tryRetry(item: Item): Future[Option[Item]] = Future.successful(Some(item)) - - override def get(kind: String): Future[Option[Item]] = { - val r = queue.iterator().asScala.find(_.kind == kind) - Future.successful(r) - } - - override def remove(item: LongId[Item]): Future[Unit] = { - queue.remove(item) - done - } - - private val done = Future.successful(()) - -} diff --git a/src/main/scala/xyz/driver/common/concurrent/MdcExecutionContext.scala b/src/main/scala/xyz/driver/common/concurrent/MdcExecutionContext.scala deleted file mode 100644 index cd2b394..0000000 --- a/src/main/scala/xyz/driver/common/concurrent/MdcExecutionContext.scala +++ /dev/null @@ -1,35 +0,0 @@ -package xyz.driver.common.concurrent - -import org.slf4j.MDC - -import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} - -object MdcExecutionContext { - def from(orig: ExecutionContext): ExecutionContext = new MdcExecutionContext(orig) -} - -class MdcExecutionContext(orig: ExecutionContext) extends ExecutionContextExecutor { - - def execute(runnable: Runnable): Unit = { - val parentMdcContext = MDC.getCopyOfContextMap - - orig.execute(new Runnable { - def run(): Unit = { - val saveMdcContext = MDC.getCopyOfContextMap - setContextMap(parentMdcContext) - - try { - runnable.run() - } finally { - setContextMap(saveMdcContext) - } - } - }) - } - - private[this] def setContextMap(context: java.util.Map[String, String]): Unit = - Option(context).fold(MDC.clear())(MDC.setContextMap) - - def reportFailure(t: Throwable): Unit = orig.reportFailure(t) - -} diff --git a/src/main/scala/xyz/driver/common/concurrent/MdcThreadFactory.scala b/src/main/scala/xyz/driver/common/concurrent/MdcThreadFactory.scala deleted file mode 100644 index 9e59a64..0000000 --- a/src/main/scala/xyz/driver/common/concurrent/MdcThreadFactory.scala +++ /dev/null @@ -1,33 +0,0 @@ -package xyz.driver.common.concurrent - -import java.util.concurrent.ThreadFactory - -import org.slf4j.MDC - -object MdcThreadFactory { - def from(orig: ThreadFactory): ThreadFactory = new MdcThreadFactory(orig) -} - -class MdcThreadFactory(orig: ThreadFactory) extends ThreadFactory { - - override def newThread(runnable: Runnable): Thread = { - val parentMdcContext = MDC.getCopyOfContextMap - - orig.newThread(new Runnable { - def run(): Unit = { - val saveMdcContext = MDC.getCopyOfContextMap - setContextMap(parentMdcContext) - - try { - runnable.run() - } finally { - setContextMap(saveMdcContext) - } - } - }) - } - - private[this] def setContextMap(context: java.util.Map[String, String]): Unit = - Option(context).fold(MDC.clear())(MDC.setContextMap) - -} |