1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
package xyz.driver.pdsuicommon.concurrent
import java.io.Closeable
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.{Timer, TimerTask}
import com.typesafe.scalalogging.StrictLogging
import org.slf4j.MDC
import xyz.driver.pdsuicommon.error.ExceptionFormatter
import xyz.driver.pdsuicommon.utils.RandomUtils
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
class Cron(settings: Cron.Settings) extends Closeable with StrictLogging {
import Cron._
private val timer = new Timer("cronTimer", true)
private val jobs = ConcurrentHashMap.newKeySet[String]()
def register(name: String)(job: () => Future[Unit])(implicit ec: ExecutionContext): Unit = {
logger.trace("register({})", name)
val disableList = settings.disable.split(",").map(_.trim).toList
if (disableList.contains(name)) logger.info("The task '{}' is disabled", name)
else {
settings.intervals.get(name) match {
case None =>
logger.error("Can not find an interval for task '{}', check the settings", name)
throw new IllegalArgumentException(s"Can not find an interval for task '$name', check the settings")
case Some(period) =>
logger.info("register a new task '{}' with a period of {}ms", name, period.toMillis.asInstanceOf[AnyRef])
timer.schedule(new SingletonTask(name, job), 0, period.toMillis)
}
}
jobs.add(name)
}
/**
* Checks unused jobs
*/
def verify(): Unit = {
import scala.collection.JavaConversions.asScalaSet
val unusedJobs = settings.intervals.keySet -- jobs.toSet
unusedJobs.foreach { job =>
logger.warn(s"The job '$job' is listed, but not registered or ignored")
}
}
override def close(): Unit = {
timer.cancel()
}
}
object Cron {
case class Settings(disable: String, intervals: Map[String, FiniteDuration])
private class SingletonTask(taskName: String,
job: () => Future[Unit])
(implicit ec: ExecutionContext)
extends TimerTask with StrictLogging {
private val isWorking = new AtomicBoolean(false)
override def run(): Unit = {
if (isWorking.compareAndSet(false, true)) {
MDC.put("userId", "cron")
MDC.put("requestId", RandomUtils.randomString(15))
logger.info("Start '{}'", taskName)
Try {
job()
.andThen {
case Success(_) => logger.info("'{}' is completed", taskName)
case Failure(e) => logger.error(s"Job '{}' is failed: ${ExceptionFormatter.format(e)}", taskName)
}
.onComplete(_ => isWorking.set(false))
} match {
case Success(_) =>
case Failure(e) =>
logger.error("Can't start '{}'", taskName, e)
}
} else {
logger.debug("The previous job '{}' is in progress", taskName)
}
}
}
}
|