diff options
Diffstat (limited to 'kamon-akka/src/main/scala/kamon/akka/instrumentation/RouterMonitor.scala')
-rw-r--r-- | kamon-akka/src/main/scala/kamon/akka/instrumentation/RouterMonitor.scala | 61 |
1 files changed, 61 insertions, 0 deletions
diff --git a/kamon-akka/src/main/scala/kamon/akka/instrumentation/RouterMonitor.scala b/kamon-akka/src/main/scala/kamon/akka/instrumentation/RouterMonitor.scala new file mode 100644 index 00000000..5c4c7aa3 --- /dev/null +++ b/kamon-akka/src/main/scala/kamon/akka/instrumentation/RouterMonitor.scala @@ -0,0 +1,61 @@ +package akka.kamon.instrumentation + +import akka.actor.{ Cell, Props, ActorRef, ActorSystem } +import akka.dispatch.{ Envelope, MessageDispatcher } +import akka.routing.RoutedActorCell +import kamon.Kamon +import kamon.akka.RouterMetrics +import kamon.metric.Entity +import kamon.util.RelativeNanoTimestamp +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation._ + +trait RouterMonitor { + def processMessage(pjp: ProceedingJoinPoint): AnyRef + def processFailure(failure: Throwable): Unit + def cleanup(): Unit + + def routeeAdded(): Unit + def routeeRemoved(): Unit +} + +object RouterMonitor { + + def createRouterInstrumentation(cell: Cell): RouterMonitor = { + val cellInfo = CellInfo.cellInfoFor(cell, cell.system, cell.self, cell.parent) + def routerMetrics = Kamon.metrics.entity(RouterMetrics, cellInfo.entity) + + if (cellInfo.isTracked) + new MetricsOnlyRouterMonitor(cellInfo.entity, routerMetrics) + else NoOpRouterMonitor + } +} + +object NoOpRouterMonitor extends RouterMonitor { + def processMessage(pjp: ProceedingJoinPoint): AnyRef = pjp.proceed() + def processFailure(failure: Throwable): Unit = {} + def routeeAdded(): Unit = {} + def routeeRemoved(): Unit = {} + def cleanup(): Unit = {} +} + +class MetricsOnlyRouterMonitor(entity: Entity, routerMetrics: RouterMetrics) extends RouterMonitor { + + def processMessage(pjp: ProceedingJoinPoint): AnyRef = { + val timestampBeforeProcessing = RelativeNanoTimestamp.now + + try { + pjp.proceed() + } finally { + val timestampAfterProcessing = RelativeNanoTimestamp.now + val routingTime = timestampAfterProcessing - timestampBeforeProcessing + + routerMetrics.routingTime.record(routingTime.nanos) + } + } + + def processFailure(failure: Throwable): Unit = {} + def routeeAdded(): Unit = {} + def routeeRemoved(): Unit = {} + def cleanup(): Unit = Kamon.metrics.removeEntity(entity) +}
\ No newline at end of file |