diff options
Diffstat (limited to 'kamon-spm')
6 files changed, 428 insertions, 0 deletions
diff --git a/kamon-spm/src/main/resources/reference.conf b/kamon-spm/src/main/resources/reference.conf new file mode 100644 index 00000000..60147097 --- /dev/null +++ b/kamon-spm/src/main/resources/reference.conf @@ -0,0 +1,25 @@ +kamon { + spm { + receiver-url = "https://spm-receiver.sematext.com/receiver/v1/_bulk" + retry-interval = 5 seconds + send-timeout = 5 seconds + max-queue-size = 100 + # token = "your token" + # hostname-alias = "custom hostname" + + subscriptions { + akka-actor = [ "**" ] + akka-dispatcher = [ "**" ] + akka-router = [ "**" ] + system-metric = [ "**" ] + } + } + + modules { + kamon-spm { + auto-start = yes + requires-aspectj = no + extension-id = "kamon.spm.SPM" + } + } +}
\ No newline at end of file diff --git a/kamon-spm/src/main/scala/kamon/spm/SPM.scala b/kamon-spm/src/main/scala/kamon/spm/SPM.scala new file mode 100644 index 00000000..b8bb4eab --- /dev/null +++ b/kamon-spm/src/main/scala/kamon/spm/SPM.scala @@ -0,0 +1,68 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.spm + +import java.net.InetAddress +import java.util.concurrent.TimeUnit + +import akka.actor._ +import akka.event.Logging +import akka.io.IO +import akka.util.Timeout +import kamon.Kamon +import kamon.util.ConfigTools.Syntax +import spray.can.Http + +import scala.concurrent.duration._ +import scala.collection.JavaConverters._ + +object SPM extends ExtensionId[SPMExtension] with ExtensionIdProvider { + override def createExtension(system: ExtendedActorSystem): SPMExtension = new SPMExtension(system) + override def lookup(): ExtensionId[_ <: Extension] = SPM +} + +class SPMExtension(system: ExtendedActorSystem) extends Kamon.Extension { + implicit val s: ActorSystem = system + + val log = Logging(system, classOf[SPMExtension]) + + log.info("Starting kamon-spm extension.") + + val config = system.settings.config.getConfig("kamon.spm") + val maxQueueSize = config.getInt("max-queue-size") + val retryInterval: FiniteDuration = config.getDuration("retry-interval", TimeUnit.MILLISECONDS) millis + val sendTimeout: FiniteDuration = config.getDuration("send-timeout", TimeUnit.MILLISECONDS) millis + val url = config.getString("receiver-url") + val token = config.getString("token") + val hostname = if (config.hasPath("hostname-alias")) { + config.getString("hostname-alias") + } else { + InetAddress.getLocalHost.getHostName + } + + val subscriptionsConf = config.getConfig("subscriptions") + val subscriptions = subscriptionsConf.firstLevelKeys.flatMap { category ⇒ + subscriptionsConf.getStringList(category).asScala.map { pattern ⇒ + category -> pattern + } + }.toList + + val sender = system.actorOf(SPMMetricsSender.props(IO(Http), retryInterval, Timeout(sendTimeout), maxQueueSize, url, hostname, token), "spm-metrics-sender") + var subscriber = system.actorOf(SPMMetricsSubscriber.props(sender, 50 seconds, subscriptions), "spm-metrics-subscriber") + + log.info(s"kamon-spm extension started. Hostname = ${hostname}, url = ${url}.") +} diff --git a/kamon-spm/src/main/scala/kamon/spm/SPMMetric.scala b/kamon-spm/src/main/scala/kamon/spm/SPMMetric.scala new file mode 100644 index 00000000..c088c471 --- /dev/null +++ b/kamon-spm/src/main/scala/kamon/spm/SPMMetric.scala @@ -0,0 +1,46 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.spm + +import kamon.metric.instrument._ +import kamon.util.MilliTimestamp + +case class SPMMetric(ts: MilliTimestamp, category: String, name: String, instrumentName: String, unitOfMeasurement: UnitOfMeasurement, snapshot: InstrumentSnapshot) + +object SPMMetric { + private def convert(unit: UnitOfMeasurement, value: Long): Long = unit match { + case t: Time ⇒ t.scale(Time.Milliseconds)(value).toLong + case m: Memory ⇒ m.scale(Memory.Bytes)(value).toLong + case _ ⇒ value + } + + private def prefix(metric: SPMMetric): String = { + s"${metric.ts.millis}\t${metric.category}-${metric.instrumentName}\t${metric.ts.millis}\t${metric.name}" + } + + def format(metric: SPMMetric): String = metric match { + case SPMMetric(_, _, _, _, unit, histo: Histogram#SnapshotType) ⇒ { + val min = convert(unit, histo.min) + val max = convert(unit, histo.max) + val sum = convert(unit, histo.sum) + s"${prefix(metric)}\t${min}\t${max}\t${sum}\t${histo.numberOfMeasurements}" + } + case SPMMetric(_, _, _, _, unit, counter: Counter#SnapshotType) ⇒ { + s"${prefix(metric)}\t${counter.count}" + } + } +} diff --git a/kamon-spm/src/main/scala/kamon/spm/SPMMetricsSender.scala b/kamon-spm/src/main/scala/kamon/spm/SPMMetricsSender.scala new file mode 100644 index 00000000..80d50c41 --- /dev/null +++ b/kamon-spm/src/main/scala/kamon/spm/SPMMetricsSender.scala @@ -0,0 +1,137 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.spm + +import akka.actor._ +import akka.pattern.{ ask, pipe } +import akka.util.Timeout +import spray.http.Uri.Query +import spray.http.{ HttpEntity, HttpResponse, Uri } +import spray.httpx.RequestBuilding._ +import spray.json.{ DefaultJsonProtocol, _ } + +import scala.annotation.tailrec +import scala.collection.immutable.{ Map, Queue } +import scala.concurrent.duration._ + +class SPMMetricsSender(io: ActorRef, retryInterval: FiniteDuration, sendTimeout: Timeout, maxQueueSize: Int, url: String, host: String, token: String) extends Actor with ActorLogging { + import context._ + import kamon.spm.SPMMetricsSender._ + + implicit val t = sendTimeout + + private def post(metrics: List[SPMMetric]): Unit = { + val query = Query("host" -> host, "token" -> token) + val entity = HttpEntity(encodeBody(metrics)) + (io ? Post(Uri(url).withQuery(query)).withEntity(entity)).mapTo[HttpResponse].recover { + case t: Throwable ⇒ { + log.error(t, "Can't post metrics.") + ScheduleRetry + } + }.pipeTo(self) + } + + private def fragment(metrics: List[SPMMetric]): List[List[SPMMetric]] = { + @tailrec + def partition(batch: List[SPMMetric], batches: List[List[SPMMetric]]): List[List[SPMMetric]] = { + if (batch.isEmpty) { + batches + } else { + val (head, tail) = batch.splitAt(MaxMetricsPerBulk) + partition(tail, batches :+ head) + } + } + if (metrics.size < MaxMetricsPerBulk) { + metrics :: Nil + } else { + partition(metrics, Nil) + } + } + + def receive = idle + + def idle: Receive = { + case Send(metrics) if metrics.nonEmpty ⇒ { + try { + val batches = fragment(metrics) + post(batches.head) + become(sending(Queue(batches: _*))) + } catch { + case e: Throwable ⇒ { + log.error(e, "Something went wrong.") + } + } + } + case Send(metrics) if metrics.isEmpty ⇒ /* do nothing */ + } + + def sending(queue: Queue[List[SPMMetric]]): Receive = { + case resp: HttpResponse if resp.status.isSuccess ⇒ { + val (_, q) = queue.dequeue + if (q.isEmpty) { + become(idle) + } else { + val (metrics, _) = q.dequeue + post(metrics) + become(sending(q)) + } + } + case Send(metrics) if metrics.nonEmpty && queue.size < maxQueueSize ⇒ { + val batches = fragment(metrics) + become(sending(queue.enqueue(batches))) + } + case _: Send if queue.size >= maxQueueSize ⇒ { + log.warning(s"Send queue is full (${queue.size}). Rejecting metrics.") + } + case Send(metrics) if metrics.isEmpty ⇒ /* do nothing */ + case Retry ⇒ { + val (metrics, _) = queue.dequeue + post(metrics) + } + case resp: HttpResponse if resp.status.isFailure ⇒ { + log.warning("Metrics can't be sent. Response status: ${resp.status}. Scheduling retry.") + context.system.scheduler.scheduleOnce(retryInterval, self, Retry) + } + case ScheduleRetry ⇒ { + log.warning("Metrics can't be sent. Scheduling retry.") + context.system.scheduler.scheduleOnce(retryInterval, self, Retry) + } + } +} + +object SPMMetricsSender { + private case object Retry + private case object ScheduleRetry + + case class Send(metrics: List[SPMMetric]) + + def props(io: ActorRef, retryInterval: FiniteDuration, sendTimeout: Timeout, maxQueueSize: Int, url: String, host: String, token: String) = + Props(classOf[SPMMetricsSender], io, retryInterval, sendTimeout, maxQueueSize, url, host, token) + + private val IndexTypeHeader = Map("index" -> Map("_type" -> "log", "_index" -> "spm-receiver")) + + private val MaxMetricsPerBulk = 100 + + import spray.json.DefaultJsonProtocol._ + + private def encodeBody(metrics: List[SPMMetric]): String = { + val body = metrics.map { metric ⇒ + Map("body" -> SPMMetric.format(metric)).toJson + }.toList + (IndexTypeHeader.toJson :: body).mkString("\n") + } +} diff --git a/kamon-spm/src/main/scala/kamon/spm/SPMMetricsSubscriber.scala b/kamon-spm/src/main/scala/kamon/spm/SPMMetricsSubscriber.scala new file mode 100644 index 00000000..7dcc64ec --- /dev/null +++ b/kamon-spm/src/main/scala/kamon/spm/SPMMetricsSubscriber.scala @@ -0,0 +1,55 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.spm + +import akka.actor.{ ActorLogging, Props, Actor, ActorRef } +import kamon.Kamon +import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot +import kamon.metric.TickMetricSnapshotBuffer +import kamon.spm.SPMMetricsSender.Send + +import scala.concurrent.duration.FiniteDuration + +class SPMMetricsSubscriber(sender: ActorRef, flushInterval: FiniteDuration, subscriptions: List[(String, String)]) extends Actor with ActorLogging { + + override def preStart(): Unit = { + val buffer = context.system.actorOf(TickMetricSnapshotBuffer.props(flushInterval, self)) + subscriptions.foreach { + case (category, selection) ⇒ + Kamon.metrics.subscribe(category, selection, buffer) + } + } + + def receive = { + case tick: TickMetricSnapshot ⇒ { + val metrics = tick.metrics.toList.flatMap { + case (entry, snap) ⇒ + snap.metrics.toList.map { + case (key, snap) ⇒ + SPMMetric(tick.to, entry.category, entry.name, key.name, key.unitOfMeasurement, snap) + } + } + + sender ! Send(metrics) + } + } +} + +object SPMMetricsSubscriber { + def props(sender: ActorRef, flushInterval: FiniteDuration, subscriptions: List[(String, String)]) = + Props(classOf[SPMMetricsSubscriber], sender, flushInterval, subscriptions) +} diff --git a/kamon-spm/src/test/scala/kamon/spm/SPMMetricsSenderSpec.scala b/kamon-spm/src/test/scala/kamon/spm/SPMMetricsSenderSpec.scala new file mode 100644 index 00000000..5f86443a --- /dev/null +++ b/kamon-spm/src/test/scala/kamon/spm/SPMMetricsSenderSpec.scala @@ -0,0 +1,97 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.spm + +import akka.testkit.TestProbe +import akka.util.Timeout +import kamon.Kamon +import kamon.metric.instrument.Time +import kamon.spm.SPMMetricsSender.Send +import kamon.testkit.BaseKamonSpec +import kamon.util.MilliTimestamp +import spray.http.{ HttpRequest, HttpResponse, StatusCodes } + +import scala.concurrent.duration._ + +class SPMMetricsSenderSpec extends BaseKamonSpec("spm-metrics-sender-spec") { + + private def testMetrics(prefix: String = ""): List[SPMMetric] = { + (0 until 2).map { i ⇒ + val histo = Kamon.metrics.histogram(s"histo-$i") + histo.record(1) + SPMMetric(new MilliTimestamp(123L), "histogram", s"${prefix}-entry-$i", s"histo-$i", Time.Milliseconds, histo.collect(collectionContext)) + }.toList + } + + "spm metrics sender" should { + "send metrics to receiver" in { + val io = TestProbe() + + val sender = system.actorOf(SPMMetricsSender.props(io.ref, 5 seconds, Timeout(5 seconds), 100, "http://localhost:1234", "host-1", "1234")) + sender ! Send(testMetrics()) + + val request = io.expectMsgPF(1 second) { + case req: HttpRequest ⇒ req + } + + request.uri.query.get("host") should be(Some("host-1")) + request.uri.query.get("token") should be(Some("1234")) + + val payload = request.entity.asString + + payload.split("\n") should have length 3 + } + + "resend metrics in case of exception or failure response status" in { + val io = TestProbe() + + val sender = system.actorOf(SPMMetricsSender.props(io.ref, 2 seconds, Timeout(5 seconds), 100, "http://localhost:1234", "host-1", "1234")) + sender ! Send(testMetrics()) + + io.expectMsgClass(classOf[HttpRequest]) + + io.sender() ! "Unknown message" /* should trigger classcast exception */ + + io.expectMsgClass(3 seconds, classOf[HttpRequest]) + + io.sender() ! HttpResponse(status = StatusCodes.NotFound) + + io.expectMsgClass(3 seconds, classOf[HttpRequest]) + + io.sender() ! HttpResponse(status = StatusCodes.OK) + + io.expectNoMsg(3 seconds) + } + + "ignore new metrics in case when send queue is full" in { + val io = TestProbe() + + val sender = system.actorOf(SPMMetricsSender.props(io.ref, 2 seconds, Timeout(5 seconds), 5, "http://localhost:1234", "host-1", "1234")) + + (0 until 5).foreach(_ ⇒ sender ! Send(testMetrics())) + + sender ! Send(testMetrics()) + + (0 until 5).foreach { _ ⇒ + io.expectMsgClass(classOf[HttpRequest]) + sender ! HttpResponse(status = StatusCodes.OK) + } + + io.expectNoMsg(3 seconds) + } + } +} |