diff options
author | Diego Parra <diegolparra@gmail.com> | 2016-03-30 21:58:43 -0300 |
---|---|---|
committer | Diego Parra <diegolparra@gmail.com> | 2016-03-30 21:58:43 -0300 |
commit | 0d793ffbf2af4b3a8d96de9d9c9edc7bd5b2af97 (patch) | |
tree | b3f42e10a73e8bd8afad445e17e102392675b264 | |
parent | 37a3f7ead3d6b461a8d8c852866731f6fb3d26dc (diff) | |
parent | 273f2da48d9aacad0aafd5e8b7f9d59f84f1a011 (diff) | |
download | Kamon-0d793ffbf2af4b3a8d96de9d9c9edc7bd5b2af97.tar.gz Kamon-0d793ffbf2af4b3a8d96de9d9c9edc7bd5b2af97.tar.bz2 Kamon-0d793ffbf2af4b3a8d96de9d9c9edc7bd5b2af97.zip |
Merge pull request #309 from jtjeferreira/kamon-elasticsearch
+ kamon-elasticsearch: new integration module
10 files changed, 631 insertions, 0 deletions
diff --git a/kamon-elasticsearch/src/main/resources/META-INF/aop.xml b/kamon-elasticsearch/src/main/resources/META-INF/aop.xml new file mode 100644 index 00000000..f9281f3f --- /dev/null +++ b/kamon-elasticsearch/src/main/resources/META-INF/aop.xml @@ -0,0 +1,11 @@ +<!DOCTYPE aspectj PUBLIC "-//AspectJ//DTD//EN" "http://www.eclipse.org/aspectj/dtd/aspectj.dtd"> + +<aspectj> + <aspects> + <aspect name="kamon.elasticsearch.instrumentation.RequestInstrumentation"/> + </aspects> + + <weaver> + <include within="org.elasticsearch.client..*"/> + </weaver> +</aspectj> diff --git a/kamon-elasticsearch/src/main/resources/reference.conf b/kamon-elasticsearch/src/main/resources/reference.conf new file mode 100644 index 00000000..544e2092 --- /dev/null +++ b/kamon-elasticsearch/src/main/resources/reference.conf @@ -0,0 +1,24 @@ +# =========================================== # +# Kamon-elasticsearch Reference Configuration # +# =========================================== # + +kamon { + elasticsearch { + slow-query-threshold = 2 seconds + + # Fully qualified name of the implementation of kamon.elasticsearch.SlowRequestProcessor. + slow-query-processor = kamon.elasticsearch.DefaultSlowRequestProcessor + + # Fully qualified name of the implementation of kamon.elasticsearch.ElasticsearchErrorProcessor. + elasticsearch-error-processor = kamon.elasticsearch.DefaultElasticsearchErrorProcessor + + # Fully qualified name of the implementation of kamon.elasticsearch.ElasticsearchNameGenerator that will be used for assigning names to segments. + name-generator = kamon.elasticsearch.DefaultElasticsearchNameGenerator + } + + modules { + kamon-elasticsearch { + requires-aspectj = yes + } + } +}
\ No newline at end of file diff --git a/kamon-elasticsearch/src/main/scala/kamon/elasticsearch/Elasticsearch.scala b/kamon-elasticsearch/src/main/scala/kamon/elasticsearch/Elasticsearch.scala new file mode 100644 index 00000000..7c4bfb65 --- /dev/null +++ b/kamon-elasticsearch/src/main/scala/kamon/elasticsearch/Elasticsearch.scala @@ -0,0 +1,82 @@ +/* ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.elasticsearch + +import org.elasticsearch.action.ActionRequest + +import akka.actor.ReflectiveDynamicAccess +import kamon.Kamon +import kamon.util.ConfigTools.Syntax + +object ElasticsearchExtension { + val SegmentLibraryName = "elasticsearch" + + private val config = Kamon.config.getConfig("kamon.elasticsearch") + + private val dynamic = new ReflectiveDynamicAccess(getClass.getClassLoader) + + private val nameGeneratorFQN = config.getString("name-generator") + private val nameGenerator: ElasticsearchNameGenerator = dynamic.createInstanceFor[ElasticsearchNameGenerator](nameGeneratorFQN, Nil).get + + private val slowQueryProcessorClass = config.getString("slow-query-processor") + private val slowQueryProcessor: SlowRequestProcessor = dynamic.createInstanceFor[SlowRequestProcessor](slowQueryProcessorClass, Nil).get + + private val elasticsearchErrorProcessorClass = config.getString("elasticsearch-error-processor") + private val elasticsearchErrorProcessor: ElasticsearchErrorProcessor = dynamic.createInstanceFor[ElasticsearchErrorProcessor](elasticsearchErrorProcessorClass, Nil).get + + val slowQueryThreshold = config.getFiniteDuration("slow-query-threshold").toMillis + + def processSlowQuery(request: ActionRequest[_], executionTime: Long) = slowQueryProcessor.process(request, executionTime, slowQueryThreshold) + def processSqlError(request: ActionRequest[_], ex: Throwable) = elasticsearchErrorProcessor.process(request, ex) + def generateElasticsearchSegmentName(request: ActionRequest[_]): String = nameGenerator.generateElasticsearchSegmentName(request) +} + +trait SlowRequestProcessor { + def process(request: ActionRequest[_], executionTime: Long, queryThreshold: Long): Unit +} + +trait ElasticsearchErrorProcessor { + def process(request: ActionRequest[_], ex: Throwable): Unit +} + +trait ElasticsearchNameGenerator { + def generateElasticsearchSegmentName(request: ActionRequest[_]): String +} + +class DefaultElasticsearchNameGenerator extends ElasticsearchNameGenerator { + def generateElasticsearchSegmentName(request: ActionRequest[_]): String = s"Elasticsearch[${request.getClass.getSimpleName}]" +} + +class DefaultElasticsearchErrorProcessor extends ElasticsearchErrorProcessor { + + import org.slf4j.LoggerFactory + + val log = LoggerFactory.getLogger(classOf[DefaultElasticsearchErrorProcessor]) + + override def process(request: ActionRequest[_], cause: Throwable): Unit = { + log.error(s"the request [$request] failed with exception [${cause.getMessage}]") + } +} + +class DefaultSlowRequestProcessor extends SlowRequestProcessor { + import org.slf4j.LoggerFactory + + val log = LoggerFactory.getLogger(classOf[DefaultSlowRequestProcessor]) + + override def process(request: ActionRequest[_], executionTimeInMillis: Long, queryThresholdInMillis: Long): Unit = { + log.warn(s"The request [$request] took $executionTimeInMillis ms and the slow query threshold is $queryThresholdInMillis ms") + } +} diff --git a/kamon-elasticsearch/src/main/scala/kamon/elasticsearch/instrumentation/RequestInstrumentation.scala b/kamon-elasticsearch/src/main/scala/kamon/elasticsearch/instrumentation/RequestInstrumentation.scala new file mode 100644 index 00000000..ba4b1ded --- /dev/null +++ b/kamon-elasticsearch/src/main/scala/kamon/elasticsearch/instrumentation/RequestInstrumentation.scala @@ -0,0 +1,86 @@ +/* ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.elasticsearch.instrumentation + +import java.util.concurrent.TimeUnit.{ NANOSECONDS ⇒ nanos } + +import org.aspectj.lang.ProceedingJoinPoint +import org.aspectj.lang.annotation.Around +import org.aspectj.lang.annotation.Aspect +import org.aspectj.lang.annotation.Pointcut +import org.elasticsearch.action._ +import org.slf4j.LoggerFactory + +import kamon.Kamon +import kamon.elasticsearch.ElasticsearchExtension +import kamon.elasticsearch.metric.RequestsMetrics +import kamon.trace.SegmentCategory +import kamon.trace.TraceContext +import kamon.trace.Tracer + +@Aspect +class RequestInstrumentation { + + import RequestInstrumentation._ + + @Pointcut("execution(* org.elasticsearch.client.ElasticsearchClient.execute*(..)) && args(action, request, listener)") + def onExecuteListener[Request <: ActionRequest[Request], Response <: ActionResponse, RequestBuilder <: ActionRequestBuilder[Request, Response, RequestBuilder]](action: Action[Request, Response, RequestBuilder], request: Request, listener: ActionListener[Response]): Unit = {} + + @Around("onExecuteListener(action, request, listener) ") + def aroundExecuteListener[Request <: ActionRequest[Request], Response <: ActionResponse, RequestBuilder <: ActionRequestBuilder[Request, Response, RequestBuilder]](pjp: ProceedingJoinPoint, action: Action[Request, Response, RequestBuilder], request: Request, listener: ActionListener[Response]): Unit = { + Tracer.currentContext.collect { ctx ⇒ + implicit val requestRecorder = Kamon.metrics.entity(RequestsMetrics, "elasticsearch-requests") + val segment = generateSegment(ctx, request) + val start = System.nanoTime() + + pjp.proceed(Array(action, request, new ActionListener[Response] { + def onFailure(e: Throwable): Unit = { requestRecorder.errors.increment(); segment.finish(); listener.onFailure(e) } + def onResponse(response: Response): Unit = { recordTrace(request, response, start); segment.finish(); listener.onResponse(response) } + })) + + } + } getOrElse pjp.proceed() + + def recordTrace[Request <: ActionRequest[Request], Response <: ActionResponse](request: Request, response: Response, start: Long)(implicit requestRecorder: RequestsMetrics) { + val timeSpent = System.nanoTime() - start + request match { + case r: get.GetRequest ⇒ requestRecorder.reads.record(timeSpent) + case r: search.SearchRequest ⇒ requestRecorder.reads.record(timeSpent) + case r: index.IndexRequest ⇒ requestRecorder.writes.record(timeSpent) + case r: update.UpdateRequest ⇒ requestRecorder.writes.record(timeSpent) + case r: delete.DeleteRequest ⇒ requestRecorder.writes.record(timeSpent) + case _ ⇒ + log.debug(s"Unable to parse request [$request]") + } + + val timeSpentInMillis = nanos.toMillis(timeSpent) + + if (timeSpentInMillis >= ElasticsearchExtension.slowQueryThreshold) { + requestRecorder.slows.increment() + ElasticsearchExtension.processSlowQuery(request, timeSpentInMillis) + } + } + + def generateSegment(ctx: TraceContext, request: ActionRequest[_]) = { + val segmentName = ElasticsearchExtension.generateElasticsearchSegmentName(request) + val segment = ctx.startSegment(segmentName, SegmentCategory.Database, ElasticsearchExtension.SegmentLibraryName) + segment + } +} + +object RequestInstrumentation { + val log = LoggerFactory.getLogger(classOf[RequestInstrumentation]) +} diff --git a/kamon-elasticsearch/src/main/scala/kamon/elasticsearch/metric/RequestsMetrics.scala b/kamon-elasticsearch/src/main/scala/kamon/elasticsearch/metric/RequestsMetrics.scala new file mode 100644 index 00000000..f2b8d721 --- /dev/null +++ b/kamon-elasticsearch/src/main/scala/kamon/elasticsearch/metric/RequestsMetrics.scala @@ -0,0 +1,32 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.elasticsearch.metric + +import kamon.metric._ +import kamon.metric.instrument.{ Time, InstrumentFactory } + +class RequestsMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val reads = histogram("reads", Time.Nanoseconds) + val writes = histogram("writes", Time.Nanoseconds) + val slows = counter("slows") + val errors = counter("errors") +} + +object RequestsMetrics extends EntityRecorderFactory[RequestsMetrics] { + def category: String = "elasticsearch-requests" + def createRecorder(instrumentFactory: InstrumentFactory): RequestsMetrics = new RequestsMetrics(instrumentFactory) +}
\ No newline at end of file diff --git a/kamon-elasticsearch/src/test/resources/logback.xml b/kamon-elasticsearch/src/test/resources/logback.xml new file mode 100644 index 00000000..c336bbfe --- /dev/null +++ b/kamon-elasticsearch/src/test/resources/logback.xml @@ -0,0 +1,12 @@ +<configuration> + <statusListener class="ch.qos.logback.core.status.NopStatusListener"/> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> + </encoder> + </appender> + + <root level="OFF"> + <appender-ref ref="STDOUT"/> + </root> +</configuration>
\ No newline at end of file diff --git a/kamon-elasticsearch/src/test/scala/kamon/elasticsearch/instrumentation/AsyncRequestInstrumentationSpec.scala b/kamon-elasticsearch/src/test/scala/kamon/elasticsearch/instrumentation/AsyncRequestInstrumentationSpec.scala new file mode 100644 index 00000000..db94b108 --- /dev/null +++ b/kamon-elasticsearch/src/test/scala/kamon/elasticsearch/instrumentation/AsyncRequestInstrumentationSpec.scala @@ -0,0 +1,178 @@ +/* ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.elasticsearch.instrumentation + +import org.elasticsearch.common.settings.Settings +import org.elasticsearch.indices.InvalidIndexNameException +import org.elasticsearch.node.NodeBuilder.nodeBuilder + +import com.typesafe.config.ConfigFactory + +import kamon.elasticsearch.ElasticsearchExtension +import kamon.testkit.BaseKamonSpec +import kamon.trace.SegmentCategory +import kamon.trace.Tracer + +class AsyncRequestInstrumentationSpec extends BaseKamonSpec("elasticsearch-spec") { + override lazy val config = + ConfigFactory.parseString( + """ + |kamon { + | elasticsearch { + | slow-query-threshold = 100 milliseconds + | + | # Fully qualified name of the implementation of kamon.elasticsearch.SlowRequestProcessor. + | slow-query-processor = kamon.elasticsearch.instrumentation.NoOpSlowRequestProcessor + | + | # Fully qualified name of the implementation of kamon.elasticsearch.ElasticsearchErrorProcessor. + | elasticsearch-error-processor = kamon.elasticsearch.instrumentation.NoOpElasticsearchErrorProcessor + | + | # Fully qualified name of the implementation of kamon.elasticsearch.ElasticsearchNameGenerator + | name-generator = kamon.elasticsearch.instrumentation.NoOpElasticsearchNameGenerator + | } + |} + """.stripMargin) + + val node = nodeBuilder() + .local(true) + .settings(Settings.builder().put("path.home", System.getProperty("java.io.tmpdir") + "/elasticsearch")) + .node() + val client = node.client(); + + "the RequestInstrumentation" should { + "record the execution time of async INDEX operation" in { + Tracer.withContext(newContext("elasticsearch-trace-index")) { + for (id ← 1 to 100) { + client.prepareIndex("twitter", "tweet", id.toString) + .setSource("{" + + "\"user\":\"kimchy\"," + + "\"postDate\":\"2013-01-30\"," + + "\"message\":\"trying out Elasticsearch\"" + + "}") + .execute().actionGet() + } + + Tracer.currentContext.finish() + } + + val elasticsearchSnapshot = takeSnapshotOf("elasticsearch-requests", "elasticsearch-requests") + elasticsearchSnapshot.histogram("writes").get.numberOfMeasurements should be(100) + + val traceSnapshot = takeSnapshotOf("elasticsearch-trace-index", "trace") + traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1) + + val segmentSnapshot = takeSnapshotOf("Elasticsearch[IndexRequest]", "trace-segment", + tags = Map( + "trace" -> "elasticsearch-trace-index", + "category" -> SegmentCategory.Database, + "library" -> ElasticsearchExtension.SegmentLibraryName)) + + segmentSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(100) + } + + "record the execution time of async GET operation" in { + Tracer.withContext(newContext("elasticsearch-trace-get")) { + for (id ← 1 to 100) { + client.prepareGet("twitter", "tweet", id.toString).execute().actionGet() + } + + Tracer.currentContext.finish() + } + + val elasticsearchSnapshot = takeSnapshotOf("elasticsearch-requests", "elasticsearch-requests") + elasticsearchSnapshot.histogram("reads").get.numberOfMeasurements should be(100) + + val traceSnapshot = takeSnapshotOf("elasticsearch-trace-get", "trace") + traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1) + + val segmentSnapshot = takeSnapshotOf("Elasticsearch[GetRequest]", "trace-segment", + tags = Map( + "trace" -> "elasticsearch-trace-get", + "category" -> SegmentCategory.Database, + "library" -> ElasticsearchExtension.SegmentLibraryName)) + + segmentSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(100) + } + + "record the execution time of async UPDATE operation" in { + Tracer.withContext(newContext("elasticsearch-trace-update")) { + for (id ← 1 to 100) { + client.prepareUpdate("twitter", "tweet", id.toString) + .setDoc("{" + + "\"updated\":\"updated\"" + + "}") + .execute().actionGet() + } + + Tracer.currentContext.finish() + } + + val elasticsearchSnapshot = takeSnapshotOf("elasticsearch-requests", "elasticsearch-requests") + elasticsearchSnapshot.histogram("writes").get.numberOfMeasurements should be(100) + + val traceSnapshot = takeSnapshotOf("elasticsearch-trace-update", "trace") + traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1) + + val segmentSnapshot = takeSnapshotOf("Elasticsearch[UpdateRequest]", "trace-segment", + tags = Map( + "trace" -> "elasticsearch-trace-update", + "category" -> SegmentCategory.Database, + "library" -> ElasticsearchExtension.SegmentLibraryName)) + + segmentSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(100) + } + + "record the execution time of async DELETE operation" in { + Tracer.withContext(newContext("elasticsearch-trace-delete")) { + for (id ← 1 to 100) { + client.prepareDelete("twitter", "tweet", id.toString).execute().actionGet() + } + + Tracer.currentContext.finish() + } + + val elasticsearchSnapshot = takeSnapshotOf("elasticsearch-requests", "elasticsearch-requests") + elasticsearchSnapshot.histogram("writes").get.numberOfMeasurements should be(100) + + val traceSnapshot = takeSnapshotOf("elasticsearch-trace-delete", "trace") + traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1) + + val segmentSnapshot = takeSnapshotOf("Elasticsearch[DeleteRequest]", "trace-segment", + tags = Map( + "trace" -> "elasticsearch-trace-delete", + "category" -> SegmentCategory.Database, + "library" -> ElasticsearchExtension.SegmentLibraryName)) + + segmentSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(100) + + } + + "count all ERRORS async" in { + Tracer.withContext(newContext("elasticsearch-trace-errors")) { + for (id ← 1 to 10) { + intercept[InvalidIndexNameException] { + client.prepareDelete("index name with spaces", "tweet", id.toString).execute().actionGet() + } + } + + Tracer.currentContext.finish() + } + + val elasticsearchSnapshot = takeSnapshotOf("elasticsearch-requests", "elasticsearch-requests") + elasticsearchSnapshot.counter("errors").get.count should be(10) + } + } +}
\ No newline at end of file diff --git a/kamon-elasticsearch/src/test/scala/kamon/elasticsearch/instrumentation/RequestInstrumentationSpec.scala b/kamon-elasticsearch/src/test/scala/kamon/elasticsearch/instrumentation/RequestInstrumentationSpec.scala new file mode 100644 index 00000000..106b225a --- /dev/null +++ b/kamon-elasticsearch/src/test/scala/kamon/elasticsearch/instrumentation/RequestInstrumentationSpec.scala @@ -0,0 +1,192 @@ +/* ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.elasticsearch.instrumentation + +import java.util.concurrent.ExecutionException + +import org.elasticsearch.action._ +import org.elasticsearch.common.settings.Settings +import org.elasticsearch.node.NodeBuilder.nodeBuilder + +import com.typesafe.config.ConfigFactory + +import kamon.elasticsearch._ +import kamon.testkit.BaseKamonSpec +import kamon.trace.SegmentCategory +import kamon.trace.Tracer + +class RequestInstrumentationSpec extends BaseKamonSpec("elasticsearch-spec") { + override lazy val config = + ConfigFactory.parseString( + """ + |kamon { + | elasticsearch { + | slow-query-threshold = 100 milliseconds + | + | # Fully qualified name of the implementation of kamon.elasticsearch.SlowRequestProcessor. + | slow-query-processor = kamon.elasticsearch.instrumentation.NoOpSlowRequestProcessor + | + | # Fully qualified name of the implementation of kamon.elasticsearch.ElasticsearchErrorProcessor. + | elasticsearch-error-processor = kamon.elasticsearch.instrumentation.NoOpElasticsearchErrorProcessor + | + | # Fully qualified name of the implementation of kamon.elasticsearch.ElasticsearchNameGenerator + | name-generator = kamon.elasticsearch.instrumentation.NoOpElasticsearchNameGenerator + | } + |} + """.stripMargin) + + val node = nodeBuilder() + .local(true) + .settings(Settings.builder().put("path.home", System.getProperty("java.io.tmpdir") + "/elasticsearch")) + .node() + val client = node.client(); + + "the RequestInstrumentation" should { + "record the execution time of INDEX operation" in { + Tracer.withContext(newContext("elasticsearch-trace-index")) { + for (id ← 1 to 100) { + client.index(new index.IndexRequest("twitter", "tweet", id.toString) + .source("{" + + "\"user\":\"kimchy\"," + + "\"postDate\":\"2013-01-30\"," + + "\"message\":\"trying out Elasticsearch\"" + + "}")).get() + } + + Tracer.currentContext.finish() + } + + val elasticsearchSnapshot = takeSnapshotOf("elasticsearch-requests", "elasticsearch-requests") + elasticsearchSnapshot.histogram("writes").get.numberOfMeasurements should be(100) + + val traceSnapshot = takeSnapshotOf("elasticsearch-trace-index", "trace") + traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1) + + val segmentSnapshot = takeSnapshotOf("Elasticsearch[IndexRequest]", "trace-segment", + tags = Map( + "trace" -> "elasticsearch-trace-index", + "category" -> SegmentCategory.Database, + "library" -> ElasticsearchExtension.SegmentLibraryName)) + + segmentSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(100) + } + + "record the execution time of GET operation" in { + Tracer.withContext(newContext("elasticsearch-trace-get")) { + for (id ← 1 to 100) { + client.get(new get.GetRequest("twitter", "tweet", id.toString)).get() + } + + Tracer.currentContext.finish() + } + + val elasticsearchSnapshot = takeSnapshotOf("elasticsearch-requests", "elasticsearch-requests") + elasticsearchSnapshot.histogram("reads").get.numberOfMeasurements should be(100) + + val traceSnapshot = takeSnapshotOf("elasticsearch-trace-get", "trace") + traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1) + + val segmentSnapshot = takeSnapshotOf("Elasticsearch[GetRequest]", "trace-segment", + tags = Map( + "trace" -> "elasticsearch-trace-get", + "category" -> SegmentCategory.Database, + "library" -> ElasticsearchExtension.SegmentLibraryName)) + + segmentSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(100) + } + + "record the execution time of UPDATE operation" in { + Tracer.withContext(newContext("elasticsearch-trace-update")) { + for (id ← 1 to 100) { + client.update( + new update.UpdateRequest("twitter", "tweet", id.toString) + .doc("{" + + "\"updated\":\"updated\"" + + "}")) + .get() + } + + Tracer.currentContext.finish() + } + + val elasticsearchSnapshot = takeSnapshotOf("elasticsearch-requests", "elasticsearch-requests") + elasticsearchSnapshot.histogram("writes").get.numberOfMeasurements should be(100) + + val traceSnapshot = takeSnapshotOf("elasticsearch-trace-update", "trace") + traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1) + + val segmentSnapshot = takeSnapshotOf("Elasticsearch[UpdateRequest]", "trace-segment", + tags = Map( + "trace" -> "elasticsearch-trace-update", + "category" -> SegmentCategory.Database, + "library" -> ElasticsearchExtension.SegmentLibraryName)) + + segmentSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(100) + } + + "record the execution time of DELETE operation" in { + Tracer.withContext(newContext("elasticsearch-trace-delete")) { + for (id ← 1 to 100) { + client.delete(new delete.DeleteRequest("twitter", "tweet", id.toString)).get() + } + + Tracer.currentContext.finish() + } + + val elasticsearchSnapshot = takeSnapshotOf("elasticsearch-requests", "elasticsearch-requests") + elasticsearchSnapshot.histogram("writes").get.numberOfMeasurements should be(100) + + val traceSnapshot = takeSnapshotOf("elasticsearch-trace-delete", "trace") + traceSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(1) + + val segmentSnapshot = takeSnapshotOf("Elasticsearch[DeleteRequest]", "trace-segment", + tags = Map( + "trace" -> "elasticsearch-trace-delete", + "category" -> SegmentCategory.Database, + "library" -> ElasticsearchExtension.SegmentLibraryName)) + + segmentSnapshot.histogram("elapsed-time").get.numberOfMeasurements should be(100) + + } + + "count all ERRORS" in { + Tracer.withContext(newContext("elasticsearch-trace-errors")) { + for (id ← 1 to 10) { + intercept[ExecutionException] { + client.delete(new delete.DeleteRequest("index name with spaces", "tweet", id.toString)).get() + } + } + + Tracer.currentContext.finish() + } + + val elasticsearchSnapshot = takeSnapshotOf("elasticsearch-requests", "elasticsearch-requests") + elasticsearchSnapshot.counter("errors").get.count should be(10) + } + } +} + +class NoOpSlowRequestProcessor extends SlowRequestProcessor { + override def process(request: ActionRequest[_], executionTimeInMillis: Long, queryThresholdInMillis: Long): Unit = { /*do nothing!!!*/ } +} + +class NoOpElasticsearchErrorProcessor extends ElasticsearchErrorProcessor { + override def process(request: ActionRequest[_], ex: Throwable): Unit = { /*do nothing!!!*/ } +} + +class NoOpElasticsearchNameGenerator extends ElasticsearchNameGenerator { + override def generateElasticsearchSegmentName(request: ActionRequest[_]): String = s"Elasticsearch[${request.getClass.getSimpleName}]" +}
\ No newline at end of file diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 1560e220..83565c75 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -28,6 +28,7 @@ object Dependencies { val slf4jVersion = "1.7.7" val play23Version = "2.3.10" val play24Version = "2.4.4" + val elasticsearchVersion = "2.1.0" val sprayJson = "io.spray" %% "spray-json" % "1.3.1" val sprayJsonLenses = "net.virtual-void" %% "json-lenses" % "0.6.0" @@ -67,6 +68,8 @@ object Dependencies { val playTest24 = "org.scalatestplus" %% "play" % "1.4.0-M2" val typesafeConfig = "com.typesafe" % "config" % "1.2.1" + val elasticsearch = "org.elasticsearch" % "elasticsearch" % elasticsearchVersion + def compile (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "compile") def provided (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "provided") def test (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "test") diff --git a/project/Projects.scala b/project/Projects.scala index c7933008..5cb230e5 100644 --- a/project/Projects.scala +++ b/project/Projects.scala @@ -206,6 +206,17 @@ object Projects extends Build { test(scalatest, slf4jApi) ++ compile(aspectJ)) + lazy val kamonElasticsearch = Project("kamon-elasticsearch", file("kamon-elasticsearch")) + .dependsOn(kamonCore % "compile->compile;test->test") + .settings(basicSettings: _*) + .settings(formatSettings: _*) + .settings(aspectJSettings: _*) + .settings( + libraryDependencies ++= + compile(elasticsearch) ++ + test(scalatest, akkaTestKit, slf4jApi) ++ + provided(aspectJ)) + lazy val kamonAnnotation = Project("kamon-annotation", file("kamon-annotation")) .dependsOn(kamonCore % "compile->compile;test->test") .settings(basicSettings: _*) |