aboutsummaryrefslogtreecommitdiff
path: root/src/main/scala/akka/Tracer.scala
blob: 60906dae2f1518429c65fbb2f979510abadba5a4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package akka

import actor.{Props, Actor, ActorSystemImpl}
import scala.concurrent.forkjoin.ForkJoinPool
import scala.concurrent.duration._
import com.newrelic.api.agent.NewRelic
import akka.dispatch.Mailbox
import scala._

object Tracer {
  protected[this] var mailboxes:List[Mailbox] = List.empty
  protected[this] var tracerActorSystem: ActorSystemImpl = _
  protected[this] var forkJoinPool:ForkJoinPool = _

  def collectPool(pool: ForkJoinPool) = forkJoinPool = pool
  def collectActorSystem(actorSystem: ActorSystemImpl)  = tracerActorSystem = actorSystem
  def collectMailbox(mb: akka.dispatch.Mailbox)  =  mailboxes ::= mb

  def start():Unit ={
    implicit val dispatcher = tracerActorSystem.dispatcher
    val metricsActor = tracerActorSystem.actorOf(Props[MetricsActor], "MetricsActor")

    tracerActorSystem.scheduler.schedule(10 seconds, 6 second, metricsActor, PoolMetrics(forkJoinPool))
    tracerActorSystem.scheduler.schedule(10 seconds, 6 second, metricsActor, MailboxMetrics(mailboxes))
  }
}

case class PoolMetrics(poolName:String, data:Map[String,Int])
case class MailboxMetrics(mailboxes:Map[String,Mailbox])


object PoolMetrics {
  def apply(pool: ForkJoinPool) = new PoolMetrics(pool.getClass.getSimpleName, toMap(pool))

  def toMap(pool: scala.concurrent.forkjoin.ForkJoinPool):Map[String,Int] = Map[String,Int](
    "ActiveThreadCount" -> pool.getActiveThreadCount,
    "Parallelism" -> pool.getParallelism,
    "PoolSize" -> pool.getPoolSize,
    "QueuedSubmissionCount" -> pool.getQueuedSubmissionCount,
    "StealCount" -> pool.getStealCount.toInt,
    "QueuedTaskCount" -> pool.getQueuedTaskCount.toInt,
    "RunningThreadCount" -> pool.getRunningThreadCount
  )
}

object  MailboxMetrics {
  def apply(mailboxes: List[Mailbox]) = {
    new MailboxMetrics(mailboxes.take(mailboxes.length - 1).map{m => (m.actor.self.path.toString -> m)}.toMap) //TODO:reseach why collect an ActorSystemImpl
  }
}

class MetricsActor extends Actor {
    def receive = {

      case poolMetrics:PoolMetrics => {
        println(poolMetrics)
        poolMetrics.data.map{case(k,v) => NewRelic.recordMetric(s"${poolMetrics.poolName}:${k}",v)}
      }
      case mailboxMetrics:MailboxMetrics => {
        mailboxMetrics.mailboxes.map { case(actorName,mb) =>
          println(s"Sending metrics to Newrelic MailBoxMonitor -> ${actorName}")

          NewRelic.recordMetric(s"${actorName}:Mailbox:NumberOfMessages",mb.numberOfMessages)
          NewRelic.recordMetric(s"${actorName}:Mailbox:MailboxDispatcherThroughput",mb.dispatcher.throughput)

          NewRelic.addCustomParameter(s"${actorName}:Mailbox:Status", mb.hasMessages.toString)
          NewRelic.addCustomParameter(s"${actorName}:Mailbox:HasMessages", mb.hasMessages.toString)
        }
      }
    }
}