From 5e0b032bfef9509e64af2960452aed44b6e6cb22 Mon Sep 17 00:00:00 2001 From: Diego Parra Date: Tue, 26 Nov 2013 15:54:10 -0300 Subject: added scalariform --- .../src/main/scala/kamon/AkkaExtensionSwap.scala | 2 +- .../instrumentation/ExecutorServiceMetrics.scala | 108 +-------------------- .../instrumentation/MessageQueueMetrics.scala | 29 ++---- .../metric/ExecutorServiceMetricCollector.scala | 34 +++---- .../main/scala/kamon/metric/GaugeGenerator.scala | 2 +- .../src/main/scala/kamon/metric/Metrics.scala | 39 +------- .../scala/kamon/dashboard/DashboardExtension.scala | 10 +- .../scala/kamon/dashboard/DashboardService.scala | 9 +- .../dashboard/protocol/DashboardProtocols.scala | 14 +-- kamon-metrics/src/main/scala/kamon/Metrics.scala | 6 +- .../src/main/scala/kamon/newrelic/Agent.scala | 42 ++++---- .../scala/kamon/newrelic/AgentJsonProtocol.scala | 23 ++--- .../src/main/scala/kamon/newrelic/Apdex.scala | 36 +++---- .../src/main/scala/kamon/newrelic/NewRelic.scala | 26 +++-- .../scala/kamon/newrelic/NewRelicErrorLogger.scala | 10 +- .../scala/kamon/newrelic/NewRelicReporting.scala | 26 +++-- .../main/scala/test/SimpleRequestProcessor.scala | 71 +++++++------- .../SprayServerInstrumentation.scala | 8 +- .../src/main/scala/kamon/spray/UowDirectives.scala | 2 +- .../spray/can/server/ServerRequestTracing.scala | 14 ++- .../scala/akka/pattern/AskPatternTracing.scala | 4 +- kamon-trace/src/main/scala/kamon/trace/Trace.scala | 16 ++- .../src/main/scala/kamon/trace/TraceContext.scala | 4 +- .../src/main/scala/kamon/trace/UowTracing.scala | 15 ++- .../instrumentation/ActorLoggingTracing.scala | 12 +-- .../ActorMessagePassingTracing.scala | 7 +- .../trace/instrumentation/FutureTracing.scala | 4 +- project/Build.scala | 8 ++ project/Settings.scala | 17 +++- project/plugins.sbt | 3 + 30 files changed, 213 insertions(+), 388 deletions(-) diff --git a/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala b/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala index 153f9335..c0994f2c 100644 --- a/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala +++ b/kamon-core/src/main/scala/kamon/AkkaExtensionSwap.scala @@ -15,7 +15,7 @@ * ========================================================== */ package kamon -import akka.actor.{Extension, ActorSystem, ExtensionId} +import akka.actor.{ Extension, ActorSystem, ExtensionId } import java.util.concurrent.ConcurrentHashMap object AkkaExtensionSwap { diff --git a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala index 995b8e7f..a3da76f7 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/ExecutorServiceMetrics.scala @@ -19,14 +19,13 @@ import org.aspectj.lang.annotation._ import java.util.concurrent._ import org.aspectj.lang.ProceedingJoinPoint import java.util -import kamon.metric.{DispatcherMetricCollector, Histogram, MetricDirectory, ExecutorServiceMetricCollector} -import akka.dispatch.{MonitorableThreadFactory, ExecutorServiceFactory} +import kamon.metric.{ DispatcherMetricCollector, Histogram, MetricDirectory, ExecutorServiceMetricCollector } +import akka.dispatch.{ MonitorableThreadFactory, ExecutorServiceFactory } import com.typesafe.config.Config import kamon.Kamon import scala.concurrent.forkjoin.ForkJoinPool import akka.dispatch.ForkJoinExecutorConfigurator.AkkaForkJoinPool - @Aspect class ActorSystemInstrumentation { @@ -64,12 +63,9 @@ class ForkJoinPoolInstrumentation { } def splitName(threadFactoryName: String, knownActorSystems: List[String]): (String, String) = { - knownActorSystems.find(threadFactoryName.startsWith(_)).map(asName => (asName, threadFactoryName.substring(asName.length+1))).getOrElse(("Unkown", "Unkown")) + knownActorSystems.find(threadFactoryName.startsWith(_)).map(asName ⇒ (asName, threadFactoryName.substring(asName.length + 1))).getOrElse(("Unkown", "Unkown")) } - - - @Pointcut("execution(* scala.concurrent.forkjoin.ForkJoinPool.scan(..)) && this(fjp)") def forkJoinScan(fjp: AkkaForkJoinPool): Unit = {} @@ -79,39 +75,8 @@ class ForkJoinPoolInstrumentation { poolSizeHistogram.update(fjp.getPoolSize) } - - } - - - - - - - - - - - - - - - - - - - - - - - - - - - - - /** * ExecutorService monitoring base: */ @@ -125,19 +90,6 @@ trait WatchedExecutorService { def collector: ExecutorServiceCollector } - - - - - - - - - - - - - trait ExecutorServiceMonitoring { def dispatcherMetrics: DispatcherMetricCollector } @@ -146,21 +98,6 @@ class ExecutorServiceMonitoringImpl extends ExecutorServiceMonitoring { @volatile var dispatcherMetrics: DispatcherMetricCollector = _ } - - - - - - - - - - - - - - - case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory { def createExecutorService: ExecutorService = delegate.createExecutorService } @@ -178,8 +115,8 @@ class ExecutorServiceFactoryProviderInstrumentation { val delegate = pjp.proceed().asInstanceOf[ExecutorServiceFactory] // Safe Cast val actorSystemName = threadFactory match { - case m: MonitorableThreadFactory => m.name - case _ => "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name. + case m: MonitorableThreadFactory ⇒ m.name + case _ ⇒ "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name. } new NamedExecutorServiceFactoryDelegate(actorSystemName, dispatcherName, delegate) @@ -187,7 +124,6 @@ class ExecutorServiceFactoryProviderInstrumentation { } - @Aspect class NamedExecutorServiceFactoryDelegateInstrumentation { @@ -224,37 +160,3 @@ case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorServ def execute(command: Runnable) = delegate.execute(command) } - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala index 65539beb..da797fa1 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/MessageQueueMetrics.scala @@ -15,20 +15,18 @@ * ========================================================== */ package kamon.instrumentation -import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} -import akka.dispatch.{UnboundedMessageQueueSemantics, Envelope, MessageQueue} -import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect} -import akka.actor.{ActorSystem, ActorRef} -import kamon.metric.{Metrics, MetricDirectory} +import com.codahale.metrics.{ ExponentiallyDecayingReservoir, Histogram } +import akka.dispatch.{ UnboundedMessageQueueSemantics, Envelope, MessageQueue } +import org.aspectj.lang.annotation.{ Around, Pointcut, DeclareMixin, Aspect } +import akka.actor.{ ActorSystem, ActorRef } +import kamon.metric.{ Metrics, MetricDirectory } import org.aspectj.lang.ProceedingJoinPoint - /** * For Mailboxes we would like to track the queue size and message latency. Currently the latency * will be gathered from the ActorCellMetrics. */ - @Aspect class MessageQueueInstrumentation { @@ -40,7 +38,7 @@ class MessageQueueInstrumentation { val delegate = pjp.proceed.asInstanceOf[MessageQueue] // We are not interested in monitoring mailboxes if we don't know where they belong to. - val monitoredMailbox = for(own <- owner; sys <- system) yield { + val monitoredMailbox = for (own ← owner; sys ← system) yield { val systemName = sys.name val ownerName = MetricDirectory.nameForActor(own) val mailBoxName = MetricDirectory.nameForMailbox(systemName, ownerName) @@ -52,14 +50,13 @@ class MessageQueueInstrumentation { } monitoredMailbox match { - case None => delegate - case Some(mmb) => mmb + case None ⇒ delegate + case Some(mmb) ⇒ mmb } } } - -class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue with UnboundedMessageQueueSemantics{ +class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: Histogram) extends MessageQueue with UnboundedMessageQueueSemantics { def enqueue(receiver: ActorRef, handle: Envelope) = { delegate.enqueue(receiver, handle) @@ -78,11 +75,3 @@ class MonitoredMessageQueue(val delegate: MessageQueue, val queueSizeHistogram: def cleanUp(owner: ActorRef, deadLetters: MessageQueue) = delegate.cleanUp(owner, deadLetters) } - - - - - - - - diff --git a/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala index a99deb5b..4c4b93e9 100644 --- a/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala +++ b/kamon-core/src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala @@ -15,16 +15,16 @@ * ========================================================== */ package kamon.metric -import java.util.concurrent.{ThreadPoolExecutor, ExecutorService} +import java.util.concurrent.{ ThreadPoolExecutor, ExecutorService } import scala.concurrent.forkjoin.ForkJoinPool -import com.codahale.metrics.{Metric, MetricFilter} +import com.codahale.metrics.{ Metric, MetricFilter } object ExecutorServiceMetricCollector extends ForkJoinPoolMetricCollector with ThreadPoolExecutorMetricCollector { def register(fullName: String, executorService: ExecutorService) = executorService match { - case fjp: ForkJoinPool => registerForkJoinPool(fullName, fjp) - case tpe: ThreadPoolExecutor => registerThreadPoolExecutor(fullName, tpe) - case _ => // If it is a unknown Executor then just do nothing. + case fjp: ForkJoinPool ⇒ registerForkJoinPool(fullName, fjp) + case tpe: ThreadPoolExecutor ⇒ registerThreadPoolExecutor(fullName, tpe) + case _ ⇒ // If it is a unknown Executor then just do nothing. } def deregister(fullName: String) = { @@ -34,22 +34,19 @@ object ExecutorServiceMetricCollector extends ForkJoinPoolMetricCollector with T } } - trait ForkJoinPoolMetricCollector { import GaugeGenerator._ import BasicExecutorMetricNames._ - def registerForkJoinPool(fullName: String, fjp: ForkJoinPool) = { val forkJoinPoolGauge = newNumericGaugeFor(fjp) _ val allMetrics = Map( - fullName + queueSize -> forkJoinPoolGauge(_.getQueuedTaskCount.toInt), - fullName + poolSize -> forkJoinPoolGauge(_.getPoolSize), - fullName + activeThreads -> forkJoinPoolGauge(_.getActiveThreadCount) - ) + fullName + queueSize -> forkJoinPoolGauge(_.getQueuedTaskCount.toInt), + fullName + poolSize -> forkJoinPoolGauge(_.getPoolSize), + fullName + activeThreads -> forkJoinPoolGauge(_.getActiveThreadCount)) - allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) } + allMetrics.foreach { case (name, metric) ⇒ Metrics.registry.register(name, metric) } } } @@ -61,22 +58,17 @@ trait ThreadPoolExecutorMetricCollector { val tpeGauge = newNumericGaugeFor(tpe) _ val allMetrics = Map( - fullName + queueSize -> tpeGauge(_.getQueue.size()), - fullName + poolSize -> tpeGauge(_.getPoolSize), - fullName + activeThreads -> tpeGauge(_.getActiveCount) - ) + fullName + queueSize -> tpeGauge(_.getQueue.size()), + fullName + poolSize -> tpeGauge(_.getPoolSize), + fullName + activeThreads -> tpeGauge(_.getActiveCount)) - allMetrics.foreach { case (name, metric) => Metrics.registry.register(name, metric) } + allMetrics.foreach { case (name, metric) ⇒ Metrics.registry.register(name, metric) } } } - object BasicExecutorMetricNames { val queueSize = "queueSize" val poolSize = "threads/poolSize" val activeThreads = "threads/activeThreads" } - - - diff --git a/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala index 1d50974d..9eff2739 100644 --- a/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala +++ b/kamon-core/src/main/scala/kamon/metric/GaugeGenerator.scala @@ -19,7 +19,7 @@ import com.codahale.metrics.Gauge trait GaugeGenerator { - def newNumericGaugeFor[T, V >: AnyVal](target: T)(generator: T => V) = new Gauge[V] { + def newNumericGaugeFor[T, V >: AnyVal](target: T)(generator: T ⇒ V) = new Gauge[V] { def getValue: V = generator(target) } } diff --git a/kamon-core/src/main/scala/kamon/metric/Metrics.scala b/kamon-core/src/main/scala/kamon/metric/Metrics.scala index dcd54bc7..b904ec56 100644 --- a/kamon-core/src/main/scala/kamon/metric/Metrics.scala +++ b/kamon-core/src/main/scala/kamon/metric/Metrics.scala @@ -18,10 +18,9 @@ package kamon.metric import java.util.concurrent.TimeUnit import akka.actor.ActorRef import com.codahale.metrics -import com.codahale.metrics.{MetricFilter, Metric, ConsoleReporter, MetricRegistry} +import com.codahale.metrics.{ MetricFilter, Metric, ConsoleReporter, MetricRegistry } import scala.collection.concurrent.TrieMap - object Metrics { val registry: MetricRegistry = new MetricRegistry @@ -41,8 +40,6 @@ object Metrics { }) } - - def deregister(fullName: String) = { registry.removeMatching(new MetricFilter { def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName) @@ -64,30 +61,14 @@ object MetricDirectory { def shouldInstrument(actorSystem: String): Boolean = !actorSystem.startsWith("kamon") - def shouldInstrumentActor(actorPath: String): Boolean = { !(actorPath.isEmpty || actorPath.startsWith("system")) } - } - - - - - - - - - - - case class DispatcherMetricCollector(activeThreadCount: Histogram, poolSize: Histogram, queueSize: Histogram) - - - trait Histogram { def update(value: Long): Unit def snapshot: HistogramSnapshot @@ -99,7 +80,6 @@ trait HistogramSnapshot { def min: Double } - case class ActorSystemMetrics(actorSystemName: String) { val dispatchers = TrieMap.empty[String, DispatcherMetricCollector] @@ -113,7 +93,6 @@ case class ActorSystemMetrics(actorSystemName: String) { } - case class CodahaleHistogram() extends Histogram { private[this] val histogram = new com.codahale.metrics.Histogram(new metrics.ExponentiallyDecayingReservoir()) @@ -127,31 +106,23 @@ case class CodahaleHistogram() extends Histogram { case class CodahaleHistogramSnapshot(median: Double, max: Double, min: Double) extends HistogramSnapshot - - - - - - /** * Dispatcher Metrics that we care about currently with a histogram-like nature: * - Work Queue Size * - Total/Active Thread Count */ - - import annotation.tailrec import java.util.concurrent.atomic.AtomicReference object Atomic { - def apply[T]( obj : T) = new Atomic(new AtomicReference(obj)) - implicit def toAtomic[T]( ref : AtomicReference[T]) : Atomic[T] = new Atomic(ref) + def apply[T](obj: T) = new Atomic(new AtomicReference(obj)) + implicit def toAtomic[T](ref: AtomicReference[T]): Atomic[T] = new Atomic(ref) } -class Atomic[T](val atomic : AtomicReference[T]) { +class Atomic[T](val atomic: AtomicReference[T]) { @tailrec - final def update(f: T => T) : T = { + final def update(f: T ⇒ T): T = { val oldValue = atomic.get() val newValue = f(oldValue) if (atomic.compareAndSet(oldValue, newValue)) newValue else update(f) diff --git a/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardExtension.scala b/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardExtension.scala index a2b9080c..97716a3f 100644 --- a/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardExtension.scala +++ b/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardExtension.scala @@ -25,15 +25,15 @@ object DashboardExtension extends ExtensionId[DashboardExtensionImpl] with Exten } class DashboardExtensionImpl(system: ExtendedActorSystem) extends Extension { - if("kamon".equalsIgnoreCase(system.name)) { + if ("kamon".equalsIgnoreCase(system.name)) { val enabled = system.settings.config getBoolean "dashboard.enabled" val interface = system.settings.config getString "dashboard.interface" - val port = system.settings.config getInt "dashboard.port" + val port = system.settings.config getInt "dashboard.port" - if(enabled){ - val service = system.actorOf(Props[DashboardServiceActor], "kamon-dashboard-service") - IO(Http)(system) ! Http.Bind(service, interface, port) + if (enabled) { + val service = system.actorOf(Props[DashboardServiceActor], "kamon-dashboard-service") + IO(Http)(system) ! Http.Bind(service, interface, port) } } } \ No newline at end of file diff --git a/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala b/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala index 59ca4339..d092a947 100644 --- a/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala +++ b/kamon-dashboard/src/main/scala/kamon/dashboard/DashboardService.scala @@ -24,15 +24,14 @@ import spray.httpx.SprayJsonSupport import kamon.Kamon import spray.http.HttpRequest import akka.actor.OneForOneStrategy -import com.codahale.metrics.{Metric, MetricFilter} - +import com.codahale.metrics.{ Metric, MetricFilter } class DashboardServiceActor extends Actor with DashboardService { def actorRefFactory = context def receive = runRoute(DashboardRoute) - override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { case _ => SupervisorStrategy.Stop } + override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() { case _ ⇒ SupervisorStrategy.Stop } } trait DashboardService extends HttpService with StaticResources with DashboardPages with DashboardMetricsApi { @@ -47,7 +46,7 @@ trait DashboardService extends HttpService with StaticResources with DashboardPa trait StaticResources extends HttpService { - val staticResources = get { getFromResourceDirectory("web")} + val staticResources = get { getFromResourceDirectory("web") } } trait DashboardPages extends HttpService { @@ -60,7 +59,7 @@ trait DashboardPages extends HttpService { } } -trait DashboardMetricsApi extends HttpService with SprayJsonSupport{ +trait DashboardMetricsApi extends HttpService with SprayJsonSupport { /*import scala.collection.JavaConverters._ import kamon.metric.Metrics._ diff --git a/kamon-dashboard/src/main/scala/kamon/dashboard/protocol/DashboardProtocols.scala b/kamon-dashboard/src/main/scala/kamon/dashboard/protocol/DashboardProtocols.scala index 2e523ca2..a27e23bf 100644 --- a/kamon-dashboard/src/main/scala/kamon/dashboard/protocol/DashboardProtocols.scala +++ b/kamon-dashboard/src/main/scala/kamon/dashboard/protocol/DashboardProtocols.scala @@ -15,15 +15,15 @@ * ========================================================== */ package kamon.dashboard.protocol -import spray.json.{RootJsonFormat, DefaultJsonProtocol} +import spray.json.{ RootJsonFormat, DefaultJsonProtocol } object DashboardProtocols { - case class TimerDataHolder(name:String, count:Double, percentile99:Double) - case class TotalMessages(messages:Double, actors:Long, data:Seq[TimerDataHolder]) - case class DispatcherMetricCollectorHolder(name:String, activeThreadCount: Double, poolSize: Double, queueSize:Double) - case class ActorSystemMetricsHolder(actorSystem:String, dispatchers:Map[String, DispatcherMetricCollectorHolder]) - case class ActorTree(name:String, children:List[ActorTree] = Nil) + case class TimerDataHolder(name: String, count: Double, percentile99: Double) + case class TotalMessages(messages: Double, actors: Long, data: Seq[TimerDataHolder]) + case class DispatcherMetricCollectorHolder(name: String, activeThreadCount: Double, poolSize: Double, queueSize: Double) + case class ActorSystemMetricsHolder(actorSystem: String, dispatchers: Map[String, DispatcherMetricCollectorHolder]) + case class ActorTree(name: String, children: List[ActorTree] = Nil) object TimerDataHolder extends DefaultJsonProtocol { implicit val TimerDataHolderJsonProtocol = jsonFormat3(apply) @@ -42,6 +42,6 @@ object DashboardProtocols { } object ActorTree extends DefaultJsonProtocol { - implicit val ActorTreeJsonProtocol:RootJsonFormat[ActorTree] = rootFormat(lazyFormat(jsonFormat(apply, "name", "children"))) + implicit val ActorTreeJsonProtocol: RootJsonFormat[ActorTree] = rootFormat(lazyFormat(jsonFormat(apply, "name", "children"))) } } diff --git a/kamon-metrics/src/main/scala/kamon/Metrics.scala b/kamon-metrics/src/main/scala/kamon/Metrics.scala index 20528eb6..c7d57f13 100644 --- a/kamon-metrics/src/main/scala/kamon/Metrics.scala +++ b/kamon-metrics/src/main/scala/kamon/Metrics.scala @@ -17,7 +17,7 @@ package kamon import com.codahale.metrics.MetricRegistry import com.typesafe.config.ConfigFactory -import akka.actor.{ActorRef, ExtendedActorSystem, ExtensionIdProvider, ExtensionId} +import akka.actor.{ ActorRef, ExtendedActorSystem, ExtensionIdProvider, ExtensionId } import kamon.Kamon.Extension import akka.actor @@ -27,11 +27,7 @@ object Metrics extends ExtensionId[MetricsExtension] with ExtensionIdProvider { } - class MetricsExtension(system: ExtendedActorSystem) extends Kamon.Extension { def manager: ActorRef = ??? } - - - diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala index 7c2b34ea..4082458c 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Agent.scala @@ -15,10 +15,10 @@ * ========================================================== */ package kamon.newrelic -import akka.actor.{ActorLogging, Actor} +import akka.actor.{ ActorLogging, Actor } import spray.json._ import scala.concurrent.Future -import spray.httpx.{SprayJsonSupport, RequestBuilding, ResponseTransformation} +import spray.httpx.{ SprayJsonSupport, RequestBuilding, ResponseTransformation } import spray.httpx.encoding.Deflate import spray.http._ import spray.json.lenses.JsonLenses._ @@ -26,7 +26,7 @@ import akka.pattern.pipe import java.lang.management.ManagementFactory import spray.client.pipelining._ import scala.util.control.NonFatal -import kamon.newrelic.NewRelicMetric.{Data, ID, MetricBatch} +import kamon.newrelic.NewRelicMetric.{ Data, ID, MetricBatch } class Agent extends Actor with RequestBuilding with ResponseTransformation with SprayJsonSupport with ActorLogging { import context.dispatcher @@ -43,15 +43,12 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with AgentInfo(licenseKey, appName, runtimeName(1), runtimeName(0).toInt) } - - def receive = { - case Initialize(runId, collector) => context become reporting(runId, collector) + case Initialize(runId, collector) ⇒ context become reporting(runId, collector) } - def reporting(runId: Long, collector: String): Receive = { - case batch: MetricBatch => sendMetricData(runId, collector, batch.metrics) + case batch: MetricBatch ⇒ sendMetricData(runId, collector, batch.metrics) } override def preStart(): Unit = { @@ -60,26 +57,26 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with } def initialize: Unit = { - pipe ({ - for( - collector <- selectCollector; - runId <- connect(collector, agentInfo) - ) yield Initialize(runId, collector) - } recover { - case NonFatal(ex) => InitializationFailed(ex) - }) to self + pipe({ + for ( + collector ← selectCollector; + runId ← connect(collector, agentInfo) + ) yield Initialize(runId, collector) + } recover { + case NonFatal(ex) ⇒ InitializationFailed(ex) + }) to self } import AgentJsonProtocol._ - val compressedPipeline: HttpRequest => Future[HttpResponse] = encode(Deflate) ~> sendReceive - val compressedToJsonPipeline: HttpRequest => Future[JsValue] = compressedPipeline ~> toJson + val compressedPipeline: HttpRequest ⇒ Future[HttpResponse] = encode(Deflate) ~> sendReceive + val compressedToJsonPipeline: HttpRequest ⇒ Future[JsValue] = compressedPipeline ~> toJson def toJson(response: HttpResponse): JsValue = response.entity.asString.asJson def selectCollector: Future[String] = { compressedToJsonPipeline { Post(s"http://collector.newrelic.com/agent_listener/invoke_raw_method?method=get_redirect_host&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12", JsArray()) - } map { json => + } map { json ⇒ json.extract[String]('return_value) } } @@ -87,12 +84,11 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with def connect(collectorHost: String, connect: AgentInfo): Future[Long] = { compressedToJsonPipeline { Post(s"http://$collectorHost/agent_listener/invoke_raw_method?method=connect&license_key=${agentInfo.licenseKey}&marshal_format=json&protocol_version=12", connect) - } map { json => + } map { json ⇒ json.extract[Long]('return_value / 'agent_run_id) } } - def sendMetricData(runId: Long, collector: String, metrics: List[(ID, Data)]) = { log.info("Reporting this to NewRelic: " + metrics.mkString("\n")) @@ -103,14 +99,10 @@ class Agent extends Actor with RequestBuilding with ResponseTransformation with } } - - } object Agent { - - case class Initialize(runId: Long, collector: String) case class InitializationFailed(reason: Throwable) case class CollectorSelection(return_value: String) diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala index b133aee0..da8199ab 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/AgentJsonProtocol.scala @@ -25,18 +25,16 @@ object AgentJsonProtocol extends DefaultJsonProtocol { JsArray( JsObject( "agent_version" -> JsString("3.1.0"), - "app_name" -> JsArray(JsString(obj.appName)), - "host" -> JsString(obj.host), - "identifier" -> JsString(s"java:${obj.appName}"), - "language" -> JsString("java"), - "pid" -> JsNumber(obj.pid) - ) - ) + "app_name" -> JsArray(JsString(obj.appName)), + "host" -> JsString(obj.host), + "identifier" -> JsString(s"java:${obj.appName}"), + "language" -> JsString("java"), + "pid" -> JsNumber(obj.pid))) } import NewRelicMetric._ - implicit def listWriter[T : JsonWriter] = new JsonWriter[List[T]] { + implicit def listWriter[T: JsonWriter] = new JsonWriter[List[T]] { def write(list: List[T]) = JsArray(list.map(_.toJson)) } @@ -46,16 +44,14 @@ object AgentJsonProtocol extends DefaultJsonProtocol { JsArray( JsObject( "name" -> JsString(id.name) // TODO Include scope - ), + ), JsArray( JsNumber(data.callCount), JsNumber(data.total), JsNumber(data.totalExclusive), JsNumber(data.min), JsNumber(data.max), - JsNumber(data.sumOfSquares) - ) - ) + JsNumber(data.sumOfSquares))) } } @@ -65,7 +61,6 @@ object AgentJsonProtocol extends DefaultJsonProtocol { JsNumber(obj.runId), JsNumber(obj.start), JsNumber(obj.end), - obj.metrics.toJson - ) + obj.metrics.toJson) } } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala index d0a71c08..53240c89 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/Apdex.scala @@ -17,7 +17,7 @@ package kamon.newrelic import akka.actor.Actor import kamon.trace.UowTrace -import com.newrelic.api.agent.{NewRelic => NRAgent} +import com.newrelic.api.agent.{ NewRelic ⇒ NRAgent } import kamon.trace.UowTracing.WebExternal class Apdex extends Actor { @@ -28,7 +28,7 @@ class Apdex extends Actor { var frustrated: Int = 0 def receive = { - case trace: UowTrace => recordTransaction(trace) + case trace: UowTrace ⇒ recordTransaction(trace) } @@ -41,26 +41,25 @@ class Apdex extends Actor { def total: Int = satisfied + tolerating + frustrated def updateStats(sampleTime: Double): Unit = { - if(sampleTime < t) + if (sampleTime < t) satisfied += 1 + else if (sampleTime >= t && sampleTime <= 4 * t) + tolerating += 1 else - if(sampleTime >= t && sampleTime <= 4*t) - tolerating += 1 - else - frustrated += 1 + frustrated += 1 } def recordTransaction(uowTrace: UowTrace): Unit = { - val time = ((uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp)/1E9) + val time = ((uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp) / 1E9) updateStats(time) - NRAgent.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat ) + NRAgent.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat) NRAgent.recordMetric("WebTransaction", time.toFloat) NRAgent.recordMetric("HttpDispatcher", time.toFloat) - uowTrace.segments.collect { case we: WebExternal => we }.foreach { webExternalTrace => - val external = ((webExternalTrace.finish - webExternalTrace.start)/1E9).toFloat + uowTrace.segments.collect { case we: WebExternal ⇒ we }.foreach { webExternalTrace ⇒ + val external = ((webExternalTrace.finish - webExternalTrace.start) / 1E9).toFloat println("Web External: " + webExternalTrace) NRAgent.recordMetric(s"External/${webExternalTrace.host}/http", external) @@ -68,22 +67,19 @@ class Apdex extends Actor { NRAgent.recordMetric(s"External/${webExternalTrace.host}/http/WebTransaction/Custom" + uowTrace.name, external) } - - val allExternals = uowTrace.segments.collect { case we: WebExternal => we } sortBy(_.timestamp) - + val allExternals = uowTrace.segments.collect { case we: WebExternal ⇒ we } sortBy (_.timestamp) def measureExternal(accum: Long, lastEnd: Long, segments: Seq[WebExternal]): Long = segments match { - case Nil => accum - case head :: tail => - if(head.start > lastEnd) - measureExternal(accum + (head.finish-head.start), head.finish, tail) + case Nil ⇒ accum + case head :: tail ⇒ + if (head.start > lastEnd) + measureExternal(accum + (head.finish - head.start), head.finish, tail) else - measureExternal(accum + (head.finish-lastEnd), head.finish, tail) + measureExternal(accum + (head.finish - lastEnd), head.finish, tail) } val external = measureExternal(0, 0, allExternals) / 1E9 - NRAgent.recordMetric(s"External/all", external.toFloat) NRAgent.recordMetric(s"External/allWeb", external.toFloat) diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala index f9b8b916..e76c9bde 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelic.scala @@ -18,8 +18,8 @@ package kamon.newrelic import akka.actor._ import scala.collection.mutable import kamon.Kamon -import kamon.trace.{UowTrace, Trace} -import kamon.newrelic.NewRelicMetric.{MetricBatch, FlushMetrics} +import kamon.trace.{ UowTrace, Trace } +import kamon.newrelic.NewRelicMetric.{ MetricBatch, FlushMetrics } import scala.concurrent.duration._ class NewRelic extends ExtensionId[NewRelicExtension] { @@ -35,8 +35,6 @@ class NewRelicManager extends Actor with ActorLogging { Kamon(Trace)(context.system) ! Trace.Register - - val webTransactionMetrics = context.actorOf(Props[WebTransactionMetrics], "web-transaction-metrics") val agent = context.actorOf(Props[Agent], "agent") @@ -46,7 +44,7 @@ class NewRelicManager extends Actor with ActorLogging { } def receive = { - case trace: UowTrace => webTransactionMetrics ! trace + case trace: UowTrace ⇒ webTransactionMetrics ! trace } } @@ -54,8 +52,8 @@ object NewRelicMetric { case class ID(name: String, scope: Option[String]) case class Data(var callCount: Long, var total: Double, var totalExclusive: Double, var min: Double, var max: Double, var sumOfSquares: Double) { def record(value: Double): Unit = { - if(value > max) max = value - if(value < min) min = value + if (value > max) max = value + if (value < min) min = value total += value totalExclusive += value @@ -72,15 +70,14 @@ object NewRelicMetric { case class MetricBatch(metrics: List[(ID, Data)]) } - class WebTransactionMetrics extends Actor with ActorLogging { val apdexT = 0.5D var metrics = mutable.Map.empty[NewRelicMetric.ID, NewRelicMetric.Data] var apdex = NewRelicMetric.Data(0, 0, 0, apdexT, apdexT, 0) def receive = { - case trace: UowTrace => updateStats(trace) - case FlushMetrics => flush + case trace: UowTrace ⇒ updateStats(trace) + case FlushMetrics ⇒ flush } def flush: Unit = { @@ -94,13 +91,12 @@ class WebTransactionMetrics extends Actor with ActorLogging { } def recordApdex(time: Double): Unit = { - if(time <= apdexT) + if (time <= apdexT) apdex.callCount += 1 + else if (time > apdexT && time <= (4 * apdexT)) + apdex.total += 1 else - if(time > apdexT && time <= (4 * apdexT)) - apdex.total += 1 - else - apdex.totalExclusive += 1 + apdex.totalExclusive += 1 } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala index 844f18ea..c6d87769 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala @@ -17,22 +17,22 @@ package kamon.newrelic import akka.actor.Actor import akka.event.Logging.Error -import akka.event.Logging.{LoggerInitialized, InitializeLogger} +import akka.event.Logging.{ LoggerInitialized, InitializeLogger } import com.newrelic.api.agent.NewRelic import kamon.trace.ContextAware class NewRelicErrorLogger extends Actor { def receive = { - case InitializeLogger(_) => sender ! LoggerInitialized - case error @ Error(cause, logSource, logClass, message) => notifyError(error) - case anythingElse => + case InitializeLogger(_) ⇒ sender ! LoggerInitialized + case error @ Error(cause, logSource, logClass, message) ⇒ notifyError(error) + case anythingElse ⇒ } def notifyError(error: Error): Unit = { val params = new java.util.HashMap[String, String]() val ctx = error.asInstanceOf[ContextAware].traceContext - for(c <- ctx) { + for (c ← ctx) { params.put("UOW", c.uow) } diff --git a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala index 2a2d4442..260fc71e 100644 --- a/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala +++ b/kamon-newrelic/src/main/scala/kamon/newrelic/NewRelicReporting.scala @@ -20,21 +20,20 @@ import kamon.trace.UowTrace import com.newrelic.api.agent.NewRelic import kamon.trace.UowTracing.WebExternal - class NewRelicReporting extends Actor { def receive = { - case trace: UowTrace => recordTransaction(trace) + case trace: UowTrace ⇒ recordTransaction(trace) } def recordTransaction(uowTrace: UowTrace): Unit = { - val time = ((uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp)/1E9) + val time = ((uowTrace.segments.last.timestamp - uowTrace.segments.head.timestamp) / 1E9) - NewRelic.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat ) + NewRelic.recordMetric("WebTransaction/Custom" + uowTrace.name, time.toFloat) NewRelic.recordMetric("WebTransaction", time.toFloat) NewRelic.recordMetric("HttpDispatcher", time.toFloat) - uowTrace.segments.collect { case we: WebExternal => we }.foreach { webExternalTrace => - val external = ((webExternalTrace.finish - webExternalTrace.start)/1E9).toFloat + uowTrace.segments.collect { case we: WebExternal ⇒ we }.foreach { webExternalTrace ⇒ + val external = ((webExternalTrace.finish - webExternalTrace.start) / 1E9).toFloat println("Web External: " + webExternalTrace) NewRelic.recordMetric(s"External/${webExternalTrace.host}/http", external) @@ -42,22 +41,19 @@ class NewRelicReporting extends Actor { NewRelic.recordMetric(s"External/${webExternalTrace.host}/http/WebTransaction/Custom" + uowTrace.name, external) } - - val allExternals = uowTrace.segments.collect { case we: WebExternal => we } sortBy(_.timestamp) - + val allExternals = uowTrace.segments.collect { case we: WebExternal ⇒ we } sortBy (_.timestamp) def measureExternal(accum: Long, lastEnd: Long, segments: Seq[WebExternal]): Long = segments match { - case Nil => accum - case head :: tail => - if(head.start > lastEnd) - measureExternal(accum + (head.finish-head.start), head.finish, tail) + case Nil ⇒ accum + case head :: tail ⇒ + if (head.start > lastEnd) + measureExternal(accum + (head.finish - head.start), head.finish, tail) else - measureExternal(accum + (head.finish-lastEnd), head.finish, tail) + measureExternal(accum + (head.finish - lastEnd), head.finish, tail) } val external = measureExternal(0, 0, allExternals) / 1E9 - NewRelic.recordMetric(s"External/all", external.toFloat) NewRelic.recordMetric(s"External/allWeb", external.toFloat) diff --git a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala index 7cdfae73..1c1dba4f 100644 --- a/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala +++ b/kamon-playground/src/main/scala/test/SimpleRequestProcessor.scala @@ -19,7 +19,7 @@ import akka.actor._ import spray.routing.SimpleRoutingApp import akka.util.Timeout import spray.httpx.RequestBuilding -import scala.concurrent.{Await, Future} +import scala.concurrent.{ Await, Future } import kamon.spray.UowDirectives import kamon.trace.Trace import kamon.Kamon @@ -34,7 +34,7 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil import system.dispatcher val act = system.actorOf(Props(new Actor { - def receive: Actor.Receive = { case any => sender ! any } + def receive: Actor.Receive = { case any ⇒ sender ! any } }), "com") implicit val timeout = Timeout(30 seconds) @@ -44,42 +44,42 @@ object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuil val random = new Random() startServer(interface = "localhost", port = 9090) { get { - path("test"){ + path("test") { uow { complete { - val futures = pipeline(Get("http://10.254.209.14:8000/")).map(r => "Ok") :: pipeline(Get("http://10.254.209.14:8000/")).map(r => "Ok") :: Nil + val futures = pipeline(Get("http://10.254.209.14:8000/")).map(r ⇒ "Ok") :: pipeline(Get("http://10.254.209.14:8000/")).map(r ⇒ "Ok") :: Nil - Future.sequence(futures).map(l => "Ok") + Future.sequence(futures).map(l ⇒ "Ok") } } } ~ - path("reply" / Segment) { reqID => - uow { + path("reply" / Segment) { reqID ⇒ + uow { + complete { + if (Trace.context().isEmpty) + println("ROUTE NO CONTEXT") + + (replier ? reqID).mapTo[String] + } + } + } ~ + path("ok") { complete { - if (Trace.context().isEmpty) - println("ROUTE NO CONTEXT") - - (replier ? reqID).mapTo[String] + //Thread.sleep(random.nextInt(1) + random.nextInt(5) + random.nextInt(2)) + "ok" + } + } ~ + path("future") { + dynamic { + complete(Future { "OK" }) + } + } ~ + path("error") { + complete { + throw new NullPointerException + "okk" } } - } ~ - path("ok") { - complete{ - //Thread.sleep(random.nextInt(1) + random.nextInt(5) + random.nextInt(2)) - "ok" - } - } ~ - path("future") { - dynamic { - complete(Future { "OK" }) - } - } ~ - path("error") { - complete { - throw new NullPointerException - "okk" - } - } } } @@ -98,21 +98,18 @@ object Verifier extends App { val pipeline = sendReceive - val futures = Future.sequence(for(i <- 1 to 500) yield { - pipeline(Get("http://127.0.0.1:9090/reply/"+i)).map(r => r.entity.asString == i.toString) + val futures = Future.sequence(for (i ← 1 to 500) yield { + pipeline(Get("http://127.0.0.1:9090/reply/" + i)).map(r ⇒ r.entity.asString == i.toString) }) - println("Everything is: "+ Await.result(futures, 10 seconds).forall(a => a == true)) + println("Everything is: " + Await.result(futures, 10 seconds).forall(a ⇒ a == true)) } - - - } class Replier extends Actor with ActorLogging { def receive = { - case anything => - if(Trace.context.isEmpty) + case anything ⇒ + if (Trace.context.isEmpty) log.warning("PROCESSING A MESSAGE WITHOUT CONTEXT") log.info("Processing at the Replier") diff --git a/kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala b/kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala index e0837d01..c47dbc67 100644 --- a/kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala +++ b/kamon-spray/src/main/scala/kamon/instrumentation/SprayServerInstrumentation.scala @@ -19,12 +19,10 @@ import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint import spray.http.HttpRequest import spray.http.HttpHeaders.Host -import kamon.trace.{TraceContext, Trace, ContextAware, TimedContextAware} +import kamon.trace.{ TraceContext, Trace, ContextAware, TimedContextAware } //import spray.can.client.HttpHostConnector.RequestContext - - @Aspect class SprayOpenRequestContextTracing { @@ -38,7 +36,6 @@ class SprayOpenRequestContextTracing { @Aspect class SprayServerInstrumentation { - @Pointcut("execution(spray.can.client.HttpHostConnector.RequestContext.new(..)) && this(ctx) && args(request, *, *, *)") def requestRecordInit(ctx: TimedContextAware, request: HttpRequest): Unit = {} @@ -51,8 +48,6 @@ class SprayServerInstrumentation { } tctx.tracer ! WebExternalStart(ctx.timestamp, host.host)*/ } - - @Pointcut("execution(* spray.can.client.HttpHostConnectionSlot.dispatchToCommander(..)) && args(requestContext, message)") def dispatchToCommander(requestContext: TimedContextAware, message: Any): Unit = {} @@ -69,7 +64,6 @@ class SprayServerInstrumentation { } - @Pointcut("execution(* spray.can.client.HttpHostConnector.RequestContext.copy(..)) && this(old)") def copyingRequestContext(old: TimedContextAware): Unit = {} diff --git a/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala b/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala index 775d93dd..b4ff1877 100644 --- a/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala +++ b/kamon-spray/src/main/scala/kamon/spray/UowDirectives.scala @@ -23,7 +23,7 @@ import java.net.InetAddress import kamon.trace.Trace trait UowDirectives extends BasicDirectives { - def uow: Directive0 = mapRequest { request => + def uow: Directive0 = mapRequest { request ⇒ val uowHeader = request.headers.find(_.name == "X-UOW") val generatedUow = uowHeader.map(_.value).getOrElse(UowDirectives.newUow) diff --git a/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala b/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala index 044ba5d8..c43022d9 100644 --- a/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala +++ b/kamon-spray/src/main/scala/spray/can/server/ServerRequestTracing.scala @@ -15,20 +15,18 @@ * ========================================================== */ package spray.can.server -import org.aspectj.lang.annotation.{After, Pointcut, DeclareMixin, Aspect} -import kamon.trace.{Trace, ContextAware} +import org.aspectj.lang.annotation.{ After, Pointcut, DeclareMixin, Aspect } +import kamon.trace.{ Trace, ContextAware } import spray.http.HttpRequest import akka.actor.ActorSystem import akka.event.Logging.Warning - @Aspect class ServerRequestTracing { @DeclareMixin("spray.can.server.OpenRequestComponent.DefaultOpenRequest") def mixinContextAwareToOpenRequest: ContextAware = ContextAware.default - @Pointcut("execution(spray.can.server.OpenRequestComponent$DefaultOpenRequest.new(..)) && this(openRequest) && args(*, request, *, *)") def openRequestInit(openRequest: ContextAware, request: HttpRequest): Unit = {} @@ -51,14 +49,14 @@ class ServerRequestTracing { val storedContext = openRequest.traceContext val incomingContext = Trace.finish() - for(original <- storedContext) { + for (original ← storedContext) { incomingContext match { - case Some(incoming) if original.id != incoming.id => + case Some(incoming) if original.id != incoming.id ⇒ publishWarning(s"Different ids when trying to close a Trace, original: [$original] - incoming: [$incoming]") - case Some(_) => // nothing to do here. + case Some(_) ⇒ // nothing to do here. - case None => + case None ⇒ original.finish publishWarning(s"Trace context not present while closing the Trace: [$original]") } diff --git a/kamon-trace/src/main/scala/akka/pattern/AskPatternTracing.scala b/kamon-trace/src/main/scala/akka/pattern/AskPatternTracing.scala index c01c0c85..970a4a51 100644 --- a/kamon-trace/src/main/scala/akka/pattern/AskPatternTracing.scala +++ b/kamon-trace/src/main/scala/akka/pattern/AskPatternTracing.scala @@ -15,7 +15,7 @@ * ========================================================== */ package akka.pattern -import org.aspectj.lang.annotation.{AfterReturning, Pointcut, Aspect} +import org.aspectj.lang.annotation.{ AfterReturning, Pointcut, Aspect } import akka.event.Logging.Warning import scala.compat.Platform.EOL import akka.actor.ActorRefProvider @@ -38,7 +38,7 @@ class AskPatternTracing { val stack = new StackTraceCaptureException future onFailure { - case timeout: AskTimeoutException => + case timeout: AskTimeoutException ⇒ val stackString = stack.getStackTrace.drop(3).mkString("", EOL, EOL) system.eventStream.publish(Warning("AskPatternTracing", classOf[AskPatternTracing], diff --git a/kamon-trace/src/main/scala/kamon/trace/Trace.scala b/kamon-trace/src/main/scala/kamon/trace/Trace.scala index d733c5de..05519067 100644 --- a/kamon-trace/src/main/scala/kamon/trace/Trace.scala +++ b/kamon-trace/src/main/scala/kamon/trace/Trace.scala @@ -27,17 +27,13 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { def lookup(): ExtensionId[_ <: Extension] = Trace def createExtension(system: ExtendedActorSystem): TraceExtension = new TraceExtension(system) - /*** Protocol */ case object Register - - /** User API */ private[trace] val traceContext = new DynamicVariable[Option[TraceContext]](None) private[trace] val tranid = new AtomicLong() - def context() = traceContext.value def set(ctx: TraceContext) = traceContext.value = Some(ctx) @@ -50,9 +46,9 @@ object Trace extends ExtensionId[TraceExtension] with ExtensionIdProvider { ctx } - def withContext[T](ctx: Option[TraceContext])(thunk: => T): T = traceContext.withValue(ctx)(thunk) + def withContext[T](ctx: Option[TraceContext])(thunk: ⇒ T): T = traceContext.withValue(ctx)(thunk) - def transformContext(f: TraceContext => TraceContext): Unit = { + def transformContext(f: TraceContext ⇒ TraceContext): Unit = { context.map(f).foreach(set(_)) } @@ -74,16 +70,16 @@ class TraceManager extends Actor with ActorLogging { var listeners: Seq[ActorRef] = Seq.empty def receive = { - case Register => + case Register ⇒ listeners = sender +: listeners log.info("Registered [{}] as listener for Kamon traces", sender) - case segment: UowSegment => + case segment: UowSegment ⇒ val tracerName = segment.id.toString context.child(tracerName).getOrElse(newTracer(tracerName)) ! segment - case trace: UowTrace => - listeners foreach(_ ! trace) + case trace: UowTrace ⇒ + listeners foreach (_ ! trace) } def newTracer(name: String): ActorRef = { diff --git a/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala b/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala index c1efd847..eee3e0b0 100644 --- a/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala +++ b/kamon-trace/src/main/scala/kamon/trace/TraceContext.scala @@ -20,7 +20,7 @@ import akka.actor._ import java.util.concurrent.atomic.AtomicLong import scala.concurrent.duration._ import kamon.Kamon -import kamon.trace.UowTracing.{Finish, Start} +import kamon.trace.UowTracing.{ Finish, Start } // TODO: Decide if we need or not an ID, generating it takes time and it doesn't seem necessary. protected[kamon] case class TraceContext(private val collector: ActorRef, id: Long, uow: String = "", userContext: Option[Any] = None) { @@ -31,10 +31,8 @@ protected[kamon] case class TraceContext(private val collector: ActorRef, id: Lo collector ! Finish(id) } - } - trait ContextAware { def traceContext: Option[TraceContext] } diff --git a/kamon-trace/src/main/scala/kamon/trace/UowTracing.scala b/kamon-trace/src/main/scala/kamon/trace/UowTracing.scala index a9603585..7858e31f 100644 --- a/kamon-trace/src/main/scala/kamon/trace/UowTracing.scala +++ b/kamon-trace/src/main/scala/kamon/trace/UowTracing.scala @@ -41,7 +41,6 @@ case class UowTrace(name: String, uow: String, start: Long, end: Long, segments: def elapsed: Long = end - start } - class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) extends Actor with ActorLogging { context.setReceiveTimeout(aggregationTimeout) @@ -54,20 +53,20 @@ class UowTraceAggregator(reporting: ActorRef, aggregationTimeout: Duration) exte var end = 0L def receive = { - case start: Start => + case start: Start ⇒ this.start = start.timestamp segments = segments :+ start; name = start.name - case finish: Finish => + case finish: Finish ⇒ end = finish.timestamp segments = segments :+ finish; finishTracing() - case wes: WebExternalStart => pendingExternal = pendingExternal :+ wes - case finish @ WebExternalFinish(id) => pendingExternal.find(_.id == id).map(start => { + case wes: WebExternalStart ⇒ pendingExternal = pendingExternal :+ wes + case finish @ WebExternalFinish(id) ⇒ pendingExternal.find(_.id == id).map(start ⇒ { segments = segments :+ WebExternal(finish.id, start.timestamp, finish.timestamp, start.host) }) - case Rename(id, newName) => name = newName - case segment: UowSegment => segments = segments :+ segment - case ReceiveTimeout => + case Rename(id, newName) ⇒ name = newName + case segment: UowSegment ⇒ segments = segments :+ segment + case ReceiveTimeout ⇒ log.warning("Transaction {} did not complete properly, the recorded segments are: {}", name, segments) context.stop(self) } diff --git a/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorLoggingTracing.scala b/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorLoggingTracing.scala index c1a0d228..783fd6e1 100644 --- a/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorLoggingTracing.scala +++ b/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorLoggingTracing.scala @@ -15,10 +15,10 @@ * ========================================================== */ package kamon.trace.instrumentation -import org.aspectj.lang.annotation.{Around, Pointcut, DeclareMixin, Aspect} +import org.aspectj.lang.annotation.{ Around, Pointcut, DeclareMixin, Aspect } import org.aspectj.lang.ProceedingJoinPoint import org.slf4j.MDC -import kamon.trace.{TraceContext, ContextAware, Trace} +import kamon.trace.{ TraceContext, ContextAware, Trace } @Aspect class ActorLoggingTracing { @@ -27,17 +27,17 @@ class ActorLoggingTracing { def mixin: ContextAware = ContextAware.default @Pointcut("execution(* akka.event.slf4j.Slf4jLogger.withMdc(..)) && args(logSource, logEvent, logStatement)") - def withMdcInvocation(logSource: String, logEvent: ContextAware, logStatement: () => _): Unit = {} + def withMdcInvocation(logSource: String, logEvent: ContextAware, logStatement: () ⇒ _): Unit = {} @Around("withMdcInvocation(logSource, logEvent, logStatement)") - def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: ContextAware, logStatement: () => _): Unit = { + def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: ContextAware, logStatement: () ⇒ _): Unit = { logEvent.traceContext match { - case Some(ctx) => + case Some(ctx) ⇒ MDC.put("uow", ctx.uow) pjp.proceed() MDC.remove("uow") - case None => pjp.proceed() + case None ⇒ pjp.proceed() } } } diff --git a/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorMessagePassingTracing.scala b/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorMessagePassingTracing.scala index 4b48f8f2..399ddf61 100644 --- a/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorMessagePassingTracing.scala +++ b/kamon-trace/src/main/scala/kamon/trace/instrumentation/ActorMessagePassingTracing.scala @@ -17,11 +17,10 @@ package kamon.trace.instrumentation import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint -import akka.actor.{Props, ActorSystem, ActorRef} -import akka.dispatch.{Envelope, MessageDispatcher} +import akka.actor.{ Props, ActorSystem, ActorRef } +import akka.dispatch.{ Envelope, MessageDispatcher } import com.codahale.metrics.Timer -import kamon.trace.{ContextAware, TraceContext, Trace} - +import kamon.trace.{ ContextAware, TraceContext, Trace } @Aspect class BehaviourInvokeTracing { diff --git a/kamon-trace/src/main/scala/kamon/trace/instrumentation/FutureTracing.scala b/kamon-trace/src/main/scala/kamon/trace/instrumentation/FutureTracing.scala index fc4abfe3..844f1d61 100644 --- a/kamon-trace/src/main/scala/kamon/trace/instrumentation/FutureTracing.scala +++ b/kamon-trace/src/main/scala/kamon/trace/instrumentation/FutureTracing.scala @@ -17,7 +17,7 @@ package kamon.trace.instrumentation import org.aspectj.lang.annotation._ import org.aspectj.lang.ProceedingJoinPoint -import kamon.trace.{ContextAware, TraceContext, Trace} +import kamon.trace.{ ContextAware, TraceContext, Trace } @Aspect class FutureTracing { @@ -25,7 +25,6 @@ class FutureTracing { @DeclareMixin("scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable") def mixin: ContextAware = ContextAware.default - @Pointcut("execution((scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).new(..)) && this(runnable)") def futureRelatedRunnableCreation(runnable: ContextAware): Unit = {} @@ -35,7 +34,6 @@ class FutureTracing { runnable.traceContext } - @Pointcut("execution(* (scala.concurrent.impl.CallbackRunnable || scala.concurrent.impl.Future.PromiseCompletingRunnable).run()) && this(runnable)") def futureRelatedRunnableExecution(runnable: ContextAware) = {} diff --git a/project/Build.scala b/project/Build.scala index 08ee3033..514d39af 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -11,11 +11,13 @@ object Build extends Build { lazy val root = Project("root", file(".")) .aggregate(kamonCore, kamonTrace, kamonMetrics, kamonSpray, kamonNewrelic, kamonPlayground, kamonDashboard) .settings(basicSettings: _*) + .settings(formatSettings: _*) .settings(noPublishing: _*) lazy val kamonCore = Project("kamon-core", file("kamon-core")) .settings(basicSettings: _*) + .settings(formatSettings: _*) .settings(aspectJSettings: _*) .settings( libraryDependencies ++= @@ -25,6 +27,7 @@ object Build extends Build { lazy val kamonTrace = Project("kamon-trace", file("kamon-trace")) .settings(basicSettings: _*) + .settings(formatSettings: _*) .settings(aspectJSettings: _*) .settings( libraryDependencies ++= @@ -35,6 +38,7 @@ object Build extends Build { lazy val kamonMetrics = Project("kamon-metrics", file("kamon-metrics")) .settings(basicSettings: _*) + .settings(formatSettings: _*) .settings(aspectJSettings: _*) .settings( libraryDependencies ++= @@ -45,6 +49,7 @@ object Build extends Build { lazy val kamonSpray = Project("kamon-spray", file("kamon-spray")) .settings(basicSettings: _*) + .settings(formatSettings: _*) .settings(aspectJSettings: _*) .settings( libraryDependencies ++= @@ -55,6 +60,7 @@ object Build extends Build { lazy val kamonNewrelic = Project("kamon-newrelic", file("kamon-newrelic")) .settings(basicSettings: _*) + .settings(formatSettings: _*) .settings(aspectJSettings: _*) .settings( libraryDependencies ++= @@ -65,6 +71,7 @@ object Build extends Build { lazy val kamonPlayground = Project("kamon-playground", file("kamon-playground")) .settings(basicSettings: _*) + .settings(formatSettings: _*) .settings(revolverSettings: _*) .settings(newrelicSettings: _*) .settings(noPublishing: _*) @@ -76,6 +83,7 @@ object Build extends Build { lazy val kamonDashboard = Project("kamon-dashboard", file("kamon-dashboard")) .settings(basicSettings: _*) + .settings(formatSettings: _*) .settings(libraryDependencies ++= compile(akkaActor, akkaSlf4j, sprayRouting, sprayCan, sprayJson)) .dependsOn(kamonCore) diff --git a/project/Settings.scala b/project/Settings.scala index 62a48223..d5cdd3b6 100644 --- a/project/Settings.scala +++ b/project/Settings.scala @@ -1,6 +1,8 @@ import sbt._ import Keys._ import spray.revolver.RevolverPlugin.Revolver +import com.typesafe.sbt.SbtScalariform +import com.typesafe.sbt.SbtScalariform.ScalariformKeys object Settings { val VERSION = "0.0.11" @@ -28,10 +30,19 @@ object Settings { import spray.revolver.RevolverPlugin.Revolver._ - lazy val revolverSettings = Revolver.settings ++ seq( - reJRebelJar := "~/.jrebel/jrebel.jar" + lazy val revolverSettings = Revolver.settings ++ seq(reJRebelJar := "~/.jrebel/jrebel.jar") + + lazy val formatSettings = SbtScalariform.scalariformSettings ++ Seq( + ScalariformKeys.preferences in Compile := formattingPreferences, + ScalariformKeys.preferences in Test := formattingPreferences ) - + import scalariform.formatter.preferences._ + def formattingPreferences = + FormattingPreferences() + .setPreference(RewriteArrowSymbols, true) + .setPreference(AlignParameters, true) + .setPreference(AlignSingleLineCaseStatements, true) + .setPreference(DoubleIndentClassDeclaration, true) } diff --git a/project/plugins.sbt b/project/plugins.sbt index d037d72f..a260b165 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -13,3 +13,6 @@ addSbtPlugin("com.ivantopo.sbt" % "sbt-newrelic" % "0.0.1") addSbtPlugin("com.github.gseitz" % "sbt-release" % "0.8") addSbtPlugin("com.typesafe.sbt" % "sbt-site" % "0.7.1") + +addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.0.1") + -- cgit v1.2.3