aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiego Parra <diegolparra@gmail.com>2016-03-30 21:58:43 -0300
committerDiego Parra <diegolparra@gmail.com>2016-03-30 21:58:43 -0300
commit0d793ffbf2af4b3a8d96de9d9c9edc7bd5b2af97 (patch)
treeb3f42e10a73e8bd8afad445e17e102392675b264
parent37a3f7ead3d6b461a8d8c852866731f6fb3d26dc (diff)
parent273f2da48d9aacad0aafd5e8b7f9d59f84f1a011 (diff)
downloadKamon-0d793ffbf2af4b3a8d96de9d9c9edc7bd5b2af97.tar.gz
Kamon-0d793ffbf2af4b3a8d96de9d9c9edc7bd5b2af97.tar.bz2
Kamon-0d793ffbf2af4b3a8d96de9d9c9edc7bd5b2af97.zip
Merge pull request #309 from jtjeferreira/kamon-elasticsearch
+ kamon-elasticsearch: new integration module
-rw-r--r--kamon-elasticsearch/src/main/resources/META-INF/aop.xml11
-rw-r--r--kamon-elasticsearch/src/main/resources/reference.conf24
-rw-r--r--kamon-elasticsearch/src/main/scala/kamon/elasticsearch/Elasticsearch.scala82
-rw-r--r--kamon-elasticsearch/src/main/scala/kamon/elasticsearch/instrumentation/RequestInstrumentation.scala86
-rw-r--r--kamon-elasticsearch/src/main/scala/kamon/elasticsearch/metric/RequestsMetrics.scala32
-rw-r--r--kamon-elasticsearch/src/test/resources/logback.xml12
-rw-r--r--kamon-elasticsearch/src/test/scala/kamon/elasticsearch/instrumentation/AsyncRequestInstrumentationSpec.scala178
-rw-r--r--kamon-elasticsearch/src/test/scala/kamon/elasticsearch/instrumentation/RequestInstrumentationSpec.scala192
-rw-r--r--project/Dependencies.scala3
-rw-r--r--project/Projects.scala11
10 files changed, 631 insertions, 0 deletions
diff --git a/kamon-elasticsearch/src/main/resources/META-INF/aop.xml b/kamon-elasticsearch/src/main/resources/META-INF/aop.xml
new file mode 100644
index 00000000..f9281f3f
--- /dev/null
+++ b/kamon-elasticsearch/src/main/resources/META-INF/aop.xml
@@ -0,0 +1,11 @@
+<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd">
+
+<aspectj>
+ <aspects>
+ <aspect name="kamon.elasticsearch.instrumentation.RequestInstrumentation"/>
+ </aspects>
+
+ <weaver>
+ <include within="org.elasticsearch.client..*"/>
+ </weaver>
+</aspectj>
diff --git a/kamon-elasticsearch/src/main/resources/reference.conf b/kamon-elasticsearch/src/main/resources/reference.conf
new file mode 100644
index 00000000..544e2092
--- /dev/null
+++ b/kamon-elasticsearch/src/main/resources/reference.conf
@@ -0,0 +1,24 @@
+# =========================================== #
+# Kamon-elasticsearch Reference Configuration #
+# =========================================== #
+
+kamon {
+ elasticsearch {
+ slow-query-threshold = 2 seconds
+
+ # Fully qualified name of the implementation of kamon.elasticsearch.SlowRequestProcessor.
+ slow-query-processor = kamon.elasticsearch.DefaultSlowRequestProcessor
+
+ # Fully qualified name of the implementation of kamon.elasticsearch.ElasticsearchErrorProcessor.
+ elasticsearch-error-processor = kamon.elasticsearch.DefaultElasticsearchErrorProcessor
+
+ # Fully qualified name of the implementation of kamon.elasticsearch.ElasticsearchNameGenerator that will be used for assigning names to segments.
+ name-generator = kamon.elasticsearch.DefaultElasticsearchNameGenerator
+ }
+
+ modules {
+ kamon-elasticsearch {
+ requires-aspectj = yes
+ }
+ }
+} \ No newline at end of file
diff --git a/kamon-elasticsearch/src/main/scala/kamon/elasticsearch/Elasticsearch.scala b/kamon-elasticsearch/src/main/scala/kamon/elasticsearch/Elasticsearch.scala
new file mode 100644
index 00000000..7c4bfb65
--- /dev/null
+++ b/kamon-elasticsearch/src/main/scala/kamon/elasticsearch/Elasticsearch.scala
@@ -0,0 +1,82 @@
+/* =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.elasticsearch
+
+import org.elasticsearch.action.ActionRequest
+
+import akka.actor.ReflectiveDynamicAccess
+import kamon.Kamon
+import kamon.util.ConfigTools.Syntax
+
+object ElasticsearchExtension {
+ val SegmentLibraryName = "elasticsearch"
+
+ private val config = Kamon.config.getConfig("kamon.elasticsearch")
+
+ private val dynamic = new ReflectiveDynamicAccess(getClass.getClassLoader)
+
+ private val nameGeneratorFQN = config.getString("name-generator")
+ private val nameGenerator: ElasticsearchNameGenerator = dynamic.createInstanceFor[ElasticsearchNameGenerator](nameGeneratorFQN, Nil).get
+
+ private val slowQueryProcessorClass = config.getString("slow-query-processor")
+ private val slowQueryProcessor: SlowRequestProcessor = dynamic.createInstanceFor[SlowRequestProcessor](slowQueryProcessorClass, Nil).get
+
+ private val elasticsearchErrorProcessorClass = config.getString("elasticsearch-error-processor")
+ private val elasticsearchErrorProcessor: ElasticsearchErrorProcessor = dynamic.createInstanceFor[ElasticsearchErrorProcessor](elasticsearchErrorProcessorClass, Nil).get
+
+ val slowQueryThreshold = config.getFiniteDuration("slow-query-threshold").toMillis
+
+ def processSlowQuery(request: ActionRequest[_], executionTime: Long) = slowQueryProcessor.process(request, executionTime, slowQueryThreshold)
+ def processSqlError(request: ActionRequest[_], ex: Throwable) = elasticsearchErrorProcessor.process(request, ex)
+ def generateElasticsearchSegmentName(request: ActionRequest[_]): String = nameGenerator.generateElasticsearchSegmentName(request)
+}
+
+trait SlowRequestProcessor {
+ def process(request: ActionRequest[_], executionTime: Long, queryThreshold: Long): Unit
+}
+
+trait ElasticsearchErrorProcessor {
+ def process(request: ActionRequest[_], ex: Throwable): Unit
+}
+
+trait ElasticsearchNameGenerator {
+ def generateElasticsearchSegmentName(request: ActionRequest[_]): String
+}
+
+class DefaultElasticsearchNameGenerator extends ElasticsearchNameGenerator {
+ def generateElasticsearchSegmentName(request: ActionRequest[_]): String = s"Elasticsearch[${request.getClass.getSimpleName}]"
+}
+
+class DefaultElasticsearchErrorProcessor extends ElasticsearchErrorProcessor {
+
+ import org.slf4j.LoggerFactory
+
+ val log = LoggerFactory.getLogger(classOf[DefaultElasticsearchErrorProcessor])
+
+ override def process(request: ActionRequest[_], cause: Throwable): Unit = {
+ log.error(s"the request [$request] failed with exception [${cause.getMessage}]")
+ }
+}
+
+class DefaultSlowRequestProcessor extends SlowRequestProcessor {
+ import org.slf4j.LoggerFactory
+
+ val log = LoggerFactory.getLogger(classOf[DefaultSlowRequestProcessor])
+
+ override def process(request: ActionRequest[_], executionTimeInMillis: Long, queryThresholdInMillis: Long): Unit = {
+ log.warn(s"The request [$request] took $executionTimeInMillis ms and the slow query threshold is $queryThresholdInMillis ms")
+ }
+}
diff --git a/kamon-elasticsearch/src/main/scala/kamon/elasticsearch/instrumentation/RequestInstrumentation.scala b/kamon-elasticsearch/src/main/scala/kamon/elasticsearch/instrumentation/RequestInstrumentation.scala
new file mode 100644
index 00000000..ba4b1ded
--- /dev/null
+++ b/kamon-elasticsearch/src/main/scala/kamon/elasticsearch/instrumentation/RequestInstrumentation.scala
@@ -0,0 +1,86 @@
+/* =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.elasticsearch.instrumentation
+
+import java.util.concurrent.TimeUnit.{ NANOSECONDS ⇒ nanos }
+
+import org.aspectj.lang.ProceedingJoinPoint
+import org.aspectj.lang.annotation.Around
+import org.aspectj.lang.annotation.Aspect
+import org.aspectj.lang.annotation.Pointcut
+import org.elasticsearch.action._
+import org.slf4j.LoggerFactory
+
+import kamon.Kamon
+import kamon.elasticsearch.ElasticsearchExtension
+import kamon.elasticsearch.metric.RequestsMetrics
+import kamon.trace.SegmentCategory
+import kamon.trace.TraceContext
+import kamon.trace.Tracer
+
+@Aspect
+class RequestInstrumentation {
+
+ import RequestInstrumentation._
+
+ @Pointcut("execution(* org.elasticsearch.client.ElasticsearchClient.execute*(..)) && args(action, request, listener)")
+ def onExecuteListener[Request <: ActionRequest[Request], Response <: ActionResponse, RequestBuilder <: ActionRequestBuilder[Request, Response, RequestBuilder]](action: Action[Request, Response, RequestBuilder], request: Request, listener: ActionListener[Response]): Unit = {}
+
+ @Around("onExecuteListener(action, request, listener) ")
+ def aroundExecuteListener[Request <: ActionRequest[Request], Response <: ActionResponse, RequestBuilder <: ActionRequestBuilder[Request, Response, RequestBuilder]](pjp: ProceedingJoinPoint, action: Action[Request, Response, RequestBuilder], request: Request, listener: ActionListener[Response]): Unit = {
+ Tracer.currentContext.collect { ctx ⇒
+ implicit val requestRecorder = Kamon.metrics.entity(RequestsMetrics, "elasticsearch-requests")
+ val segment = generateSegment(ctx, request)
+ val start = System.nanoTime()
+
+ pjp.proceed(Array(action, request, new ActionListener[Response] {
+ def onFailure(e: Throwable): Unit = { requestRecorder.errors.increment(); segment.finish(); listener.onFailure(e) }
+ def onResponse(response: Response): Unit = { recordTrace(request, response, start); segment.finish(); listener.onResponse(response) }
+ }))
+
+ }
+ } getOrElse pjp.proceed()
+
+ def recordTrace[Request <: ActionRequest[Request], Response <: ActionResponse](request: Request, response: Response, start: Long)(implicit requestRecorder: RequestsMetrics) {
+ val timeSpent = System.nanoTime() - start
+ request match {
+ case r: get.GetRequest ⇒ requestRecorder.reads.record(timeSpent)
+ case r: search.SearchRequest ⇒ requestRecorder.reads.record(timeSpent)
+ case r: index.IndexRequest ⇒ requestRecorder.writes.record(timeSpent)
+ case r: update.UpdateRequest ⇒ requestRecorder.writes.record(timeSpent)
+ case r: delete.DeleteRequest ⇒ requestRecorder.writes.record(timeSpent)
+ case _ ⇒
+ log.debug(s"Unable to parse request [$request]")
+ }
+
+ val timeSpentInMillis = nanos.toMillis(timeSpent)
+
+ if (timeSpentInMillis >= ElasticsearchExtension.slowQueryThreshold) {
+ requestRecorder.slows.increment()
+ ElasticsearchExtension.processSlowQuery(request, timeSpentInMillis)
+ }
+ }
+
+ def generateSegment(ctx: TraceContext, request: ActionRequest[_]) = {
+ val segmentName = ElasticsearchExtension.generateElasticsearchSegmentName(request)
+ val segment = ctx.startSegment(segmentName, SegmentCategory.Database, ElasticsearchExtension.SegmentLibraryName)
+ segment
+ }
+}
+
+object RequestInstrumentation {
+ val log = LoggerFactory.getLogger(classOf[RequestInstrumentation])
+}
diff --git a/kamon-elasticsearch/src/main/scala/kamon/elasticsearch/metric/RequestsMetrics.scala b/kamon-elasticsearch/src/main/scala/kamon/elasticsearch/metric/RequestsMetrics.scala
new file mode 100644
index 00000000..f2b8d721
--- /dev/null
+++ b/kamon-elasticsearch/src/main/scala/kamon/elasticsearch/metric/RequestsMetrics.scala
@@ -0,0 +1,32 @@
+/*
+ * =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.elasticsearch.metric
+
+import kamon.metric._
+import kamon.metric.instrument.{ Time, InstrumentFactory }
+
+class RequestsMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) {
+ val reads = histogram("reads", Time.Nanoseconds)
+ val writes = histogram("writes", Time.Nanoseconds)
+ val slows = counter("slows")
+ val errors = counter("errors")
+}
+
+object RequestsMetrics extends EntityRecorderFactory[RequestsMetrics] {
+ def category: String = "elasticsearch-requests"
+ def createRecorder(instrumentFactory: InstrumentFactory): RequestsMetrics = new RequestsMetrics(instrumentFactory)
+} \ No newline at end of file
diff --git a/kamon-elasticsearch/src/test/resources/logback.xml b/kamon-elasticsearch/src/test/resources/logback.xml
new file mode 100644
index 00000000..c336bbfe
--- /dev/null
+++ b/kamon-elasticsearch/src/test/resources/logback.xml
@@ -0,0 +1,12 @@
+<configuration>
+ <statusListener class="ch.qos.logback.core.status.NopStatusListener"/>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="OFF">
+ <appender-ref ref="STDOUT"/>
+ </root>
+</configuration> \ No newline at end of file
diff --git a/kamon-elasticsearch/src/test/scala/kamon/elasticsearch/instrumentation/AsyncRequestInstrumentationSpec.scala b/kamon-elasticsearch/src/test/scala/kamon/elasticsearch/instrumentation/AsyncRequestInstrumentationSpec.scala
new file mode 100644
index 00000000..db94b108
--- /dev/null
+++ b/kamon-elasticsearch/src/test/scala/kamon/elasticsearch/instrumentation/AsyncRequestInstrumentationSpec.scala
@@ -0,0 +1,178 @@
+/* =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.elasticsearch.instrumentation
+
+import org.elasticsearch.common.settings.Settings
+import org.elasticsearch.indices.InvalidIndexNameException
+import org.elasticsearch.node.NodeBuilder.nodeBuilder
+
+import com.typesafe.config.ConfigFactory
+
+import kamon.elasticsearch.ElasticsearchExtension
+import kamon.testkit.BaseKamonSpec
+import kamon.trace.SegmentCategory
+import kamon.trace.Tracer
+
+class AsyncRequestInstrumentationSpec extends BaseKamonSpec("elasticsearch-spec") {
+ override lazy val config =
+ ConfigFactory.parseString(
+ """
+ |kamon {
+ | elasticsearch {
+ | slow-query-threshold = 100 milliseconds
+ |
+ | # Fully qualified name of the implementation of kamon.elasticsearch.SlowRequestProcessor.
+ | slow-query-processor = kamon.elasticsearch.instrumentation.NoOpSlowRequestProcessor
+ |
+ | # Fully qualified name of the implementation of kamon.elasticsearch.ElasticsearchErrorProcessor.
+ | elasticsearch-error-processor = kamon.elasticsearch.instrumentation.NoOpElasticsearchErrorProcessor
+ |
+ | # Fully qualified name of the implementation of kamon.elasticsearch.ElasticsearchNameGenerator
+ | name-generator = kamon.elasticsearch.instrumentation.NoOpElasticsearchNameGenerator
+ | }
+ |}
+ """.stripMargin)
+
+ val node = nodeBuilder()
+ .local(true)
+ .settings(Settings.builder().put("path.home", System.getProperty("java.io.tmpdir") + "/elasticsearch"))
+ .node()
+ val client = node.client();
+
+ "the RequestInstrumentation" should {
+ "record the execution time of async INDEX operation" in {
+ Tracer.withContext(newContext("elasticsearch-trace-index")) {
+ for (id ← 1 to 100) {
+ client.prepareIndex("twitter", "tweet", id.toString)
+ .setSource("{" +
+ "\"user\":\"kimchy\"," +
+ "\"postDate\":\"2013-01-30\"," +
+ "\"message\":\"trying out Elasticsearch\"" +
+ "}")
+ .execute().actionGet()
+ }
+
+ Tracer.currentContext.finish()
+ }
+
+ val elasticsearchSnapshot = takeSnapshotOf("elasticsearch-requests", "elasticsearch-requests")
+ elasticsearchSnapshot.histogram("writes").get.numberOfMeasurements should be(100)
+
+ val traceSnapshot = takeSnapshotOf("elasticsearch-trace-index", "trace")
+ traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1)
+
+ val segmentSnapshot = takeSnapshotOf("Elasticsearch[IndexRequest]", "trace-segment",
+ tags = Map(
+ "trace" -> "elasticsearch-trace-index",
+ "category" -> SegmentCategory.Database,
+ "library" -> ElasticsearchExtension.SegmentLibraryName))
+
+ segmentSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(100)
+ }
+
+ "record the execution time of async GET operation" in {
+ Tracer.withContext(newContext("elasticsearch-trace-get")) {
+ for (id ← 1 to 100) {
+ client.prepareGet("twitter", "tweet", id.toString).execute().actionGet()
+ }
+
+ Tracer.currentContext.finish()
+ }
+
+ val elasticsearchSnapshot = takeSnapshotOf("elasticsearch-requests", "elasticsearch-requests")
+ elasticsearchSnapshot.histogram("reads").get.numberOfMeasurements should be(100)
+
+ val traceSnapshot = takeSnapshotOf("elasticsearch-trace-get", "trace")
+ traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1)
+
+ val segmentSnapshot = takeSnapshotOf("Elasticsearch[GetRequest]", "trace-segment",
+ tags = Map(
+ "trace" -> "elasticsearch-trace-get",
+ "category" -> SegmentCategory.Database,
+ "library" -> ElasticsearchExtension.SegmentLibraryName))
+
+ segmentSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(100)
+ }
+
+ "record the execution time of async UPDATE operation" in {
+ Tracer.withContext(newContext("elasticsearch-trace-update")) {
+ for (id ← 1 to 100) {
+ client.prepareUpdate("twitter", "tweet", id.toString)
+ .setDoc("{" +
+ "\"updated\":\"updated\"" +
+ "}")
+ .execute().actionGet()
+ }
+
+ Tracer.currentContext.finish()
+ }
+
+ val elasticsearchSnapshot = takeSnapshotOf("elasticsearch-requests", "elasticsearch-requests")
+ elasticsearchSnapshot.histogram("writes").get.numberOfMeasurements should be(100)
+
+ val traceSnapshot = takeSnapshotOf("elasticsearch-trace-update", "trace")
+ traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1)
+
+ val segmentSnapshot = takeSnapshotOf("Elasticsearch[UpdateRequest]", "trace-segment",
+ tags = Map(
+ "trace" -> "elasticsearch-trace-update",
+ "category" -> SegmentCategory.Database,
+ "library" -> ElasticsearchExtension.SegmentLibraryName))
+
+ segmentSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(100)
+ }
+
+ "record the execution time of async DELETE operation" in {
+ Tracer.withContext(newContext("elasticsearch-trace-delete")) {
+ for (id ← 1 to 100) {
+ client.prepareDelete("twitter", "tweet", id.toString).execute().actionGet()
+ }
+
+ Tracer.currentContext.finish()
+ }
+
+ val elasticsearchSnapshot = takeSnapshotOf("elasticsearch-requests", "elasticsearch-requests")
+ elasticsearchSnapshot.histogram("writes").get.numberOfMeasurements should be(100)
+
+ val traceSnapshot = takeSnapshotOf("elasticsearch-trace-delete", "trace")
+ traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1)
+
+ val segmentSnapshot = takeSnapshotOf("Elasticsearch[DeleteRequest]", "trace-segment",
+ tags = Map(
+ "trace" -> "elasticsearch-trace-delete",
+ "category" -> SegmentCategory.Database,
+ "library" -> ElasticsearchExtension.SegmentLibraryName))
+
+ segmentSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(100)
+
+ }
+
+ "count all ERRORS async" in {
+ Tracer.withContext(newContext("elasticsearch-trace-errors")) {
+ for (id ← 1 to 10) {
+ intercept[InvalidIndexNameException] {
+ client.prepareDelete("index name with spaces", "tweet", id.toString).execute().actionGet()
+ }
+ }
+
+ Tracer.currentContext.finish()
+ }
+
+ val elasticsearchSnapshot = takeSnapshotOf("elasticsearch-requests", "elasticsearch-requests")
+ elasticsearchSnapshot.counter("errors").get.count should be(10)
+ }
+ }
+} \ No newline at end of file
diff --git a/kamon-elasticsearch/src/test/scala/kamon/elasticsearch/instrumentation/RequestInstrumentationSpec.scala b/kamon-elasticsearch/src/test/scala/kamon/elasticsearch/instrumentation/RequestInstrumentationSpec.scala
new file mode 100644
index 00000000..106b225a
--- /dev/null
+++ b/kamon-elasticsearch/src/test/scala/kamon/elasticsearch/instrumentation/RequestInstrumentationSpec.scala
@@ -0,0 +1,192 @@
+/* =========================================================================================
+ * Copyright © 2013-2014 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon.elasticsearch.instrumentation
+
+import java.util.concurrent.ExecutionException
+
+import org.elasticsearch.action._
+import org.elasticsearch.common.settings.Settings
+import org.elasticsearch.node.NodeBuilder.nodeBuilder
+
+import com.typesafe.config.ConfigFactory
+
+import kamon.elasticsearch._
+import kamon.testkit.BaseKamonSpec
+import kamon.trace.SegmentCategory
+import kamon.trace.Tracer
+
+class RequestInstrumentationSpec extends BaseKamonSpec("elasticsearch-spec") {
+ override lazy val config =
+ ConfigFactory.parseString(
+ """
+ |kamon {
+ | elasticsearch {
+ | slow-query-threshold = 100 milliseconds
+ |
+ | # Fully qualified name of the implementation of kamon.elasticsearch.SlowRequestProcessor.
+ | slow-query-processor = kamon.elasticsearch.instrumentation.NoOpSlowRequestProcessor
+ |
+ | # Fully qualified name of the implementation of kamon.elasticsearch.ElasticsearchErrorProcessor.
+ | elasticsearch-error-processor = kamon.elasticsearch.instrumentation.NoOpElasticsearchErrorProcessor
+ |
+ | # Fully qualified name of the implementation of kamon.elasticsearch.ElasticsearchNameGenerator
+ | name-generator = kamon.elasticsearch.instrumentation.NoOpElasticsearchNameGenerator
+ | }
+ |}
+ """.stripMargin)
+
+ val node = nodeBuilder()
+ .local(true)
+ .settings(Settings.builder().put("path.home", System.getProperty("java.io.tmpdir") + "/elasticsearch"))
+ .node()
+ val client = node.client();
+
+ "the RequestInstrumentation" should {
+ "record the execution time of INDEX operation" in {
+ Tracer.withContext(newContext("elasticsearch-trace-index")) {
+ for (id ← 1 to 100) {
+ client.index(new index.IndexRequest("twitter", "tweet", id.toString)
+ .source("{" +
+ "\"user\":\"kimchy\"," +
+ "\"postDate\":\"2013-01-30\"," +
+ "\"message\":\"trying out Elasticsearch\"" +
+ "}")).get()
+ }
+
+ Tracer.currentContext.finish()
+ }
+
+ val elasticsearchSnapshot = takeSnapshotOf("elasticsearch-requests", "elasticsearch-requests")
+ elasticsearchSnapshot.histogram("writes").get.numberOfMeasurements should be(100)
+
+ val traceSnapshot = takeSnapshotOf("elasticsearch-trace-index", "trace")
+ traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1)
+
+ val segmentSnapshot = takeSnapshotOf("Elasticsearch[IndexRequest]", "trace-segment",
+ tags = Map(
+ "trace" -> "elasticsearch-trace-index",
+ "category" -> SegmentCategory.Database,
+ "library" -> ElasticsearchExtension.SegmentLibraryName))
+
+ segmentSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(100)
+ }
+
+ "record the execution time of GET operation" in {
+ Tracer.withContext(newContext("elasticsearch-trace-get")) {
+ for (id ← 1 to 100) {
+ client.get(new get.GetRequest("twitter", "tweet", id.toString)).get()
+ }
+
+ Tracer.currentContext.finish()
+ }
+
+ val elasticsearchSnapshot = takeSnapshotOf("elasticsearch-requests", "elasticsearch-requests")
+ elasticsearchSnapshot.histogram("reads").get.numberOfMeasurements should be(100)
+
+ val traceSnapshot = takeSnapshotOf("elasticsearch-trace-get", "trace")
+ traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1)
+
+ val segmentSnapshot = takeSnapshotOf("Elasticsearch[GetRequest]", "trace-segment",
+ tags = Map(
+ "trace" -> "elasticsearch-trace-get",
+ "category" -> SegmentCategory.Database,
+ "library" -> ElasticsearchExtension.SegmentLibraryName))
+
+ segmentSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(100)
+ }
+
+ "record the execution time of UPDATE operation" in {
+ Tracer.withContext(newContext("elasticsearch-trace-update")) {
+ for (id ← 1 to 100) {
+ client.update(
+ new update.UpdateRequest("twitter", "tweet", id.toString)
+ .doc("{" +
+ "\"updated\":\"updated\"" +
+ "}"))
+ .get()
+ }
+
+ Tracer.currentContext.finish()
+ }
+
+ val elasticsearchSnapshot = takeSnapshotOf("elasticsearch-requests", "elasticsearch-requests")
+ elasticsearchSnapshot.histogram("writes").get.numberOfMeasurements should be(100)
+
+ val traceSnapshot = takeSnapshotOf("elasticsearch-trace-update", "trace")
+ traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1)
+
+ val segmentSnapshot = takeSnapshotOf("Elasticsearch[UpdateRequest]", "trace-segment",
+ tags = Map(
+ "trace" -> "elasticsearch-trace-update",
+ "category" -> SegmentCategory.Database,
+ "library" -> ElasticsearchExtension.SegmentLibraryName))
+
+ segmentSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(100)
+ }
+
+ "record the execution time of DELETE operation" in {
+ Tracer.withContext(newContext("elasticsearch-trace-delete")) {
+ for (id ← 1 to 100) {
+ client.delete(new delete.DeleteRequest("twitter", "tweet", id.toString)).get()
+ }
+
+ Tracer.currentContext.finish()
+ }
+
+ val elasticsearchSnapshot = takeSnapshotOf("elasticsearch-requests", "elasticsearch-requests")
+ elasticsearchSnapshot.histogram("writes").get.numberOfMeasurements should be(100)
+
+ val traceSnapshot = takeSnapshotOf("elasticsearch-trace-delete", "trace")
+ traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1)
+
+ val segmentSnapshot = takeSnapshotOf("Elasticsearch[DeleteRequest]", "trace-segment",
+ tags = Map(
+ "trace" -> "elasticsearch-trace-delete",
+ "category" -> SegmentCategory.Database,
+ "library" -> ElasticsearchExtension.SegmentLibraryName))
+
+ segmentSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(100)
+
+ }
+
+ "count all ERRORS" in {
+ Tracer.withContext(newContext("elasticsearch-trace-errors")) {
+ for (id ← 1 to 10) {
+ intercept[ExecutionException] {
+ client.delete(new delete.DeleteRequest("index name with spaces", "tweet", id.toString)).get()
+ }
+ }
+
+ Tracer.currentContext.finish()
+ }
+
+ val elasticsearchSnapshot = takeSnapshotOf("elasticsearch-requests", "elasticsearch-requests")
+ elasticsearchSnapshot.counter("errors").get.count should be(10)
+ }
+ }
+}
+
+class NoOpSlowRequestProcessor extends SlowRequestProcessor {
+ override def process(request: ActionRequest[_], executionTimeInMillis: Long, queryThresholdInMillis: Long): Unit = { /*do nothing!!!*/ }
+}
+
+class NoOpElasticsearchErrorProcessor extends ElasticsearchErrorProcessor {
+ override def process(request: ActionRequest[_], ex: Throwable): Unit = { /*do nothing!!!*/ }
+}
+
+class NoOpElasticsearchNameGenerator extends ElasticsearchNameGenerator {
+ override def generateElasticsearchSegmentName(request: ActionRequest[_]): String = s"Elasticsearch[${request.getClass.getSimpleName}]"
+} \ No newline at end of file
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 1560e220..83565c75 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -28,6 +28,7 @@ object Dependencies {
val slf4jVersion = "1.7.7"
val play23Version = "2.3.10"
val play24Version = "2.4.4"
+ val elasticsearchVersion = "2.1.0"
val sprayJson = "io.spray" %% "spray-json" % "1.3.1"
val sprayJsonLenses = "net.virtual-void" %% "json-lenses" % "0.6.0"
@@ -67,6 +68,8 @@ object Dependencies {
val playTest24 = "org.scalatestplus" %% "play" % "1.4.0-M2"
val typesafeConfig = "com.typesafe" % "config" % "1.2.1"
+ val elasticsearch = "org.elasticsearch" % "elasticsearch" % elasticsearchVersion
+
def compile (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "compile")
def provided (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "provided")
def test (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "test")
diff --git a/project/Projects.scala b/project/Projects.scala
index c7933008..5cb230e5 100644
--- a/project/Projects.scala
+++ b/project/Projects.scala
@@ -206,6 +206,17 @@ object Projects extends Build {
test(scalatest, slf4jApi) ++
compile(aspectJ))
+ lazy val kamonElasticsearch = Project("kamon-elasticsearch", file("kamon-elasticsearch"))
+ .dependsOn(kamonCore % "compile->compile;test->test")
+ .settings(basicSettings: _*)
+ .settings(formatSettings: _*)
+ .settings(aspectJSettings: _*)
+ .settings(
+ libraryDependencies ++=
+ compile(elasticsearch) ++
+ test(scalatest, akkaTestKit, slf4jApi) ++
+ provided(aspectJ))
+
lazy val kamonAnnotation = Project("kamon-annotation", file("kamon-annotation"))
.dependsOn(kamonCore % "compile->compile;test->test")
.settings(basicSettings: _*)