From ca6659c419e434b1b8caa6e9551420aad2230e77 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Wed, 19 Jul 2017 14:47:39 +0200 Subject: start writting Span.Real tests --- .../src/main/scala/kamon/ReporterRegistry.scala | 14 ++++++-- .../src/main/scala/kamon/trace/ActiveSpan.scala | 3 ++ kamon-core/src/main/scala/kamon/trace/Span.scala | 16 ++++++--- .../src/test/scala/kamon/testkit/Reconfigure.scala | 26 ++++++++++++++ .../scala/kamon/testkit/TestSpanReporter.scala | 23 ++++++++++++ .../src/test/scala/kamon/trace/RealSpanSpec.scala | 41 ++++++++++++++++++++++ 6 files changed, 115 insertions(+), 8 deletions(-) create mode 100644 kamon-core/src/test/scala/kamon/testkit/Reconfigure.scala create mode 100644 kamon-core/src/test/scala/kamon/testkit/TestSpanReporter.scala create mode 100644 kamon-core/src/test/scala/kamon/trace/RealSpanSpec.scala diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index 8a36a7c7..f0d744e5 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -20,9 +20,11 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference} import java.util.concurrent._ import com.typesafe.config.Config +import kamon.ReporterRegistry.SpanSink import kamon.metric._ import kamon.trace.Span -import kamon.util.{DynamicAccess, Registration} +import kamon.trace.Span.FinishedSpan +import kamon.util.{CallingThreadExecutionContext, DynamicAccess, Registration} import org.slf4j.LoggerFactory import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} @@ -42,6 +44,12 @@ trait ReporterRegistry { def stopAllReporters(): Future[Unit] } +object ReporterRegistry { + private[kamon] trait SpanSink { + def reportSpan(finishedSpan: FinishedSpan): Unit + } +} + sealed trait Reporter { def start(): Unit def stop(): Unit @@ -56,7 +64,7 @@ trait SpanReporter extends Reporter { def reportSpans(spans: Seq[Span.FinishedSpan]): Unit } -class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry { +class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry with SpanSink { private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry]) private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry")) private val reporterCounter = new AtomicLong(0L) @@ -212,7 +220,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con } } - private[kamon] def reportSpan(span: Span.FinishedSpan): Unit = { + def reportSpan(span: Span.FinishedSpan): Unit = { spanReporters.foreach { case (_, reporterEntry) => if(reporterEntry.isActive) reporterEntry.buffer.offer(span) diff --git a/kamon-core/src/main/scala/kamon/trace/ActiveSpan.scala b/kamon-core/src/main/scala/kamon/trace/ActiveSpan.scala index c30a7157..b6e5d5e9 100644 --- a/kamon-core/src/main/scala/kamon/trace/ActiveSpan.scala +++ b/kamon-core/src/main/scala/kamon/trace/ActiveSpan.scala @@ -66,6 +66,9 @@ object ActiveSpan { override def capture(): Continuation = wrappedSpan.capture() + + override def capture(activeSpanSource: ActiveSpanSource): Continuation = + wrappedSpan.capture(activeSpanSource) } object Default { diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala index 01cfbfc3..113ec3de 100644 --- a/kamon-core/src/main/scala/kamon/trace/Span.scala +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -16,9 +16,9 @@ package kamon package trace +import kamon.ReporterRegistry.SpanSink import kamon.trace.SpanContext.SamplingDecision -import scala.collection.JavaConverters._ import kamon.util.{Clock, MeasurementUnit} /** @@ -31,6 +31,8 @@ trait BaseSpan { def capture(): Continuation + def capture(activeSpanSource: ActiveSpanSource): Continuation + def annotate(annotation: Span.Annotation): Span def addSpanTag(key: String, value: String): Span @@ -76,6 +78,7 @@ object Span { final class Empty(activeSpanSource: ActiveSpanSource) extends Span { override val context: SpanContext = SpanContext.EmptySpanContext override def capture(): Continuation = Continuation.Default(this, activeSpanSource) + override def capture(activeSpanSource: ActiveSpanSource): Continuation = Continuation.Default(this, activeSpanSource) override def annotate(annotation: Annotation): Span = this override def addSpanTag(key: String, value: String): Span = this @@ -99,10 +102,10 @@ object Span { * @param initialOperationName * @param initialSpanTags * @param startTimestampMicros - * @param reporterRegistry + * @param spanSink */ final class Real(spanContext: SpanContext, initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], - initialMetricTags: Map[String, String], startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl, tracer: Tracer) extends Span { + initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink, activeSpanSource: ActiveSpanSource) extends Span { private var collectMetrics: Boolean = true private var open: Boolean = true @@ -175,12 +178,15 @@ object Span { recordSpanMetrics(finishMicros) if(sampled) - reporterRegistry.reportSpan(toFinishedSpan(finishMicros)) + spanSink.reportSpan(toFinishedSpan(finishMicros)) } } override def capture(): Continuation = - Continuation.Default(this, tracer) + Continuation.Default(this, activeSpanSource) + + override def capture(activeSpanSource: ActiveSpanSource): Continuation = + Continuation.Default(this, activeSpanSource) private def toFinishedSpan(endTimestampMicros: Long): Span.FinishedSpan = Span.FinishedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, spanTags, annotations) diff --git a/kamon-core/src/test/scala/kamon/testkit/Reconfigure.scala b/kamon-core/src/test/scala/kamon/testkit/Reconfigure.scala new file mode 100644 index 00000000..4b3b2cdb --- /dev/null +++ b/kamon-core/src/test/scala/kamon/testkit/Reconfigure.scala @@ -0,0 +1,26 @@ +package kamon.testkit + +import com.typesafe.config.ConfigFactory +import kamon.Kamon + +trait Reconfigure { + + def enableFastSpanFlushing(): Unit = { + applyConfig("kamon.trace.tick-interval = 1 millisecond") + } + + def sampleAlways(): Unit = { + applyConfig("kamon.trace.sampler = always") + } + + def sampleNever(): Unit = { + applyConfig("kamon.trace.sampler = never") + } + + private def applyConfig(configString: String): Unit = { + Kamon.reconfigure(ConfigFactory.parseString(configString).withFallback(Kamon.config())) + } + + + +} diff --git a/kamon-core/src/test/scala/kamon/testkit/TestSpanReporter.scala b/kamon-core/src/test/scala/kamon/testkit/TestSpanReporter.scala new file mode 100644 index 00000000..8ea2d433 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/testkit/TestSpanReporter.scala @@ -0,0 +1,23 @@ +package kamon.testkit + +import java.util.concurrent.LinkedBlockingQueue + +import com.typesafe.config.Config +import kamon.SpanReporter +import kamon.trace.Span +import kamon.trace.Span.FinishedSpan + +class TestSpanReporter() extends SpanReporter { + import scala.collection.JavaConverters._ + private val reportedSpans = new LinkedBlockingQueue[FinishedSpan]() + + override def reportSpans(spans: Seq[Span.FinishedSpan]): Unit = + reportedSpans.addAll(spans.asJava) + + def nextSpan(): Option[FinishedSpan] = + Option(reportedSpans.poll()) + + override def start(): Unit = {} + override def stop(): Unit = {} + override def reconfigure(config: Config): Unit = {} +} diff --git a/kamon-core/src/test/scala/kamon/trace/RealSpanSpec.scala b/kamon-core/src/test/scala/kamon/trace/RealSpanSpec.scala new file mode 100644 index 00000000..61e651c2 --- /dev/null +++ b/kamon-core/src/test/scala/kamon/trace/RealSpanSpec.scala @@ -0,0 +1,41 @@ +package kamon.trace + +import kamon.testkit.{Reconfigure, TestSpanReporter} +import kamon.util.Registration +import kamon.Kamon +import org.scalatest.concurrent.Eventually +import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpec} +import org.scalatest.time.SpanSugar._ + +class RealSpanSpec extends WordSpec with Matchers with BeforeAndAfterAll with Eventually with OptionValues with Reconfigure { + + "a real span" when { + "sampled" should { + "be sent to the Span reporters" in { + + Kamon.buildSpan("test-span") + .withSpanTag("test", "value") + .start() + .finish() + + eventually(timeout(50 milliseconds)) { + val finishedSpan = reporter.nextSpan().value + finishedSpan.operationName shouldBe("test-span") + } + } + } + } + + @volatile var registration: Registration = _ + val reporter = new TestSpanReporter() + + override protected def beforeAll(): Unit = { + enableFastSpanFlushing() + sampleAlways() + registration = Kamon.addReporter(reporter) + } + + override protected def afterAll(): Unit = { + registration.cancel() + } +} -- cgit v1.2.3