From 562bebe1d0ce74217f4c355e3ab49c9ae03cdb61 Mon Sep 17 00:00:00 2001 From: Diego Parra Date: Thu, 3 Oct 2013 13:23:31 -0300 Subject: cleanup cleanup cleanup --- kamon-core/src/main/resources/META-INF/aop.xml | 12 +- .../main/scala/kamon/TransactionPublisher.scala | 15 --- .../src/main/scala/kamon/executor/eventbus.scala | 103 ---------------- .../instrumentation/SampleInstrumentation.scala | 49 -------- .../src/main/scala/kamon/metric/MetricsUtils.scala | 51 -------- .../scala/kamon/newrelic/NewRelicErrorLogger.scala | 1 - .../src/main/scala/spraytest/ClientTest.scala | 55 --------- .../src/main/scala/spraytest/FutureTesting.scala | 81 ------------- kamon-core/src/main/scala/test/PingPong.scala | 129 --------------------- .../main/scala/test/SimpleRequestProcessor.scala | 83 +++++++++++++ 10 files changed, 89 insertions(+), 490 deletions(-) delete mode 100644 kamon-core/src/main/scala/kamon/TransactionPublisher.scala delete mode 100644 kamon-core/src/main/scala/kamon/executor/eventbus.scala delete mode 100644 kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala delete mode 100644 kamon-core/src/main/scala/spraytest/ClientTest.scala delete mode 100644 kamon-core/src/main/scala/spraytest/FutureTesting.scala delete mode 100644 kamon-core/src/main/scala/test/PingPong.scala create mode 100644 kamon-core/src/main/scala/test/SimpleRequestProcessor.scala diff --git a/kamon-core/src/main/resources/META-INF/aop.xml b/kamon-core/src/main/resources/META-INF/aop.xml index efdce792..104d7f78 100644 --- a/kamon-core/src/main/resources/META-INF/aop.xml +++ b/kamon-core/src/main/resources/META-INF/aop.xml @@ -10,18 +10,18 @@ - + + + + - + + - - - - diff --git a/kamon-core/src/main/scala/kamon/TransactionPublisher.scala b/kamon-core/src/main/scala/kamon/TransactionPublisher.scala deleted file mode 100644 index 0626b91d..00000000 --- a/kamon-core/src/main/scala/kamon/TransactionPublisher.scala +++ /dev/null @@ -1,15 +0,0 @@ -package kamon - -import akka.actor.Actor -import java.util.UUID - -class TransactionPublisher extends Actor { - - def receive = { - case FullTransaction(id, entries) => println(s"I got a full tran: $id - $entries") - } - -} - - -case class FullTransaction(id: UUID, entries: List[TraceEntry]) diff --git a/kamon-core/src/main/scala/kamon/executor/eventbus.scala b/kamon-core/src/main/scala/kamon/executor/eventbus.scala deleted file mode 100644 index d51305a8..00000000 --- a/kamon-core/src/main/scala/kamon/executor/eventbus.scala +++ /dev/null @@ -1,103 +0,0 @@ -package kamon.executor - -import akka.event.ActorEventBus -import akka.event.LookupClassification -import akka.actor._ -import java.util.concurrent.TimeUnit - -import kamon.{Tracer, CodeBlockExecutionTime, Kamon, TraceContext} -import akka.util.Timeout -import scala.util.{Random, Success, Failure} -import scala.concurrent.Future - -trait Message - -case class PostMessage(text:String) extends Message - -case class MessageEvent(val channel:String, val message:Message) - -class AppActorEventBus extends ActorEventBus with LookupClassification{ - type Event = MessageEvent - type Classifier=String - protected def mapSize(): Int={ - 10 - } - - protected def classify(event: Event): Classifier={ - event.channel - } - - protected def publish(event: Event, subscriber: Subscriber): Unit={ - subscriber ! event - } -} -case class Ping() -case class Pong() - -class PingActor extends Actor with ActorLogging { - - val pong = context.actorOf(Props[PongActor], "Pong") - val random = new Random() - def receive = { - case Pong() => { - //Thread.sleep(random.nextInt(2000)) - //log.info("Message from Ping") - pong ! Ping() - } - } -} - -class PongActor extends Actor with ActorLogging { - def receive = { - case Ping() => { - sender ! Pong() - } - } -} - - -object TryAkka extends App{ - val system = ActorSystem("MySystem") - val appActorEventBus=new AppActorEventBus - val NEW_POST_CHANNEL="/posts/new" - val subscriber = system.actorOf(Props(new Actor { - def receive = { - case d: MessageEvent => println(d) - } - })) - - Tracer.start - for(i <- 1 to 4) { - val ping = system.actorOf(Props[PingActor], "Ping" + i) - ping ! Pong() - } - - - def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Tracer.context}] : $body") - - /* - val newRelicReporter = new NewRelicReporter(registry) - newRelicReporter.start(1, TimeUnit.SECONDS) - -*/ - import akka.pattern.ask - implicit val timeout = Timeout(10, TimeUnit.SECONDS) - implicit def execContext = system.dispatcher - - - - //Tracer.start - - Tracer.context.get.append(CodeBlockExecutionTime("some-block", System.nanoTime(), System.nanoTime())) - threadPrintln("Before doing it") - val f = Future { threadPrintln("This is happening inside the future body") } - - - - - //Thread.sleep(3000) - //system.shutdown() - -/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL) - appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/ -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala deleted file mode 100644 index 74261403..00000000 --- a/kamon-core/src/main/scala/kamon/instrumentation/SampleInstrumentation.scala +++ /dev/null @@ -1,49 +0,0 @@ -package kamon.instrumentation - -import com.codahale.metrics.{ExponentiallyDecayingReservoir, Histogram} -import org.aspectj.lang.annotation.{After, Pointcut, DeclareMixin, Aspect} - -class ActorCage(val name: String, val size: Int) { - - def doIt: Unit = println("name") -} - -trait CageMonitoring { - def histogram: Histogram - def count(value: Int): Unit -} - -class CageMonitoringImp extends CageMonitoring{ - final val histogram = new Histogram(new ExponentiallyDecayingReservoir()) - - def count(value: Int) = histogram.update(value) - -} - - -@Aspect -class InceptionAspect { - - @DeclareMixin("kamon.instrumentation.ActorCage") - def mixin: CageMonitoring = new CageMonitoringImp - - - @Pointcut("execution(* kamon.instrumentation.ActorCage.doIt()) && target(actorCage)") - def theActorCageDidIt(actorCage: CageMonitoring) = {} - - @After("theActorCageDidIt(actorCage)") - def afterDoingIt(actorCage: CageMonitoring) = { - actorCage.count(1) - actorCage.histogram.getSnapshot.dump(System.out) - } - - - -} - - -object Runner extends App { - val cage = new ActorCage("ivan", 10) - - cage.doIt -} diff --git a/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala b/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala deleted file mode 100644 index 5b4ceaf4..00000000 --- a/kamon-core/src/main/scala/kamon/metric/MetricsUtils.scala +++ /dev/null @@ -1,51 +0,0 @@ -package kamon.metric - -import com.codahale.metrics._ - -object MetricsUtils { - - def markMeter[T](meter:Meter)(f: => T): T = { - meter.mark() - f - } -// -// def incrementCounter(key: String) { -// counters.getOrElseUpdate(key, (metricsGroup.counter(s"${key}-counter"))).count -// } -// -// def markMeter(key: String) { -// meters.getOrElseUpdate(key, metricsGroup.meter(s"${key}-meter", "actor", "actor-message-counter", TimeUnit.SECONDS)).mark() -// } -// -// def trace[T](key: String)(f: => T): T = { -// val timer = timers.getOrElseUpdate(key, (metricsGroup.timer(s"${key}-timer")) ) -// timer.time(f) -// } - -// def markAndCountMeter[T](key: String)(f: => T): T = { -// markMeter(key) -// f -// } -// -// def traceAndCount[T](key: String)(f: => T): T = { -// incrementCounter(key) -// trace(key) { -// f -// } - //} - -// implicit def runnable(f: () => Unit): Runnable = -// new Runnable() { def run() = f() } -// -// -// import java.util.concurrent.Callable -// -// implicit def callable[T](f: () => T): Callable[T] = -// new Callable[T]() { def call() = f() } - -// private val actorCounter:Counter = new Counter -// private val actorTimer:Timer = new Timer -// -// metricsRegistry.register(s"counter-for-${actorName}", actorCounter) -// metricsRegistry.register(s"timer-for-${actorName}", actorTimer) -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala b/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala index 72bcb4e2..4bc49496 100644 --- a/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala +++ b/kamon-core/src/main/scala/kamon/newrelic/NewRelicErrorLogger.scala @@ -13,7 +13,6 @@ class NewRelicErrorLogger extends Actor { } def notifyError(error: Error): Unit = { - println(error.message) noticeError(error.cause) } } diff --git a/kamon-core/src/main/scala/spraytest/ClientTest.scala b/kamon-core/src/main/scala/spraytest/ClientTest.scala deleted file mode 100644 index 07532d0a..00000000 --- a/kamon-core/src/main/scala/spraytest/ClientTest.scala +++ /dev/null @@ -1,55 +0,0 @@ -package spraytest - -import akka.actor.ActorSystem -import spray.client.pipelining._ -import spray.httpx.SprayJsonSupport -import spray.json._ -import scala.concurrent.Future -import spray.can.Http -import akka.io.IO - -/** - * BEGIN JSON Infrastructure - */ -case class Container(data: List[PointOfInterest]) -case class Geolocation(latitude: Float, longitude: Float) -case class PointOfInterest(ma: Option[String], a: Option[String], c: String, s: Option[String], geolocation: Geolocation) - -object GeoJsonProtocol extends DefaultJsonProtocol { - implicit val geolocationFormat = jsonFormat2(Geolocation) - implicit val pointOfInterestFormat = jsonFormat5(PointOfInterest) - implicit val containerFormat = jsonFormat1(Container) -} -/** END-OF JSON Infrastructure */ - - - - - - -class ClientTest extends App { - implicit val actorSystem = ActorSystem("spray-client-test") - import actorSystem.dispatcher - - - import GeoJsonProtocol._ - import SprayJsonSupport._ - - - val actor = IO(Http) - - val pipeline = sendReceive ~> unmarshal[Container] - - val response = pipeline { - Get("http://geo.despegar.com/geo-services-web/service/Autocomplete/DESAR/1/0/0/10/0/0/Obelisco") - } onSuccess { - case a => { - println(a) - } - } -} - - - - - diff --git a/kamon-core/src/main/scala/spraytest/FutureTesting.scala b/kamon-core/src/main/scala/spraytest/FutureTesting.scala deleted file mode 100644 index b864d6d6..00000000 --- a/kamon-core/src/main/scala/spraytest/FutureTesting.scala +++ /dev/null @@ -1,81 +0,0 @@ -package spraytest -/* -import akka.actor.ActorSystem -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Try, Success} -import kamon.actor.TransactionContext - -object FutureTesting extends App { - - val actorSystem = ActorSystem("future-testing") - implicit val ec = actorSystem.dispatcher - implicit val tctx = TransactionContext(11, Nil) - - threadPrintln("In the initial Thread") - - - val f = TraceableFuture { - threadPrintln(s"Processing the Future, and the current context is: ${TransactionContext.current.get()}") - } - - f.onComplete({ - case Success(a) => threadPrintln(s"Processing the first callback, and the current context is: ${TransactionContext.current.get()}") - }) - - f.onComplete({ - case Success(a) => threadPrintln(s"Processing the second callback, and the current context is: ${TransactionContext.current.get()}") - }) - - - - - - - - - def threadPrintln(message: String) = println(s"Thread[${Thread.currentThread.getName}] says: [${message}]") - -} - - - - -trait TransactionContextWrapper { - def wrap[In, Out](f: => In => Out, tranContext: TransactionContext) = { - TransactionContext.current.set(tranContext.fork) - println(s"SetContext to: ${tranContext}") - val result = f - - TransactionContext.current.remove() - result - } - -} - -class TraceableFuture[T](val future: Future[T]) extends TransactionContextWrapper { - def onComplete[U](func: Try[T] => U)(implicit transactionContext: TransactionContext, executor: ExecutionContext): Unit = { - future.onComplete(wrap(func, transactionContext)) - } -} - -object TraceableFuture { - - implicit def toRegularFuture[T](tf: TraceableFuture[T]) = tf.future - - def apply[T](body: => T)(implicit transactionContext: TransactionContext, executor: ExecutionContext) = { - val wrappedBody = contextSwitchWrapper(body, TransactionContext(transactionContext.dispatcherName, Nil)) - - new TraceableFuture(Future { wrappedBody }) - } - - - - - def contextSwitchWrapper[T](body: => T, transactionContext: TransactionContext) = { - TransactionContext.current.set(transactionContext) - val result = body - TransactionContext.current.remove() - result - } -}*/ - diff --git a/kamon-core/src/main/scala/test/PingPong.scala b/kamon-core/src/main/scala/test/PingPong.scala deleted file mode 100644 index 8808e4bf..00000000 --- a/kamon-core/src/main/scala/test/PingPong.scala +++ /dev/null @@ -1,129 +0,0 @@ -package test - -import akka.actor._ -import java.util.concurrent.atomic.AtomicLong -import kamon.Tracer -import spray.routing.SimpleRoutingApp -import akka.util.Timeout -import spray.httpx.RequestBuilding -import scala.concurrent.{Await, Future} - -object PingPong extends App { - import scala.concurrent.duration._ - val counter = new AtomicLong - - val as = ActorSystem("ping-pong") - import as.dispatcher - - Tracer.start - - for(i <- 1 to 64) { - val pinger = as.actorOf(Props[Pinger]) - val ponger = as.actorOf(Props[Ponger]) - - for(_ <- 1 to 256) { - pinger.tell(Pong, ponger) - } - } - - as.scheduler.schedule(1 second, 1 second) { - println("Processed: " + counter.getAndSet(0)) - } -} - -case object Ping -case object Pong - -class Pinger extends Actor { - def receive = { - case Pong => { - sender ! Ping - PingPong.counter.incrementAndGet() - } - } -} - -class Ponger extends Actor { - def receive = { - case Ping => { - sender ! Pong; PingPong.counter.incrementAndGet() - } - } -} - - -object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding { - import scala.concurrent.duration._ - import spray.client.pipelining._ - import akka.pattern.ask - - implicit val system = ActorSystem("test") - import system.dispatcher - - implicit val timeout = Timeout(30 seconds) - - val pipeline = sendReceive - val replier = system.actorOf(Props[Replier]) - - startServer(interface = "localhost", port = 9090) { - get { - path("test"){ - complete { - pipeline(Get("http://www.despegar.com.ar")).map(r => "Ok") - } - } ~ - path("reply" / Segment) { reqID => - complete { - if (Tracer.context().isEmpty) - println("ROUTE NO CONTEXT") - - (replier ? reqID).mapTo[String] - } - } ~ - path("ok") { - complete("ok") - } ~ - path("future") { - dynamic { - complete(Future { "OK" }) - } - } - } - } - -} - -object Verifier extends App { - - def go: Unit = { - import scala.concurrent.duration._ - import spray.client.pipelining._ - import akka.pattern.ask - - implicit val system = ActorSystem("test") - import system.dispatcher - - implicit val timeout = Timeout(30 seconds) - - val pipeline = sendReceive - - val futures = Future.sequence(for(i <- 1 to 500) yield { - pipeline(Get("http://127.0.0.1:9090/reply/"+i)).map(r => r.entity.asString == i.toString) - }) - println("Everything is: "+ Await.result(futures, 10 seconds).forall(a => a == true)) - } - - - - -} - -class Replier extends Actor with ActorLogging { - def receive = { - case anything => - if(Tracer.context.isEmpty) - log.warning("PROCESSING A MESSAGE WITHOUT CONTEXT") - - sender ! anything - } -} diff --git a/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala b/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala new file mode 100644 index 00000000..5b216b39 --- /dev/null +++ b/kamon-core/src/main/scala/test/SimpleRequestProcessor.scala @@ -0,0 +1,83 @@ +package test + +import akka.actor._ +import kamon.Tracer +import spray.routing.SimpleRoutingApp +import akka.util.Timeout +import spray.httpx.RequestBuilding +import scala.concurrent.{Await, Future} + +object SimpleRequestProcessor extends App with SimpleRoutingApp with RequestBuilding { + import scala.concurrent.duration._ + import spray.client.pipelining._ + import akka.pattern.ask + + implicit val system = ActorSystem("test") + import system.dispatcher + + implicit val timeout = Timeout(30 seconds) + + val pipeline = sendReceive + val replier = system.actorOf(Props[Replier]) + + startServer(interface = "localhost", port = 9090) { + get { + path("test"){ + complete { + pipeline(Get("http://www.despegar.com.ar")).map(r => "Ok") + } + } ~ + path("reply" / Segment) { reqID => + complete { + if (Tracer.context().isEmpty) + println("ROUTE NO CONTEXT") + + (replier ? reqID).mapTo[String] + } + } ~ + path("ok") { + complete("ok") + } ~ + path("future") { + dynamic { + complete(Future { "OK" }) + } + } + } + } + +} + +object Verifier extends App { + + def go: Unit = { + import scala.concurrent.duration._ + import spray.client.pipelining._ + + implicit val system = ActorSystem("test") + import system.dispatcher + + implicit val timeout = Timeout(30 seconds) + + val pipeline = sendReceive + + val futures = Future.sequence(for(i <- 1 to 500) yield { + pipeline(Get("http://127.0.0.1:9090/reply/"+i)).map(r => r.entity.asString == i.toString) + }) + println("Everything is: "+ Await.result(futures, 10 seconds).forall(a => a == true)) + } + + + + +} + +class Replier extends Actor with ActorLogging { + def receive = { + case anything => + if(Tracer.context.isEmpty) + log.warning("PROCESSING A MESSAGE WITHOUT CONTEXT") + + sender ! anything + } +} -- cgit v1.2.3