aboutsummaryrefslogtreecommitdiff
path: root/kamon-core/src/main/scala/kamon/util
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2015-02-12 11:30:06 +0100
committerIvan Topolnjak <ivantopo@gmail.com>2015-02-13 05:15:30 +0100
commitc6bb65535bcc3cc1ff3834a91473ee8dfa6145e8 (patch)
treed7dbe6a1007b168998f167ac74a98744542c6fa8 /kamon-core/src/main/scala/kamon/util
parent6729c9632245328a007332cdcce7d362584d735a (diff)
downloadKamon-c6bb65535bcc3cc1ff3834a91473ee8dfa6145e8.tar.gz
Kamon-c6bb65535bcc3cc1ff3834a91473ee8dfa6145e8.tar.bz2
Kamon-c6bb65535bcc3cc1ff3834a91473ee8dfa6145e8.zip
! all: Kamon now works as a single instance in a companion object.
Diffstat (limited to 'kamon-core/src/main/scala/kamon/util')
-rw-r--r--kamon-core/src/main/scala/kamon/util/LazyActorRef.scala53
1 files changed, 53 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/util/LazyActorRef.scala b/kamon-core/src/main/scala/kamon/util/LazyActorRef.scala
new file mode 100644
index 00000000..855bf1fc
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/util/LazyActorRef.scala
@@ -0,0 +1,53 @@
+package kamon.util
+
+import java.util
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import akka.actor.{ Actor, ActorRef }
+import org.HdrHistogram.WriterReaderPhaser
+
+import scala.annotation.tailrec
+
+/**
+ * A LazyActorRef accumulates messages sent to an actor that doesn't exist yet. Once the actor is created and
+ * the LazyActorRef is pointed to it, all the accumulated messages are flushed and any new message sent to the
+ * LazyActorRef will immediately be sent to the pointed ActorRef.
+ *
+ * This is intended to be used during Kamon's initialization where some components need to use ActorRefs to work
+ * (like subscriptions and the trace incubator) but our internal ActorSystem is not yet ready to create the
+ * required actors.
+ */
+class LazyActorRef {
+ private val _refPhaser = new WriterReaderPhaser
+ private val _backlog = new ConcurrentLinkedQueue[(Any, ActorRef)]()
+ @volatile private var _target: Option[ActorRef] = None
+
+ def tell(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = {
+ val criticalEnter = _refPhaser.writerCriticalSectionEnter()
+ try {
+ _target.map(_.tell(message, sender)) getOrElse {
+ _backlog.add((message, sender))
+ }
+
+ } finally { _refPhaser.writerCriticalSectionExit(criticalEnter) }
+ }
+
+ def point(target: ActorRef): Unit = {
+ @tailrec def drain(q: util.Queue[(Any, ActorRef)]): Unit = if (!q.isEmpty) {
+ val (msg, sender) = q.poll()
+ target.tell(msg, sender)
+ drain(q)
+ }
+
+ try {
+ _refPhaser.readerLock()
+
+ if (_target.isEmpty) {
+ _target = Some(target)
+ _refPhaser.flipPhase(1000L)
+ drain(_backlog)
+
+ } else sys.error("A LazyActorRef cannot be pointed more than once.")
+ } finally { _refPhaser.readerUnlock() }
+ }
+}