aboutsummaryrefslogblamecommitdiff
path: root/kamon-core/src/main/scala/kamon/instrumentation/ActorRefTellInstrumentation.scala
blob: c543123c804ad327ce344f118e8d540be8373292 (plain) (tree)
1
2
3
4
5
6
7
8
9
                             
 
                                    
                                           
                                                
                                   
                                                  
                                                                              
                                              
                           
                 
 
                                                                                                   

 

                                   
                                  
 
                                                                                                                                                    
                                                                                    
 




                                                                                                  
                                                                                 








                                                                                               
   

 
 
                                         

                                      
                                    
                         
 
                                                                                          
 




                                                                                                                                            


                                                                              
                                                            


                                                                                     
                        
       


   




                                                                            
                                                                    
                                    
                                                       
                    

                                                                   
 
                                                           


                                                                

                           
                         
                                                                                 
                                             
                        


                                                        
                 
       
                           

     
 
package kamon.instrumentation

import org.aspectj.lang.annotation._
import org.aspectj.lang.ProceedingJoinPoint
import akka.actor.{Props, ActorSystem, ActorRef}
import kamon.{Tracer, TraceContext}
import akka.dispatch.{MessageDispatcher, Envelope}
import com.codahale.metrics.{Timer, ExponentiallyDecayingReservoir, Histogram}
import kamon.metric.{MetricDirectory, Metrics}
import com.codahale.metrics
import scala.Some

case class TraceableMessage(traceContext: Option[TraceContext], message: Any, timer: Timer.Context)


@Aspect
class ActorRefTellInstrumentation {
  import ProceedingJoinPointPimp._

  @Pointcut("execution(* (akka.actor.ScalaActorRef+ && !akka.event.Logging$StandardOutLogger).$bang(..)) && target(actor) && args(message, sender)")
  def sendingMessageToActorRef(actor: ActorRef, message: Any, sender: ActorRef) = {}

  @Around("sendingMessageToActorRef(actor, message, sender)")
  def around(pjp: ProceedingJoinPoint, actor: ActorRef, message: Any, sender: ActorRef): Unit  = {

    val actorName = MetricDirectory.nameForActor(actor)
    val t = Metrics.registry.timer(actorName + "LATENCY")
    //println(s"About to proceed with: $actor $message $sender ${Kamon.context}")
    if(!actor.toString().contains("StandardOutLogger")) {
      println("Skipped the actor")
      pjp.proceedWithTarget(actor, TraceableMessage(Tracer.context, message, t.time()), sender)

    }
    else {
      println("Got the standardLogger!!")
      pjp.proceed()
    }
  }
}


@Aspect("perthis(actorCellCreation(..))")
class ActorCellInvokeInstrumentation {

  var processingTimeTimer: Timer = _
  var shouldTrack = false

  // AKKA 2.2 introduces the dispatcher parameter. Maybe we could provide a dual pointcut.

  @Pointcut("execution(akka.actor.ActorCell.new(..)) && args(system, ref, props, dispatcher, parent)")
  def actorCellCreation(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {}

  @After("actorCellCreation(system, ref, props, dispatcher, parent)")
  def registerMetricsInRegistry(system: ActorSystem, ref: ActorRef, props: Props, dispatcher: MessageDispatcher, parent: ActorRef): Unit = {
    val actorName = MetricDirectory.nameForActor(ref)
    val histogramName = MetricDirectory.nameForMailbox(system.name, actorName)

    println("=====> Created ActorCell for: "+ref.toString())
    /** TODO: Find a better way to filter the things we don't want to measure. */
    //if(system.name != "kamon" && actorName.startsWith("/user")) {
      processingTimeTimer = Metrics.registry.timer(histogramName + "/PROCESSINGTIME")
      shouldTrack = true
    //}
  }


  @Pointcut("execution(* akka.actor.ActorCell.invoke(*)) && args(envelope)")
  def invokingActorBehaviourAtActorCell(envelope: Envelope) = {}


  @Around("invokingActorBehaviourAtActorCell(envelope)")
  def around(pjp: ProceedingJoinPoint, envelope: Envelope): Unit = {
    import ProceedingJoinPointPimp._
    println("ENVELOPE --------------------->"+envelope)
    envelope match {
      case Envelope(TraceableMessage(ctx, msg, timer), sender) => {
        timer.stop()

        val originalEnvelope = envelope.copy(message = msg)

        //println("PROCESSING TIME TIMER: "+processingTimeTimer)
        val pt = processingTimeTimer.time()
        ctx match {
          case Some(c) => {
            Tracer.set(c)
            println(s"ENVELOPE ORIGINAL: [$c]---------------->"+originalEnvelope)
            pjp.proceedWith(originalEnvelope)
            Tracer.clear
          }
          case None => pjp.proceedWith(originalEnvelope)
        }
        pt.stop()
      }
      case _ => pjp.proceed
    }
  }
}