aboutsummaryrefslogblamecommitdiff
path: root/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala
blob: 25fbc9dbca9a993b340ac8143677cc8fdc6b87f9 (plain) (tree)
1
2
3
4
5
6
7
8
9


                                                                                            
  

                                                                                            
  
                                               
  






                                                                                             

                      

                                                                      
                                         

                                
                   
                                                    
                                                                                


                                     

                                             
                                      

                                                       
                                  
 
                                                                                                                    
 

                           


                   







                                                                           

                                                                                                  
 
                                                                                              

   




                                          
















                                                                                                                      

   
                                                            
                                                                                

   



                                                     

                            
 
                                                                                               
                                                                                              
 
                                                                                  

                                         


                                                                                                                   
                              

                                         
                    




                                                                          




                                                                                                    
                              

                               
                    



                                                       


                                                                                                       
 




                                                                          

     


              

                                                        

                                                     
                                                                                                                                  
                                                                        




























                                                                                                                                                                             
 
/*
 * =========================================================================================
 * Copyright © 2013-2014 the kamon project <http://kamon.io/>
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the
 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
 * either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 * =========================================================================================
 */

package kamon.newrelic

import java.util.concurrent.TimeUnit.{ MILLISECONDS  milliseconds }

import akka.actor.{ ActorLogging, Actor }
import akka.event.LoggingAdapter
import org.slf4j.LoggerFactory
import spray.json._
import scala.concurrent.{ ExecutionContext, Future }
import spray.httpx.{ SprayJsonSupport, RequestBuilding, ResponseTransformation }
import spray.httpx.encoding.Deflate
import spray.http._
import spray.json.lenses.JsonLenses._
import java.lang.management.ManagementFactory
import spray.client.pipelining._
import scala.util.{ Failure, Success }
import spray.http.Uri.Query
import kamon.newrelic.MetricTranslator.TimeSliceMetrics
import scala.concurrent.duration._

class Agent extends Actor with RequestBuilding with ResponseTransformation with SprayJsonSupport with ActorLogging {

  import context.dispatcher
  import Agent._
  import Retry._

  self ! Initialize

  val agentInfo = {
    val config = context.system.settings.config.getConfig("kamon.newrelic")
    val appName = config.getString("app-name")
    val licenseKey = config.getString("license-key")

    // Name has the format of pid@host
    val runtimeName = ManagementFactory.getRuntimeMXBean.getName.split('@')
    val retryDelay = FiniteDuration(config.getDuration("retry-delay", milliseconds), milliseconds)
    val maxRetry = config.getInt("max-retry")

    AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt, maxRetry, retryDelay)
  }

  val baseQuery = Query(
    "license_key" -> agentInfo.licenseKey,
    "marshal_format" -> "json",
    "protocol_version" -> "12")

  def receive: Receive = uninitialized

  def uninitialized: Receive = {
    case Initialize  {
      connectToCollector onComplete {
        case Success(agent)  {
          log.info("Agent initialized with runID: [{}] and collector: [{}]", agent.runId, agent.collector)
          context become reporting(agent.runId, agent.collector)
        }
        case Failure(reason)  self ! InitializationFailed(reason)
      }
    }
    case InitializationFailed(reason)  {
      log.info("Initialization failed: {}, retrying in {} seconds", reason.getMessage, agentInfo.retryDelay.toSeconds)
      context.system.scheduler.scheduleOnce(agentInfo.retryDelay, self, Initialize)
    }
    case everythingElse  //ignore
  }

  def reporting(runId: Long, collector: String): Receive = {
    case metrics: TimeSliceMetrics  sendMetricData(runId, collector, metrics)
  }

  def connectToCollector: Future[Initialized] = for {
    collector  selectCollector
    runId  connect(collector, agentInfo)
  } yield Initialized(runId, collector)

  import AgentJsonProtocol._

  val compressedPipeline: HttpRequest  Future[HttpResponse] = encode(Deflate) ~> sendReceive
  val compressedToJsonPipeline: HttpRequest  Future[JsValue] = compressedPipeline ~> toJson

  def toJson(response: HttpResponse): JsValue = response.entity.asString.parseJson

  def selectCollector: Future[String] = {
    val query = ("method" -> "get_redirect_host") +: baseQuery
    val getRedirectHostUri = Uri("http://collector.newrelic.com/agent_listener/invoke_raw_method").withQuery(query)

    compressedToJsonPipeline {
      Post(getRedirectHostUri, JsArray())

    } map { json 
      json.extract[String]('return_value)
    }
  }

  def connect(collectorHost: String, connect: AgentInfo): Future[Long] = {
    log.debug("Connecting to NewRelic Collector [{}]", collectorHost)

    val query = ("method" -> "connect") +: baseQuery
    val connectUri = Uri(s"http://$collectorHost/agent_listener/invoke_raw_method").withQuery(query)

    compressedToJsonPipeline {
      Post(connectUri, connect)

    } map { json 
      json.extract[Long]('return_value / 'agent_run_id)
    }
  }

  def sendMetricData(runId: Long, collector: String, metrics: TimeSliceMetrics) = {
    val query = ("method" -> "metric_data") +: ("run_id" -> runId.toString) +: baseQuery
    val sendMetricDataUri = Uri(s"http://$collector/agent_listener/invoke_raw_method").withQuery(query)

    withMaxAttempts(agentInfo.maxRetry, metrics, log) { currentMetrics 
      compressedPipeline {
        log.info("Sending metrics to NewRelic collector")
        Post(sendMetricDataUri, MetricData(runId, currentMetrics))
      }
    }
  }
}

object Agent {
  case class Initialize()
  case class Initialized(runId: Long, collector: String)
  case class InitializationFailed(reason: Throwable)
  case class CollectorSelection(return_value: String)
  case class AgentInfo(licenseKey: String, appName: String, host: String, pid: Int, maxRetry: Int = 0, retryDelay: FiniteDuration)
  case class MetricData(runId: Long, timeSliceMetrics: TimeSliceMetrics)
}

object Retry {

  @volatile private var attempts: Int = 0
  @volatile private var pendingMetrics: Option[TimeSliceMetrics] = None

  def withMaxAttempts[T](maxRetry: Int, metrics: TimeSliceMetrics, log: LoggingAdapter)(block: TimeSliceMetrics  Future[T])(implicit executor: ExecutionContext): Unit = {

    val currentMetrics = metrics.merge(pendingMetrics)

    if (currentMetrics.metrics.nonEmpty) {
      block(currentMetrics) onComplete {
        case Success(_) 
          pendingMetrics = None
          attempts = 0
        case Failure(_) 
          attempts += 1
          if (maxRetry > attempts) {
            log.info("Trying to send metrics to NewRelic collector, attempt [{}] of [{}]", attempts, maxRetry)
            pendingMetrics = Some(currentMetrics)
          } else {
            log.info("Max attempts achieved, proceeding to remove all pending metrics")
            pendingMetrics = None
            attempts = 0
          }
      }
    }
  }
}