path: root/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala
blob: adb08713c6896f609237f5e002cfe213c1ff0eea (plain) (tree)

package kamon.metric.instrument

import akka.actor.{ Scheduler, Cancellable }
import org.HdrHistogram.WriterReaderPhaser

import scala.collection.concurrent.TrieMap
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

trait RefreshScheduler {
  def schedule(interval: FiniteDuration, refresh: ()  Unit): Cancellable

 *  Default implementation of RefreshScheduler that simply uses an [[akka.actor.Scheduler]] to schedule tasks to be run
 *  in the provided ExecutionContext.
class DefaultRefreshScheduler(scheduler: Scheduler, dispatcher: ExecutionContext) extends RefreshScheduler {
  def schedule(interval: FiniteDuration, refresh: ()  Unit): Cancellable =
    scheduler.schedule(interval, interval)(refresh.apply())(dispatcher)

object DefaultRefreshScheduler {
  def apply(scheduler: Scheduler, dispatcher: ExecutionContext): RefreshScheduler =
    new DefaultRefreshScheduler(scheduler, dispatcher)

  def create(scheduler: Scheduler, dispatcher: ExecutionContext): RefreshScheduler =
    apply(scheduler, dispatcher)

 *  RefreshScheduler implementation that accumulates all the scheduled actions until it is pointed to another refresh
 *  scheduler. Once it is pointed, all subsequent calls to `schedule` will immediately be scheduled in the pointed
 *  scheduler.
class LazyRefreshScheduler extends RefreshScheduler {
  private val _schedulerPhaser = new WriterReaderPhaser
  private val _backlog = new TrieMap[(FiniteDuration, ()  Unit), RepointableCancellable]()
  @volatile private var _target: Option[RefreshScheduler] = None

  def schedule(interval: FiniteDuration, refresh: ()  Unit): Cancellable = {
    val criticalEnter = _schedulerPhaser.writerCriticalSectionEnter()
    try {
      _target.map { scheduler 
        scheduler.schedule(interval, refresh)

      } getOrElse {
        val entry = (interval, refresh)
        val cancellable = new RepointableCancellable(entry)

        _backlog.put(entry, cancellable)

    } finally {

  def point(target: RefreshScheduler): Unit = try {

    if (_target.isEmpty) {
      _target = Some(target)
      _backlog.dropWhile {
        case ((interval, refresh), repointableCancellable) 
          repointableCancellable.point(target.schedule(interval, refresh))
    } else sys.error("A LazyRefreshScheduler cannot be pointed more than once.")
  } finally { _schedulerPhaser.readerUnlock() }

  class RepointableCancellable(entry: (FiniteDuration, ()  Unit)) extends Cancellable {
    private var _isCancelled = false
    private var _cancellable: Option[Cancellable] = None

    def isCancelled: Boolean = synchronized {

    def cancel(): Boolean = synchronized {
      _isCancelled = true

    def point(cancellable: Cancellable): Unit = synchronized {
      if (_cancellable.isEmpty) {
        _cancellable = Some(cancellable)

        if (_isCancelled)

      } else sys.error("A RepointableCancellable cannot be pointed more than once.")
