diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2015-02-12 11:30:06 +0100 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2015-02-13 05:15:30 +0100 |
commit | 07fc83bb01c5873b47aff50d6d3abbb9f11842bd (patch) | |
tree | 814c6067a25066978b5cb8bac6541f39d0d4454d /kamon-core/src/main/scala/kamon/util | |
parent | 82a110b23ca57286e4e3dd0315c20ed99b5e8f88 (diff) | |
download | Kamon-07fc83bb01c5873b47aff50d6d3abbb9f11842bd.tar.gz Kamon-07fc83bb01c5873b47aff50d6d3abbb9f11842bd.tar.bz2 Kamon-07fc83bb01c5873b47aff50d6d3abbb9f11842bd.zip |
! all: Kamon now works as a single instance in a companion object.
Diffstat (limited to 'kamon-core/src/main/scala/kamon/util')
-rw-r--r-- | kamon-core/src/main/scala/kamon/util/LazyActorRef.scala | 53 |
1 files changed, 53 insertions, 0 deletions
diff --git a/kamon-core/src/main/scala/kamon/util/LazyActorRef.scala b/kamon-core/src/main/scala/kamon/util/LazyActorRef.scala new file mode 100644 index 00000000..855bf1fc --- /dev/null +++ b/kamon-core/src/main/scala/kamon/util/LazyActorRef.scala @@ -0,0 +1,53 @@ +package kamon.util + +import java.util +import java.util.concurrent.ConcurrentLinkedQueue + +import akka.actor.{ Actor, ActorRef } +import org.HdrHistogram.WriterReaderPhaser + +import scala.annotation.tailrec + +/** + * A LazyActorRef accumulates messages sent to an actor that doesn't exist yet. Once the actor is created and + * the LazyActorRef is pointed to it, all the accumulated messages are flushed and any new message sent to the + * LazyActorRef will immediately be sent to the pointed ActorRef. + * + * This is intended to be used during Kamon's initialization where some components need to use ActorRefs to work + * (like subscriptions and the trace incubator) but our internal ActorSystem is not yet ready to create the + * required actors. + */ +class LazyActorRef { + private val _refPhaser = new WriterReaderPhaser + private val _backlog = new ConcurrentLinkedQueue[(Any, ActorRef)]() + @volatile private var _target: Option[ActorRef] = None + + def tell(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = { + val criticalEnter = _refPhaser.writerCriticalSectionEnter() + try { + _target.map(_.tell(message, sender)) getOrElse { + _backlog.add((message, sender)) + } + + } finally { _refPhaser.writerCriticalSectionExit(criticalEnter) } + } + + def point(target: ActorRef): Unit = { + @tailrec def drain(q: util.Queue[(Any, ActorRef)]): Unit = if (!q.isEmpty) { + val (msg, sender) = q.poll() + target.tell(msg, sender) + drain(q) + } + + try { + _refPhaser.readerLock() + + if (_target.isEmpty) { + _target = Some(target) + _refPhaser.flipPhase(1000L) + drain(_backlog) + + } else sys.error("A LazyActorRef cannot be pointed more than once.") + } finally { _refPhaser.readerUnlock() } + } +} |