aboutsummaryrefslogblamecommitdiff
path: root/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
blob: 49a4993c92395c32541b9d46f29e92f77a1287de (plain) (tree)
1
2
3
4
5
6
7
8
9


                                                                                            
  

                                                                                            
  
                                               
  






                                                                                             

                      

                                                   

                        
                                 

                                                                                             
                     
                   

                                   
                                     
                                             
                                    
              
                     
                        
                                  

                                               
                                                                                            
                           
 


                                                      

                        










                                                                                                
 
                                                     





                                                                                         
                                                                                                    


                                                                                                   
 



                                  
 





                                                                                                                                           
 




                                                                                                           

                                                                                                                           


                                                             
 




                                                                                                                           
 


                                                                                                                                          

   
                                                   

                                   

                                                                                                
   

                                         

                                                                                                                



                                         
                                                                                        

                                                                                                     
                                                                           

     


              






                                                      
                                                                                            



                                                                                                                    
                                                                                      
 


                                    

                                                                           
                                                           

                                                                                                                                  
                                              

                  
                 


                                           
                                                                     
                                                   
                                                              

                                                                 
   









































                                                                                                                        
 
/*
 * =========================================================================================
 * Copyright © 2013-2014 the kamon project <http://kamon.io/>
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the
 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
 * either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 * =========================================================================================
 */

package kamon.newrelic

import akka.actor.{ ActorRef, ActorLogging, Actor }
import akka.event.LoggingAdapter
import akka.io.IO
import akka.util.Timeout
import com.typesafe.config.Config
import kamon.Kamon
import kamon.metric.{ SegmentMetrics, TraceMetrics, MetricsModule, TickMetricSnapshotBuffer }
import spray.can.Http
import spray.json._
import scala.concurrent.Future
import spray.httpx.SprayJsonSupport
import spray.json.lenses.JsonLenses._
import java.lang.management.ManagementFactory
import kamon.util.ConfigTools.Syntax
import Agent._
import JsonProtocol._
import akka.pattern.pipe
import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration

class Agent extends Actor with SprayJsonSupport with ActorLogging with MetricsSubscription {
  import context.dispatcher

  private val config = context.system.settings.config

  val agentSettings = AgentSettings.fromConfig(config)

  // Start the reporters
  private val reporter = context.actorOf(MetricReporter.props(agentSettings), "metric-reporter")

  val metricsSubscriber = {
    val tickInterval = Kamon.metrics.settings.tickInterval

    // Metrics are always sent to New Relic in 60 seconds intervals.
    if (tickInterval == 60.seconds) reporter
    else context.actorOf(TickMetricSnapshotBuffer.props(1 minute, reporter), "metric-buffer")
  }

  subscribeToMetrics(config, metricsSubscriber, Kamon.metrics)

  // Start the connection to the New Relic collector.
  self ! Connect

  def receive: Receive = disconnected(agentSettings.maxConnectionRetries)

  def disconnected(attemptsLeft: Int): Receive = {
    case Connect                                      pipe(connectToCollector) to self
    case Connected(collector, runID, scheme)          configureChildren(collector, runID, scheme)
    case ConnectFailed(reason) if (attemptsLeft > 0)  scheduleReconnection(reason, attemptsLeft)
    case ConnectFailed(reason)                        giveUpConnection()
  }

  def connected: Receive = {
    case Reconnect  reconnect()
    case Shutdown   shutdown()
  }

  def reconnect(): Unit = {
    log.warning("New Relic request the agent to restart the connection, all reporters will be paused until a new connection is available.")
    self ! Connect
    context.children.foreach(_ ! ResetConfiguration)
    context become disconnected(agentSettings.maxConnectionRetries)
  }

  def shutdown(): Unit = {
    log.error("New Relic requested the agent to be stopped, no metrics will be reported after this point.")
    context stop self
  }

  def configureChildren(collector: String, runID: Long, scheme: String): Unit = {
    log.info("Configuring New Relic reporters to use runID: [{}] and collector: [{}] over: [{}]", runID, collector, scheme)
    context.children.foreach(_ ! Configure(collector, runID))
    context become connected
  }

  def scheduleReconnection(connectionFailureReason: Throwable, attemptsLeft: Int): Unit = {
    log.error(connectionFailureReason, "Initialization failed, retrying in {} seconds", agentSettings.retryDelay.toSeconds)
    context.system.scheduler.scheduleOnce(agentSettings.retryDelay, self, Connect)
    context become (disconnected(attemptsLeft - 1))
  }

