diff options
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala')
-rw-r--r-- | kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala | 99 |
1 files changed, 99 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala b/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala new file mode 100644 index 00000000..adb08713 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala @@ -0,0 +1,99 @@ +package kamon.metric.instrument + +import akka.actor.{ Scheduler, Cancellable } +import org.HdrHistogram.WriterReaderPhaser + +import scala.collection.concurrent.TrieMap +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.FiniteDuration + +trait RefreshScheduler { + def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable +} + +/** + * Default implementation of RefreshScheduler that simply uses an [[akka.actor.Scheduler]] to schedule tasks to be run + * in the provided ExecutionContext. + */ +class DefaultRefreshScheduler(scheduler: Scheduler, dispatcher: ExecutionContext) extends RefreshScheduler { + def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable = + scheduler.schedule(interval, interval)(refresh.apply())(dispatcher) +} + +object DefaultRefreshScheduler { + def apply(scheduler: Scheduler, dispatcher: ExecutionContext): RefreshScheduler = + new DefaultRefreshScheduler(scheduler, dispatcher) + + def create(scheduler: Scheduler, dispatcher: ExecutionContext): RefreshScheduler = + apply(scheduler, dispatcher) +} + +/** + * RefreshScheduler implementation that accumulates all the scheduled actions until it is pointed to another refresh + * scheduler. Once it is pointed, all subsequent calls to `schedule` will immediately be scheduled in the pointed + * scheduler. + */ +class LazyRefreshScheduler extends RefreshScheduler { + private val _schedulerPhaser = new WriterReaderPhaser + private val _backlog = new TrieMap[(FiniteDuration, () ⇒ Unit), RepointableCancellable]() + @volatile private var _target: Option[RefreshScheduler] = None + + def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable = { + val criticalEnter = _schedulerPhaser.writerCriticalSectionEnter() + try { + _target.map { scheduler ⇒ + scheduler.schedule(interval, refresh) + + } getOrElse { + val entry = (interval, refresh) + val cancellable = new RepointableCancellable(entry) + + _backlog.put(entry, cancellable) + cancellable + } + + } finally { + _schedulerPhaser.writerCriticalSectionExit(criticalEnter) + } + } + + def point(target: RefreshScheduler): Unit = try { + _schedulerPhaser.readerLock() + + if (_target.isEmpty) { + _target = Some(target) + _schedulerPhaser.flipPhase(10000L) + _backlog.dropWhile { + case ((interval, refresh), repointableCancellable) ⇒ + repointableCancellable.point(target.schedule(interval, refresh)) + true + } + } else sys.error("A LazyRefreshScheduler cannot be pointed more than once.") + } finally { _schedulerPhaser.readerUnlock() } + + class RepointableCancellable(entry: (FiniteDuration, () ⇒ Unit)) extends Cancellable { + private var _isCancelled = false + private var _cancellable: Option[Cancellable] = None + + def isCancelled: Boolean = synchronized { + _cancellable.map(_.isCancelled).getOrElse(_isCancelled) + } + + def cancel(): Boolean = synchronized { + _isCancelled = true + _cancellable.map(_.cancel()).getOrElse(_backlog.remove(entry).nonEmpty) + } + + def point(cancellable: Cancellable): Unit = synchronized { + if (_cancellable.isEmpty) { + _cancellable = Some(cancellable) + + if (_isCancelled) + cancellable.cancel() + + } else sys.error("A RepointableCancellable cannot be pointed more than once.") + + } + } +} + |