aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/xyz/driver/pdsuicommon/concurrent/Cron.scala
blob: 66590889dedc990f650ced07b324968fa5cc1a08 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package xyz.driver.pdsuicommon.concurrent

import java.io.Closeable
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.{Timer, TimerTask}

import com.typesafe.scalalogging.StrictLogging
import org.slf4j.MDC
import xyz.driver.pdsuicommon.error.ExceptionFormatter
import xyz.driver.pdsuicommon.utils.RandomUtils

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

class Cron(settings: Cron.Settings) extends Closeable with StrictLogging {

  import Cron._

  private val timer = new Timer("cronTimer", true)

  private val jobs = ConcurrentHashMap.newKeySet[String]()

  def register(name: String)(job: () => Future[Unit])(implicit ec: ExecutionContext): Unit = {
    logger.trace("register({})", name)
    val disableList = settings.disable.split(",").map(_.trim).toList
    if (disableList.contains(name)) logger.info("The task '{}' is disabled", name)
    else {
      settings.intervals.get(name) match {
        case None =>
          logger.error("Can not find an interval for task '{}', check the settings", name)
          throw new IllegalArgumentException(s"Can not find an interval for task '$name', check the settings")

        case Some(period) =>
          logger.info("register a new task '{}' with a period of {}ms", name, period.toMillis.asInstanceOf[AnyRef])
          timer.schedule(new SingletonTask(name, job), 0, period.toMillis)
      }
    }

    jobs.add(name)
  }

  /**
    * Checks unused jobs
    */
  def verify(): Unit = {
    import scala.collection.JavaConverters._

    val unusedJobs = settings.intervals.keySet -- jobs.asScala.toSet
    unusedJobs.foreach { job =>
      logger.warn(s"The job '$job' is listed, but not registered or ignored")
    }
  }

  override def close(): Unit = {
    timer.cancel()
  }
}

object Cron {

  final case class Settings(disable: String, intervals: Map[String, FiniteDuration])

  private class SingletonTask(taskName: String, job: () => Future[Unit])(implicit ec: ExecutionContext)
      extends TimerTask with StrictLogging {

    private val isWorking = new AtomicBoolean(false)

    override def run(): Unit = {
      if (isWorking.compareAndSet(false, true)) {
        MDC.put("userId", "cron")
        MDC.put("requestId", RandomUtils.randomString(15))

        logger.info("Start '{}'", taskName)
        Try {
          job()
            .andThen {
              case Success(_) => logger.info("'{}' is completed", taskName)
              case Failure(e) => logger.error(s"Job '{}' is failed: ${ExceptionFormatter.format(e)}", taskName)
            }
            .onComplete(_ => isWorking.set(false))
        } match {
          case Success(_) =>
          case Failure(e) =>
            logger.error("Can't start '{}'", taskName, e)
        }
      } else {
        logger.debug("The previous job '{}' is in progress", taskName)
      }
    }
  }
}