aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIvan Topolnjak <ivantopo@gmail.com>2017-08-21 09:23:07 +0200
committerIvan Topolnjak <ivantopo@gmail.com>2017-08-21 10:37:08 +0200
commita152a3098b564ed43766a857b32b7c7d7445f9ce (patch)
tree7651f61e598f316ee9dca415c5a5c67ce530bad5
parent3cb974e5dfd381b9b28ffef9977047cf35242121 (diff)
downloadKamon-a152a3098b564ed43766a857b32b7c7d7445f9ce.tar.gz
Kamon-a152a3098b564ed43766a857b32b7c7d7445f9ce.tar.bz2
Kamon-a152a3098b564ed43766a857b32b7c7d7445f9ce.zip
binary encoding of context and entries
-rw-r--r--kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala100
-rw-r--r--kamon-core/src/main/colfer/Context.colf (renamed from kamon-core/src/main/colfer/context.colf)2
-rw-r--r--kamon-core/src/main/colfer/Span.colf8
-rw-r--r--kamon-core/src/main/colfer/building.md5
-rw-r--r--kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java (renamed from kamon-core/src/main/java/kamon/context/encoding/Context.java)18
-rw-r--r--kamon-core/src/main/java/kamon/context/generated/binary/context/Entry.java (renamed from kamon-core/src/main/java/kamon/context/encoding/Entry.java)28
-rw-r--r--kamon-core/src/main/java/kamon/context/generated/binary/span/Span.java524
-rw-r--r--kamon-core/src/main/resources/reference.conf40
-rw-r--r--kamon-core/src/main/scala/kamon/Kamon.scala24
-rw-r--r--kamon-core/src/main/scala/kamon/ReporterRegistry.scala425
-rw-r--r--kamon-core/src/main/scala/kamon/context/Codec.scala144
-rw-r--r--kamon-core/src/main/scala/kamon/context/Codecs.scala245
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Span.scala4
-rw-r--r--kamon-core/src/main/scala/kamon/trace/SpanCodec.scala71
-rw-r--r--kamon-core/src/main/scala/kamon/trace/Tracer.scala15
-rw-r--r--kamon-testkit/src/main/resources/reference.conf13
-rw-r--r--kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala8
17 files changed, 1238 insertions, 436 deletions
diff --git a/kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala b/kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala
index 242c3345..ceac4e58 100644
--- a/kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala
+++ b/kamon-core-tests/src/test/scala/kamon/context/ContextCodecSpec.scala
@@ -1,18 +1,110 @@
package kamon.context
+import java.nio.ByteBuffer
+
import kamon.Kamon
-import org.scalatest.{Matchers, WordSpec}
+import kamon.testkit.ContextTesting
+import org.scalatest.{Matchers, OptionValues, WordSpec}
-class ContextCodecSpec extends WordSpec with Matchers {
+class ContextCodecSpec extends WordSpec with Matchers with ContextTesting with OptionValues {
"the Context Codec" when {
"encoding/decoding to HttpHeaders" should {
- "encode stuff" in {
+ "round trip a empty context" in {
+ val textMap = ContextCodec.HttpHeaders.encode(Context.Empty)
+ val decodedContext = ContextCodec.HttpHeaders.decode(textMap)
+
+ decodedContext shouldBe Context.Empty
+ }
+
+ "round trip a context with only local keys" in {
+ val localOnlyContext = Context.create(StringKey, Some("string-value"))
+ val textMap = ContextCodec.HttpHeaders.encode(localOnlyContext)
+ val decodedContext = ContextCodec.HttpHeaders.decode(textMap)
+
+ decodedContext shouldBe Context.Empty
+ }
+
+ "round trip a context with local and broadcast keys" in {
+ val initialContext = Context.create()
+ .withKey(StringKey, Some("string-value"))
+ .withKey(StringBroadcastKey, Some("this-should-be-round-tripped"))
+
+ val textMap = ContextCodec.HttpHeaders.encode(initialContext)
+ val decodedContext = ContextCodec.HttpHeaders.decode(textMap)
+
+ decodedContext.get(StringKey) shouldBe empty
+ decodedContext.get(StringBroadcastKey).value shouldBe "this-should-be-round-tripped"
+ }
+ }
+
+ "encoding/decoding to Binary" should {
+ "round trip a empty context" in {
+ val byteBuffer = ContextCodec.Binary.encode(Context.Empty)
+
+ val decodedContext = ContextCodec.Binary.decode(byteBuffer)
+
+ decodedContext shouldBe Context.Empty
+ }
+
+ "round trip a context with only local keys" in {
+ val localOnlyContext = Context.create(StringKey, Some("string-value"))
+ val byteBuffer = ContextCodec.Binary.encode(localOnlyContext)
+ val decodedContext = ContextCodec.Binary.decode(byteBuffer)
+
+ decodedContext shouldBe Context.Empty
+ }
+ "round trip a context with local and broadcast keys" in {
+ val initialContext = Context.create()
+ .withKey(StringKey, Some("string-value"))
+ .withKey(StringBroadcastKey, Some("this-should-be-round-tripped"))
+ val byteBuffer = ContextCodec.Binary.encode(initialContext)
+ val decodedContext = ContextCodec.Binary.decode(byteBuffer)
+ decodedContext.get(StringKey) shouldBe empty
+ decodedContext.get(StringBroadcastKey).value shouldBe "this-should-be-round-tripped"
}
}
}
- val ContextCodec = new Codec(Kamon.config())
+ val ContextCodec = new Codecs(Kamon.config())
+}
+
+object SimpleStringCodec {
+ final class Headers extends Codecs.ForEntry[TextMap] {
+ private val dataKey = "X-String-Value"
+
+ override def encode(context: Context): TextMap = {
+ val textMap = TextMap.Default()
+ context.get(ContextTesting.StringBroadcastKey).foreach { value =>
+ textMap.put(dataKey, value)
+ }
+
+ textMap
+ }
+
+ override def decode(carrier: TextMap, context: Context): Context = {
+ carrier.get(dataKey) match {
+ case value @ Some(_) => context.withKey(ContextTesting.StringBroadcastKey, value)
+ case None => context
+ }
+ }
+ }
+
+ final class Binary extends Codecs.ForEntry[ByteBuffer] {
+ val emptyBuffer = ByteBuffer.allocate(0)
+
+ override def encode(context: Context): ByteBuffer = {
+ context.get(ContextTesting.StringBroadcastKey) match {
+ case Some(value) => ByteBuffer.wrap(value.getBytes)
+ case None => emptyBuffer
+ }
+ }
+
+ override def decode(carrier: ByteBuffer, context: Context): Context = {
+ context.withKey(ContextTesting.StringBroadcastKey, Some(new String(carrier.array())))
+ }
+ }
+
}
diff --git a/kamon-core/src/main/colfer/context.colf b/kamon-core/src/main/colfer/Context.colf
index 26421cba..f84d7a56 100644
--- a/kamon-core/src/main/colfer/context.colf
+++ b/kamon-core/src/main/colfer/Context.colf
@@ -1,4 +1,4 @@
-package kamon
+package context
type Entry struct {
name text
diff --git a/kamon-core/src/main/colfer/Span.colf b/kamon-core/src/main/colfer/Span.colf
new file mode 100644
index 00000000..4edbed7b
--- /dev/null
+++ b/kamon-core/src/main/colfer/Span.colf
@@ -0,0 +1,8 @@
+package span
+
+type Span struct {
+ traceID binary
+ spanID binary
+ parentID binary
+ samplingDecision uint8
+} \ No newline at end of file
diff --git a/kamon-core/src/main/colfer/building.md b/kamon-core/src/main/colfer/building.md
new file mode 100644
index 00000000..f510a44f
--- /dev/null
+++ b/kamon-core/src/main/colfer/building.md
@@ -0,0 +1,5 @@
+Just download and install the colver compiler and run this command from the colfer folder:
+
+```
+colfer -b ../java -p kamon/context/generated/binary java
+``` \ No newline at end of file
diff --git a/kamon-core/src/main/java/kamon/context/encoding/Context.java b/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java
index db6ed7a9..a9917e99 100644
--- a/kamon-core/src/main/java/kamon/context/encoding/Context.java
+++ b/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java
@@ -1,4 +1,4 @@
-package kamon.context.encoding;
+package kamon.context.generated.binary.context;
// Code generated by colf(1); DO NOT EDIT.
@@ -23,7 +23,7 @@ import java.nio.BufferUnderflowException;
* @author generated by colf(1)
* @see <a href="https://github.com/pascaldekloe/colfer">Colfer's home</a>
*/
-@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file context.colf")
+@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file Context.colf")
public class Context implements Serializable {
/** The upper limit for serial byte sizes. */
@@ -188,7 +188,7 @@ public class Context implements Serializable {
int x = a.length;
if (x > Context.colferListMax)
- throw new IllegalStateException(format("colfer: kamon/context/kamon.Context.entries length %d exceeds %d elements", x, Context.colferListMax));
+ throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Context.entries length %d exceeds %d elements", x, Context.colferListMax));
while (x > 0x7f) {
buf[i++] = (byte) (x | 0x80);
x >>>= 7;
@@ -209,7 +209,7 @@ public class Context implements Serializable {
return i;
} catch (ArrayIndexOutOfBoundsException e) {
if (i - offset > Context.colferSizeMax)
- throw new IllegalStateException(format("colfer: kamon/context/kamon.Context exceeds %d bytes", Context.colferSizeMax));
+ throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Context exceeds %d bytes", Context.colferSizeMax));
if (i > buf.length) throw new BufferOverflowException();
throw e;
}
@@ -253,7 +253,7 @@ public class Context implements Serializable {
if (shift == 28 || b >= 0) break;
}
if (length < 0 || length > Context.colferListMax)
- throw new SecurityException(format("colfer: kamon/context/kamon.Context.entries length %d exceeds %d elements", length, Context.colferListMax));
+ throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Context.entries length %d exceeds %d elements", length, Context.colferListMax));
Entry[] a = new Entry[length];
for (int ai = 0; ai < length; ai++) {
@@ -270,7 +270,7 @@ public class Context implements Serializable {
} finally {
if (i > end && end - offset < Context.colferSizeMax) throw new BufferUnderflowException();
if (i < 0 || i - offset > Context.colferSizeMax)
- throw new SecurityException(format("colfer: kamon/context/kamon.Context exceeds %d bytes", Context.colferSizeMax));
+ throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Context exceeds %d bytes", Context.colferSizeMax));
if (i > end) throw new BufferUnderflowException();
}
@@ -312,7 +312,7 @@ public class Context implements Serializable {
}
/**
- * Gets kamon/context/kamon.Context.entries.
+ * Gets kamon/context/generated/binary/context.Context.entries.
* @return the value.
*/
public Entry[] getEntries() {
@@ -320,7 +320,7 @@ public class Context implements Serializable {
}
/**
- * Sets kamon/context/kamon.Context.entries.
+ * Sets kamon/context/generated/binary/context.Context.entries.
* @param value the replacement.
*/
public void setEntries(Entry[] value) {
@@ -328,7 +328,7 @@ public class Context implements Serializable {
}
/**
- * Sets kamon/context/kamon.Context.entries.
+ * Sets kamon/context/generated/binary/context.Context.entries.
* @param value the replacement.
* @return {link this}.
*/
diff --git a/kamon-core/src/main/java/kamon/context/encoding/Entry.java b/kamon-core/src/main/java/kamon/context/generated/binary/context/Entry.java
index d7734c13..dc75b10d 100644
--- a/kamon-core/src/main/java/kamon/context/encoding/Entry.java
+++ b/kamon-core/src/main/java/kamon/context/generated/binary/context/Entry.java
@@ -1,4 +1,4 @@
-package kamon.context.encoding;
+package kamon.context.generated.binary.context;
// Code generated by colf(1); DO NOT EDIT.
@@ -24,7 +24,7 @@ import java.nio.BufferUnderflowException;
* @author generated by colf(1)
* @see <a href="https://github.com/pascaldekloe/colfer">Colfer's home</a>
*/
-@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file context.colf")
+@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file Context.colf")
public class Entry implements Serializable {
/** The upper limit for serial byte sizes. */
@@ -211,7 +211,7 @@ public class Entry implements Serializable {
}
int size = i - start;
if (size > Entry.colferSizeMax)
- throw new IllegalStateException(format("colfer: kamon/context/kamon.Entry.name size %d exceeds %d UTF-8 bytes", size, Entry.colferSizeMax));
+ throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Entry.name size %d exceeds %d UTF-8 bytes", size, Entry.colferSizeMax));
int ii = start - 1;
if (size > 0x7f) {
@@ -232,7 +232,7 @@ public class Entry implements Serializable {
int size = this.content.length;
if (size > Entry.colferSizeMax)
- throw new IllegalStateException(format("colfer: kamon/context/kamon.Entry.content size %d exceeds %d bytes", size, Entry.colferSizeMax));
+ throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Entry.content size %d exceeds %d bytes", size, Entry.colferSizeMax));
int x = size;
while (x > 0x7f) {
@@ -250,7 +250,7 @@ public class Entry implements Serializable {
return i;
} catch (ArrayIndexOutOfBoundsException e) {
if (i - offset > Entry.colferSizeMax)
- throw new IllegalStateException(format("colfer: kamon/context/kamon.Entry exceeds %d bytes", Entry.colferSizeMax));
+ throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Entry exceeds %d bytes", Entry.colferSizeMax));
if (i > buf.length) throw new BufferOverflowException();
throw e;
}
@@ -294,7 +294,7 @@ public class Entry implements Serializable {
if (shift == 28 || b >= 0) break;
}
if (size < 0 || size > Entry.colferSizeMax)
- throw new SecurityException(format("colfer: kamon/context/kamon.Entry.name size %d exceeds %d UTF-8 bytes", size, Entry.colferSizeMax));
+ throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Entry.name size %d exceeds %d UTF-8 bytes", size, Entry.colferSizeMax));
int start = i;
i += size;
@@ -310,7 +310,7 @@ public class Entry implements Serializable {
if (shift == 28 || b >= 0) break;
}
if (size < 0 || size > Entry.colferSizeMax)
- throw new SecurityException(format("colfer: kamon/context/kamon.Entry.content size %d exceeds %d bytes", size, Entry.colferSizeMax));
+ throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Entry.content size %d exceeds %d bytes", size, Entry.colferSizeMax));
this.content = new byte[size];
int start = i;
@@ -325,7 +325,7 @@ public class Entry implements Serializable {
} finally {
if (i > end && end - offset < Entry.colferSizeMax) throw new BufferUnderflowException();
if (i < 0 || i - offset > Entry.colferSizeMax)
- throw new SecurityException(format("colfer: kamon/context/kamon.Entry exceeds %d bytes", Entry.colferSizeMax));
+ throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Entry exceeds %d bytes", Entry.colferSizeMax));
if (i > end) throw new BufferUnderflowException();
}
@@ -367,7 +367,7 @@ public class Entry implements Serializable {
}
/**
- * Gets kamon/context/kamon.Entry.name.
+ * Gets kamon/context/generated/binary/context.Entry.name.
* @return the value.
*/
public String getName() {
@@ -375,7 +375,7 @@ public class Entry implements Serializable {
}
/**
- * Sets kamon/context/kamon.Entry.name.
+ * Sets kamon/context/generated/binary/context.Entry.name.
* @param value the replacement.
*/
public void setName(String value) {
@@ -383,7 +383,7 @@ public class Entry implements Serializable {
}
/**
- * Sets kamon/context/kamon.Entry.name.
+ * Sets kamon/context/generated/binary/context.Entry.name.
* @param value the replacement.
* @return {link this}.
*/
@@ -393,7 +393,7 @@ public class Entry implements Serializable {
}
/**
- * Gets kamon/context/kamon.Entry.content.
+ * Gets kamon/context/generated/binary/context.Entry.content.
* @return the value.
*/
public byte[] getContent() {
@@ -401,7 +401,7 @@ public class Entry implements Serializable {
}
/**
- * Sets kamon/context/kamon.Entry.content.
+ * Sets kamon/context/generated/binary/context.Entry.content.
* @param value the replacement.
*/
public void setContent(byte[] value) {
@@ -409,7 +409,7 @@ public class Entry implements Serializable {
}
/**
- * Sets kamon/context/kamon.Entry.content.
+ * Sets kamon/context/generated/binary/context.Entry.content.
* @param value the replacement.
* @return {link this}.
*/
diff --git a/kamon-core/src/main/java/kamon/context/generated/binary/span/Span.java b/kamon-core/src/main/java/kamon/context/generated/binary/span/Span.java
new file mode 100644
index 00000000..ea7a6fe8
--- /dev/null
+++ b/kamon-core/src/main/java/kamon/context/generated/binary/span/Span.java
@@ -0,0 +1,524 @@
+package kamon.context.generated.binary.span;
+
+
+// Code generated by colf(1); DO NOT EDIT.
+
+
+import static java.lang.String.format;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.InputMismatchException;
+import java.nio.BufferOverflowException;
+import java.nio.BufferUnderflowException;
+
+
+/**
+ * Data bean with built-in serialization support.
+
+ * @author generated by colf(1)
+ * @see <a href="https://github.com/pascaldekloe/colfer">Colfer's home</a>
+ */
+@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file Span.colf")
+public class Span implements Serializable {
+
+ /** The upper limit for serial byte sizes. */
+ public static int colferSizeMax = 16 * 1024 * 1024;
+
+
+
+
+ public byte[] traceID;
+
+ public byte[] spanID;
+
+ public byte[] parentID;
+
+ public byte samplingDecision;
+
+
+ /** Default constructor */
+ public Span() {
+ init();
+ }
+
+ private static final byte[] _zeroBytes = new byte[0];
+
+ /** Colfer zero values. */
+ private void init() {
+ traceID = _zeroBytes;
+ spanID = _zeroBytes;
+ parentID = _zeroBytes;
+ }
+
+ /**
+ * {@link #reset(InputStream) Reusable} deserialization of Colfer streams.
+ */
+ public static class Unmarshaller {
+
+ /** The data source. */
+ protected InputStream in;
+
+ /** The read buffer. */
+ public byte[] buf;
+
+ /** The {@link #buf buffer}'s data start index, inclusive. */
+ protected int offset;
+
+ /** The {@link #buf buffer}'s data end index, exclusive. */
+ protected int i;
+
+
+ /**
+ * @param in the data source or {@code null}.
+ * @param buf the initial buffer or {@code null}.
+ */
+ public Unmarshaller(InputStream in, byte[] buf) {
+ // TODO: better size estimation
+ if (buf == null || buf.length == 0)
+ buf = new byte[Math.min(Span.colferSizeMax, 2048)];
+ this.buf = buf;
+ reset(in);
+ }
+
+ /**
+ * Reuses the marshaller.
+ * @param in the data source or {@code null}.
+ * @throws IllegalStateException on pending data.
+ */
+ public void reset(InputStream in) {
+ if (this.i != this.offset) throw new IllegalStateException("colfer: pending data");
+ this.in = in;
+ this.offset = 0;
+ this.i = 0;
+ }
+
+ /**
+ * Deserializes the following object.
+ * @return the result or {@code null} when EOF.
+ * @throws IOException from the input stream.
+ * @throws SecurityException on an upper limit breach defined by {@link #colferSizeMax}.
+ * @throws InputMismatchException when the data does not match this object's schema.
+ */
+ public Span next() throws IOException {
+ if (in == null) return null;
+
+ while (true) {
+ if (this.i > this.offset) {
+ try {
+ Span o = new Span();
+ this.offset = o.unmarshal(this.buf, this.offset, this.i);
+ return o;
+ } catch (BufferUnderflowException e) {
+ }
+ }
+ // not enough data
+
+ if (this.i <= this.offset) {
+ this.offset = 0;
+ this.i = 0;
+ } else if (i == buf.length) {
+ byte[] src = this.buf;
+ // TODO: better size estimation
+ if (offset == 0) this.buf = new byte[Math.min(Span.colferSizeMax, this.buf.length * 4)];
+ System.arraycopy(src, this.offset, this.buf, 0, this.i - this.offset);
+ this.i -= this.offset;
+ this.offset = 0;
+ }
+ assert this.i < this.buf.length;
+
+ int n = in.read(buf, i, buf.length - i);
+ if (n < 0) {
+ if (this.i > this.offset)
+ throw new InputMismatchException("colfer: pending data with EOF");
+ return null;
+ }
+ assert n > 0;
+ i += n;
+ }
+ }
+
+ }
+
+
+ /**
+ * Serializes the object.
+ * @param out the data destination.
+ * @param buf the initial buffer or {@code null}.
+ * @return the final buffer. When the serial fits into {@code buf} then the return is {@code buf}.
+ * Otherwise the return is a new buffer, large enough to hold the whole serial.
+ * @throws IOException from {@code out}.
+ * @throws IllegalStateException on an upper limit breach defined by {@link #colferSizeMax}.
+ */
+ public byte[] marshal(OutputStream out, byte[] buf) throws IOException {
+ // TODO: better size estimation
+ if (buf == null || buf.length == 0)
+ buf = new byte[Math.min(Span.colferSizeMax, 2048)];
+
+ while (true) {
+ int i;
+ try {
+ i = marshal(buf, 0);
+ } catch (BufferOverflowException e) {
+ buf = new byte[Math.min(Span.colferSizeMax, buf.length * 4)];
+ continue;
+ }
+
+ out.write(buf, 0, i);
+ return buf;
+ }
+ }
+
+ /**
+ * Serializes the object.
+ * @param buf the data destination.
+ * @param offset the initial index for {@code buf}, inclusive.
+ * @return the final index for {@code buf}, exclusive.
+ * @throws BufferOverflowException when {@code buf} is too small.
+ * @throws IllegalStateException on an upper limit breach defined by {@link #colferSizeMax}.
+ */
+ public int marshal(byte[] buf, int offset) {
+ int i = offset;
+
+ try {
+ if (this.traceID.length != 0) {
+ buf[i++] = (byte) 0;
+
+ int size = this.traceID.length;
+ if (size > Span.colferSizeMax)
+ throw new IllegalStateException(format("colfer: kamon/context/generated/binary/span.Span.traceID size %d exceeds %d bytes", size, Span.colferSizeMax));
+
+ int x = size;
+ while (x > 0x7f) {
+ buf[i++] = (byte) (x | 0x80);
+ x >>>= 7;
+ }
+ buf[i++] = (byte) x;
+
+ int start = i;
+ i += size;
+ System.arraycopy(this.traceID, 0, buf, start, size);
+ }
+
+ if (this.spanID.length != 0) {
+ buf[i++] = (byte) 1;
+
+ int size = this.spanID.length;
+ if (size > Span.colferSizeMax)
+ throw new IllegalStateException(format("colfer: kamon/context/generated/binary/span.Span.spanID size %d exceeds %d bytes", size, Span.colferSizeMax));
+
+ int x = size;
+ while (x > 0x7f) {
+ buf[i++] = (byte) (x | 0x80);
+ x >>>= 7;
+ }
+ buf[i++] = (byte) x;
+
+ int start = i;
+ i += size;
+ System.arraycopy(this.spanID, 0, buf, start, size);
+ }
+
+ if (this.parentID.length != 0) {
+ buf[i++] = (byte) 2;
+
+ int size = this.parentID.length;
+ if (size > Span.colferSizeMax)
+ throw new IllegalStateException(format("colfer: kamon/context/generated/binary/span.Span.parentID size %d exceeds %d bytes", size, Span.colferSizeMax));
+
+ int x = size;
+ while (x > 0x7f) {
+ buf[i++] = (byte) (x | 0x80);
+ x >>>= 7;
+ }
+ buf[i++] = (byte) x;
+
+ int start = i;
+ i += size;
+ System.arraycopy(this.parentID, 0, buf, start, size);
+ }
+
+ if (this.samplingDecision != 0) {
+ buf[i++] = (byte) 3;
+ buf[i++] = this.samplingDecision;
+ }
+
+ buf[i++] = (byte) 0x7f;
+ return i;
+ } catch (ArrayIndexOutOfBoundsException e) {
+ if (i - offset > Span.colferSizeMax)
+ throw new IllegalStateException(format("colfer: kamon/context/generated/binary/span.Span exceeds %d bytes", Span.colferSizeMax));
+ if (i > buf.length) throw new BufferOverflowException();
+ throw e;
+ }
+ }
+
+ /**
+ * Deserializes the object.
+ * @param buf the data source.
+ * @param offset the initial index for {@code buf}, inclusive.
+ * @return the final index for {@code buf}, exclusive.
+ * @throws BufferUnderflowException when {@code buf} is incomplete. (EOF)
+ * @throws SecurityException on an upper limit breach defined by {@link #colferSizeMax}.
+ * @throws InputMismatchException when the data does not match this object's schema.
+ */
+ public int unmarshal(byte[] buf, int offset) {
+ return unmarshal(buf, offset, buf.length);
+ }
+
+ /**
+ * Deserializes the object.
+ * @param buf the data source.
+ * @param offset the initial index for {@code buf}, inclusive.
+ * @param end the index limit for {@code buf}, exclusive.
+ * @return the final index for {@code buf}, exclusive.
+ * @throws BufferUnderflowException when {@code buf} is incomplete. (EOF)
+ * @throws SecurityException on an upper limit breach defined by {@link #colferSizeMax}.
+ * @throws InputMismatchException when the data does not match this object's schema.
+ */
+ public int unmarshal(byte[] buf, int offset, int end) {
+ if (end > buf.length) end = buf.length;
+ int i = offset;
+
+ try {
+ byte header = buf[i++];
+
+ if (header == (byte) 0) {
+ int size = 0;
+ for (int shift = 0; true; shift += 7) {
+ byte b = buf[i++];
+ size |= (b & 0x7f) << shift;
+ if (shift == 28 || b >= 0) break;
+ }
+ if (size < 0 || size > Span.colferSizeMax)
+ throw new SecurityException(format("colfer: kamon/context/generated/binary/span.Span.traceID size %d exceeds %d bytes", size, Span.colferSizeMax));
+
+ this.traceID = new byte[size];
+ int start = i;
+ i += size;
+ System.arraycopy(buf, start, this.traceID, 0, size);
+
+ header = buf[i++];
+ }
+
+ if (header == (byte) 1) {
+ int size = 0;
+ for (int shift = 0; true; shift += 7) {
+ byte b = buf[i++];
+ size |= (b & 0x7f) << shift;
+ if (shift == 28 || b >= 0) break;
+ }
+ if (size < 0 || size > Span.colferSizeMax)
+ throw new SecurityException(format("colfer: kamon/context/generated/binary/span.Span.spanID size %d exceeds %d bytes", size, Span.colferSizeMax));
+
+ this.spanID = new byte[size];
+ int start = i;
+ i += size;
+ System.arraycopy(buf, start, this.spanID, 0, size);
+
+ header = buf[i++];
+ }
+
+ if (header == (byte) 2) {
+ int size = 0;
+ for (int shift = 0; true; shift += 7) {
+ byte b = buf[i++];
+ size |= (b & 0x7f) << shift;
+ if (shift == 28 || b >= 0) break;
+ }
+ if (size < 0 || size > Span.colferSizeMax)
+ throw new SecurityException(format("colfer: kamon/context/generated/binary/span.Span.parentID size %d exceeds %d bytes", size, Span.colferSizeMax));
+
+ this.parentID = new byte[size];
+ int start = i;
+ i += size;
+ System.arraycopy(buf, start, this.parentID, 0, size);
+
+ header = buf[i++];
+ }
+
+ if (header == (byte) 3) {
+ this.samplingDecision = buf[i++];
+ header = buf[i++];
+ }
+
+ if (header != (byte) 0x7f)
+ throw new InputMismatchException(format("colfer: unknown header at byte %d", i - 1));
+ } finally {
+ if (i > end && end - offset < Span.colferSizeMax) throw new BufferUnderflowException();
+ if (i < 0 || i - offset > Span.colferSizeMax)
+ throw new SecurityException(format("colfer: kamon/context/generated/binary/span.Span exceeds %d bytes", Span.colferSizeMax));
+ if (i > end) throw new BufferUnderflowException();
+ }
+
+ return i;
+ }
+
+ // {@link Serializable} version number.
+ private static final long serialVersionUID = 4L;
+
+ // {@link Serializable} Colfer extension.
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ // TODO: better size estimation
+ byte[] buf = new byte[1024];
+ int n;
+ while (true) try {
+ n = marshal(buf, 0);
+ break;
+ } catch (BufferUnderflowException e) {
+ buf = new byte[4 * buf.length];
+ }
+
+ out.writeInt(n);
+ out.write(buf, 0, n);
+ }
+
+ // {@link Serializable} Colfer extension.
+ private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
+ init();
+
+ int n = in.readInt();
+ byte[] buf = new byte[n];
+ in.readFully(buf);
+ unmarshal(buf, 0);
+ }
+
+ // {@link Serializable} Colfer extension.
+ private void readObjectNoData() throws ObjectStreamException {
+ init();
+ }
+
+ /**
+ * Gets kamon/context/generated/binary/span.Span.traceID.
+ * @return the value.
+ */
+ public byte[] getTraceID() {
+ return this.traceID;
+ }
+
+ /**
+ * Sets kamon/context/generated/binary/span.Span.traceID.
+ * @param value the replacement.
+ */
+ public void setTraceID(byte[] value) {
+ this.traceID = value;
+ }
+
+ /**
+ * Sets kamon/context/generated/binary/span.Span.traceID.
+ * @param value the replacement.
+ * @return {link this}.
+ */
+ public Span withTraceID(byte[] value) {
+ this.traceID = value;
+ return this;
+ }
+
+ /**
+ * Gets kamon/context/generated/binary/span.Span.spanID.
+ * @return the value.
+ */
+ public byte[] getSpanID() {
+ return this.spanID;
+ }
+
+ /**
+ * Sets kamon/context/generated/binary/span.Span.spanID.
+ * @param value the replacement.
+ */
+ public void setSpanID(byte[] value) {
+ this.spanID = value;
+ }
+
+ /**
+ * Sets kamon/context/generated/binary/span.Span.spanID.
+ * @param value the replacement.
+ * @return {link this}.
+ */
+ public Span withSpanID(byte[] value) {
+ this.spanID = value;
+ return this;
+ }
+
+ /**
+ * Gets kamon/context/generated/binary/span.Span.parentID.
+ * @return the value.
+ */
+ public byte[] getParentID() {
+ return this.parentID;
+ }
+
+ /**
+ * Sets kamon/context/generated/binary/span.Span.parentID.
+ * @param value the replacement.
+ */
+ public void setParentID(byte[] value) {
+ this.parentID = value;
+ }
+
+ /**
+ * Sets kamon/context/generated/binary/span.Span.parentID.
+ * @param value the replacement.
+ * @return {link this}.
+ */
+ public Span withParentID(byte[] value) {
+ this.parentID = value;
+ return this;
+ }
+
+ /**
+ * Gets kamon/context/generated/binary/span.Span.samplingDecision.
+ * @return the value.
+ */
+ public byte getSamplingDecision() {
+ return this.samplingDecision;
+ }
+
+ /**
+ * Sets kamon/context/generated/binary/span.Span.samplingDecision.
+ * @param value the replacement.
+ */
+ public void setSamplingDecision(byte value) {
+ this.samplingDecision = value;
+ }
+
+ /**
+ * Sets kamon/context/generated/binary/span.Span.samplingDecision.
+ * @param value the replacement.
+ * @return {link this}.
+ */
+ public Span withSamplingDecision(byte value) {
+ this.samplingDecision = value;
+ return this;
+ }
+
+ @Override
+ public final int hashCode() {
+ int h = 1;
+ for (byte b : this.traceID) h = 31 * h + b;
+ for (byte b : this.spanID) h = 31 * h + b;
+ for (byte b : this.parentID) h = 31 * h + b;
+ h = 31 * h + (this.samplingDecision & 0xff);
+ return h;
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ return o instanceof Span && equals((Span) o);
+ }
+
+ public final boolean equals(Span o) {
+ if (o == null) return false;
+ if (o == this) return true;
+ return o.getClass() == Span.class
+ && java.util.Arrays.equals(this.traceID, o.traceID)
+ && java.util.Arrays.equals(this.spanID, o.spanID)
+ && java.util.Arrays.equals(this.parentID, o.parentID)
+ && this.samplingDecision == o.samplingDecision;
+ }
+
+}
diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf
index 5b885fd0..e0002194 100644
--- a/kamon-core/src/main/resources/reference.conf
+++ b/kamon-core/src/main/resources/reference.conf
@@ -123,39 +123,27 @@ kamon {
# with the name of the operation on the parent Span, if any.
scope-spans-to-parent = yes
}
-
- # The SpanContextCodecs are used to encode/decode the SpanContext data into simple TextMaps, HTTP Headers or Binary
- # carriers. The decision about which one to use is based on the kamon.trace.SpanContextCodec.Format instance passed
- # to inject/extract calls.
- #
- # Any external implementation can be configured here, as long as it can be instantiated with a single parameter
- # constructor that accepts a IdentityProvider.
- span-context-codec {
-
- # Encodes/Decodes the SpanContext data using a simple key/value pair. Since this is very rarely going to be used
- # we default to using the same codec for HTTP Headers, as it is built on top of a TextMap.
- text-map = "kamon.trace.SpanContextCodec$ExtendedB3"
-
- # Encodes/Decodes the SpanContext into a TextMap with HTTP Header friendly, URLEncoded values. The default
- # implementation follows the guidelines of B3 propagation. See more at https://github.com/openzipkin/b3-propagation.
- http-headers = "kamon.trace.SpanContextCodec$ExtendedB3"
-
- # Encodes/Decodes the SpanContext using a binary representation.
- binary = "TODO"
- }
-
-
}
context {
- encoding {
- http-headers {
+ # Codecs are used to encode/decode Context keys when a Context must be propagated either through HTTP headers or
+ # Binary transports. Only broadcast keys configured bellow will be processed by the context Codec. The FQCN of
+ # the appropriate Codecs for each key must be provided, otherwise keys will be ignored.
+ #
+ codecs {
+
+ # Size of the encoding buffer for the Binary Codec.
+ binary-buffer-size = 256
+
+ # Codecs to be used when propagating a Context through a HTTP Headers transport.
+ http-headers-keys {
span = "kamon.trace.SpanCodec$B3"
}
- binary {
- # span = "kamon.trace.propagation.Binary"
+ # Codecs to be used when propagating a Context through a Binary transport.
+ binary-keys {
+ span = "kamon.trace.SpanCodec$Colfer"
}
}
}
diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala
index 2c0561e2..f251b1ec 100644
--- a/kamon-core/src/main/scala/kamon/Kamon.scala
+++ b/kamon-core/src/main/scala/kamon/Kamon.scala
@@ -24,7 +24,7 @@ import scala.concurrent.Future
import java.time.Duration
import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor}
-import kamon.context.{Codec, Context, Storage}
+import kamon.context.{Codecs, Context, Storage}
import org.slf4j.LoggerFactory
import scala.util.Try
@@ -39,10 +39,10 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer {
private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler"))
private val _metrics = new MetricRegistry(_config, _scheduler)
- private val _reporters = new ReporterRegistryImpl(_metrics, _config)
- private val _tracer = Tracer.Default(Kamon, _reporters, _config)
+ private val _reporterRegistry = new ReporterRegistry.Default(_metrics, _config)
+ private val _tracer = Tracer.Default(Kamon, _reporterRegistry, _config)
private val _contextStorage = Storage.ThreadLocal()
- private val _contextCodec = new Codec(_config)
+ private val _contextCodec = new Codecs(_config)
private var _onReconfigureHooks = Seq.empty[OnReconfigureHook]
def environment: Environment =
@@ -56,7 +56,7 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer {
_environment = Environment.fromConfig(config)
_filters = Filters.fromConfig(config)
_metrics.reconfigure(config)
- _reporters.reconfigure(config)
+ _reporterRegistry.reconfigure(config)
_tracer.reconfigure(config)
_contextCodec.reconfigure(config)
@@ -100,7 +100,7 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer {
override def identityProvider: IdentityProvider =
_tracer.identityProvider
- def contextCodec(): Codec =
+ def contextCodec(): Codecs =
_contextCodec
def currentContext(): Context =
@@ -120,22 +120,22 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer {
override def loadReportersFromConfig(): Unit =
- _reporters.loadReportersFromConfig()
+ _reporterRegistry.loadReportersFromConfig()
override def addReporter(reporter: MetricReporter): Registration =
- _reporters.addReporter(reporter)
+ _reporterRegistry.addReporter(reporter)
override def addReporter(reporter: MetricReporter, name: String): Registration =
- _reporters.addReporter(reporter, name)
+ _reporterRegistry.addReporter(reporter, name)
override def addReporter(reporter: SpanReporter): Registration =
- _reporters.addReporter(reporter)
+ _reporterRegistry.addReporter(reporter)
override def addReporter(reporter: SpanReporter, name: String): Registration =
- _reporters.addReporter(reporter, name)
+ _reporterRegistry.addReporter(reporter, name)
override def stopAllReporters(): Future[Unit] =
- _reporters.stopAllReporters()
+ _reporterRegistry.stopAllReporters()
def filter(filterName: String, pattern: String): Boolean =
_filters.accept(filterName, pattern)
diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
index f0d744e5..ff135f60 100644
--- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
+++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala
@@ -33,6 +33,20 @@ import scala.util.control.NonFatal
import scala.collection.JavaConverters._
import scala.collection.concurrent.TrieMap
+sealed trait Reporter {
+ def start(): Unit
+ def stop(): Unit
+ def reconfigure(config: Config): Unit
+}
+
+trait MetricReporter extends Reporter {
+ def reportTickSnapshot(snapshot: TickSnapshot): Unit
+}
+
+trait SpanReporter extends Reporter {
+ def reportSpans(spans: Seq[Span.FinishedSpan]): Unit
+}
+
trait ReporterRegistry {
def loadReportersFromConfig(): Unit
@@ -48,274 +62,261 @@ object ReporterRegistry {
private[kamon] trait SpanSink {
def reportSpan(finishedSpan: FinishedSpan): Unit
}
-}
-
-sealed trait Reporter {
- def start(): Unit
- def stop(): Unit
- def reconfigure(config: Config): Unit
-}
-trait MetricReporter extends Reporter {
- def reportTickSnapshot(snapshot: TickSnapshot): Unit
-}
-
-trait SpanReporter extends Reporter {
- def reportSpans(spans: Seq[Span.FinishedSpan]): Unit
-}
-
-class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry with SpanSink {
- private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry])
- private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry"))
- private val reporterCounter = new AtomicLong(0L)
- private var registryConfiguration = readRegistryConfiguration(initialConfig)
-
- private val metricReporters = TrieMap[Long, MetricReporterEntry]()
- private val metricReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
- private val spanReporters = TrieMap[Long, SpanReporterEntry]()
- private val spanReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
-
-
- reconfigure(initialConfig)
-
- override def loadReportersFromConfig(): Unit = {
- if(registryConfiguration.configuredReporters.isEmpty)
- logger.info("The kamon.reporters setting is empty, no reporters have been started.")
- else {
- registryConfiguration.configuredReporters.foreach { reporterFQCN =>
- val dynamicAccess = new DynamicAccess(getClass.getClassLoader)
- dynamicAccess.createInstanceFor[Reporter](reporterFQCN, Nil).map({
- case mr: MetricReporter =>
- addMetricReporter(mr, "loaded-from-config: " + reporterFQCN)
- logger.info("Loaded metric reporter [{}]", reporterFQCN)
-
- case sr: SpanReporter =>
- addSpanReporter(sr, "loaded-from-config: " + reporterFQCN)
- logger.info("Loaded span reporter [{}]", reporterFQCN)
-
- }).failed.foreach {
- t => logger.error(s"Failed to load configured reporter [$reporterFQCN]", t)
+ private[kamon] class Default(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry with SpanSink {
+ private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry])
+ private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry"))
+ private val reporterCounter = new AtomicLong(0L)
+ private var registryConfiguration = readRegistryConfiguration(initialConfig)
+
+ private val metricReporters = TrieMap[Long, MetricReporterEntry]()
+ private val metricReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
+ private val spanReporters = TrieMap[Long, SpanReporterEntry]()
+ private val spanReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]()
+
+
+ reconfigure(initialConfig)
+
+ override def loadReportersFromConfig(): Unit = {
+ if(registryConfiguration.configuredReporters.isEmpty)
+ logger.info("The kamon.reporters setting is empty, no reporters have been started.")
+ else {
+ registryConfiguration.configuredReporters.foreach { reporterFQCN =>
+ val dynamicAccess = new DynamicAccess(getClass.getClassLoader)
+ dynamicAccess.createInstanceFor[Reporter](reporterFQCN, Nil).map({
+ case mr: MetricReporter =>
+ addMetricReporter(mr, "loaded-from-config: " + reporterFQCN)
+ logger.info("Loaded metric reporter [{}]", reporterFQCN)
+
+ case sr: SpanReporter =>
+ addSpanReporter(sr, "loaded-from-config: " + reporterFQCN)
+ logger.info("Loaded span reporter [{}]", reporterFQCN)
+
+ }).failed.foreach {
+ t => logger.error(s"Failed to load configured reporter [$reporterFQCN]", t)
+ }
}
}
}
- }
- override def addReporter(reporter: MetricReporter): Registration =
- addMetricReporter(reporter, reporter.getClass.getName())
+ override def addReporter(reporter: MetricReporter): Registration =
+ addMetricReporter(reporter, reporter.getClass.getName())
- override def addReporter(reporter: MetricReporter, name: String): Registration =
- addMetricReporter(reporter, name)
+ override def addReporter(reporter: MetricReporter, name: String): Registration =
+ addMetricReporter(reporter, name)
- override def addReporter(reporter: SpanReporter): Registration =
- addSpanReporter(reporter, reporter.getClass.getName())
+ override def addReporter(reporter: SpanReporter): Registration =
+ addSpanReporter(reporter, reporter.getClass.getName())
- override def addReporter(reporter: SpanReporter, name: String): Registration =
- addSpanReporter(reporter, name)
+ override def addReporter(reporter: SpanReporter, name: String): Registration =
+ addSpanReporter(reporter, name)
- private def addMetricReporter(reporter: MetricReporter, name: String): Registration = synchronized {
- val executor = Executors.newSingleThreadExecutor(threadFactory(name))
- val reporterEntry = new MetricReporterEntry(
- id = reporterCounter.getAndIncrement(),
- name = name,
- reporter = reporter,
- executionContext = ExecutionContext.fromExecutorService(executor)
- )
+ private def addMetricReporter(reporter: MetricReporter, name: String): Registration = synchronized {
+ val executor = Executors.newSingleThreadExecutor(threadFactory(name))
+ val reporterEntry = new MetricReporterEntry(
+ id = reporterCounter.getAndIncrement(),
+ name = name,
+ reporter = reporter,
+ executionContext = ExecutionContext.fromExecutorService(executor)
+ )
- Future(reporterEntry.reporter.start())(reporterEntry.executionContext)
+ Future(reporterEntry.reporter.start())(reporterEntry.executionContext)
- if(metricReporters.isEmpty)
- reStartMetricTicker()
+ if(metricReporters.isEmpty)
+ reStartMetricTicker()
- metricReporters.put(reporterEntry.id, reporterEntry)
- createRegistration(reporterEntry.id, metricReporters)
+ metricReporters.put(reporterEntry.id, reporterEntry)
+ createRegistration(reporterEntry.id, metricReporters)
- }
+ }
- private def addSpanReporter(reporter: SpanReporter, name: String): Registration = synchronized {
- val executor = Executors.newSingleThreadExecutor(threadFactory(name))
- val reporterEntry = new SpanReporterEntry(
- id = reporterCounter.incrementAndGet(),
- name = name,
- reporter = reporter,
- bufferCapacity = registryConfiguration.traceReporterQueueSize,
- executionContext = ExecutionContext.fromExecutorService(executor)
- )
+ private def addSpanReporter(reporter: SpanReporter, name: String): Registration = synchronized {
+ val executor = Executors.newSingleThreadExecutor(threadFactory(name))
+ val reporterEntry = new SpanReporterEntry(
+ id = reporterCounter.incrementAndGet(),
+ name = name,
+ reporter = reporter,
+ bufferCapacity = registryConfiguration.traceReporterQueueSize,
+ executionContext = ExecutionContext.fromExecutorService(executor)
+ )
- Future(reporterEntry.reporter.start())(reporterEntry.executionContext)
+ Future(reporterEntry.reporter.start())(reporterEntry.executionContext)
- if(spanReporters.isEmpty)
- reStartTraceTicker()
+ if(spanReporters.isEmpty)
+ reStartTraceTicker()
- spanReporters.put(reporterEntry.id, reporterEntry)
- createRegistration(reporterEntry.id, spanReporters)
- }
+ spanReporters.put(reporterEntry.id, reporterEntry)
+ createRegistration(reporterEntry.id, spanReporters)
+ }
- private def createRegistration(id: Long, target: TrieMap[Long, _]): Registration = new Registration {
- override def cancel(): Boolean =
- target.remove(id).nonEmpty
- }
+ private def createRegistration(id: Long, target: TrieMap[Long, _]): Registration = new Registration {
+ override def cancel(): Boolean =
+ target.remove(id).nonEmpty
+ }
- override def stopAllReporters(): Future[Unit] = {
- implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext)
- val reporterStopFutures = Vector.newBuilder[Future[Unit]]
+ override def stopAllReporters(): Future[Unit] = {
+ implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext)
+ val reporterStopFutures = Vector.newBuilder[Future[Unit]]
- while(metricReporters.nonEmpty) {
- val (idToRemove, _) = metricReporters.head
- metricReporters.remove(idToRemove).foreach { entry =>
- reporterStopFutures += stopMetricReporter(entry)
+ while(metricReporters.nonEmpty) {
+ val (idToRemove, _) = metricReporters.head
+ metricReporters.remove(idToRemove).foreach { entry =>
+ reporterStopFutures += stopMetricReporter(entry)
+ }
}
- }
- while(spanReporters.nonEmpty) {
- val (idToRemove, _) = spanReporters.head
- spanReporters.remove(idToRemove).foreach { entry =>
- reporterStopFutures += stopSpanReporter(entry)
+ while(spanReporters.nonEmpty) {
+ val (idToRemove, _) = spanReporters.head
+ spanReporters.remove(idToRemove).foreach { entry =>
+ reporterStopFutures += stopSpanReporter(entry)
+ }
}
- }
- Future.sequence(reporterStopFutures.result()).map(_ => Try((): Unit))
- }
+ Future.sequence(reporterStopFutures.result()).map(_ => Try((): Unit))
+ }
- private[kamon] def reconfigure(config: Config): Unit = synchronized {
- val newConfig = readRegistryConfiguration(config)
+ private[kamon] def reconfigure(config: Config): Unit = synchronized {
+ val newConfig = readRegistryConfiguration(config)
- if(newConfig.metricTickInterval != registryConfiguration.metricTickInterval && metricReporters.nonEmpty)
- reStartMetricTicker()
+ if(newConfig.metricTickInterval != registryConfiguration.metricTickInterval && metricReporters.nonEmpty)
+ reStartMetricTicker()
- if(newConfig.traceTickInterval != registryConfiguration.metricTickInterval && spanReporters.nonEmpty)
- reStartTraceTicker()
+ if(newConfig.traceTickInterval != registryConfiguration.metricTickInterval && spanReporters.nonEmpty)
+ reStartTraceTicker()
- // Reconfigure all registered reporters
- metricReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) }
- spanReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) }
- registryConfiguration = newConfig
- }
+ // Reconfigure all registered reporters
+ metricReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) }
+ spanReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) }
+ registryConfiguration = newConfig
+ }
- private def reStartMetricTicker(): Unit = {
- val tickIntervalMillis = registryConfiguration.metricTickInterval.toMillis
- val currentMetricTicker = metricReporterTickerSchedule.get()
+ private def reStartMetricTicker(): Unit = {
+ val tickIntervalMillis = registryConfiguration.metricTickInterval.toMillis
+ val currentMetricTicker = metricReporterTickerSchedule.get()
- if(currentMetricTicker != null)
- currentMetricTicker.cancel(false)
+ if(currentMetricTicker != null)
+ currentMetricTicker.cancel(false)
- metricReporterTickerSchedule.set {
- registryExecutionContext.scheduleAtFixedRate(
- new MetricReporterTicker(metrics, metricReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS
- )
+ metricReporterTickerSchedule.set {
+ registryExecutionContext.scheduleAtFixedRate(
+ new MetricReporterTicker(metrics, metricReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS
+ )
+ }
}
- }
- private def reStartTraceTicker(): Unit = {
- val tickIntervalMillis = registryConfiguration.traceTickInterval.toMillis
- val currentSpanTicker = spanReporterTickerSchedule.get()
- if(currentSpanTicker != null)
- currentSpanTicker.cancel(false)
+ private def reStartTraceTicker(): Unit = {
+ val tickIntervalMillis = registryConfiguration.traceTickInterval.toMillis
+ val currentSpanTicker = spanReporterTickerSchedule.get()
+ if(currentSpanTicker != null)
+ currentSpanTicker.cancel(false)
- spanReporterTickerSchedule.set {
- registryExecutionContext.scheduleAtFixedRate(
- new SpanReporterTicker(spanReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS
- )
+ spanReporterTickerSchedule.set {
+ registryExecutionContext.scheduleAtFixedRate(
+ new SpanReporterTicker(spanReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS
+ )
+ }
}
- }
- def reportSpan(span: Span.FinishedSpan): Unit = {
- spanReporters.foreach { case (_, reporterEntry) =>
- if(reporterEntry.isActive)
- reporterEntry.buffer.offer(span)
+ def reportSpan(span: Span.FinishedSpan): Unit = {
+ spanReporters.foreach { case (_, reporterEntry) =>
+ if(reporterEntry.isActive)
+ reporterEntry.buffer.offer(span)
+ }
}
- }
- private def stopMetricReporter(entry: MetricReporterEntry): Future[Unit] = {
- entry.isActive = false
+ private def stopMetricReporter(entry: MetricReporterEntry): Future[Unit] = {
+ entry.isActive = false
- Future(entry.reporter.stop())(entry.executionContext).andThen {
- case _ => entry.executionContext.shutdown()
- }(ExecutionContext.fromExecutor(registryExecutionContext))
- }
+ Future(entry.reporter.stop())(entry.executionContext).andThen {
+ case _ => entry.executionContext.shutdown()
+ }(ExecutionContext.fromExecutor(registryExecutionContext))
+ }
- private def stopSpanReporter(entry: SpanReporterEntry): Future[Unit] = {
- entry.isActive = false
+ private def stopSpanReporter(entry: SpanReporterEntry): Future[Unit] = {
+ entry.isActive = false
- Future(entry.reporter.stop())(entry.executionContext).andThen {
- case _ => entry.executionContext.shutdown()
- }(ExecutionContext.fromExecutor(registryExecutionContext))
- }
+ Future(entry.reporter.stop())(entry.executionContext).andThen {
+ case _ => entry.executionContext.shutdown()
+ }(ExecutionContext.fromExecutor(registryExecutionContext))
+ }
- private class MetricReporterEntry(
- @volatile var isActive: Boolean = true,
- val id: Long,
- val name: String,
- val reporter: MetricReporter,
- val executionContext: ExecutionContextExecutorService
- )
-
- private class SpanReporterEntry(
- @volatile var isActive: Boolean = true,
- val id: Long,
- val name: String,
- val reporter: SpanReporter,
- val bufferCapacity: Int,
- val executionContext: ExecutionContextExecutorService
- ) {
- val buffer = new ArrayBlockingQueue[Span.FinishedSpan](bufferCapacity)
- }
+ private class MetricReporterEntry(
+ @volatile var isActive: Boolean = true,
+ val id: Long,
+ val name: String,
+ val reporter: MetricReporter,
+ val executionContext: ExecutionContextExecutorService
+ )
- private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry]) extends Runnable {
- val logger = LoggerFactory.getLogger(classOf[MetricReporterTicker])
- var lastTick = System.currentTimeMillis()
+ private class SpanReporterEntry(
+ @volatile var isActive: Boolean = true,
+ val id: Long,
+ val name: String,
+ val reporter: SpanReporter,
+ val bufferCapacity: Int,
+ val executionContext: ExecutionContextExecutorService
+ ) {
+ val buffer = new ArrayBlockingQueue[Span.FinishedSpan](bufferCapacity)
+ }
- def run(): Unit = try {
- val currentTick = System.currentTimeMillis()
- val tickSnapshot = TickSnapshot(
- interval = Interval(lastTick, currentTick),
- metrics = snapshotGenerator.snapshot()
- )
+ private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry]) extends Runnable {
+ val logger = LoggerFactory.getLogger(classOf[MetricReporterTicker])
+ var lastTick = System.currentTimeMillis()
- reporterEntries.foreach { case (_, entry) =>
- Future {
- Try {
- if (entry.isActive)
- entry.reporter.reportTickSnapshot(tickSnapshot)
+ def run(): Unit = try {
+ val currentTick = System.currentTimeMillis()
+ val tickSnapshot = TickSnapshot(
+ interval = Interval(lastTick, currentTick),
+ metrics = snapshotGenerator.snapshot()
+ )
- }.failed.foreach { error =>
- logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error)
- }
+ reporterEntries.foreach { case (_, entry) =>
+ Future {
+ Try {
+ if (entry.isActive)
+ entry.reporter.reportTickSnapshot(tickSnapshot)
- }(entry.executionContext)
- }
+ }.failed.foreach { error =>
+ logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error)
+ }
- lastTick = currentTick
+ }(entry.executionContext)
+ }
- } catch {
- case NonFatal(t) => logger.error("Error while running a tick", t)
+ lastTick = currentTick
+
+ } catch {
+ case NonFatal(t) => logger.error("Error while running a tick", t)
+ }
}
- }
- private class SpanReporterTicker(spanReporters: TrieMap[Long, SpanReporterEntry]) extends Runnable {
- override def run(): Unit = {
- spanReporters.foreach {
- case (_, entry) =>
+ private class SpanReporterTicker(spanReporters: TrieMap[Long, SpanReporterEntry]) extends Runnable {
+ override def run(): Unit = {
+ spanReporters.foreach {
+ case (_, entry) =>
- val spanBatch = new java.util.ArrayList[Span.FinishedSpan](entry.bufferCapacity)
- entry.buffer.drainTo(spanBatch, entry.bufferCapacity)
+ val spanBatch = new java.util.ArrayList[Span.FinishedSpan](entry.bufferCapacity)
+ entry.buffer.drainTo(spanBatch, entry.bufferCapacity)
- Future {
- entry.reporter.reportSpans(spanBatch.asScala)
- }(entry.executionContext)
+ Future {
+ entry.reporter.reportSpans(spanBatch.asScala)
+ }(entry.executionContext)
+ }
}
}
- }
- private def readRegistryConfiguration(config: Config): Configuration =
- Configuration(
- metricTickInterval = config.getDuration("kamon.metric.tick-interval"),
- traceTickInterval = config.getDuration("kamon.trace.tick-interval"),
- traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size"),
- configuredReporters = config.getStringList("kamon.reporters").asScala
- )
+ private def readRegistryConfiguration(config: Config): Configuration =
+ Configuration(
+ metricTickInterval = config.getDuration("kamon.metric.tick-interval"),
+ traceTickInterval = config.getDuration("kamon.trace.tick-interval"),
+ traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size"),
+ configuredReporters = config.getStringList("kamon.reporters").asScala
+ )
+
+ private case class Configuration(metricTickInterval: Duration, traceTickInterval: Duration,
+ traceReporterQueueSize: Int, configuredReporters: Seq[String])
+ }
+}
- private case class Configuration(metricTickInterval: Duration, traceTickInterval: Duration,
- traceReporterQueueSize: Int, configuredReporters: Seq[String])
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/context/Codec.scala b/kamon-core/src/main/scala/kamon/context/Codec.scala
deleted file mode 100644
index 19ec2a20..00000000
--- a/kamon-core/src/main/scala/kamon/context/Codec.scala
+++ /dev/null
@@ -1,144 +0,0 @@
-/* =========================================================================================
- * Copyright © 2013-2017 the kamon project <http://kamon.io/>
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the
- * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- * =========================================================================================
- */
-
-package kamon
-package context
-
-import com.typesafe.config.Config
-import kamon.util.DynamicAccess
-import org.slf4j.LoggerFactory
-
-import scala.collection.mutable
-
-class Codec(initialConfig: Config) {
- private val log = LoggerFactory.getLogger(classOf[Codec])
-
- @volatile private var httpHeaders: Codec.ForContext[TextMap] = new Codec.HttpHeaders(Map.empty)
- //val Binary: Codec.ForContext[ByteBuffer] = _
- reconfigure(initialConfig)
-
-
- def HttpHeaders: Codec.ForContext[TextMap] =
- httpHeaders
-
- def reconfigure(config: Config): Unit = {
- httpHeaders = new Codec.HttpHeaders(readEntryCodecs("kamon.context.encoding.http-headers", config))
- }
-
- private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codec.ForEntry[T]] = {
- val rootConfig = config.getConfig(rootKey)
- val dynamic = new DynamicAccess(getClass.getClassLoader)
- val entries = Map.newBuilder[String, Codec.ForEntry[T]]
-
- rootConfig.topLevelKeys.foreach(key => {
- try {
- val fqcn = rootConfig.getString(key)
- entries += ((key, dynamic.createInstanceFor[Codec.ForEntry[T]](fqcn, Nil).get))
- } catch {
- case e: Throwable =>
- log.error(s"Failed to initialize codec for key [$key]", e)
- }
- })
-
- entries.result()
- }
-}
-
-object Codec {
-
- trait ForContext[T] {
- def encode(context: Context): T
- def decode(carrier: T): Context
- }
-
- trait ForEntry[T] {
- def encode(context: Context): T
- def decode(carrier: T, context: Context): Context
- }
-
- final class HttpHeaders(entryCodecs: Map[String, Codec.ForEntry[TextMap]]) extends Codec.ForContext[TextMap] {
- private val log = LoggerFactory.getLogger(classOf[HttpHeaders])
-
- override def encode(context: Context): TextMap = {
- val encoded = TextMap.Default()
-
- context.entries.foreach {
- case (key, _) if key.broadcast =>
- entryCodecs.get(key.name) match {
- case Some(codec) =>
- try {
- codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2))
- } catch {
- case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e)
- }
-
- case None =>
- log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name)
- }
- }
-
- encoded
- }
-
- override def decode(carrier: TextMap): Context = {
- var context: Context = Context.Empty
-
- try {
- context = entryCodecs.foldLeft(context)((ctx, codecEntry) => {
- val (_, codec) = codecEntry
- codec.decode(carrier, ctx)
- })
-
- } catch {
- case e: Throwable =>
- log.error("Failed to decode context from HttpHeaders", e)
- }
-
- context
- }
- }
-}
-
-
-trait TextMap {
-
- def get(key: String): Option[String]
-
- def put(key: String, value: String): Unit
-
- def values: Iterator[(String, String)]
-}
-
-object TextMap {
-
- class Default extends TextMap {
- private val storage =
- mutable.Map.empty[String, String]
-
- override def get(key: String): Option[String] =
- storage.get(key)
-
- override def put(key: String, value: String): Unit =
- storage.put(key, value)
-
- override def values: Iterator[(String, String)] =
- storage.toIterator
- }
-
- object Default {
- def apply(): Default =
- new Default()
- }
-} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/context/Codecs.scala b/kamon-core/src/main/scala/kamon/context/Codecs.scala
new file mode 100644
index 00000000..b50e991d
--- /dev/null
+++ b/kamon-core/src/main/scala/kamon/context/Codecs.scala
@@ -0,0 +1,245 @@
+/* =========================================================================================
+ * Copyright © 2013-2017 the kamon project <http://kamon.io/>
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ * =========================================================================================
+ */
+
+package kamon
+package context
+
+import java.nio.ByteBuffer
+
+import com.typesafe.config.Config
+import kamon.util.DynamicAccess
+import org.slf4j.LoggerFactory
+import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry}
+
+import scala.collection.mutable
+
+class Codecs(initialConfig: Config) {
+ private val log = LoggerFactory.getLogger(classOf[Codecs])
+ @volatile private var httpHeaders: Codecs.ForContext[TextMap] = new Codecs.HttpHeaders(Map.empty)
+ @volatile private var binary: Codecs.ForContext[ByteBuffer] = new Codecs.Binary(256, Map.empty)
+
+ reconfigure(initialConfig)
+
+
+ def HttpHeaders: Codecs.ForContext[TextMap] =
+ httpHeaders
+
+ def Binary: Codecs.ForContext[ByteBuffer] =
+ binary
+
+ def reconfigure(config: Config): Unit = {
+ try {
+ val codecsConfig = config.getConfig("kamon.context.codecs")
+ httpHeaders = new Codecs.HttpHeaders(readEntryCodecs("http-headers-keys", codecsConfig))
+ binary = new Codecs.Binary(codecsConfig.getBytes("binary-buffer-size"), readEntryCodecs("binary-keys", codecsConfig))
+ } catch {
+ case t: Throwable => log.error("Failed to initialize Context Codecs", t)
+ }
+ }
+
+ private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codecs.ForEntry[T]] = {
+ val rootConfig = config.getConfig(rootKey)
+ val dynamic = new DynamicAccess(getClass.getClassLoader)
+ val entries = Map.newBuilder[String, Codecs.ForEntry[T]]
+
+ rootConfig.topLevelKeys.foreach(key => {
+ try {
+ val fqcn = rootConfig.getString(key)
+ entries += ((key, dynamic.createInstanceFor[Codecs.ForEntry[T]](fqcn, Nil).get))
+ } catch {
+ case e: Throwable =>
+ log.error(s"Failed to initialize codec for key [$key]", e)
+ }
+ })
+
+ entries.result()
+ }
+}
+
+object Codecs {
+
+ trait ForContext[T] {
+ def encode(context: Context): T
+ def decode(carrier: T): Context
+ }
+
+ trait ForEntry[T] {
+ def encode(context: Context): T
+ def decode(carrier: T, context: Context): Context
+ }
+
+ final class HttpHeaders(entryCodecs: Map[String, Codecs.ForEntry[TextMap]]) extends Codecs.ForContext[TextMap] {
+ private val log = LoggerFactory.getLogger(classOf[HttpHeaders])
+
+ override def encode(context: Context): TextMap = {
+ val encoded = TextMap.Default()
+
+ context.entries.foreach {
+ case (key, _) if key.broadcast =>
+ entryCodecs.get(key.name) match {
+ case Some(codec) =>
+ try {
+ codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2))
+ } catch {
+ case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e)
+ }
+
+ case None =>
+ log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name)
+ }
+
+ case _ => // All non-broadcast keys should be ignored.
+ }
+
+ encoded
+ }
+
+ override def decode(carrier: TextMap): Context = {
+ var context: Context = Context.Empty
+
+ try {
+ context = entryCodecs.foldLeft(context)((ctx, codecEntry) => {
+ val (_, codec) = codecEntry
+ codec.decode(carrier, ctx)
+ })
+
+ } catch {
+ case e: Throwable =>
+ log.error("Failed to decode context from HttpHeaders", e)
+ }
+
+ context
+ }
+ }
+
+
+ final class Binary(bufferSize: Long, entryCodecs: Map[String, Codecs.ForEntry[ByteBuffer]]) extends Codecs.ForContext[ByteBuffer] {
+ private val log = LoggerFactory.getLogger(classOf[Binary])
+ private val binaryBuffer = newThreadLocalBuffer(bufferSize)
+ private val emptyBuffer = ByteBuffer.allocate(0)
+
+ override def encode(context: Context): ByteBuffer = {
+ val entries = context.entries
+ if(entries.isEmpty)
+ emptyBuffer
+ else {
+ var colferEntries: List[ColferEntry] = Nil
+ entries.foreach {
+ case (key, _) if key.broadcast =>
+ entryCodecs.get(key.name) match {
+ case Some(entryCodec) =>
+ try {
+ val entryData = entryCodec.encode(context)
+ if(entryData.capacity() > 0) {
+ val colferEntry = new ColferEntry()
+ colferEntry.setName(key.name)
+ colferEntry.setContent(entryData.array())
+ colferEntries = colferEntry :: colferEntries
+ }
+ } catch {
+ case throwable: Throwable =>
+ log.error(s"Failed to encode broadcast context key [${key.name}]", throwable)
+ }
+
+ case None =>
+ log.error("Failed to encode broadcast context key [{}]. No codec found.", key.name)
+ }
+
+ case _ => // All non-broadcast keys should be ignored.
+ }
+
+ if(colferEntries.isEmpty)
+ emptyBuffer
+ else {
+ val buffer = binaryBuffer.get()
+ val colferContext = new ColferContext()
+ colferContext.setEntries(colferEntries.toArray)
+ val marshalledSize = colferContext.marshal(buffer, 0)
+
+ val data = Array.ofDim[Byte](marshalledSize)
+ System.arraycopy(buffer, 0, data, 0, marshalledSize)
+ ByteBuffer.wrap(data)
+ }
+ }
+ }
+
+
+ override def decode(carrier: ByteBuffer): Context = {
+ if(carrier.capacity() == 0)
+ Context.Empty
+ else {
+ var context: Context = Context.Empty
+
+ try {
+ val colferContext = new ColferContext()
+ colferContext.unmarshal(carrier.array(), 0)
+
+ colferContext.entries.foreach(colferEntry => {
+ entryCodecs.get(colferEntry.getName()) match {
+ case Some(entryCodec) =>
+ context = entryCodec.decode(ByteBuffer.wrap(colferEntry.content), context)
+
+ case None =>
+ log.error("Failed to decode entry [{}] with Binary context codec. No entry found for the key.", colferEntry.getName())
+ }
+ })
+
+ } catch {
+ case e: Throwable =>
+ log.error("Failed to decode context from Binary", e)
+ }
+
+ context
+ }
+ }
+
+
+ private def newThreadLocalBuffer(size: Long): ThreadLocal[Array[Byte]] = new ThreadLocal[Array[Byte]] {
+ override def initialValue(): Array[Byte] = Array.ofDim[Byte](size.toInt)
+ }
+ }
+}
+
+
+trait TextMap {
+
+ def get(key: String): Option[String]
+
+ def put(key: String, value: String): Unit
+
+ def values: Iterator[(String, String)]
+}
+
+object TextMap {
+
+ class Default extends TextMap {
+ private val storage =
+ mutable.Map.empty[String, String]
+
+ override def get(key: String): Option[String] =
+ storage.get(key)
+
+ override def put(key: String, value: String): Unit =
+ storage.put(key, value)
+
+ override def values: Iterator[(String, String)] =
+ storage.toIterator
+ }
+
+ object Default {
+ def apply(): Default =
+ new Default()
+ }
+} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala
index ea28142e..3158aa73 100644
--- a/kamon-core/src/main/scala/kamon/trace/Span.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Span.scala
@@ -195,9 +195,9 @@ object Span {
object Local {
def apply(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue],
- initialMetricTags: Map[String, String], startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl,
+ initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink,
scopeSpanMetrics: Boolean): Local =
- new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry, scopeSpanMetrics)
+ new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, spanSink, scopeSpanMetrics)
}
diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
index 96317696..ae78ee67 100644
--- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
+++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala
@@ -16,15 +16,17 @@
package kamon.trace
import java.net.{URLDecoder, URLEncoder}
+import java.nio.ByteBuffer
import kamon.Kamon
-import kamon.context.{Codec, Context, TextMap}
+import kamon.context.{Codecs, Context, TextMap}
+import kamon.context.generated.binary.span.{Span => ColferSpan}
import kamon.trace.SpanContext.SamplingDecision
object SpanCodec {
- class B3 extends Codec.ForEntry[TextMap] {
+ class B3 extends Codecs.ForEntry[TextMap] {
import B3.Headers
override def encode(context: Context): TextMap = {
@@ -96,4 +98,69 @@ object SpanCodec {
val Flags = "X-B3-Flags"
}
}
+
+
+ class Colfer extends Codecs.ForEntry[ByteBuffer] {
+ val emptyBuffer = ByteBuffer.allocate(0)
+
+ override def encode(context: Context): ByteBuffer = {
+ val span = context.get(Span.ContextKey)
+ if(span.nonEmpty()) {
+ val marshalBuffer = Colfer.codecBuffer.get()
+ val colferSpan = new ColferSpan()
+ val spanContext = span.context()
+
+ colferSpan.setTraceID(spanContext.traceID.bytes)
+ colferSpan.setSpanID(spanContext.spanID.bytes)
+ colferSpan.setParentID(spanContext.parentID.bytes)
+ colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision))
+
+ val marshalledSize = colferSpan.marshal(marshalBuffer, 0)
+ val buffer = ByteBuffer.allocate(marshalledSize)
+ buffer.put(marshalBuffer, 0, marshalledSize)
+ buffer
+
+ } else emptyBuffer
+ }
+
+ override def decode(carrier: ByteBuffer, context: Context): Context = {
+ carrier.clear()
+
+ if(carrier.capacity() == 0)
+ context
+ else {
+ val identityProvider = Kamon.tracer.identityProvider
+ val colferSpan = new ColferSpan()
+ colferSpan.unmarshal(carrier.array(), 0)
+
+ val spanContext = SpanContext(
+ traceID = identityProvider.traceIdGenerator().from(colferSpan.traceID),
+ spanID = identityProvider.traceIdGenerator().from(colferSpan.spanID),
+ parentID = identityProvider.traceIdGenerator().from(colferSpan.parentID),
+ samplingDecision = byteToSamplingDecision(colferSpan.samplingDecision)
+ )
+
+ context.withKey(Span.ContextKey, Span.Remote(spanContext))
+ }
+ }
+
+
+ private def samplingDecisionToByte(samplingDecision: SamplingDecision): Byte = samplingDecision match {
+ case SamplingDecision.Sample => 1
+ case SamplingDecision.DoNotSample => 2
+ case SamplingDecision.Unknown => 3
+ }
+
+ private def byteToSamplingDecision(byte: Byte): SamplingDecision = byte match {
+ case 1 => SamplingDecision.Sample
+ case 2 => SamplingDecision.DoNotSample
+ case _ => SamplingDecision.Unknown
+ }
+ }
+
+ object Colfer {
+ private val codecBuffer = new ThreadLocal[Array[Byte]] {
+ override def initialValue(): Array[Byte] = Array.ofDim[Byte](256)
+ }
+ }
} \ No newline at end of file
diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
index f2f1918c..5f61f3aa 100644
--- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala
+++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala
@@ -16,7 +16,8 @@
package kamon.trace
import com.typesafe.config.Config
-import kamon.{Kamon, ReporterRegistryImpl}
+import kamon.ReporterRegistry.SpanSink
+import kamon.Kamon
import kamon.metric.MetricLookup
import kamon.trace.Span.TagValue
import kamon.trace.SpanContext.SamplingDecision
@@ -34,7 +35,7 @@ trait Tracer {
object Tracer {
- final class Default(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config) extends Tracer {
+ final class Default(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config) extends Tracer {
private val logger = LoggerFactory.getLogger(classOf[Tracer])
private[Tracer] val tracerMetrics = new TracerMetrics(metrics)
@@ -46,7 +47,7 @@ object Tracer {
reconfigure(initialConfig)
override def buildSpan(operationName: String): SpanBuilder =
- new SpanBuilder(operationName, this, reporterRegistry)
+ new SpanBuilder(operationName, this, spanSink)
override def identityProvider: IdentityProvider =
this._identityProvider
@@ -84,11 +85,11 @@ object Tracer {
}
object Default {
- def apply(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config): Default =
- new Default(metrics, reporterRegistry, initialConfig)
+ def apply(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config): Default =
+ new Default(metrics, spanSink, initialConfig)
}
- final class SpanBuilder(operationName: String, tracer: Tracer.Default, reporterRegistry: ReporterRegistryImpl) {
+ final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanSink: SpanSink) {
private var parentSpan: Span = _
private var startTimestamp = 0L
private var initialSpanTags = Map.empty[String, Span.TagValue]
@@ -157,7 +158,7 @@ object Tracer {
}
tracer.tracerMetrics.createdSpans.increment()
- Span.Local(spanContext, nonRemoteParent, operationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry, tracer.scopeSpanMetrics)
+ Span.Local(spanContext, nonRemoteParent, operationName, initialSpanTags, initialMetricTags, startTimestampMicros, spanSink, tracer.scopeSpanMetrics)
}
private def joinParentContext(parent: Span, samplingDecision: SamplingDecision): SpanContext =
diff --git a/kamon-testkit/src/main/resources/reference.conf b/kamon-testkit/src/main/resources/reference.conf
new file mode 100644
index 00000000..e20cf755
--- /dev/null
+++ b/kamon-testkit/src/main/resources/reference.conf
@@ -0,0 +1,13 @@
+kamon {
+
+
+ context.codecs {
+ http-headers-keys {
+ string-broadcast-key = "kamon.context.SimpleStringCodec$Headers"
+ }
+
+ binary-keys {
+ string-broadcast-key = "kamon.context.SimpleStringCodec$Binary"
+ }
+ }
+} \ No newline at end of file
diff --git a/kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala b/kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala
index 507f17ad..8fd51024 100644
--- a/kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala
+++ b/kamon-testkit/src/main/scala/kamon/testkit/ContextTesting.scala
@@ -3,9 +3,11 @@ package kamon.testkit
import kamon.context.{Context, Key}
trait ContextTesting {
- val TestLocalKey = Key.local[Option[String]]("test-local-key", None)
- val TestBroadcastKey = Key.broadcast[Option[String]]("test-local-key", None)
+ val StringKey = Key.local[Option[String]]("string-key", None)
+ val StringBroadcastKey = Key.broadcast[Option[String]]("string-broadcast-key", None)
def contextWithLocal(value: String): Context =
- Context.create(TestLocalKey, Some(value))
+ Context.create(StringKey, Some(value))
}
+
+object ContextTesting extends ContextTesting