aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/metric/instrument
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2015-02-12 11:30:06 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2015-02-13 05:15:30 +0100
commit07fc83bb01c5873b47aff50d6d3abbb9f11842bd (patch)
tree814c6067a25066978b5cb8bac6541f39d0d4454d /kamon-core/src/main/scala/kamon/metric/instrument
parent82a110b23ca57286e4e3dd0315c20ed99b5e8f88 (diff)
downloadKamon-07fc83bb01c5873b47aff50d6d3abbb9f11842bd.tar.gz
Kamon-07fc83bb01c5873b47aff50d6d3abbb9f11842bd.tar.bz2
Kamon-07fc83bb01c5873b47aff50d6d3abbb9f11842bd.zip
! all: Kamon now works as a single instance in a companion object.
Diffstat (limited to 'kamon-core/src/main/scala/kamon/metric/instrument')
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala19
-rw-r--r--kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala99
2 files changed, 99 insertions, 19 deletions
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala
index 68d5c876..59b4b443 100644
--- a/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/Instrument.scala
@@ -51,22 +51,3 @@ object CollectionContext {
}
}
-trait RefreshScheduler {
- def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable
-}
-
-object RefreshScheduler {
- val NoopScheduler = new RefreshScheduler {
- def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable = new Cancellable {
- override def isCancelled: Boolean = true
- override def cancel(): Boolean = true
- }
- }
-
- def apply(scheduler: Scheduler, dispatcher: MessageDispatcher): RefreshScheduler = new RefreshScheduler {
- def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable =
- scheduler.schedule(interval, interval)(refresh.apply())(dispatcher)
- }
-
- def create(scheduler: Scheduler, dispatcher: MessageDispatcher): RefreshScheduler = apply(scheduler, dispatcher)
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala b/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala
new file mode 100644
index 00000000..adb08713
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/metric/instrument/RefreshScheduler.scala
@@ -0,0 +1,99 @@
+package kamon.metric.instrument
+
+import akka.actor.{ Scheduler, Cancellable }
+import org.HdrHistogram.WriterReaderPhaser
+
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.FiniteDuration
+
+trait RefreshScheduler {
+ def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable
+}
+
+/**
+ * Default implementation of RefreshScheduler that simply uses an [[akka.actor.Scheduler]] to schedule tasks to be run
+ * in the provided ExecutionContext.
+ */
+class DefaultRefreshScheduler(scheduler: Scheduler, dispatcher: ExecutionContext) extends RefreshScheduler {
+ def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable =
+ scheduler.schedule(interval, interval)(refresh.apply())(dispatcher)
+}
+
+object DefaultRefreshScheduler {
+ def apply(scheduler: Scheduler, dispatcher: ExecutionContext): RefreshScheduler =
+ new DefaultRefreshScheduler(scheduler, dispatcher)
+
+ def create(scheduler: Scheduler, dispatcher: ExecutionContext): RefreshScheduler =
+ apply(scheduler, dispatcher)
+}
+
+/**
+ * RefreshScheduler implementation that accumulates all the scheduled actions until it is pointed to another refresh
+ * scheduler. Once it is pointed, all subsequent calls to `schedule` will immediately be scheduled in the pointed
+ * scheduler.
+ */
+class LazyRefreshScheduler extends RefreshScheduler {
+ private val _schedulerPhaser = new WriterReaderPhaser
+ private val _backlog = new TrieMap[(FiniteDuration, () ⇒ Unit), RepointableCancellable]()
+ @volatile private var _target: Option[RefreshScheduler] = None
+
+ def schedule(interval: FiniteDuration, refresh: () ⇒ Unit): Cancellable = {
+ val criticalEnter = _schedulerPhaser.writerCriticalSectionEnter()
+ try {
+ _target.map { scheduler ⇒
+ scheduler.schedule(interval, refresh)
+
+ } getOrElse {
+ val entry = (interval, refresh)
+ val cancellable = new RepointableCancellable(entry)
+
+ _backlog.put(entry, cancellable)
+ cancellable
+ }
+
+ } finally {
+ _schedulerPhaser.writerCriticalSectionExit(criticalEnter)
+ }
+ }
+
+ def point(target: RefreshScheduler): Unit = try {
+ _schedulerPhaser.readerLock()
+
+ if (_target.isEmpty) {
+ _target = Some(target)
+ _schedulerPhaser.flipPhase(10000L)
+ _backlog.dropWhile {
+ case ((interval, refresh), repointableCancellable) ⇒
+ repointableCancellable.point(target.schedule(interval, refresh))
+ true
+ }
+ } else sys.error("A LazyRefreshScheduler cannot be pointed more than once.")
+ } finally { _schedulerPhaser.readerUnlock() }
+
+ class RepointableCancellable(entry: (FiniteDuration, () ⇒ Unit)) extends Cancellable {
+ private var _isCancelled = false
+ private var _cancellable: Option[Cancellable] = None
+
+ def isCancelled: Boolean = synchronized {
+ _cancellable.map(_.isCancelled).getOrElse(_isCancelled)
+ }
+
+ def cancel(): Boolean = synchronized {
+ _isCancelled = true
+ _cancellable.map(_.cancel()).getOrElse(_backlog.remove(entry).nonEmpty)
+ }
+
+ def point(cancellable: Cancellable): Unit = synchronized {
+ if (_cancellable.isEmpty) {
+ _cancellable = Some(cancellable)
+
+ if (_isCancelled)
+ cancellable.cancel()
+
+ } else sys.error("A RepointableCancellable cannot be pointed more than once.")
+
+ }
+ }
+}
+