diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2017-08-21 09:23:07 +0200 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2017-08-21 10:37:08 +0200 |
commit | a152a3098b564ed43766a857b32b7c7d7445f9ce (patch) | |
tree | 7651f61e598f316ee9dca415c5a5c67ce530bad5 /kamon-core/src | |
parent | 3cb974e5dfd381b9b28ffef9977047cf35242121 (diff) | |
download | Kamon-a152a3098b564ed43766a857b32b7c7d7445f9ce.tar.gz Kamon-a152a3098b564ed43766a857b32b7c7d7445f9ce.tar.bz2 Kamon-a152a3098b564ed43766a857b32b7c7d7445f9ce.zip |
binary encoding of context and entries
Diffstat (limited to 'kamon-core/src')
14 files changed, 1124 insertions, 429 deletions
diff --git a/kamon-core/src/main/colfer/context.colf b/kamon-core/src/main/colfer/Context.colf index 26421cba..f84d7a56 100644 --- a/kamon-core/src/main/colfer/context.colf +++ b/kamon-core/src/main/colfer/Context.colf @@ -1,4 +1,4 @@ -package kamon +package context type Entry struct { name text diff --git a/kamon-core/src/main/colfer/Span.colf b/kamon-core/src/main/colfer/Span.colf new file mode 100644 index 00000000..4edbed7b --- /dev/null +++ b/kamon-core/src/main/colfer/Span.colf @@ -0,0 +1,8 @@ +package span + +type Span struct { + traceID binary + spanID binary + parentID binary + samplingDecision uint8 +}
\ No newline at end of file diff --git a/kamon-core/src/main/colfer/building.md b/kamon-core/src/main/colfer/building.md new file mode 100644 index 00000000..f510a44f --- /dev/null +++ b/kamon-core/src/main/colfer/building.md @@ -0,0 +1,5 @@ +Just download and install the colver compiler and run this command from the colfer folder: + +``` +colfer -b ../java -p kamon/context/generated/binary java +```
\ No newline at end of file diff --git a/kamon-core/src/main/java/kamon/context/encoding/Context.java b/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java index db6ed7a9..a9917e99 100644 --- a/kamon-core/src/main/java/kamon/context/encoding/Context.java +++ b/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java @@ -1,4 +1,4 @@ -package kamon.context.encoding; +package kamon.context.generated.binary.context; // Code generated by colf(1); DO NOT EDIT. @@ -23,7 +23,7 @@ import java.nio.BufferUnderflowException; * @author generated by colf(1) * @see <a href="https://github.com/pascaldekloe/colfer">Colfer's home</a> */ -@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file context.colf") +@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file Context.colf") public class Context implements Serializable { /** The upper limit for serial byte sizes. */ @@ -188,7 +188,7 @@ public class Context implements Serializable { int x = a.length; if (x > Context.colferListMax) - throw new IllegalStateException(format("colfer: kamon/context/kamon.Context.entries length %d exceeds %d elements", x, Context.colferListMax)); + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Context.entries length %d exceeds %d elements", x, Context.colferListMax)); while (x > 0x7f) { buf[i++] = (byte) (x | 0x80); x >>>= 7; @@ -209,7 +209,7 @@ public class Context implements Serializable { return i; } catch (ArrayIndexOutOfBoundsException e) { if (i - offset > Context.colferSizeMax) - throw new IllegalStateException(format("colfer: kamon/context/kamon.Context exceeds %d bytes", Context.colferSizeMax)); + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Context exceeds %d bytes", Context.colferSizeMax)); if (i > buf.length) throw new BufferOverflowException(); throw e; } @@ -253,7 +253,7 @@ public class Context implements Serializable { if (shift == 28 || b >= 0) break; } if (length < 0 || length > Context.colferListMax) - throw new SecurityException(format("colfer: kamon/context/kamon.Context.entries length %d exceeds %d elements", length, Context.colferListMax)); + throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Context.entries length %d exceeds %d elements", length, Context.colferListMax)); Entry[] a = new Entry[length]; for (int ai = 0; ai < length; ai++) { @@ -270,7 +270,7 @@ public class Context implements Serializable { } finally { if (i > end && end - offset < Context.colferSizeMax) throw new BufferUnderflowException(); if (i < 0 || i - offset > Context.colferSizeMax) - throw new SecurityException(format("colfer: kamon/context/kamon.Context exceeds %d bytes", Context.colferSizeMax)); + throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Context exceeds %d bytes", Context.colferSizeMax)); if (i > end) throw new BufferUnderflowException(); } @@ -312,7 +312,7 @@ public class Context implements Serializable { } /** - * Gets kamon/context/kamon.Context.entries. + * Gets kamon/context/generated/binary/context.Context.entries. * @return the value. */ public Entry[] getEntries() { @@ -320,7 +320,7 @@ public class Context implements Serializable { } /** - * Sets kamon/context/kamon.Context.entries. + * Sets kamon/context/generated/binary/context.Context.entries. * @param value the replacement. */ public void setEntries(Entry[] value) { @@ -328,7 +328,7 @@ public class Context implements Serializable { } /** - * Sets kamon/context/kamon.Context.entries. + * Sets kamon/context/generated/binary/context.Context.entries. * @param value the replacement. * @return {link this}. */ diff --git a/kamon-core/src/main/java/kamon/context/encoding/Entry.java b/kamon-core/src/main/java/kamon/context/generated/binary/context/Entry.java index d7734c13..dc75b10d 100644 --- a/kamon-core/src/main/java/kamon/context/encoding/Entry.java +++ b/kamon-core/src/main/java/kamon/context/generated/binary/context/Entry.java @@ -1,4 +1,4 @@ -package kamon.context.encoding; +package kamon.context.generated.binary.context; // Code generated by colf(1); DO NOT EDIT. @@ -24,7 +24,7 @@ import java.nio.BufferUnderflowException; * @author generated by colf(1) * @see <a href="https://github.com/pascaldekloe/colfer">Colfer's home</a> */ -@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file context.colf") +@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file Context.colf") public class Entry implements Serializable { /** The upper limit for serial byte sizes. */ @@ -211,7 +211,7 @@ public class Entry implements Serializable { } int size = i - start; if (size > Entry.colferSizeMax) - throw new IllegalStateException(format("colfer: kamon/context/kamon.Entry.name size %d exceeds %d UTF-8 bytes", size, Entry.colferSizeMax)); + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Entry.name size %d exceeds %d UTF-8 bytes", size, Entry.colferSizeMax)); int ii = start - 1; if (size > 0x7f) { @@ -232,7 +232,7 @@ public class Entry implements Serializable { int size = this.content.length; if (size > Entry.colferSizeMax) - throw new IllegalStateException(format("colfer: kamon/context/kamon.Entry.content size %d exceeds %d bytes", size, Entry.colferSizeMax)); + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Entry.content size %d exceeds %d bytes", size, Entry.colferSizeMax)); int x = size; while (x > 0x7f) { @@ -250,7 +250,7 @@ public class Entry implements Serializable { return i; } catch (ArrayIndexOutOfBoundsException e) { if (i - offset > Entry.colferSizeMax) - throw new IllegalStateException(format("colfer: kamon/context/kamon.Entry exceeds %d bytes", Entry.colferSizeMax)); + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Entry exceeds %d bytes", Entry.colferSizeMax)); if (i > buf.length) throw new BufferOverflowException(); throw e; } @@ -294,7 +294,7 @@ public class Entry implements Serializable { if (shift == 28 || b >= 0) break; } if (size < 0 || size > Entry.colferSizeMax) - throw new SecurityException(format("colfer: kamon/context/kamon.Entry.name size %d exceeds %d UTF-8 bytes", size, Entry.colferSizeMax)); + throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Entry.name size %d exceeds %d UTF-8 bytes", size, Entry.colferSizeMax)); int start = i; i += size; @@ -310,7 +310,7 @@ public class Entry implements Serializable { if (shift == 28 || b >= 0) break; } if (size < 0 || size > Entry.colferSizeMax) - throw new SecurityException(format("colfer: kamon/context/kamon.Entry.content size %d exceeds %d bytes", size, Entry.colferSizeMax)); + throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Entry.content size %d exceeds %d bytes", size, Entry.colferSizeMax)); this.content = new byte[size]; int start = i; @@ -325,7 +325,7 @@ public class Entry implements Serializable { } finally { if (i > end && end - offset < Entry.colferSizeMax) throw new BufferUnderflowException(); if (i < 0 || i - offset > Entry.colferSizeMax) - throw new SecurityException(format("colfer: kamon/context/kamon.Entry exceeds %d bytes", Entry.colferSizeMax)); + throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Entry exceeds %d bytes", Entry.colferSizeMax)); if (i > end) throw new BufferUnderflowException(); } @@ -367,7 +367,7 @@ public class Entry implements Serializable { } /** - * Gets kamon/context/kamon.Entry.name. + * Gets kamon/context/generated/binary/context.Entry.name. * @return the value. */ public String getName() { @@ -375,7 +375,7 @@ public class Entry implements Serializable { } /** - * Sets kamon/context/kamon.Entry.name. + * Sets kamon/context/generated/binary/context.Entry.name. * @param value the replacement. */ public void setName(String value) { @@ -383,7 +383,7 @@ public class Entry implements Serializable { } /** - * Sets kamon/context/kamon.Entry.name. + * Sets kamon/context/generated/binary/context.Entry.name. * @param value the replacement. * @return {link this}. */ @@ -393,7 +393,7 @@ public class Entry implements Serializable { } /** - * Gets kamon/context/kamon.Entry.content. + * Gets kamon/context/generated/binary/context.Entry.content. * @return the value. */ public byte[] getContent() { @@ -401,7 +401,7 @@ public class Entry implements Serializable { } /** - * Sets kamon/context/kamon.Entry.content. + * Sets kamon/context/generated/binary/context.Entry.content. * @param value the replacement. */ public void setContent(byte[] value) { @@ -409,7 +409,7 @@ public class Entry implements Serializable { } /** - * Sets kamon/context/kamon.Entry.content. + * Sets kamon/context/generated/binary/context.Entry.content. * @param value the replacement. * @return {link this}. */ diff --git a/kamon-core/src/main/java/kamon/context/generated/binary/span/Span.java b/kamon-core/src/main/java/kamon/context/generated/binary/span/Span.java new file mode 100644 index 00000000..ea7a6fe8 --- /dev/null +++ b/kamon-core/src/main/java/kamon/context/generated/binary/span/Span.java @@ -0,0 +1,524 @@ +package kamon.context.generated.binary.span; + + +// Code generated by colf(1); DO NOT EDIT. + + +import static java.lang.String.format; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.ObjectStreamException; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.InputMismatchException; +import java.nio.BufferOverflowException; +import java.nio.BufferUnderflowException; + + +/** + * Data bean with built-in serialization support. + + * @author generated by colf(1) + * @see <a href="https://github.com/pascaldekloe/colfer">Colfer's home</a> + */ +@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file Span.colf") +public class Span implements Serializable { + + /** The upper limit for serial byte sizes. */ + public static int colferSizeMax = 16 * 1024 * 1024; + + + + + public byte[] traceID; + + public byte[] spanID; + + public byte[] parentID; + + public byte samplingDecision; + + + /** Default constructor */ + public Span() { + init(); + } + + private static final byte[] _zeroBytes = new byte[0]; + + /** Colfer zero values. */ + private void init() { + traceID = _zeroBytes; + spanID = _zeroBytes; + parentID = _zeroBytes; + } + + /** + * {@link #reset(InputStream) Reusable} deserialization of Colfer streams. + */ + public static class Unmarshaller { + + /** The data source. */ + protected InputStream in; + + /** The read buffer. */ + public byte[] buf; + + /** The {@link #buf buffer}'s data start index, inclusive. */ + protected int offset; + + /** The {@link #buf buffer}'s data end index, exclusive. */ + protected int i; + + + /** + * @param in the data source or {@code null}. + * @param buf the initial buffer or {@code null}. + */ + public Unmarshaller(InputStream in, byte[] buf) { + // TODO: better size estimation + if (buf == null || buf.length == 0) + buf = new byte[Math.min(Span.colferSizeMax, 2048)]; + this.buf = buf; + reset(in); + } + + /** + * Reuses the marshaller. + * @param in the data source or {@code null}. + * @throws IllegalStateException on pending data. + */ + public void reset(InputStream in) { + if (this.i != this.offset) throw new IllegalStateException("colfer: pending data"); + this.in = in; + this.offset = 0; + this.i = 0; + } + + /** + * Deserializes the following object. + * @return the result or {@code null} when EOF. + * @throws IOException from the input stream. + * @throws SecurityException on an upper limit breach defined by {@link #colferSizeMax}. + * @throws InputMismatchException when the data does not match this object's schema. + */ + public Span next() throws IOException { + if (in == null) return null; + + while (true) { + if (this.i > this.offset) { + try { + Span o = new Span(); + this.offset = o.unmarshal(this.buf, this.offset, this.i); + return o; + } catch (BufferUnderflowException e) { + } + } + // not enough data + + if (this.i <= this.offset) { + this.offset = 0; + this.i = 0; + } else if (i == buf.length) { + byte[] src = this.buf; + // TODO: better size estimation + if (offset == 0) this.buf = new byte[Math.min(Span.colferSizeMax, this.buf.length * 4)]; + System.arraycopy(src, this.offset, this.buf, 0, this.i - this.offset); + this.i -= this.offset; + this.offset = 0; + } + assert this.i < this.buf.length; + + int n = in.read(buf, i, buf.length - i); + if (n < 0) { + if (this.i > this.offset) + throw new InputMismatchException("colfer: pending data with EOF"); + return null; + } + assert n > 0; + i += n; + } + } + + } + + + /** + * Serializes the object. + * @param out the data destination. + * @param buf the initial buffer or {@code null}. + * @return the final buffer. When the serial fits into {@code buf} then the return is {@code buf}. + * Otherwise the return is a new buffer, large enough to hold the whole serial. + * @throws IOException from {@code out}. + * @throws IllegalStateException on an upper limit breach defined by {@link #colferSizeMax}. + */ + public byte[] marshal(OutputStream out, byte[] buf) throws IOException { + // TODO: better size estimation + if (buf == null || buf.length == 0) + buf = new byte[Math.min(Span.colferSizeMax, 2048)]; + + while (true) { + int i; + try { + i = marshal(buf, 0); + } catch (BufferOverflowException e) { + buf = new byte[Math.min(Span.colferSizeMax, buf.length * 4)]; + continue; + } + + out.write(buf, 0, i); + return buf; + } + } + + /** + * Serializes the object. + * @param buf the data destination. + * @param offset the initial index for {@code buf}, inclusive. + * @return the final index for {@code buf}, exclusive. + * @throws BufferOverflowException when {@code buf} is too small. + * @throws IllegalStateException on an upper limit breach defined by {@link #colferSizeMax}. + */ + public int marshal(byte[] buf, int offset) { + int i = offset; + + try { + if (this.traceID.length != 0) { + buf[i++] = (byte) 0; + + int size = this.traceID.length; + if (size > Span.colferSizeMax) + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/span.Span.traceID size %d exceeds %d bytes", size, Span.colferSizeMax)); + + int x = size; + while (x > 0x7f) { + buf[i++] = (byte) (x | 0x80); + x >>>= 7; + } + buf[i++] = (byte) x; + + int start = i; + i += size; + System.arraycopy(this.traceID, 0, buf, start, size); + } + + if (this.spanID.length != 0) { + buf[i++] = (byte) 1; + + int size = this.spanID.length; + if (size > Span.colferSizeMax) + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/span.Span.spanID size %d exceeds %d bytes", size, Span.colferSizeMax)); + + int x = size; + while (x > 0x7f) { + buf[i++] = (byte) (x | 0x80); + x >>>= 7; + } + buf[i++] = (byte) x; + + int start = i; + i += size; + System.arraycopy(this.spanID, 0, buf, start, size); + } + + if (this.parentID.length != 0) { + buf[i++] = (byte) 2; + + int size = this.parentID.length; + if (size > Span.colferSizeMax) + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/span.Span.parentID size %d exceeds %d bytes", size, Span.colferSizeMax)); + + int x = size; + while (x > 0x7f) { + buf[i++] = (byte) (x | 0x80); + x >>>= 7; + } + buf[i++] = (byte) x; + + int start = i; + i += size; + System.arraycopy(this.parentID, 0, buf, start, size); + } + + if (this.samplingDecision != 0) { + buf[i++] = (byte) 3; + buf[i++] = this.samplingDecision; + } + + buf[i++] = (byte) 0x7f; + return i; + } catch (ArrayIndexOutOfBoundsException e) { + if (i - offset > Span.colferSizeMax) + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/span.Span exceeds %d bytes", Span.colferSizeMax)); + if (i > buf.length) throw new BufferOverflowException(); + throw e; + } + } + + /** + * Deserializes the object. + * @param buf the data source. + * @param offset the initial index for {@code buf}, inclusive. + * @return the final index for {@code buf}, exclusive. + * @throws BufferUnderflowException when {@code buf} is incomplete. (EOF) + * @throws SecurityException on an upper limit breach defined by {@link #colferSizeMax}. + * @throws InputMismatchException when the data does not match this object's schema. + */ + public int unmarshal(byte[] buf, int offset) { + return unmarshal(buf, offset, buf.length); + } + + /** + * Deserializes the object. + * @param buf the data source. + * @param offset the initial index for {@code buf}, inclusive. + * @param end the index limit for {@code buf}, exclusive. + * @return the final index for {@code buf}, exclusive. + * @throws BufferUnderflowException when {@code buf} is incomplete. (EOF) + * @throws SecurityException on an upper limit breach defined by {@link #colferSizeMax}. + * @throws InputMismatchException when the data does not match this object's schema. + */ + public int unmarshal(byte[] buf, int offset, int end) { + if (end > buf.length) end = buf.length; + int i = offset; + + try { + byte header = buf[i++]; + + if (header == (byte) 0) { + int size = 0; + for (int shift = 0; true; shift += 7) { + byte b = buf[i++]; + size |= (b & 0x7f) << shift; + if (shift == 28 || b >= 0) break; + } + if (size < 0 || size > Span.colferSizeMax) + throw new SecurityException(format("colfer: kamon/context/generated/binary/span.Span.traceID size %d exceeds %d bytes", size, Span.colferSizeMax)); + + this.traceID = new byte[size]; + int start = i; + i += size; + System.arraycopy(buf, start, this.traceID, 0, size); + + header = buf[i++]; + } + + if (header == (byte) 1) { + int size = 0; + for (int shift = 0; true; shift += 7) { + byte b = buf[i++]; + size |= (b & 0x7f) << shift; + if (shift == 28 || b >= 0) break; + } + if (size < 0 || size > Span.colferSizeMax) + throw new SecurityException(format("colfer: kamon/context/generated/binary/span.Span.spanID size %d exceeds %d bytes", size, Span.colferSizeMax)); + + this.spanID = new byte[size]; + int start = i; + i += size; + System.arraycopy(buf, start, this.spanID, 0, size); + + header = buf[i++]; + } + + if (header == (byte) 2) { + int size = 0; + for (int shift = 0; true; shift += 7) { + byte b = buf[i++]; + size |= (b & 0x7f) << shift; + if (shift == 28 || b >= 0) break; + } + if (size < 0 || size > Span.colferSizeMax) + throw new SecurityException(format("colfer: kamon/context/generated/binary/span.Span.parentID size %d exceeds %d bytes", size, Span.colferSizeMax)); + + this.parentID = new byte[size]; + int start = i; + i += size; + System.arraycopy(buf, start, this.parentID, 0, size); + + header = buf[i++]; + } + + if (header == (byte) 3) { + this.samplingDecision = buf[i++]; + header = buf[i++]; + } + + if (header != (byte) 0x7f) + throw new InputMismatchException(format("colfer: unknown header at byte %d", i - 1)); + } finally { + if (i > end && end - offset < Span.colferSizeMax) throw new BufferUnderflowException(); + if (i < 0 || i - offset > Span.colferSizeMax) + throw new SecurityException(format("colfer: kamon/context/generated/binary/span.Span exceeds %d bytes", Span.colferSizeMax)); + if (i > end) throw new BufferUnderflowException(); + } + + return i; + } + + // {@link Serializable} version number. + private static final long serialVersionUID = 4L; + + // {@link Serializable} Colfer extension. + private void writeObject(ObjectOutputStream out) throws IOException { + // TODO: better size estimation + byte[] buf = new byte[1024]; + int n; + while (true) try { + n = marshal(buf, 0); + break; + } catch (BufferUnderflowException e) { + buf = new byte[4 * buf.length]; + } + + out.writeInt(n); + out.write(buf, 0, n); + } + + // {@link Serializable} Colfer extension. + private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException { + init(); + + int n = in.readInt(); + byte[] buf = new byte[n]; + in.readFully(buf); + unmarshal(buf, 0); + } + + // {@link Serializable} Colfer extension. + private void readObjectNoData() throws ObjectStreamException { + init(); + } + + /** + * Gets kamon/context/generated/binary/span.Span.traceID. + * @return the value. + */ + public byte[] getTraceID() { + return this.traceID; + } + + /** + * Sets kamon/context/generated/binary/span.Span.traceID. + * @param value the replacement. + */ + public void setTraceID(byte[] value) { + this.traceID = value; + } + + /** + * Sets kamon/context/generated/binary/span.Span.traceID. + * @param value the replacement. + * @return {link this}. + */ + public Span withTraceID(byte[] value) { + this.traceID = value; + return this; + } + + /** + * Gets kamon/context/generated/binary/span.Span.spanID. + * @return the value. + */ + public byte[] getSpanID() { + return this.spanID; + } + + /** + * Sets kamon/context/generated/binary/span.Span.spanID. + * @param value the replacement. + */ + public void setSpanID(byte[] value) { + this.spanID = value; + } + + /** + * Sets kamon/context/generated/binary/span.Span.spanID. + * @param value the replacement. + * @return {link this}. + */ + public Span withSpanID(byte[] value) { + this.spanID = value; + return this; + } + + /** + * Gets kamon/context/generated/binary/span.Span.parentID. + * @return the value. + */ + public byte[] getParentID() { + return this.parentID; + } + + /** + * Sets kamon/context/generated/binary/span.Span.parentID. + * @param value the replacement. + */ + public void setParentID(byte[] value) { + this.parentID = value; + } + + /** + * Sets kamon/context/generated/binary/span.Span.parentID. + * @param value the replacement. + * @return {link this}. + */ + public Span withParentID(byte[] value) { + this.parentID = value; + return this; + } + + /** + * Gets kamon/context/generated/binary/span.Span.samplingDecision. + * @return the value. + */ + public byte getSamplingDecision() { + return this.samplingDecision; + } + + /** + * Sets kamon/context/generated/binary/span.Span.samplingDecision. + * @param value the replacement. + */ + public void setSamplingDecision(byte value) { + this.samplingDecision = value; + } + + /** + * Sets kamon/context/generated/binary/span.Span.samplingDecision. + * @param value the replacement. + * @return {link this}. + */ + public Span withSamplingDecision(byte value) { + this.samplingDecision = value; + return this; + } + + @Override + public final int hashCode() { + int h = 1; + for (byte b : this.traceID) h = 31 * h + b; + for (byte b : this.spanID) h = 31 * h + b; + for (byte b : this.parentID) h = 31 * h + b; + h = 31 * h + (this.samplingDecision & 0xff); + return h; + } + + @Override + public final boolean equals(Object o) { + return o instanceof Span && equals((Span) o); + } + + public final boolean equals(Span o) { + if (o == null) return false; + if (o == this) return true; + return o.getClass() == Span.class + && java.util.Arrays.equals(this.traceID, o.traceID) + && java.util.Arrays.equals(this.spanID, o.spanID) + && java.util.Arrays.equals(this.parentID, o.parentID) + && this.samplingDecision == o.samplingDecision; + } + +} diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 5b885fd0..e0002194 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -123,39 +123,27 @@ kamon { # with the name of the operation on the parent Span, if any. scope-spans-to-parent = yes } - - # The SpanContextCodecs are used to encode/decode the SpanContext data into simple TextMaps, HTTP Headers or Binary - # carriers. The decision about which one to use is based on the kamon.trace.SpanContextCodec.Format instance passed - # to inject/extract calls. - # - # Any external implementation can be configured here, as long as it can be instantiated with a single parameter - # constructor that accepts a IdentityProvider. - span-context-codec { - - # Encodes/Decodes the SpanContext data using a simple key/value pair. Since this is very rarely going to be used - # we default to using the same codec for HTTP Headers, as it is built on top of a TextMap. - text-map = "kamon.trace.SpanContextCodec$ExtendedB3" - - # Encodes/Decodes the SpanContext into a TextMap with HTTP Header friendly, URLEncoded values. The default - # implementation follows the guidelines of B3 propagation. See more at https://github.com/openzipkin/b3-propagation. - http-headers = "kamon.trace.SpanContextCodec$ExtendedB3" - - # Encodes/Decodes the SpanContext using a binary representation. - binary = "TODO" - } - - } context { - encoding { - http-headers { + # Codecs are used to encode/decode Context keys when a Context must be propagated either through HTTP headers or + # Binary transports. Only broadcast keys configured bellow will be processed by the context Codec. The FQCN of + # the appropriate Codecs for each key must be provided, otherwise keys will be ignored. + # + codecs { + + # Size of the encoding buffer for the Binary Codec. + binary-buffer-size = 256 + + # Codecs to be used when propagating a Context through a HTTP Headers transport. + http-headers-keys { span = "kamon.trace.SpanCodec$B3" } - binary { - # span = "kamon.trace.propagation.Binary" + # Codecs to be used when propagating a Context through a Binary transport. + binary-keys { + span = "kamon.trace.SpanCodec$Colfer" } } } diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 2c0561e2..f251b1ec 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -24,7 +24,7 @@ import scala.concurrent.Future import java.time.Duration import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor} -import kamon.context.{Codec, Context, Storage} +import kamon.context.{Codecs, Context, Storage} import org.slf4j.LoggerFactory import scala.util.Try @@ -39,10 +39,10 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler")) private val _metrics = new MetricRegistry(_config, _scheduler) - private val _reporters = new ReporterRegistryImpl(_metrics, _config) - private val _tracer = Tracer.Default(Kamon, _reporters, _config) + private val _reporterRegistry = new ReporterRegistry.Default(_metrics, _config) + private val _tracer = Tracer.Default(Kamon, _reporterRegistry, _config) private val _contextStorage = Storage.ThreadLocal() - private val _contextCodec = new Codec(_config) + private val _contextCodec = new Codecs(_config) private var _onReconfigureHooks = Seq.empty[OnReconfigureHook] def environment: Environment = @@ -56,7 +56,7 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { _environment = Environment.fromConfig(config) _filters = Filters.fromConfig(config) _metrics.reconfigure(config) - _reporters.reconfigure(config) + _reporterRegistry.reconfigure(config) _tracer.reconfigure(config) _contextCodec.reconfigure(config) @@ -100,7 +100,7 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { override def identityProvider: IdentityProvider = _tracer.identityProvider - def contextCodec(): Codec = + def contextCodec(): Codecs = _contextCodec def currentContext(): Context = @@ -120,22 +120,22 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { override def loadReportersFromConfig(): Unit = - _reporters.loadReportersFromConfig() + _reporterRegistry.loadReportersFromConfig() override def addReporter(reporter: MetricReporter): Registration = - _reporters.addReporter(reporter) + _reporterRegistry.addReporter(reporter) override def addReporter(reporter: MetricReporter, name: String): Registration = - _reporters.addReporter(reporter, name) + _reporterRegistry.addReporter(reporter, name) override def addReporter(reporter: SpanReporter): Registration = - _reporters.addReporter(reporter) + _reporterRegistry.addReporter(reporter) override def addReporter(reporter: SpanReporter, name: String): Registration = - _reporters.addReporter(reporter, name) + _reporterRegistry.addReporter(reporter, name) override def stopAllReporters(): Future[Unit] = - _reporters.stopAllReporters() + _reporterRegistry.stopAllReporters() def filter(filterName: String, pattern: String): Boolean = _filters.accept(filterName, pattern) diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index f0d744e5..ff135f60 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -33,6 +33,20 @@ import scala.util.control.NonFatal import scala.collection.JavaConverters._ import scala.collection.concurrent.TrieMap +sealed trait Reporter { + def start(): Unit + def stop(): Unit + def reconfigure(config: Config): Unit +} + +trait MetricReporter extends Reporter { + def reportTickSnapshot(snapshot: TickSnapshot): Unit +} + +trait SpanReporter extends Reporter { + def reportSpans(spans: Seq[Span.FinishedSpan]): Unit +} + trait ReporterRegistry { def loadReportersFromConfig(): Unit @@ -48,274 +62,261 @@ object ReporterRegistry { private[kamon] trait SpanSink { def reportSpan(finishedSpan: FinishedSpan): Unit } -} - -sealed trait Reporter { - def start(): Unit - def stop(): Unit - def reconfigure(config: Config): Unit -} -trait MetricReporter extends Reporter { - def reportTickSnapshot(snapshot: TickSnapshot): Unit -} - -trait SpanReporter extends Reporter { - def reportSpans(spans: Seq[Span.FinishedSpan]): Unit -} - -class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry with SpanSink { - private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry]) - private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry")) - private val reporterCounter = new AtomicLong(0L) - private var registryConfiguration = readRegistryConfiguration(initialConfig) - - private val metricReporters = TrieMap[Long, MetricReporterEntry]() - private val metricReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() - private val spanReporters = TrieMap[Long, SpanReporterEntry]() - private val spanReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() - - - reconfigure(initialConfig) - - override def loadReportersFromConfig(): Unit = { - if(registryConfiguration.configuredReporters.isEmpty) - logger.info("The kamon.reporters setting is empty, no reporters have been started.") - else { - registryConfiguration.configuredReporters.foreach { reporterFQCN => - val dynamicAccess = new DynamicAccess(getClass.getClassLoader) - dynamicAccess.createInstanceFor[Reporter](reporterFQCN, Nil).map({ - case mr: MetricReporter => - addMetricReporter(mr, "loaded-from-config: " + reporterFQCN) - logger.info("Loaded metric reporter [{}]", reporterFQCN) - - case sr: SpanReporter => - addSpanReporter(sr, "loaded-from-config: " + reporterFQCN) - logger.info("Loaded span reporter [{}]", reporterFQCN) - - }).failed.foreach { - t => logger.error(s"Failed to load configured reporter [$reporterFQCN]", t) + private[kamon] class Default(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry with SpanSink { + private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry]) + private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry")) + private val reporterCounter = new AtomicLong(0L) + private var registryConfiguration = readRegistryConfiguration(initialConfig) + + private val metricReporters = TrieMap[Long, MetricReporterEntry]() + private val metricReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() + private val spanReporters = TrieMap[Long, SpanReporterEntry]() + private val spanReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() + + + reconfigure(initialConfig) + + override def loadReportersFromConfig(): Unit = { + if(registryConfiguration.configuredReporters.isEmpty) + logger.info("The kamon.reporters setting is empty, no reporters have been started.") + else { + registryConfiguration.configuredReporters.foreach { reporterFQCN => + val dynamicAccess = new DynamicAccess(getClass.getClassLoader) + dynamicAccess.createInstanceFor[Reporter](reporterFQCN, Nil).map({ + case mr: MetricReporter => + addMetricReporter(mr, "loaded-from-config: " + reporterFQCN) + logger.info("Loaded metric reporter [{}]", reporterFQCN) + + case sr: SpanReporter => + addSpanReporter(sr, "loaded-from-config: " + reporterFQCN) + logger.info("Loaded span reporter [{}]", reporterFQCN) + + }).failed.foreach { + t => logger.error(s"Failed to load configured reporter [$reporterFQCN]", t) + } } } } - } - override def addReporter(reporter: MetricReporter): Registration = - addMetricReporter(reporter, reporter.getClass.getName()) + override def addReporter(reporter: MetricReporter): Registration = + addMetricReporter(reporter, reporter.getClass.getName()) - override def addReporter(reporter: MetricReporter, name: String): Registration = - addMetricReporter(reporter, name) + override def addReporter(reporter: MetricReporter, name: String): Registration = + addMetricReporter(reporter, name) - override def addReporter(reporter: SpanReporter): Registration = - addSpanReporter(reporter, reporter.getClass.getName()) + override def addReporter(reporter: SpanReporter): Registration = + addSpanReporter(reporter, reporter.getClass.getName()) - override def addReporter(reporter: SpanReporter, name: String): Registration = - addSpanReporter(reporter, name) + override def addReporter(reporter: SpanReporter, name: String): Registration = + addSpanReporter(reporter, name) - private def addMetricReporter(reporter: MetricReporter, name: String): Registration = synchronized { - val executor = Executors.newSingleThreadExecutor(threadFactory(name)) - val reporterEntry = new MetricReporterEntry( - id = reporterCounter.getAndIncrement(), - name = name, - reporter = reporter, - executionContext = ExecutionContext.fromExecutorService(executor) - ) + private def addMetricReporter(reporter: MetricReporter, name: String): Registration = synchronized { + val executor = Executors.newSingleThreadExecutor(threadFactory(name)) + val reporterEntry = new MetricReporterEntry( + id = reporterCounter.getAndIncrement(), + name = name, + reporter = reporter, + executionContext = ExecutionContext.fromExecutorService(executor) + ) - Future(reporterEntry.reporter.start())(reporterEntry.executionContext) + Future(reporterEntry.reporter.start())(reporterEntry.executionContext) - if(metricReporters.isEmpty) - reStartMetricTicker() + if(metricReporters.isEmpty) + reStartMetricTicker() - metricReporters.put(reporterEntry.id, reporterEntry) - createRegistration(reporterEntry.id, metricReporters) + metricReporters.put(reporterEntry.id, reporterEntry) + createRegistration(reporterEntry.id, metricReporters) - } + } - private def addSpanReporter(reporter: SpanReporter, name: String): Registration = synchronized { - val executor = Executors.newSingleThreadExecutor(threadFactory(name)) - val reporterEntry = new SpanReporterEntry( - id = reporterCounter.incrementAndGet(), - name = name, - reporter = reporter, - bufferCapacity = registryConfiguration.traceReporterQueueSize, - executionContext = ExecutionContext.fromExecutorService(executor) - ) + private def addSpanReporter(reporter: SpanReporter, name: String): Registration = synchronized { + val executor = Executors.newSingleThreadExecutor(threadFactory(name)) + val reporterEntry = new SpanReporterEntry( + id = reporterCounter.incrementAndGet(), + name = name, + reporter = reporter, + bufferCapacity = registryConfiguration.traceReporterQueueSize, + executionContext = ExecutionContext.fromExecutorService(executor) + ) - Future(reporterEntry.reporter.start())(reporterEntry.executionContext) + Future(reporterEntry.reporter.start())(reporterEntry.executionContext) - if(spanReporters.isEmpty) - reStartTraceTicker() + if(spanReporters.isEmpty) + reStartTraceTicker() - spanReporters.put(reporterEntry.id, reporterEntry) - createRegistration(reporterEntry.id, spanReporters) - } + spanReporters.put(reporterEntry.id, reporterEntry) + createRegistration(reporterEntry.id, spanReporters) + } - private def createRegistration(id: Long, target: TrieMap[Long, _]): Registration = new Registration { - override def cancel(): Boolean = - target.remove(id).nonEmpty - } + private def createRegistration(id: Long, target: TrieMap[Long, _]): Registration = new Registration { + override def cancel(): Boolean = + target.remove(id).nonEmpty + } - override def stopAllReporters(): Future[Unit] = { - implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext) - val reporterStopFutures = Vector.newBuilder[Future[Unit]] + override def stopAllReporters(): Future[Unit] = { + implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext) + val reporterStopFutures = Vector.newBuilder[Future[Unit]] - while(metricReporters.nonEmpty) { - val (idToRemove, _) = metricReporters.head - metricReporters.remove(idToRemove).foreach { entry => - reporterStopFutures += stopMetricReporter(entry) + while(metricReporters.nonEmpty) { + val (idToRemove, _) = metricReporters.head + metricReporters.remove(idToRemove).foreach { entry => + reporterStopFutures += stopMetricReporter(entry) + } } - } - while(spanReporters.nonEmpty) { - val (idToRemove, _) = spanReporters.head - spanReporters.remove(idToRemove).foreach { entry => - reporterStopFutures += stopSpanReporter(entry) + while(spanReporters.nonEmpty) { + val (idToRemove, _) = spanReporters.head + spanReporters.remove(idToRemove).foreach { entry => + reporterStopFutures += stopSpanReporter(entry) + } } - } - Future.sequence(reporterStopFutures.result()).map(_ => Try((): Unit)) - } + Future.sequence(reporterStopFutures.result()).map(_ => Try((): Unit)) + } - private[kamon] def reconfigure(config: Config): Unit = synchronized { - val newConfig = readRegistryConfiguration(config) + private[kamon] def reconfigure(config: Config): Unit = synchronized { + val newConfig = readRegistryConfiguration(config) - if(newConfig.metricTickInterval != registryConfiguration.metricTickInterval && metricReporters.nonEmpty) - reStartMetricTicker() + if(newConfig.metricTickInterval != registryConfiguration.metricTickInterval && metricReporters.nonEmpty) + reStartMetricTicker() - if(newConfig.traceTickInterval != registryConfiguration.metricTickInterval && spanReporters.nonEmpty) - reStartTraceTicker() + if(newConfig.traceTickInterval != registryConfiguration.metricTickInterval && spanReporters.nonEmpty) + reStartTraceTicker() - // Reconfigure all registered reporters - metricReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) } - spanReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) } - registryConfiguration = newConfig - } + // Reconfigure all registered reporters + metricReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) } + spanReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) } + registryConfiguration = newConfig + } - private def reStartMetricTicker(): Unit = { - val tickIntervalMillis = registryConfiguration.metricTickInterval.toMillis - val currentMetricTicker = metricReporterTickerSchedule.get() + private def reStartMetricTicker(): Unit = { + val tickIntervalMillis = registryConfiguration.metricTickInterval.toMillis + val currentMetricTicker = metricReporterTickerSchedule.get() - if(currentMetricTicker != null) - currentMetricTicker.cancel(false) + if(currentMetricTicker != null) + currentMetricTicker.cancel(false) - metricReporterTickerSchedule.set { - registryExecutionContext.scheduleAtFixedRate( - new MetricReporterTicker(metrics, metricReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS - ) + metricReporterTickerSchedule.set { + registryExecutionContext.scheduleAtFixedRate( + new MetricReporterTicker(metrics, metricReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS + ) + } } - } - private def reStartTraceTicker(): Unit = { - val tickIntervalMillis = registryConfiguration.traceTickInterval.toMillis - val currentSpanTicker = spanReporterTickerSchedule.get() - if(currentSpanTicker != null) - currentSpanTicker.cancel(false) + private def reStartTraceTicker(): Unit = { + val tickIntervalMillis = registryConfiguration.traceTickInterval.toMillis + val currentSpanTicker = spanReporterTickerSchedule.get() + if(currentSpanTicker != null) + currentSpanTicker.cancel(false) - spanReporterTickerSchedule.set { - registryExecutionContext.scheduleAtFixedRate( - new SpanReporterTicker(spanReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS - ) + spanReporterTickerSchedule.set { + registryExecutionContext.scheduleAtFixedRate( + new SpanReporterTicker(spanReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS + ) + } } - } - def reportSpan(span: Span.FinishedSpan): Unit = { - spanReporters.foreach { case (_, reporterEntry) => - if(reporterEntry.isActive) - reporterEntry.buffer.offer(span) + def reportSpan(span: Span.FinishedSpan): Unit = { + spanReporters.foreach { case (_, reporterEntry) => + if(reporterEntry.isActive) + reporterEntry.buffer.offer(span) + } } - } - private def stopMetricReporter(entry: MetricReporterEntry): Future[Unit] = { - entry.isActive = false + private def stopMetricReporter(entry: MetricReporterEntry): Future[Unit] = { + entry.isActive = false - Future(entry.reporter.stop())(entry.executionContext).andThen { - case _ => entry.executionContext.shutdown() - }(ExecutionContext.fromExecutor(registryExecutionContext)) - } + Future(entry.reporter.stop())(entry.executionContext).andThen { + case _ => entry.executionContext.shutdown() + }(ExecutionContext.fromExecutor(registryExecutionContext)) + } - private def stopSpanReporter(entry: SpanReporterEntry): Future[Unit] = { - entry.isActive = false + private def stopSpanReporter(entry: SpanReporterEntry): Future[Unit] = { + entry.isActive = false - Future(entry.reporter.stop())(entry.executionContext).andThen { - case _ => entry.executionContext.shutdown() - }(ExecutionContext.fromExecutor(registryExecutionContext)) - } + Future(entry.reporter.stop())(entry.executionContext).andThen { + case _ => entry.executionContext.shutdown() + }(ExecutionContext.fromExecutor(registryExecutionContext)) + } - private class MetricReporterEntry( - @volatile var isActive: Boolean = true, - val id: Long, - val name: String, - val reporter: MetricReporter, - val executionContext: ExecutionContextExecutorService - ) - - private class SpanReporterEntry( - @volatile var isActive: Boolean = true, - val id: Long, - val name: String, - val reporter: SpanReporter, - val bufferCapacity: Int, - val executionContext: ExecutionContextExecutorService - ) { - val buffer = new ArrayBlockingQueue[Span.FinishedSpan](bufferCapacity) - } + private class MetricReporterEntry( + @volatile var isActive: Boolean = true, + val id: Long, + val name: String, + val reporter: MetricReporter, + val executionContext: ExecutionContextExecutorService + ) - private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry]) extends Runnable { - val logger = LoggerFactory.getLogger(classOf[MetricReporterTicker]) - var lastTick = System.currentTimeMillis() + private class SpanReporterEntry( + @volatile var isActive: Boolean = true, + val id: Long, + val name: String, + val reporter: SpanReporter, + val bufferCapacity: Int, + val executionContext: ExecutionContextExecutorService + ) { + val buffer = new ArrayBlockingQueue[Span.FinishedSpan](bufferCapacity) + } - def run(): Unit = try { - val currentTick = System.currentTimeMillis() - val tickSnapshot = TickSnapshot( - interval = Interval(lastTick, currentTick), - metrics = snapshotGenerator.snapshot() - ) + private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry]) extends Runnable { + val logger = LoggerFactory.getLogger(classOf[MetricReporterTicker]) + var lastTick = System.currentTimeMillis() - reporterEntries.foreach { case (_, entry) => - Future { - Try { - if (entry.isActive) - entry.reporter.reportTickSnapshot(tickSnapshot) + def run(): Unit = try { + val currentTick = System.currentTimeMillis() + val tickSnapshot = TickSnapshot( + interval = Interval(lastTick, currentTick), + metrics = snapshotGenerator.snapshot() + ) - }.failed.foreach { error => - logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error) - } + reporterEntries.foreach { case (_, entry) => + Future { + Try { + if (entry.isActive) + entry.reporter.reportTickSnapshot(tickSnapshot) - }(entry.executionContext) - } + }.failed.foreach { error => + logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error) + } - lastTick = currentTick + }(entry.executionContext) + } - } catch { - case NonFatal(t) => logger.error("Error while running a tick", t) + lastTick = currentTick + + } catch { + case NonFatal(t) => logger.error("Error while running a tick", t) + } } - } - private class SpanReporterTicker(spanReporters: TrieMap[Long, SpanReporterEntry]) extends Runnable { - override def run(): Unit = { - spanReporters.foreach { - case (_, entry) => + private class SpanReporterTicker(spanReporters: TrieMap[Long, SpanReporterEntry]) extends Runnable { + override def run(): Unit = { + spanReporters.foreach { + case (_, entry) => - val spanBatch = new java.util.ArrayList[Span.FinishedSpan](entry.bufferCapacity) - entry.buffer.drainTo(spanBatch, entry.bufferCapacity) + val spanBatch = new java.util.ArrayList[Span.FinishedSpan](entry.bufferCapacity) + entry.buffer.drainTo(spanBatch, entry.bufferCapacity) - Future { - entry.reporter.reportSpans(spanBatch.asScala) - }(entry.executionContext) + Future { + entry.reporter.reportSpans(spanBatch.asScala) + }(entry.executionContext) + } } } - } - private def readRegistryConfiguration(config: Config): Configuration = - Configuration( - metricTickInterval = config.getDuration("kamon.metric.tick-interval"), - traceTickInterval = config.getDuration("kamon.trace.tick-interval"), - traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size"), - configuredReporters = config.getStringList("kamon.reporters").asScala - ) + private def readRegistryConfiguration(config: Config): Configuration = + Configuration( + metricTickInterval = config.getDuration("kamon.metric.tick-interval"), + traceTickInterval = config.getDuration("kamon.trace.tick-interval"), + traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size"), + configuredReporters = config.getStringList("kamon.reporters").asScala + ) + + private case class Configuration(metricTickInterval: Duration, traceTickInterval: Duration, + traceReporterQueueSize: Int, configuredReporters: Seq[String]) + } +} - private case class Configuration(metricTickInterval: Duration, traceTickInterval: Duration, - traceReporterQueueSize: Int, configuredReporters: Seq[String]) -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/context/Codec.scala b/kamon-core/src/main/scala/kamon/context/Codec.scala deleted file mode 100644 index 19ec2a20..00000000 --- a/kamon-core/src/main/scala/kamon/context/Codec.scala +++ /dev/null @@ -1,144 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon -package context - -import com.typesafe.config.Config -import kamon.util.DynamicAccess -import org.slf4j.LoggerFactory - -import scala.collection.mutable - -class Codec(initialConfig: Config) { - private val log = LoggerFactory.getLogger(classOf[Codec]) - - @volatile private var httpHeaders: Codec.ForContext[TextMap] = new Codec.HttpHeaders(Map.empty) - //val Binary: Codec.ForContext[ByteBuffer] = _ - reconfigure(initialConfig) - - - def HttpHeaders: Codec.ForContext[TextMap] = - httpHeaders - - def reconfigure(config: Config): Unit = { - httpHeaders = new Codec.HttpHeaders(readEntryCodecs("kamon.context.encoding.http-headers", config)) - } - - private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codec.ForEntry[T]] = { - val rootConfig = config.getConfig(rootKey) - val dynamic = new DynamicAccess(getClass.getClassLoader) - val entries = Map.newBuilder[String, Codec.ForEntry[T]] - - rootConfig.topLevelKeys.foreach(key => { - try { - val fqcn = rootConfig.getString(key) - entries += ((key, dynamic.createInstanceFor[Codec.ForEntry[T]](fqcn, Nil).get)) - } catch { - case e: Throwable => - log.error(s"Failed to initialize codec for key [$key]", e) - } - }) - - entries.result() - } -} - -object Codec { - - trait ForContext[T] { - def encode(context: Context): T - def decode(carrier: T): Context - } - - trait ForEntry[T] { - def encode(context: Context): T - def decode(carrier: T, context: Context): Context - } - - final class HttpHeaders(entryCodecs: Map[String, Codec.ForEntry[TextMap]]) extends Codec.ForContext[TextMap] { - private val log = LoggerFactory.getLogger(classOf[HttpHeaders]) - - override def encode(context: Context): TextMap = { - val encoded = TextMap.Default() - - context.entries.foreach { - case (key, _) if key.broadcast => - entryCodecs.get(key.name) match { - case Some(codec) => - try { - codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2)) - } catch { - case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e) - } - - case None => - log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name) - } - } - - encoded - } - - override def decode(carrier: TextMap): Context = { - var context: Context = Context.Empty - - try { - context = entryCodecs.foldLeft(context)((ctx, codecEntry) => { - val (_, codec) = codecEntry - codec.decode(carrier, ctx) - }) - - } catch { - case e: Throwable => - log.error("Failed to decode context from HttpHeaders", e) - } - - context - } - } -} - - -trait TextMap { - - def get(key: String): Option[String] - - def put(key: String, value: String): Unit - - def values: Iterator[(String, String)] -} - -object TextMap { - - class Default extends TextMap { - private val storage = - mutable.Map.empty[String, String] - - override def get(key: String): Option[String] = - storage.get(key) - - override def put(key: String, value: String): Unit = - storage.put(key, value) - - override def values: Iterator[(String, String)] = - storage.toIterator - } - - object Default { - def apply(): Default = - new Default() - } -}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/context/Codecs.scala b/kamon-core/src/main/scala/kamon/context/Codecs.scala new file mode 100644 index 00000000..b50e991d --- /dev/null +++ b/kamon-core/src/main/scala/kamon/context/Codecs.scala @@ -0,0 +1,245 @@ +/* ========================================================================================= + * Copyright © 2013-2017 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon +package context + +import java.nio.ByteBuffer + +import com.typesafe.config.Config +import kamon.util.DynamicAccess +import org.slf4j.LoggerFactory +import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry} + +import scala.collection.mutable + +class Codecs(initialConfig: Config) { + private val log = LoggerFactory.getLogger(classOf[Codecs]) + @volatile private var httpHeaders: Codecs.ForContext[TextMap] = new Codecs.HttpHeaders(Map.empty) + @volatile private var binary: Codecs.ForContext[ByteBuffer] = new Codecs.Binary(256, Map.empty) + + reconfigure(initialConfig) + + + def HttpHeaders: Codecs.ForContext[TextMap] = + httpHeaders + + def Binary: Codecs.ForContext[ByteBuffer] = + binary + + def reconfigure(config: Config): Unit = { + try { + val codecsConfig = config.getConfig("kamon.context.codecs") + httpHeaders = new Codecs.HttpHeaders(readEntryCodecs("http-headers-keys", codecsConfig)) + binary = new Codecs.Binary(codecsConfig.getBytes("binary-buffer-size"), readEntryCodecs("binary-keys", codecsConfig)) + } catch { + case t: Throwable => log.error("Failed to initialize Context Codecs", t) + } + } + + private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codecs.ForEntry[T]] = { + val rootConfig = config.getConfig(rootKey) + val dynamic = new DynamicAccess(getClass.getClassLoader) + val entries = Map.newBuilder[String, Codecs.ForEntry[T]] + + rootConfig.topLevelKeys.foreach(key => { + try { + val fqcn = rootConfig.getString(key) + entries += ((key, dynamic.createInstanceFor[Codecs.ForEntry[T]](fqcn, Nil).get)) + } catch { + case e: Throwable => + log.error(s"Failed to initialize codec for key [$key]", e) + } + }) + + entries.result() + } +} + +object Codecs { + + trait ForContext[T] { + def encode(context: Context): T + def decode(carrier: T): Context + } + + trait ForEntry[T] { + def encode(context: Context): T + def decode(carrier: T, context: Context): Context + } + + final class HttpHeaders(entryCodecs: Map[String, Codecs.ForEntry[TextMap]]) extends Codecs.ForContext[TextMap] { + private val log = LoggerFactory.getLogger(classOf[HttpHeaders]) + + override def encode(context: Context): TextMap = { + val encoded = TextMap.Default() + + context.entries.foreach { + case (key, _) if key.broadcast => + entryCodecs.get(key.name) match { + case Some(codec) => + try { + codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2)) + } catch { + case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e) + } + + case None => + log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name) + } + + case _ => // All non-broadcast keys should be ignored. + } + + encoded + } + + override def decode(carrier: TextMap): Context = { + var context: Context = Context.Empty + + try { + context = entryCodecs.foldLeft(context)((ctx, codecEntry) => { + val (_, codec) = codecEntry + codec.decode(carrier, ctx) + }) + + } catch { + case e: Throwable => + log.error("Failed to decode context from HttpHeaders", e) + } + + context + } + } + + + final class Binary(bufferSize: Long, entryCodecs: Map[String, Codecs.ForEntry[ByteBuffer]]) extends Codecs.ForContext[ByteBuffer] { + private val log = LoggerFactory.getLogger(classOf[Binary]) + private val binaryBuffer = newThreadLocalBuffer(bufferSize) + private val emptyBuffer = ByteBuffer.allocate(0) + + override def encode(context: Context): ByteBuffer = { + val entries = context.entries + if(entries.isEmpty) + emptyBuffer + else { + var colferEntries: List[ColferEntry] = Nil + entries.foreach { + case (key, _) if key.broadcast => + entryCodecs.get(key.name) match { + case Some(entryCodec) => + try { + val entryData = entryCodec.encode(context) + if(entryData.capacity() > 0) { + val colferEntry = new ColferEntry() + colferEntry.setName(key.name) + colferEntry.setContent(entryData.array()) + colferEntries = colferEntry :: colferEntries + } + } catch { + case throwable: Throwable => + log.error(s"Failed to encode broadcast context key [${key.name}]", throwable) + } + + case None => + log.error("Failed to encode broadcast context key [{}]. No codec found.", key.name) + } + + case _ => // All non-broadcast keys should be ignored. + } + + if(colferEntries.isEmpty) + emptyBuffer + else { + val buffer = binaryBuffer.get() + val colferContext = new ColferContext() + colferContext.setEntries(colferEntries.toArray) + val marshalledSize = colferContext.marshal(buffer, 0) + + val data = Array.ofDim[Byte](marshalledSize) + System.arraycopy(buffer, 0, data, 0, marshalledSize) + ByteBuffer.wrap(data) + } + } + } + + + override def decode(carrier: ByteBuffer): Context = { + if(carrier.capacity() == 0) + Context.Empty + else { + var context: Context = Context.Empty + + try { + val colferContext = new ColferContext() + colferContext.unmarshal(carrier.array(), 0) + + colferContext.entries.foreach(colferEntry => { + entryCodecs.get(colferEntry.getName()) match { + case Some(entryCodec) => + context = entryCodec.decode(ByteBuffer.wrap(colferEntry.content), context) + + case None => + log.error("Failed to decode entry [{}] with Binary context codec. No entry found for the key.", colferEntry.getName()) + } + }) + + } catch { + case e: Throwable => + log.error("Failed to decode context from Binary", e) + } + + context + } + } + + + private def newThreadLocalBuffer(size: Long): ThreadLocal[Array[Byte]] = new ThreadLocal[Array[Byte]] { + override def initialValue(): Array[Byte] = Array.ofDim[Byte](size.toInt) + } + } +} + + +trait TextMap { + + def get(key: String): Option[String] + + def put(key: String, value: String): Unit + + def values: Iterator[(String, String)] +} + +object TextMap { + + class Default extends TextMap { + private val storage = + mutable.Map.empty[String, String] + + override def get(key: String): Option[String] = + storage.get(key) + + override def put(key: String, value: String): Unit = + storage.put(key, value) + + override def values: Iterator[(String, String)] = + storage.toIterator + } + + object Default { + def apply(): Default = + new Default() + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala index ea28142e..3158aa73 100644 --- a/kamon-core/src/main/scala/kamon/trace/Span.scala +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -195,9 +195,9 @@ object Span { object Local { def apply(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], - initialMetricTags: Map[String, String], startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl, + initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink, scopeSpanMetrics: Boolean): Local = - new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry, scopeSpanMetrics) + new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, spanSink, scopeSpanMetrics) } diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala index 96317696..ae78ee67 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala @@ -16,15 +16,17 @@ package kamon.trace import java.net.{URLDecoder, URLEncoder} +import java.nio.ByteBuffer import kamon.Kamon -import kamon.context.{Codec, Context, TextMap} +import kamon.context.{Codecs, Context, TextMap} +import kamon.context.generated.binary.span.{Span => ColferSpan} import kamon.trace.SpanContext.SamplingDecision object SpanCodec { - class B3 extends Codec.ForEntry[TextMap] { + class B3 extends Codecs.ForEntry[TextMap] { import B3.Headers override def encode(context: Context): TextMap = { @@ -96,4 +98,69 @@ object SpanCodec { val Flags = "X-B3-Flags" } } + + + class Colfer extends Codecs.ForEntry[ByteBuffer] { + val emptyBuffer = ByteBuffer.allocate(0) + + override def encode(context: Context): ByteBuffer = { + val span = context.get(Span.ContextKey) + if(span.nonEmpty()) { + val marshalBuffer = Colfer.codecBuffer.get() + val colferSpan = new ColferSpan() + val spanContext = span.context() + + colferSpan.setTraceID(spanContext.traceID.bytes) + colferSpan.setSpanID(spanContext.spanID.bytes) + colferSpan.setParentID(spanContext.parentID.bytes) + colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision)) + + val marshalledSize = colferSpan.marshal(marshalBuffer, 0) + val buffer = ByteBuffer.allocate(marshalledSize) + buffer.put(marshalBuffer, 0, marshalledSize) + buffer + + } else emptyBuffer + } + + override def decode(carrier: ByteBuffer, context: Context): Context = { + carrier.clear() + + if(carrier.capacity() == 0) + context + else { + val identityProvider = Kamon.tracer.identityProvider + val colferSpan = new ColferSpan() + colferSpan.unmarshal(carrier.array(), 0) + + val spanContext = SpanContext( + traceID = identityProvider.traceIdGenerator().from(colferSpan.traceID), + spanID = identityProvider.traceIdGenerator().from(colferSpan.spanID), + parentID = identityProvider.traceIdGenerator().from(colferSpan.parentID), + samplingDecision = byteToSamplingDecision(colferSpan.samplingDecision) + ) + + context.withKey(Span.ContextKey, Span.Remote(spanContext)) + } + } + + + private def samplingDecisionToByte(samplingDecision: SamplingDecision): Byte = samplingDecision match { + case SamplingDecision.Sample => 1 + case SamplingDecision.DoNotSample => 2 + case SamplingDecision.Unknown => 3 + } + + private def byteToSamplingDecision(byte: Byte): SamplingDecision = byte match { + case 1 => SamplingDecision.Sample + case 2 => SamplingDecision.DoNotSample + case _ => SamplingDecision.Unknown + } + } + + object Colfer { + private val codecBuffer = new ThreadLocal[Array[Byte]] { + override def initialValue(): Array[Byte] = Array.ofDim[Byte](256) + } + } }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index f2f1918c..5f61f3aa 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -16,7 +16,8 @@ package kamon.trace import com.typesafe.config.Config -import kamon.{Kamon, ReporterRegistryImpl} +import kamon.ReporterRegistry.SpanSink +import kamon.Kamon import kamon.metric.MetricLookup import kamon.trace.Span.TagValue import kamon.trace.SpanContext.SamplingDecision @@ -34,7 +35,7 @@ trait Tracer { object Tracer { - final class Default(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config) extends Tracer { + final class Default(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config) extends Tracer { private val logger = LoggerFactory.getLogger(classOf[Tracer]) private[Tracer] val tracerMetrics = new TracerMetrics(metrics) @@ -46,7 +47,7 @@ object Tracer { reconfigure(initialConfig) override def buildSpan(operationName: String): SpanBuilder = - new SpanBuilder(operationName, this, reporterRegistry) + new SpanBuilder(operationName, this, spanSink) override def identityProvider: IdentityProvider = this._identityProvider @@ -84,11 +85,11 @@ object Tracer { } object Default { - def apply(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config): Default = - new Default(metrics, reporterRegistry, initialConfig) + def apply(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config): Default = + new Default(metrics, spanSink, initialConfig) } - final class SpanBuilder(operationName: String, tracer: Tracer.Default, reporterRegistry: ReporterRegistryImpl) { + final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanSink: SpanSink) { private var parentSpan: Span = _ private var startTimestamp = 0L private var initialSpanTags = Map.empty[String, Span.TagValue] @@ -157,7 +158,7 @@ object Tracer { } tracer.tracerMetrics.createdSpans.increment() - Span.Local(spanContext, nonRemoteParent, operationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry, tracer.scopeSpanMetrics) + Span.Local(spanContext, nonRemoteParent, operationName, initialSpanTags, initialMetricTags, startTimestampMicros, spanSink, tracer.scopeSpanMetrics) } private def joinParentContext(parent: Span, samplingDecision: SamplingDecision): SpanContext = |