diff options
Diffstat (limited to 'src/main/scala/xyz/driver/common/concurrent/Cron.scala')
-rw-r--r-- | src/main/scala/xyz/driver/common/concurrent/Cron.scala | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/src/main/scala/xyz/driver/common/concurrent/Cron.scala b/src/main/scala/xyz/driver/common/concurrent/Cron.scala new file mode 100644 index 0000000..9dd3155 --- /dev/null +++ b/src/main/scala/xyz/driver/common/concurrent/Cron.scala @@ -0,0 +1,97 @@ +package xyz.driver.common.concurrent + +import java.io.Closeable +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean +import java.util.{Timer, TimerTask} + +import com.typesafe.scalalogging.StrictLogging +import org.slf4j.MDC +import xyz.driver.common.error.ExceptionFormatter +import xyz.driver.common.utils.RandomUtils + +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success, Try} + +class Cron(settings: Cron.Settings) extends Closeable with StrictLogging { + + import Cron._ + + private val timer = new Timer("cronTimer", true) + + private val jobs = ConcurrentHashMap.newKeySet[String]() + + def register(name: String)(job: () => Future[Unit])(implicit ec: ExecutionContext): Unit = { + logger.trace("register({})", name) + val disableList = settings.disable.split(",").map(_.trim).toList + if (disableList.contains(name)) logger.info("The task '{}' is disabled", name) + else { + settings.intervals.get(name) match { + case None => + logger.error("Can not find an interval for task '{}', check the settings", name) + throw new IllegalArgumentException(s"Can not find an interval for task '$name', check the settings") + + case Some(period) => + logger.info("register a new task '{}' with a period of {}ms", name, period.toMillis.asInstanceOf[AnyRef]) + timer.schedule(new SingletonTask(name, job), 0, period.toMillis) + } + } + + jobs.add(name) + } + + /** + * Checks unused jobs + */ + def verify(): Unit = { + import scala.collection.JavaConversions.asScalaSet + + val unusedJobs = settings.intervals.keySet -- jobs.toSet + unusedJobs.foreach { job => + logger.warn(s"The job '$job' is listed, but not registered or ignored") + } + } + + override def close(): Unit = { + timer.cancel() + } + +} + +object Cron { + + case class Settings(disable: String, intervals: Map[String, FiniteDuration]) + + private class SingletonTask(taskName: String, + job: () => Future[Unit]) + (implicit ec: ExecutionContext) + extends TimerTask with StrictLogging { + + private val isWorking = new AtomicBoolean(false) + + override def run(): Unit = { + if (isWorking.compareAndSet(false, true)) { + MDC.put("userId", "cron") + MDC.put("requestId", RandomUtils.randomString(15)) + + logger.info("Start '{}'", taskName) + Try { + job() + .andThen { + case Success(_) => logger.info("'{}' is completed", taskName) + case Failure(e) => logger.error(s"Job '{}' is failed: ${ExceptionFormatter.format(e)}", taskName) + } + .onComplete(_ => isWorking.set(false)) + } match { + case Success(_) => + case Failure(e) => + logger.error("Can't start '{}'", taskName, e) + } + } else { + logger.debug("The previous job '{}' is in progress", taskName) + } + } + } + +} |