  def giveUpConnection(): Unit = {
    log.error("Giving up while trying to set up a connection with the New Relic collector. The New Relic module is shutting down itself.")
    context.stop(self)
  }

  def connectToCollector: Future[ConnectResult] = {
    (for {
      collector  selectCollector
      (runID, scheme)  connect(collector, agentSettings)
    } yield Connected(collector, runID, scheme)) recover { case error  ConnectFailed(error) }
  }

  def selectCollector: Future[String] = {
    val apiClient = new ApiMethodClient("collector.newrelic.com", None, agentSettings, IO(Http)(context.system))
    apiClient.invokeMethod(RawMethods.GetRedirectHost, JsArray()) map { json 
      json.extract[String]('return_value)
    }
  }

  def connect(collectorHost: String, connect: AgentSettings): Future[(Long, String)] = {
    val apiClient = new ApiMethodClient(collectorHost, None, agentSettings, IO(Http)(context.system))
    apiClient.invokeMethod(RawMethods.Connect, connect) map { json 
      (json.extract[Long]('return_value / 'agent_run_id), apiClient.scheme)
    }
  }
}

object Agent {
  case object Connect
  case object Reconnect
  case object Shutdown
  case object ResetConfiguration
  case class Configure(collector: String, runID: Long)

  sealed trait ConnectResult
  case class Connected(collector: String, runID: Long, scheme: String) extends ConnectResult
  case class ConnectFailed(reason: Throwable) extends ConnectResult
}

case class AgentSettings(licenseKey: String, appName: String, hostname: String, pid: Int, operationTimeout: Timeout,
  maxConnectionRetries: Int, retryDelay: FiniteDuration, apdexT: Double, ssl: Boolean)

object AgentSettings {

  def fromConfig(config: Config) = {
    // Name has the format of 'pid'@'host'
    val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@')
    val newRelicConfig = config.getConfig("kamon.newrelic")
    val licenseKey = newRelicConfig.getString("license-key")
    assert(licenseKey != "<put-your-key-here>", "You forgot to include your New Relic license key in the configuration settings!")
    val ssl = newRelicConfig.getBoolean("ssl")

    AgentSettings(
      licenseKey,
      newRelicConfig.getString("app-name"),
      runtimeName(1),
      runtimeName(0).toInt,
      Timeout(newRelicConfig.getFiniteDuration("operation-timeout")),
      newRelicConfig.getInt("max-connect-retries"),
      newRelicConfig.getFiniteDuration("connect-retry-delay"),
      newRelicConfig.getFiniteDuration("apdexT").toMillis / 1E3D,
      ssl)
  }
}

trait MetricsSubscription {
  import kamon.util.ConfigTools.Syntax
  import scala.collection.JavaConverters._
  import MetricsSubscription._

  def log: LoggingAdapter

  def subscriptions(config: Config) = config getConfig "kamon.newrelic" getConfig "custom-metric-subscriptions"

  def subscriptionKeys(config: Config) = subscriptions(config).firstLevelKeys filterNot isTraceOrSegmentEntityName

  def subscribeToMetrics(config: Config, metricsSubscriber: ActorRef, extension: MetricsModule): Unit = {
    subscribeToCustomMetrics(config, metricsSubscriber, extension)
    subscribeToTransactionMetrics(metricsSubscriber, extension)
  }

  def subscribeToCustomMetrics(config: Config, metricsSubscriber: ActorRef, extension: MetricsModule): Unit =
    subscriptionKeys(config) foreach { subscriptionCategory 
      subscriptions(config).getStringList(subscriptionCategory).asScala foreach { pattern 
        log.debug("Subscribing NewRelic reporting for custom metric '{}' : {}", subscriptionCategory, pattern)
        extension.subscribe(subscriptionCategory, pattern, metricsSubscriber)
      }
    }

  def subscribeToTransactionMetrics(metricsSubscriber: ActorRef, extension: MetricsModule): Unit =
    traceAndSegmentMetrics foreach { subscriptionCategory 
      log.debug("Subscribing NewRelic reporting for transaction metric '{}' : {}", subscriptionCategory, defaultPattern)
      extension.subscribe(subscriptionCategory, defaultPattern, metricsSubscriber)
    }

}

object MetricsSubscription {

  private val defaultPattern = "**"

  private val traceAndSegmentMetrics = Seq(TraceMetrics.category, SegmentMetrics.category)

  def isTraceOrSegmentEntityName(name: String): Boolean = traceAndSegmentMetrics contains name

}