aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-07-19 14:47:39 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2017-07-19 14:47:39 +0200
commitca6659c419e434b1b8caa6e9551420aad2230e77 (patch)
treefc8d8aa588a7d99a792cd3db23c8688f0ec6852b
parent3076d7b7a499d1d7d3d2bc447d989e383dbb1b40 (diff)
downloadKamon-ca6659c419e434b1b8caa6e9551420aad2230e77.tar.gz
Kamon-ca6659c419e434b1b8caa6e9551420aad2230e77.tar.bz2
Kamon-ca6659c419e434b1b8caa6e9551420aad2230e77.zip
start writting Span.Real tests
-rw-r--r--kamon-core/src/main/scala/kamon/ReporterRegistry.scala14
-rw-r--r--kamon-core/src/main/scala/kamon/trace/ActiveSpan.scala3
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Span.scala16
-rw-r--r--kamon-core/src/test/scala/kamon/testkit/Reconfigure.scala26
-rw-r--r--kamon-core/src/test/scala/kamon/testkit/TestSpanReporter.scala23
-rw-r--r--kamon-core/src/test/scala/kamon/trace/RealSpanSpec.scala41
6 files changed, 115 insertions, 8 deletions
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
index 8a36a7c7..f0d744e5 100644
--- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
@@ -20,9 +20,11 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import java.util.concurrent._
import com.typesafe.config.Config
+import kamon.ReporterRegistry.SpanSink
import kamon.metric._
import kamon.trace.Span
-import kamon.util.{DynamicAccess, Registration}
+import kamon.trace.Span.FinishedSpan
+import kamon.util.{CallingThreadExecutionContext, DynamicAccess, Registration}
import org.slf4j.LoggerFactory
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
@@ -42,6 +44,12 @@ trait ReporterRegistry {
def stopAllReporters(): Future[Unit]
}
+object ReporterRegistry {
+ private[kamon] trait SpanSink {
+ def reportSpan(finishedSpan: FinishedSpan): Unit
+ }
+}
+
sealed trait Reporter {
def start(): Unit
def stop(): Unit
@@ -56,7 +64,7 @@ trait SpanReporter extends Reporter {
def reportSpans(spans: Seq[Span.FinishedSpan]): Unit
}
-class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry {
+class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry with SpanSink {
private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry])
private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry"))
private val reporterCounter = new AtomicLong(0L)
@@ -212,7 +220,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con
}
}
- private[kamon] def reportSpan(span: Span.FinishedSpan): Unit = {
+ def reportSpan(span: Span.FinishedSpan): Unit = {
spanReporters.foreach { case (_, reporterEntry) =>
if(reporterEntry.isActive)
reporterEntry.buffer.offer(span)
diff --git a/kamon-core/src/main/scala/kamon/trace/ActiveSpan.scala b/kamon-core/src/main/scala/kamon/trace/ActiveSpan.scala
index c30a7157..b6e5d5e9 100644
--- a/kamon-core/src/main/scala/kamon/trace/ActiveSpan.scala
+++ b/kamon-core/src/main/scala/kamon/trace/ActiveSpan.scala
@@ -66,6 +66,9 @@ object ActiveSpan {
override def capture(): Continuation =
wrappedSpan.capture()
+
+ override def capture(activeSpanSource: ActiveSpanSource): Continuation =
+ wrappedSpan.capture(activeSpanSource)
}
object Default {
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala
index 01cfbfc3..113ec3de 100644
--- a/kamon-core/src/main/scala/kamon/trace/Span.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Span.scala
@@ -16,9 +16,9 @@
package kamon
package trace
+import kamon.ReporterRegistry.SpanSink
import kamon.trace.SpanContext.SamplingDecision
-import scala.collection.JavaConverters._
import kamon.util.{Clock, MeasurementUnit}
/**
@@ -31,6 +31,8 @@ trait BaseSpan {
def capture(): Continuation
+ def capture(activeSpanSource: ActiveSpanSource): Continuation
+
def annotate(annotation: Span.Annotation): Span
def addSpanTag(key: String, value: String): Span
@@ -76,6 +78,7 @@ object Span {
final class Empty(activeSpanSource: ActiveSpanSource) extends Span {
override val context: SpanContext = SpanContext.EmptySpanContext
override def capture(): Continuation = Continuation.Default(this, activeSpanSource)
+ override def capture(activeSpanSource: ActiveSpanSource): Continuation = Continuation.Default(this, activeSpanSource)
override def annotate(annotation: Annotation): Span = this
override def addSpanTag(key: String, value: String): Span = this
@@ -99,10 +102,10 @@ object Span {
* @param initialOperationName
* @param initialSpanTags
* @param startTimestampMicros
- * @param reporterRegistry
+ * @param spanSink
*/
final class Real(spanContext: SpanContext, initialOperationName: String, initialSpanTags: Map[String, Span.TagValue],
- initialMetricTags: Map[String, String], startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl, tracer: Tracer) extends Span {
+ initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink, activeSpanSource: ActiveSpanSource) extends Span {
private var collectMetrics: Boolean = true
private var open: Boolean = true
@@ -175,12 +178,15 @@ object Span {
recordSpanMetrics(finishMicros)
if(sampled)
- reporterRegistry.reportSpan(toFinishedSpan(finishMicros))
+ spanSink.reportSpan(toFinishedSpan(finishMicros))
}
}
override def capture(): Continuation =
- Continuation.Default(this, tracer)
+ Continuation.Default(this, activeSpanSource)
+
+ override def capture(activeSpanSource: ActiveSpanSource): Continuation =
+ Continuation.Default(this, activeSpanSource)
private def toFinishedSpan(endTimestampMicros: Long): Span.FinishedSpan =
Span.FinishedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, spanTags, annotations)
diff --git a/kamon-core/src/test/scala/kamon/testkit/Reconfigure.scala b/kamon-core/src/test/scala/kamon/testkit/Reconfigure.scala
new file mode 100644
index 00000000..4b3b2cdb
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/testkit/Reconfigure.scala
@@ -0,0 +1,26 @@
+package kamon.testkit
+
+import com.typesafe.config.ConfigFactory
+import kamon.Kamon
+
+trait Reconfigure {
+
+ def enableFastSpanFlushing(): Unit = {
+ applyConfig("kamon.trace.tick-interval = 1 millisecond")
+ }
+
+ def sampleAlways(): Unit = {
+ applyConfig("kamon.trace.sampler = always")
+ }
+
+ def sampleNever(): Unit = {
+ applyConfig("kamon.trace.sampler = never")
+ }
+
+ private def applyConfig(configString: String): Unit = {
+ Kamon.reconfigure(ConfigFactory.parseString(configString).withFallback(Kamon.config()))
+ }
+
+
+
+}
diff --git a/kamon-core/src/test/scala/kamon/testkit/TestSpanReporter.scala b/kamon-core/src/test/scala/kamon/testkit/TestSpanReporter.scala
new file mode 100644
index 00000000..8ea2d433
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/testkit/TestSpanReporter.scala
@@ -0,0 +1,23 @@
+package kamon.testkit
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import com.typesafe.config.Config
+import kamon.SpanReporter
+import kamon.trace.Span
+import kamon.trace.Span.FinishedSpan
+
+class TestSpanReporter() extends SpanReporter {
+ import scala.collection.JavaConverters._
+ private val reportedSpans = new LinkedBlockingQueue[FinishedSpan]()
+
+ override def reportSpans(spans: Seq[Span.FinishedSpan]): Unit =
+ reportedSpans.addAll(spans.asJava)
+
+ def nextSpan(): Option[FinishedSpan] =
+ Option(reportedSpans.poll())
+
+ override def start(): Unit = {}
+ override def stop(): Unit = {}
+ override def reconfigure(config: Config): Unit = {}
+}
diff --git a/kamon-core/src/test/scala/kamon/trace/RealSpanSpec.scala b/kamon-core/src/test/scala/kamon/trace/RealSpanSpec.scala
new file mode 100644
index 00000000..61e651c2
--- /dev/null
+++ b/kamon-core/src/test/scala/kamon/trace/RealSpanSpec.scala
@@ -0,0 +1,41 @@
+package kamon.trace
+
+import kamon.testkit.{Reconfigure, TestSpanReporter}
+import kamon.util.Registration
+import kamon.Kamon
+import org.scalatest.concurrent.Eventually
+import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpec}
+import org.scalatest.time.SpanSugar._
+
+class RealSpanSpec extends WordSpec with Matchers with BeforeAndAfterAll with Eventually with OptionValues with Reconfigure {
+
+ "a real span" when {
+ "sampled" should {
+ "be sent to the Span reporters" in {
+
+ Kamon.buildSpan("test-span")
+ .withSpanTag("test", "value")
+ .start()
+ .finish()
+
+ eventually(timeout(50 milliseconds)) {
+ val finishedSpan = reporter.nextSpan().value
+ finishedSpan.operationName shouldBe("test-span")
+ }
+ }
+ }
+ }
+
+ @volatile var registration: Registration = _
+ val reporter = new TestSpanReporter()
+
+ override protected def beforeAll(): Unit = {
+ enableFastSpanFlushing()
+ sampleAlways()
+ registration = Kamon.addReporter(reporter)
+ }
+
+ override protected def afterAll(): Unit = {
+ registration.cancel()
+ }
+}