From a152a3098b564ed43766a857b32b7c7d7445f9ce Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Mon, 21 Aug 2017 09:23:07 +0200 Subject: binary encoding of context and entries --- kamon-core/src/main/colfer/Context.colf | 10 + kamon-core/src/main/colfer/Span.colf | 8 + kamon-core/src/main/colfer/building.md | 5 + kamon-core/src/main/colfer/context.colf | 10 - .../main/java/kamon/context/encoding/Context.java | 359 -------------- .../main/java/kamon/context/encoding/Entry.java | 442 ----------------- .../context/generated/binary/context/Context.java | 359 ++++++++++++++ .../context/generated/binary/context/Entry.java | 442 +++++++++++++++++ .../kamon/context/generated/binary/span/Span.java | 524 +++++++++++++++++++++ kamon-core/src/main/resources/reference.conf | 40 +- kamon-core/src/main/scala/kamon/Kamon.scala | 24 +- .../src/main/scala/kamon/ReporterRegistry.scala | 425 ++++++++--------- .../src/main/scala/kamon/context/Codec.scala | 144 ------ .../src/main/scala/kamon/context/Codecs.scala | 245 ++++++++++ kamon-core/src/main/scala/kamon/trace/Span.scala | 4 +- .../src/main/scala/kamon/trace/SpanCodec.scala | 71 ++- kamon-core/src/main/scala/kamon/trace/Tracer.scala | 15 +- 17 files changed, 1911 insertions(+), 1216 deletions(-) create mode 100644 kamon-core/src/main/colfer/Context.colf create mode 100644 kamon-core/src/main/colfer/Span.colf create mode 100644 kamon-core/src/main/colfer/building.md delete mode 100644 kamon-core/src/main/colfer/context.colf delete mode 100644 kamon-core/src/main/java/kamon/context/encoding/Context.java delete mode 100644 kamon-core/src/main/java/kamon/context/encoding/Entry.java create mode 100644 kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java create mode 100644 kamon-core/src/main/java/kamon/context/generated/binary/context/Entry.java create mode 100644 kamon-core/src/main/java/kamon/context/generated/binary/span/Span.java delete mode 100644 kamon-core/src/main/scala/kamon/context/Codec.scala create mode 100644 kamon-core/src/main/scala/kamon/context/Codecs.scala (limited to 'kamon-core/src/main') diff --git a/kamon-core/src/main/colfer/Context.colf b/kamon-core/src/main/colfer/Context.colf new file mode 100644 index 00000000..f84d7a56 --- /dev/null +++ b/kamon-core/src/main/colfer/Context.colf @@ -0,0 +1,10 @@ +package context + +type Entry struct { + name text + content binary +} + +type Context struct { + entries []Entry +} \ No newline at end of file diff --git a/kamon-core/src/main/colfer/Span.colf b/kamon-core/src/main/colfer/Span.colf new file mode 100644 index 00000000..4edbed7b --- /dev/null +++ b/kamon-core/src/main/colfer/Span.colf @@ -0,0 +1,8 @@ +package span + +type Span struct { + traceID binary + spanID binary + parentID binary + samplingDecision uint8 +} \ No newline at end of file diff --git a/kamon-core/src/main/colfer/building.md b/kamon-core/src/main/colfer/building.md new file mode 100644 index 00000000..f510a44f --- /dev/null +++ b/kamon-core/src/main/colfer/building.md @@ -0,0 +1,5 @@ +Just download and install the colver compiler and run this command from the colfer folder: + +``` +colfer -b ../java -p kamon/context/generated/binary java +``` \ No newline at end of file diff --git a/kamon-core/src/main/colfer/context.colf b/kamon-core/src/main/colfer/context.colf deleted file mode 100644 index 26421cba..00000000 --- a/kamon-core/src/main/colfer/context.colf +++ /dev/null @@ -1,10 +0,0 @@ -package kamon - -type Entry struct { - name text - content binary -} - -type Context struct { - entries []Entry -} \ No newline at end of file diff --git a/kamon-core/src/main/java/kamon/context/encoding/Context.java b/kamon-core/src/main/java/kamon/context/encoding/Context.java deleted file mode 100644 index db6ed7a9..00000000 --- a/kamon-core/src/main/java/kamon/context/encoding/Context.java +++ /dev/null @@ -1,359 +0,0 @@ -package kamon.context.encoding; - - -// Code generated by colf(1); DO NOT EDIT. - - -import static java.lang.String.format; -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.ObjectStreamException; -import java.io.OutputStream; -import java.io.Serializable; -import java.util.InputMismatchException; -import java.nio.BufferOverflowException; -import java.nio.BufferUnderflowException; - - -/** - * Data bean with built-in serialization support. - - * @author generated by colf(1) - * @see Colfer's home - */ -@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file context.colf") -public class Context implements Serializable { - - /** The upper limit for serial byte sizes. */ - public static int colferSizeMax = 16 * 1024 * 1024; - - /** The upper limit for the number of elements in a list. */ - public static int colferListMax = 64 * 1024; - - - - - public Entry[] entries; - - - /** Default constructor */ - public Context() { - init(); - } - - private static final Entry[] _zeroEntries = new Entry[0]; - - /** Colfer zero values. */ - private void init() { - entries = _zeroEntries; - } - - /** - * {@link #reset(InputStream) Reusable} deserialization of Colfer streams. - */ - public static class Unmarshaller { - - /** The data source. */ - protected InputStream in; - - /** The read buffer. */ - public byte[] buf; - - /** The {@link #buf buffer}'s data start index, inclusive. */ - protected int offset; - - /** The {@link #buf buffer}'s data end index, exclusive. */ - protected int i; - - - /** - * @param in the data source or {@code null}. - * @param buf the initial buffer or {@code null}. - */ - public Unmarshaller(InputStream in, byte[] buf) { - // TODO: better size estimation - if (buf == null || buf.length == 0) - buf = new byte[Math.min(Context.colferSizeMax, 2048)]; - this.buf = buf; - reset(in); - } - - /** - * Reuses the marshaller. - * @param in the data source or {@code null}. - * @throws IllegalStateException on pending data. - */ - public void reset(InputStream in) { - if (this.i != this.offset) throw new IllegalStateException("colfer: pending data"); - this.in = in; - this.offset = 0; - this.i = 0; - } - - /** - * Deserializes the following object. - * @return the result or {@code null} when EOF. - * @throws IOException from the input stream. - * @throws SecurityException on an upper limit breach defined by either {@link #colferSizeMax} or {@link #colferListMax}. - * @throws InputMismatchException when the data does not match this object's schema. - */ - public Context next() throws IOException { - if (in == null) return null; - - while (true) { - if (this.i > this.offset) { - try { - Context o = new Context(); - this.offset = o.unmarshal(this.buf, this.offset, this.i); - return o; - } catch (BufferUnderflowException e) { - } - } - // not enough data - - if (this.i <= this.offset) { - this.offset = 0; - this.i = 0; - } else if (i == buf.length) { - byte[] src = this.buf; - // TODO: better size estimation - if (offset == 0) this.buf = new byte[Math.min(Context.colferSizeMax, this.buf.length * 4)]; - System.arraycopy(src, this.offset, this.buf, 0, this.i - this.offset); - this.i -= this.offset; - this.offset = 0; - } - assert this.i < this.buf.length; - - int n = in.read(buf, i, buf.length - i); - if (n < 0) { - if (this.i > this.offset) - throw new InputMismatchException("colfer: pending data with EOF"); - return null; - } - assert n > 0; - i += n; - } - } - - } - - - /** - * Serializes the object. - * All {@code null} elements in {@link #entries} will be replaced with a {@code new} value. - * @param out the data destination. - * @param buf the initial buffer or {@code null}. - * @return the final buffer. When the serial fits into {@code buf} then the return is {@code buf}. - * Otherwise the return is a new buffer, large enough to hold the whole serial. - * @throws IOException from {@code out}. - * @throws IllegalStateException on an upper limit breach defined by either {@link #colferSizeMax} or {@link #colferListMax}. - */ - public byte[] marshal(OutputStream out, byte[] buf) throws IOException { - // TODO: better size estimation - if (buf == null || buf.length == 0) - buf = new byte[Math.min(Context.colferSizeMax, 2048)]; - - while (true) { - int i; - try { - i = marshal(buf, 0); - } catch (BufferOverflowException e) { - buf = new byte[Math.min(Context.colferSizeMax, buf.length * 4)]; - continue; - } - - out.write(buf, 0, i); - return buf; - } - } - - /** - * Serializes the object. - * All {@code null} elements in {@link #entries} will be replaced with a {@code new} value. - * @param buf the data destination. - * @param offset the initial index for {@code buf}, inclusive. - * @return the final index for {@code buf}, exclusive. - * @throws BufferOverflowException when {@code buf} is too small. - * @throws IllegalStateException on an upper limit breach defined by either {@link #colferSizeMax} or {@link #colferListMax}. - */ - public int marshal(byte[] buf, int offset) { - int i = offset; - - try { - if (this.entries.length != 0) { - buf[i++] = (byte) 0; - Entry[] a = this.entries; - - int x = a.length; - if (x > Context.colferListMax) - throw new IllegalStateException(format("colfer: kamon/context/kamon.Context.entries length %d exceeds %d elements", x, Context.colferListMax)); - while (x > 0x7f) { - buf[i++] = (byte) (x | 0x80); - x >>>= 7; - } - buf[i++] = (byte) x; - - for (int ai = 0; ai < a.length; ai++) { - Entry o = a[ai]; - if (o == null) { - o = new Entry(); - a[ai] = o; - } - i = o.marshal(buf, i); - } - } - - buf[i++] = (byte) 0x7f; - return i; - } catch (ArrayIndexOutOfBoundsException e) { - if (i - offset > Context.colferSizeMax) - throw new IllegalStateException(format("colfer: kamon/context/kamon.Context exceeds %d bytes", Context.colferSizeMax)); - if (i > buf.length) throw new BufferOverflowException(); - throw e; - } - } - - /** - * Deserializes the object. - * @param buf the data source. - * @param offset the initial index for {@code buf}, inclusive. - * @return the final index for {@code buf}, exclusive. - * @throws BufferUnderflowException when {@code buf} is incomplete. (EOF) - * @throws SecurityException on an upper limit breach defined by either {@link #colferSizeMax} or {@link #colferListMax}. - * @throws InputMismatchException when the data does not match this object's schema. - */ - public int unmarshal(byte[] buf, int offset) { - return unmarshal(buf, offset, buf.length); - } - - /** - * Deserializes the object. - * @param buf the data source. - * @param offset the initial index for {@code buf}, inclusive. - * @param end the index limit for {@code buf}, exclusive. - * @return the final index for {@code buf}, exclusive. - * @throws BufferUnderflowException when {@code buf} is incomplete. (EOF) - * @throws SecurityException on an upper limit breach defined by either {@link #colferSizeMax} or {@link #colferListMax}. - * @throws InputMismatchException when the data does not match this object's schema. - */ - public int unmarshal(byte[] buf, int offset, int end) { - if (end > buf.length) end = buf.length; - int i = offset; - - try { - byte header = buf[i++]; - - if (header == (byte) 0) { - int length = 0; - for (int shift = 0; true; shift += 7) { - byte b = buf[i++]; - length |= (b & 0x7f) << shift; - if (shift == 28 || b >= 0) break; - } - if (length < 0 || length > Context.colferListMax) - throw new SecurityException(format("colfer: kamon/context/kamon.Context.entries length %d exceeds %d elements", length, Context.colferListMax)); - - Entry[] a = new Entry[length]; - for (int ai = 0; ai < length; ai++) { - Entry o = new Entry(); - i = o.unmarshal(buf, i, end); - a[ai] = o; - } - this.entries = a; - header = buf[i++]; - } - - if (header != (byte) 0x7f) - throw new InputMismatchException(format("colfer: unknown header at byte %d", i - 1)); - } finally { - if (i > end && end - offset < Context.colferSizeMax) throw new BufferUnderflowException(); - if (i < 0 || i - offset > Context.colferSizeMax) - throw new SecurityException(format("colfer: kamon/context/kamon.Context exceeds %d bytes", Context.colferSizeMax)); - if (i > end) throw new BufferUnderflowException(); - } - - return i; - } - - // {@link Serializable} version number. - private static final long serialVersionUID = 1L; - - // {@link Serializable} Colfer extension. - private void writeObject(ObjectOutputStream out) throws IOException { - // TODO: better size estimation - byte[] buf = new byte[1024]; - int n; - while (true) try { - n = marshal(buf, 0); - break; - } catch (BufferUnderflowException e) { - buf = new byte[4 * buf.length]; - } - - out.writeInt(n); - out.write(buf, 0, n); - } - - // {@link Serializable} Colfer extension. - private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException { - init(); - - int n = in.readInt(); - byte[] buf = new byte[n]; - in.readFully(buf); - unmarshal(buf, 0); - } - - // {@link Serializable} Colfer extension. - private void readObjectNoData() throws ObjectStreamException { - init(); - } - - /** - * Gets kamon/context/kamon.Context.entries. - * @return the value. - */ - public Entry[] getEntries() { - return this.entries; - } - - /** - * Sets kamon/context/kamon.Context.entries. - * @param value the replacement. - */ - public void setEntries(Entry[] value) { - this.entries = value; - } - - /** - * Sets kamon/context/kamon.Context.entries. - * @param value the replacement. - * @return {link this}. - */ - public Context withEntries(Entry[] value) { - this.entries = value; - return this; - } - - @Override - public final int hashCode() { - int h = 1; - for (Entry o : this.entries) h = 31 * h + (o == null ? 0 : o.hashCode()); - return h; - } - - @Override - public final boolean equals(Object o) { - return o instanceof Context && equals((Context) o); - } - - public final boolean equals(Context o) { - if (o == null) return false; - if (o == this) return true; - return o.getClass() == Context.class - && java.util.Arrays.equals(this.entries, o.entries); - } - -} diff --git a/kamon-core/src/main/java/kamon/context/encoding/Entry.java b/kamon-core/src/main/java/kamon/context/encoding/Entry.java deleted file mode 100644 index d7734c13..00000000 --- a/kamon-core/src/main/java/kamon/context/encoding/Entry.java +++ /dev/null @@ -1,442 +0,0 @@ -package kamon.context.encoding; - - -// Code generated by colf(1); DO NOT EDIT. - - -import static java.lang.String.format; -import java.io.IOException; -import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.ObjectStreamException; -import java.io.OutputStream; -import java.io.Serializable; -import java.nio.charset.StandardCharsets; -import java.util.InputMismatchException; -import java.nio.BufferOverflowException; -import java.nio.BufferUnderflowException; - - -/** - * Data bean with built-in serialization support. - - * @author generated by colf(1) - * @see Colfer's home - */ -@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file context.colf") -public class Entry implements Serializable { - - /** The upper limit for serial byte sizes. */ - public static int colferSizeMax = 16 * 1024 * 1024; - - - - - public String name; - - public byte[] content; - - - /** Default constructor */ - public Entry() { - init(); - } - - private static final byte[] _zeroBytes = new byte[0]; - - /** Colfer zero values. */ - private void init() { - name = ""; - content = _zeroBytes; - } - - /** - * {@link #reset(InputStream) Reusable} deserialization of Colfer streams. - */ - public static class Unmarshaller { - - /** The data source. */ - protected InputStream in; - - /** The read buffer. */ - public byte[] buf; - - /** The {@link #buf buffer}'s data start index, inclusive. */ - protected int offset; - - /** The {@link #buf buffer}'s data end index, exclusive. */ - protected int i; - - - /** - * @param in the data source or {@code null}. - * @param buf the initial buffer or {@code null}. - */ - public Unmarshaller(InputStream in, byte[] buf) { - // TODO: better size estimation - if (buf == null || buf.length == 0) - buf = new byte[Math.min(Entry.colferSizeMax, 2048)]; - this.buf = buf; - reset(in); - } - - /** - * Reuses the marshaller. - * @param in the data source or {@code null}. - * @throws IllegalStateException on pending data. - */ - public void reset(InputStream in) { - if (this.i != this.offset) throw new IllegalStateException("colfer: pending data"); - this.in = in; - this.offset = 0; - this.i = 0; - } - - /** - * Deserializes the following object. - * @return the result or {@code null} when EOF. - * @throws IOException from the input stream. - * @throws SecurityException on an upper limit breach defined by {@link #colferSizeMax}. - * @throws InputMismatchException when the data does not match this object's schema. - */ - public Entry next() throws IOException { - if (in == null) return null; - - while (true) { - if (this.i > this.offset) { - try { - Entry o = new Entry(); - this.offset = o.unmarshal(this.buf, this.offset, this.i); - return o; - } catch (BufferUnderflowException e) { - } - } - // not enough data - - if (this.i <= this.offset) { - this.offset = 0; - this.i = 0; - } else if (i == buf.length) { - byte[] src = this.buf; - // TODO: better size estimation - if (offset == 0) this.buf = new byte[Math.min(Entry.colferSizeMax, this.buf.length * 4)]; - System.arraycopy(src, this.offset, this.buf, 0, this.i - this.offset); - this.i -= this.offset; - this.offset = 0; - } - assert this.i < this.buf.length; - - int n = in.read(buf, i, buf.length - i); - if (n < 0) { - if (this.i > this.offset) - throw new InputMismatchException("colfer: pending data with EOF"); - return null; - } - assert n > 0; - i += n; - } - } - - } - - - /** - * Serializes the object. - * @param out the data destination. - * @param buf the initial buffer or {@code null}. - * @return the final buffer. When the serial fits into {@code buf} then the return is {@code buf}. - * Otherwise the return is a new buffer, large enough to hold the whole serial. - * @throws IOException from {@code out}. - * @throws IllegalStateException on an upper limit breach defined by {@link #colferSizeMax}. - */ - public byte[] marshal(OutputStream out, byte[] buf) throws IOException { - // TODO: better size estimation - if (buf == null || buf.length == 0) - buf = new byte[Math.min(Entry.colferSizeMax, 2048)]; - - while (true) { - int i; - try { - i = marshal(buf, 0); - } catch (BufferOverflowException e) { - buf = new byte[Math.min(Entry.colferSizeMax, buf.length * 4)]; - continue; - } - - out.write(buf, 0, i); - return buf; - } - } - - /** - * Serializes the object. - * @param buf the data destination. - * @param offset the initial index for {@code buf}, inclusive. - * @return the final index for {@code buf}, exclusive. - * @throws BufferOverflowException when {@code buf} is too small. - * @throws IllegalStateException on an upper limit breach defined by {@link #colferSizeMax}. - */ - public int marshal(byte[] buf, int offset) { - int i = offset; - - try { - if (! this.name.isEmpty()) { - buf[i++] = (byte) 0; - int start = ++i; - - String s = this.name; - for (int sIndex = 0, sLength = s.length(); sIndex < sLength; sIndex++) { - char c = s.charAt(sIndex); - if (c < '\u0080') { - buf[i++] = (byte) c; - } else if (c < '\u0800') { - buf[i++] = (byte) (192 | c >>> 6); - buf[i++] = (byte) (128 | c & 63); - } else if (c < '\ud800' || c > '\udfff') { - buf[i++] = (byte) (224 | c >>> 12); - buf[i++] = (byte) (128 | c >>> 6 & 63); - buf[i++] = (byte) (128 | c & 63); - } else { - int cp = 0; - if (++sIndex < sLength) cp = Character.toCodePoint(c, s.charAt(sIndex)); - if ((cp >= 1 << 16) && (cp < 1 << 21)) { - buf[i++] = (byte) (240 | cp >>> 18); - buf[i++] = (byte) (128 | cp >>> 12 & 63); - buf[i++] = (byte) (128 | cp >>> 6 & 63); - buf[i++] = (byte) (128 | cp & 63); - } else - buf[i++] = (byte) '?'; - } - } - int size = i - start; - if (size > Entry.colferSizeMax) - throw new IllegalStateException(format("colfer: kamon/context/kamon.Entry.name size %d exceeds %d UTF-8 bytes", size, Entry.colferSizeMax)); - - int ii = start - 1; - if (size > 0x7f) { - i++; - for (int x = size; x >= 1 << 14; x >>>= 7) i++; - System.arraycopy(buf, start, buf, i - size, size); - - do { - buf[ii++] = (byte) (size | 0x80); - size >>>= 7; - } while (size > 0x7f); - } - buf[ii] = (byte) size; - } - - if (this.content.length != 0) { - buf[i++] = (byte) 1; - - int size = this.content.length; - if (size > Entry.colferSizeMax) - throw new IllegalStateException(format("colfer: kamon/context/kamon.Entry.content size %d exceeds %d bytes", size, Entry.colferSizeMax)); - - int x = size; - while (x > 0x7f) { - buf[i++] = (byte) (x | 0x80); - x >>>= 7; - } - buf[i++] = (byte) x; - - int start = i; - i += size; - System.arraycopy(this.content, 0, buf, start, size); - } - - buf[i++] = (byte) 0x7f; - return i; - } catch (ArrayIndexOutOfBoundsException e) { - if (i - offset > Entry.colferSizeMax) - throw new IllegalStateException(format("colfer: kamon/context/kamon.Entry exceeds %d bytes", Entry.colferSizeMax)); - if (i > buf.length) throw new BufferOverflowException(); - throw e; - } - } - - /** - * Deserializes the object. - * @param buf the data source. - * @param offset the initial index for {@code buf}, inclusive. - * @return the final index for {@code buf}, exclusive. - * @throws BufferUnderflowException when {@code buf} is incomplete. (EOF) - * @throws SecurityException on an upper limit breach defined by {@link #colferSizeMax}. - * @throws InputMismatchException when the data does not match this object's schema. - */ - public int unmarshal(byte[] buf, int offset) { - return unmarshal(buf, offset, buf.length); - } - - /** - * Deserializes the object. - * @param buf the data source. - * @param offset the initial index for {@code buf}, inclusive. - * @param end the index limit for {@code buf}, exclusive. - * @return the final index for {@code buf}, exclusive. - * @throws BufferUnderflowException when {@code buf} is incomplete. (EOF) - * @throws SecurityException on an upper limit breach defined by {@link #colferSizeMax}. - * @throws InputMismatchException when the data does not match this object's schema. - */ - public int unmarshal(byte[] buf, int offset, int end) { - if (end > buf.length) end = buf.length; - int i = offset; - - try { - byte header = buf[i++]; - - if (header == (byte) 0) { - int size = 0; - for (int shift = 0; true; shift += 7) { - byte b = buf[i++]; - size |= (b & 0x7f) << shift; - if (shift == 28 || b >= 0) break; - } - if (size < 0 || size > Entry.colferSizeMax) - throw new SecurityException(format("colfer: kamon/context/kamon.Entry.name size %d exceeds %d UTF-8 bytes", size, Entry.colferSizeMax)); - - int start = i; - i += size; - this.name = new String(buf, start, size, StandardCharsets.UTF_8); - header = buf[i++]; - } - - if (header == (byte) 1) { - int size = 0; - for (int shift = 0; true; shift += 7) { - byte b = buf[i++]; - size |= (b & 0x7f) << shift; - if (shift == 28 || b >= 0) break; - } - if (size < 0 || size > Entry.colferSizeMax) - throw new SecurityException(format("colfer: kamon/context/kamon.Entry.content size %d exceeds %d bytes", size, Entry.colferSizeMax)); - - this.content = new byte[size]; - int start = i; - i += size; - System.arraycopy(buf, start, this.content, 0, size); - - header = buf[i++]; - } - - if (header != (byte) 0x7f) - throw new InputMismatchException(format("colfer: unknown header at byte %d", i - 1)); - } finally { - if (i > end && end - offset < Entry.colferSizeMax) throw new BufferUnderflowException(); - if (i < 0 || i - offset > Entry.colferSizeMax) - throw new SecurityException(format("colfer: kamon/context/kamon.Entry exceeds %d bytes", Entry.colferSizeMax)); - if (i > end) throw new BufferUnderflowException(); - } - - return i; - } - - // {@link Serializable} version number. - private static final long serialVersionUID = 2L; - - // {@link Serializable} Colfer extension. - private void writeObject(ObjectOutputStream out) throws IOException { - // TODO: better size estimation - byte[] buf = new byte[1024]; - int n; - while (true) try { - n = marshal(buf, 0); - break; - } catch (BufferUnderflowException e) { - buf = new byte[4 * buf.length]; - } - - out.writeInt(n); - out.write(buf, 0, n); - } - - // {@link Serializable} Colfer extension. - private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException { - init(); - - int n = in.readInt(); - byte[] buf = new byte[n]; - in.readFully(buf); - unmarshal(buf, 0); - } - - // {@link Serializable} Colfer extension. - private void readObjectNoData() throws ObjectStreamException { - init(); - } - - /** - * Gets kamon/context/kamon.Entry.name. - * @return the value. - */ - public String getName() { - return this.name; - } - - /** - * Sets kamon/context/kamon.Entry.name. - * @param value the replacement. - */ - public void setName(String value) { - this.name = value; - } - - /** - * Sets kamon/context/kamon.Entry.name. - * @param value the replacement. - * @return {link this}. - */ - public Entry withName(String value) { - this.name = value; - return this; - } - - /** - * Gets kamon/context/kamon.Entry.content. - * @return the value. - */ - public byte[] getContent() { - return this.content; - } - - /** - * Sets kamon/context/kamon.Entry.content. - * @param value the replacement. - */ - public void setContent(byte[] value) { - this.content = value; - } - - /** - * Sets kamon/context/kamon.Entry.content. - * @param value the replacement. - * @return {link this}. - */ - public Entry withContent(byte[] value) { - this.content = value; - return this; - } - - @Override - public final int hashCode() { - int h = 1; - if (this.name != null) h = 31 * h + this.name.hashCode(); - for (byte b : this.content) h = 31 * h + b; - return h; - } - - @Override - public final boolean equals(Object o) { - return o instanceof Entry && equals((Entry) o); - } - - public final boolean equals(Entry o) { - if (o == null) return false; - if (o == this) return true; - return o.getClass() == Entry.class - && (this.name == null ? o.name == null : this.name.equals(o.name)) - && java.util.Arrays.equals(this.content, o.content); - } - -} diff --git a/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java b/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java new file mode 100644 index 00000000..a9917e99 --- /dev/null +++ b/kamon-core/src/main/java/kamon/context/generated/binary/context/Context.java @@ -0,0 +1,359 @@ +package kamon.context.generated.binary.context; + + +// Code generated by colf(1); DO NOT EDIT. + + +import static java.lang.String.format; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.ObjectStreamException; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.InputMismatchException; +import java.nio.BufferOverflowException; +import java.nio.BufferUnderflowException; + + +/** + * Data bean with built-in serialization support. + + * @author generated by colf(1) + * @see Colfer's home + */ +@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file Context.colf") +public class Context implements Serializable { + + /** The upper limit for serial byte sizes. */ + public static int colferSizeMax = 16 * 1024 * 1024; + + /** The upper limit for the number of elements in a list. */ + public static int colferListMax = 64 * 1024; + + + + + public Entry[] entries; + + + /** Default constructor */ + public Context() { + init(); + } + + private static final Entry[] _zeroEntries = new Entry[0]; + + /** Colfer zero values. */ + private void init() { + entries = _zeroEntries; + } + + /** + * {@link #reset(InputStream) Reusable} deserialization of Colfer streams. + */ + public static class Unmarshaller { + + /** The data source. */ + protected InputStream in; + + /** The read buffer. */ + public byte[] buf; + + /** The {@link #buf buffer}'s data start index, inclusive. */ + protected int offset; + + /** The {@link #buf buffer}'s data end index, exclusive. */ + protected int i; + + + /** + * @param in the data source or {@code null}. + * @param buf the initial buffer or {@code null}. + */ + public Unmarshaller(InputStream in, byte[] buf) { + // TODO: better size estimation + if (buf == null || buf.length == 0) + buf = new byte[Math.min(Context.colferSizeMax, 2048)]; + this.buf = buf; + reset(in); + } + + /** + * Reuses the marshaller. + * @param in the data source or {@code null}. + * @throws IllegalStateException on pending data. + */ + public void reset(InputStream in) { + if (this.i != this.offset) throw new IllegalStateException("colfer: pending data"); + this.in = in; + this.offset = 0; + this.i = 0; + } + + /** + * Deserializes the following object. + * @return the result or {@code null} when EOF. + * @throws IOException from the input stream. + * @throws SecurityException on an upper limit breach defined by either {@link #colferSizeMax} or {@link #colferListMax}. + * @throws InputMismatchException when the data does not match this object's schema. + */ + public Context next() throws IOException { + if (in == null) return null; + + while (true) { + if (this.i > this.offset) { + try { + Context o = new Context(); + this.offset = o.unmarshal(this.buf, this.offset, this.i); + return o; + } catch (BufferUnderflowException e) { + } + } + // not enough data + + if (this.i <= this.offset) { + this.offset = 0; + this.i = 0; + } else if (i == buf.length) { + byte[] src = this.buf; + // TODO: better size estimation + if (offset == 0) this.buf = new byte[Math.min(Context.colferSizeMax, this.buf.length * 4)]; + System.arraycopy(src, this.offset, this.buf, 0, this.i - this.offset); + this.i -= this.offset; + this.offset = 0; + } + assert this.i < this.buf.length; + + int n = in.read(buf, i, buf.length - i); + if (n < 0) { + if (this.i > this.offset) + throw new InputMismatchException("colfer: pending data with EOF"); + return null; + } + assert n > 0; + i += n; + } + } + + } + + + /** + * Serializes the object. + * All {@code null} elements in {@link #entries} will be replaced with a {@code new} value. + * @param out the data destination. + * @param buf the initial buffer or {@code null}. + * @return the final buffer. When the serial fits into {@code buf} then the return is {@code buf}. + * Otherwise the return is a new buffer, large enough to hold the whole serial. + * @throws IOException from {@code out}. + * @throws IllegalStateException on an upper limit breach defined by either {@link #colferSizeMax} or {@link #colferListMax}. + */ + public byte[] marshal(OutputStream out, byte[] buf) throws IOException { + // TODO: better size estimation + if (buf == null || buf.length == 0) + buf = new byte[Math.min(Context.colferSizeMax, 2048)]; + + while (true) { + int i; + try { + i = marshal(buf, 0); + } catch (BufferOverflowException e) { + buf = new byte[Math.min(Context.colferSizeMax, buf.length * 4)]; + continue; + } + + out.write(buf, 0, i); + return buf; + } + } + + /** + * Serializes the object. + * All {@code null} elements in {@link #entries} will be replaced with a {@code new} value. + * @param buf the data destination. + * @param offset the initial index for {@code buf}, inclusive. + * @return the final index for {@code buf}, exclusive. + * @throws BufferOverflowException when {@code buf} is too small. + * @throws IllegalStateException on an upper limit breach defined by either {@link #colferSizeMax} or {@link #colferListMax}. + */ + public int marshal(byte[] buf, int offset) { + int i = offset; + + try { + if (this.entries.length != 0) { + buf[i++] = (byte) 0; + Entry[] a = this.entries; + + int x = a.length; + if (x > Context.colferListMax) + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Context.entries length %d exceeds %d elements", x, Context.colferListMax)); + while (x > 0x7f) { + buf[i++] = (byte) (x | 0x80); + x >>>= 7; + } + buf[i++] = (byte) x; + + for (int ai = 0; ai < a.length; ai++) { + Entry o = a[ai]; + if (o == null) { + o = new Entry(); + a[ai] = o; + } + i = o.marshal(buf, i); + } + } + + buf[i++] = (byte) 0x7f; + return i; + } catch (ArrayIndexOutOfBoundsException e) { + if (i - offset > Context.colferSizeMax) + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Context exceeds %d bytes", Context.colferSizeMax)); + if (i > buf.length) throw new BufferOverflowException(); + throw e; + } + } + + /** + * Deserializes the object. + * @param buf the data source. + * @param offset the initial index for {@code buf}, inclusive. + * @return the final index for {@code buf}, exclusive. + * @throws BufferUnderflowException when {@code buf} is incomplete. (EOF) + * @throws SecurityException on an upper limit breach defined by either {@link #colferSizeMax} or {@link #colferListMax}. + * @throws InputMismatchException when the data does not match this object's schema. + */ + public int unmarshal(byte[] buf, int offset) { + return unmarshal(buf, offset, buf.length); + } + + /** + * Deserializes the object. + * @param buf the data source. + * @param offset the initial index for {@code buf}, inclusive. + * @param end the index limit for {@code buf}, exclusive. + * @return the final index for {@code buf}, exclusive. + * @throws BufferUnderflowException when {@code buf} is incomplete. (EOF) + * @throws SecurityException on an upper limit breach defined by either {@link #colferSizeMax} or {@link #colferListMax}. + * @throws InputMismatchException when the data does not match this object's schema. + */ + public int unmarshal(byte[] buf, int offset, int end) { + if (end > buf.length) end = buf.length; + int i = offset; + + try { + byte header = buf[i++]; + + if (header == (byte) 0) { + int length = 0; + for (int shift = 0; true; shift += 7) { + byte b = buf[i++]; + length |= (b & 0x7f) << shift; + if (shift == 28 || b >= 0) break; + } + if (length < 0 || length > Context.colferListMax) + throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Context.entries length %d exceeds %d elements", length, Context.colferListMax)); + + Entry[] a = new Entry[length]; + for (int ai = 0; ai < length; ai++) { + Entry o = new Entry(); + i = o.unmarshal(buf, i, end); + a[ai] = o; + } + this.entries = a; + header = buf[i++]; + } + + if (header != (byte) 0x7f) + throw new InputMismatchException(format("colfer: unknown header at byte %d", i - 1)); + } finally { + if (i > end && end - offset < Context.colferSizeMax) throw new BufferUnderflowException(); + if (i < 0 || i - offset > Context.colferSizeMax) + throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Context exceeds %d bytes", Context.colferSizeMax)); + if (i > end) throw new BufferUnderflowException(); + } + + return i; + } + + // {@link Serializable} version number. + private static final long serialVersionUID = 1L; + + // {@link Serializable} Colfer extension. + private void writeObject(ObjectOutputStream out) throws IOException { + // TODO: better size estimation + byte[] buf = new byte[1024]; + int n; + while (true) try { + n = marshal(buf, 0); + break; + } catch (BufferUnderflowException e) { + buf = new byte[4 * buf.length]; + } + + out.writeInt(n); + out.write(buf, 0, n); + } + + // {@link Serializable} Colfer extension. + private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException { + init(); + + int n = in.readInt(); + byte[] buf = new byte[n]; + in.readFully(buf); + unmarshal(buf, 0); + } + + // {@link Serializable} Colfer extension. + private void readObjectNoData() throws ObjectStreamException { + init(); + } + + /** + * Gets kamon/context/generated/binary/context.Context.entries. + * @return the value. + */ + public Entry[] getEntries() { + return this.entries; + } + + /** + * Sets kamon/context/generated/binary/context.Context.entries. + * @param value the replacement. + */ + public void setEntries(Entry[] value) { + this.entries = value; + } + + /** + * Sets kamon/context/generated/binary/context.Context.entries. + * @param value the replacement. + * @return {link this}. + */ + public Context withEntries(Entry[] value) { + this.entries = value; + return this; + } + + @Override + public final int hashCode() { + int h = 1; + for (Entry o : this.entries) h = 31 * h + (o == null ? 0 : o.hashCode()); + return h; + } + + @Override + public final boolean equals(Object o) { + return o instanceof Context && equals((Context) o); + } + + public final boolean equals(Context o) { + if (o == null) return false; + if (o == this) return true; + return o.getClass() == Context.class + && java.util.Arrays.equals(this.entries, o.entries); + } + +} diff --git a/kamon-core/src/main/java/kamon/context/generated/binary/context/Entry.java b/kamon-core/src/main/java/kamon/context/generated/binary/context/Entry.java new file mode 100644 index 00000000..dc75b10d --- /dev/null +++ b/kamon-core/src/main/java/kamon/context/generated/binary/context/Entry.java @@ -0,0 +1,442 @@ +package kamon.context.generated.binary.context; + + +// Code generated by colf(1); DO NOT EDIT. + + +import static java.lang.String.format; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.ObjectStreamException; +import java.io.OutputStream; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.InputMismatchException; +import java.nio.BufferOverflowException; +import java.nio.BufferUnderflowException; + + +/** + * Data bean with built-in serialization support. + + * @author generated by colf(1) + * @see Colfer's home + */ +@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file Context.colf") +public class Entry implements Serializable { + + /** The upper limit for serial byte sizes. */ + public static int colferSizeMax = 16 * 1024 * 1024; + + + + + public String name; + + public byte[] content; + + + /** Default constructor */ + public Entry() { + init(); + } + + private static final byte[] _zeroBytes = new byte[0]; + + /** Colfer zero values. */ + private void init() { + name = ""; + content = _zeroBytes; + } + + /** + * {@link #reset(InputStream) Reusable} deserialization of Colfer streams. + */ + public static class Unmarshaller { + + /** The data source. */ + protected InputStream in; + + /** The read buffer. */ + public byte[] buf; + + /** The {@link #buf buffer}'s data start index, inclusive. */ + protected int offset; + + /** The {@link #buf buffer}'s data end index, exclusive. */ + protected int i; + + + /** + * @param in the data source or {@code null}. + * @param buf the initial buffer or {@code null}. + */ + public Unmarshaller(InputStream in, byte[] buf) { + // TODO: better size estimation + if (buf == null || buf.length == 0) + buf = new byte[Math.min(Entry.colferSizeMax, 2048)]; + this.buf = buf; + reset(in); + } + + /** + * Reuses the marshaller. + * @param in the data source or {@code null}. + * @throws IllegalStateException on pending data. + */ + public void reset(InputStream in) { + if (this.i != this.offset) throw new IllegalStateException("colfer: pending data"); + this.in = in; + this.offset = 0; + this.i = 0; + } + + /** + * Deserializes the following object. + * @return the result or {@code null} when EOF. + * @throws IOException from the input stream. + * @throws SecurityException on an upper limit breach defined by {@link #colferSizeMax}. + * @throws InputMismatchException when the data does not match this object's schema. + */ + public Entry next() throws IOException { + if (in == null) return null; + + while (true) { + if (this.i > this.offset) { + try { + Entry o = new Entry(); + this.offset = o.unmarshal(this.buf, this.offset, this.i); + return o; + } catch (BufferUnderflowException e) { + } + } + // not enough data + + if (this.i <= this.offset) { + this.offset = 0; + this.i = 0; + } else if (i == buf.length) { + byte[] src = this.buf; + // TODO: better size estimation + if (offset == 0) this.buf = new byte[Math.min(Entry.colferSizeMax, this.buf.length * 4)]; + System.arraycopy(src, this.offset, this.buf, 0, this.i - this.offset); + this.i -= this.offset; + this.offset = 0; + } + assert this.i < this.buf.length; + + int n = in.read(buf, i, buf.length - i); + if (n < 0) { + if (this.i > this.offset) + throw new InputMismatchException("colfer: pending data with EOF"); + return null; + } + assert n > 0; + i += n; + } + } + + } + + + /** + * Serializes the object. + * @param out the data destination. + * @param buf the initial buffer or {@code null}. + * @return the final buffer. When the serial fits into {@code buf} then the return is {@code buf}. + * Otherwise the return is a new buffer, large enough to hold the whole serial. + * @throws IOException from {@code out}. + * @throws IllegalStateException on an upper limit breach defined by {@link #colferSizeMax}. + */ + public byte[] marshal(OutputStream out, byte[] buf) throws IOException { + // TODO: better size estimation + if (buf == null || buf.length == 0) + buf = new byte[Math.min(Entry.colferSizeMax, 2048)]; + + while (true) { + int i; + try { + i = marshal(buf, 0); + } catch (BufferOverflowException e) { + buf = new byte[Math.min(Entry.colferSizeMax, buf.length * 4)]; + continue; + } + + out.write(buf, 0, i); + return buf; + } + } + + /** + * Serializes the object. + * @param buf the data destination. + * @param offset the initial index for {@code buf}, inclusive. + * @return the final index for {@code buf}, exclusive. + * @throws BufferOverflowException when {@code buf} is too small. + * @throws IllegalStateException on an upper limit breach defined by {@link #colferSizeMax}. + */ + public int marshal(byte[] buf, int offset) { + int i = offset; + + try { + if (! this.name.isEmpty()) { + buf[i++] = (byte) 0; + int start = ++i; + + String s = this.name; + for (int sIndex = 0, sLength = s.length(); sIndex < sLength; sIndex++) { + char c = s.charAt(sIndex); + if (c < '\u0080') { + buf[i++] = (byte) c; + } else if (c < '\u0800') { + buf[i++] = (byte) (192 | c >>> 6); + buf[i++] = (byte) (128 | c & 63); + } else if (c < '\ud800' || c > '\udfff') { + buf[i++] = (byte) (224 | c >>> 12); + buf[i++] = (byte) (128 | c >>> 6 & 63); + buf[i++] = (byte) (128 | c & 63); + } else { + int cp = 0; + if (++sIndex < sLength) cp = Character.toCodePoint(c, s.charAt(sIndex)); + if ((cp >= 1 << 16) && (cp < 1 << 21)) { + buf[i++] = (byte) (240 | cp >>> 18); + buf[i++] = (byte) (128 | cp >>> 12 & 63); + buf[i++] = (byte) (128 | cp >>> 6 & 63); + buf[i++] = (byte) (128 | cp & 63); + } else + buf[i++] = (byte) '?'; + } + } + int size = i - start; + if (size > Entry.colferSizeMax) + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Entry.name size %d exceeds %d UTF-8 bytes", size, Entry.colferSizeMax)); + + int ii = start - 1; + if (size > 0x7f) { + i++; + for (int x = size; x >= 1 << 14; x >>>= 7) i++; + System.arraycopy(buf, start, buf, i - size, size); + + do { + buf[ii++] = (byte) (size | 0x80); + size >>>= 7; + } while (size > 0x7f); + } + buf[ii] = (byte) size; + } + + if (this.content.length != 0) { + buf[i++] = (byte) 1; + + int size = this.content.length; + if (size > Entry.colferSizeMax) + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Entry.content size %d exceeds %d bytes", size, Entry.colferSizeMax)); + + int x = size; + while (x > 0x7f) { + buf[i++] = (byte) (x | 0x80); + x >>>= 7; + } + buf[i++] = (byte) x; + + int start = i; + i += size; + System.arraycopy(this.content, 0, buf, start, size); + } + + buf[i++] = (byte) 0x7f; + return i; + } catch (ArrayIndexOutOfBoundsException e) { + if (i - offset > Entry.colferSizeMax) + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/context.Entry exceeds %d bytes", Entry.colferSizeMax)); + if (i > buf.length) throw new BufferOverflowException(); + throw e; + } + } + + /** + * Deserializes the object. + * @param buf the data source. + * @param offset the initial index for {@code buf}, inclusive. + * @return the final index for {@code buf}, exclusive. + * @throws BufferUnderflowException when {@code buf} is incomplete. (EOF) + * @throws SecurityException on an upper limit breach defined by {@link #colferSizeMax}. + * @throws InputMismatchException when the data does not match this object's schema. + */ + public int unmarshal(byte[] buf, int offset) { + return unmarshal(buf, offset, buf.length); + } + + /** + * Deserializes the object. + * @param buf the data source. + * @param offset the initial index for {@code buf}, inclusive. + * @param end the index limit for {@code buf}, exclusive. + * @return the final index for {@code buf}, exclusive. + * @throws BufferUnderflowException when {@code buf} is incomplete. (EOF) + * @throws SecurityException on an upper limit breach defined by {@link #colferSizeMax}. + * @throws InputMismatchException when the data does not match this object's schema. + */ + public int unmarshal(byte[] buf, int offset, int end) { + if (end > buf.length) end = buf.length; + int i = offset; + + try { + byte header = buf[i++]; + + if (header == (byte) 0) { + int size = 0; + for (int shift = 0; true; shift += 7) { + byte b = buf[i++]; + size |= (b & 0x7f) << shift; + if (shift == 28 || b >= 0) break; + } + if (size < 0 || size > Entry.colferSizeMax) + throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Entry.name size %d exceeds %d UTF-8 bytes", size, Entry.colferSizeMax)); + + int start = i; + i += size; + this.name = new String(buf, start, size, StandardCharsets.UTF_8); + header = buf[i++]; + } + + if (header == (byte) 1) { + int size = 0; + for (int shift = 0; true; shift += 7) { + byte b = buf[i++]; + size |= (b & 0x7f) << shift; + if (shift == 28 || b >= 0) break; + } + if (size < 0 || size > Entry.colferSizeMax) + throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Entry.content size %d exceeds %d bytes", size, Entry.colferSizeMax)); + + this.content = new byte[size]; + int start = i; + i += size; + System.arraycopy(buf, start, this.content, 0, size); + + header = buf[i++]; + } + + if (header != (byte) 0x7f) + throw new InputMismatchException(format("colfer: unknown header at byte %d", i - 1)); + } finally { + if (i > end && end - offset < Entry.colferSizeMax) throw new BufferUnderflowException(); + if (i < 0 || i - offset > Entry.colferSizeMax) + throw new SecurityException(format("colfer: kamon/context/generated/binary/context.Entry exceeds %d bytes", Entry.colferSizeMax)); + if (i > end) throw new BufferUnderflowException(); + } + + return i; + } + + // {@link Serializable} version number. + private static final long serialVersionUID = 2L; + + // {@link Serializable} Colfer extension. + private void writeObject(ObjectOutputStream out) throws IOException { + // TODO: better size estimation + byte[] buf = new byte[1024]; + int n; + while (true) try { + n = marshal(buf, 0); + break; + } catch (BufferUnderflowException e) { + buf = new byte[4 * buf.length]; + } + + out.writeInt(n); + out.write(buf, 0, n); + } + + // {@link Serializable} Colfer extension. + private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException { + init(); + + int n = in.readInt(); + byte[] buf = new byte[n]; + in.readFully(buf); + unmarshal(buf, 0); + } + + // {@link Serializable} Colfer extension. + private void readObjectNoData() throws ObjectStreamException { + init(); + } + + /** + * Gets kamon/context/generated/binary/context.Entry.name. + * @return the value. + */ + public String getName() { + return this.name; + } + + /** + * Sets kamon/context/generated/binary/context.Entry.name. + * @param value the replacement. + */ + public void setName(String value) { + this.name = value; + } + + /** + * Sets kamon/context/generated/binary/context.Entry.name. + * @param value the replacement. + * @return {link this}. + */ + public Entry withName(String value) { + this.name = value; + return this; + } + + /** + * Gets kamon/context/generated/binary/context.Entry.content. + * @return the value. + */ + public byte[] getContent() { + return this.content; + } + + /** + * Sets kamon/context/generated/binary/context.Entry.content. + * @param value the replacement. + */ + public void setContent(byte[] value) { + this.content = value; + } + + /** + * Sets kamon/context/generated/binary/context.Entry.content. + * @param value the replacement. + * @return {link this}. + */ + public Entry withContent(byte[] value) { + this.content = value; + return this; + } + + @Override + public final int hashCode() { + int h = 1; + if (this.name != null) h = 31 * h + this.name.hashCode(); + for (byte b : this.content) h = 31 * h + b; + return h; + } + + @Override + public final boolean equals(Object o) { + return o instanceof Entry && equals((Entry) o); + } + + public final boolean equals(Entry o) { + if (o == null) return false; + if (o == this) return true; + return o.getClass() == Entry.class + && (this.name == null ? o.name == null : this.name.equals(o.name)) + && java.util.Arrays.equals(this.content, o.content); + } + +} diff --git a/kamon-core/src/main/java/kamon/context/generated/binary/span/Span.java b/kamon-core/src/main/java/kamon/context/generated/binary/span/Span.java new file mode 100644 index 00000000..ea7a6fe8 --- /dev/null +++ b/kamon-core/src/main/java/kamon/context/generated/binary/span/Span.java @@ -0,0 +1,524 @@ +package kamon.context.generated.binary.span; + + +// Code generated by colf(1); DO NOT EDIT. + + +import static java.lang.String.format; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.ObjectStreamException; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.InputMismatchException; +import java.nio.BufferOverflowException; +import java.nio.BufferUnderflowException; + + +/** + * Data bean with built-in serialization support. + + * @author generated by colf(1) + * @see Colfer's home + */ +@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file Span.colf") +public class Span implements Serializable { + + /** The upper limit for serial byte sizes. */ + public static int colferSizeMax = 16 * 1024 * 1024; + + + + + public byte[] traceID; + + public byte[] spanID; + + public byte[] parentID; + + public byte samplingDecision; + + + /** Default constructor */ + public Span() { + init(); + } + + private static final byte[] _zeroBytes = new byte[0]; + + /** Colfer zero values. */ + private void init() { + traceID = _zeroBytes; + spanID = _zeroBytes; + parentID = _zeroBytes; + } + + /** + * {@link #reset(InputStream) Reusable} deserialization of Colfer streams. + */ + public static class Unmarshaller { + + /** The data source. */ + protected InputStream in; + + /** The read buffer. */ + public byte[] buf; + + /** The {@link #buf buffer}'s data start index, inclusive. */ + protected int offset; + + /** The {@link #buf buffer}'s data end index, exclusive. */ + protected int i; + + + /** + * @param in the data source or {@code null}. + * @param buf the initial buffer or {@code null}. + */ + public Unmarshaller(InputStream in, byte[] buf) { + // TODO: better size estimation + if (buf == null || buf.length == 0) + buf = new byte[Math.min(Span.colferSizeMax, 2048)]; + this.buf = buf; + reset(in); + } + + /** + * Reuses the marshaller. + * @param in the data source or {@code null}. + * @throws IllegalStateException on pending data. + */ + public void reset(InputStream in) { + if (this.i != this.offset) throw new IllegalStateException("colfer: pending data"); + this.in = in; + this.offset = 0; + this.i = 0; + } + + /** + * Deserializes the following object. + * @return the result or {@code null} when EOF. + * @throws IOException from the input stream. + * @throws SecurityException on an upper limit breach defined by {@link #colferSizeMax}. + * @throws InputMismatchException when the data does not match this object's schema. + */ + public Span next() throws IOException { + if (in == null) return null; + + while (true) { + if (this.i > this.offset) { + try { + Span o = new Span(); + this.offset = o.unmarshal(this.buf, this.offset, this.i); + return o; + } catch (BufferUnderflowException e) { + } + } + // not enough data + + if (this.i <= this.offset) { + this.offset = 0; + this.i = 0; + } else if (i == buf.length) { + byte[] src = this.buf; + // TODO: better size estimation + if (offset == 0) this.buf = new byte[Math.min(Span.colferSizeMax, this.buf.length * 4)]; + System.arraycopy(src, this.offset, this.buf, 0, this.i - this.offset); + this.i -= this.offset; + this.offset = 0; + } + assert this.i < this.buf.length; + + int n = in.read(buf, i, buf.length - i); + if (n < 0) { + if (this.i > this.offset) + throw new InputMismatchException("colfer: pending data with EOF"); + return null; + } + assert n > 0; + i += n; + } + } + + } + + + /** + * Serializes the object. + * @param out the data destination. + * @param buf the initial buffer or {@code null}. + * @return the final buffer. When the serial fits into {@code buf} then the return is {@code buf}. + * Otherwise the return is a new buffer, large enough to hold the whole serial. + * @throws IOException from {@code out}. + * @throws IllegalStateException on an upper limit breach defined by {@link #colferSizeMax}. + */ + public byte[] marshal(OutputStream out, byte[] buf) throws IOException { + // TODO: better size estimation + if (buf == null || buf.length == 0) + buf = new byte[Math.min(Span.colferSizeMax, 2048)]; + + while (true) { + int i; + try { + i = marshal(buf, 0); + } catch (BufferOverflowException e) { + buf = new byte[Math.min(Span.colferSizeMax, buf.length * 4)]; + continue; + } + + out.write(buf, 0, i); + return buf; + } + } + + /** + * Serializes the object. + * @param buf the data destination. + * @param offset the initial index for {@code buf}, inclusive. + * @return the final index for {@code buf}, exclusive. + * @throws BufferOverflowException when {@code buf} is too small. + * @throws IllegalStateException on an upper limit breach defined by {@link #colferSizeMax}. + */ + public int marshal(byte[] buf, int offset) { + int i = offset; + + try { + if (this.traceID.length != 0) { + buf[i++] = (byte) 0; + + int size = this.traceID.length; + if (size > Span.colferSizeMax) + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/span.Span.traceID size %d exceeds %d bytes", size, Span.colferSizeMax)); + + int x = size; + while (x > 0x7f) { + buf[i++] = (byte) (x | 0x80); + x >>>= 7; + } + buf[i++] = (byte) x; + + int start = i; + i += size; + System.arraycopy(this.traceID, 0, buf, start, size); + } + + if (this.spanID.length != 0) { + buf[i++] = (byte) 1; + + int size = this.spanID.length; + if (size > Span.colferSizeMax) + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/span.Span.spanID size %d exceeds %d bytes", size, Span.colferSizeMax)); + + int x = size; + while (x > 0x7f) { + buf[i++] = (byte) (x | 0x80); + x >>>= 7; + } + buf[i++] = (byte) x; + + int start = i; + i += size; + System.arraycopy(this.spanID, 0, buf, start, size); + } + + if (this.parentID.length != 0) { + buf[i++] = (byte) 2; + + int size = this.parentID.length; + if (size > Span.colferSizeMax) + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/span.Span.parentID size %d exceeds %d bytes", size, Span.colferSizeMax)); + + int x = size; + while (x > 0x7f) { + buf[i++] = (byte) (x | 0x80); + x >>>= 7; + } + buf[i++] = (byte) x; + + int start = i; + i += size; + System.arraycopy(this.parentID, 0, buf, start, size); + } + + if (this.samplingDecision != 0) { + buf[i++] = (byte) 3; + buf[i++] = this.samplingDecision; + } + + buf[i++] = (byte) 0x7f; + return i; + } catch (ArrayIndexOutOfBoundsException e) { + if (i - offset > Span.colferSizeMax) + throw new IllegalStateException(format("colfer: kamon/context/generated/binary/span.Span exceeds %d bytes", Span.colferSizeMax)); + if (i > buf.length) throw new BufferOverflowException(); + throw e; + } + } + + /** + * Deserializes the object. + * @param buf the data source. + * @param offset the initial index for {@code buf}, inclusive. + * @return the final index for {@code buf}, exclusive. + * @throws BufferUnderflowException when {@code buf} is incomplete. (EOF) + * @throws SecurityException on an upper limit breach defined by {@link #colferSizeMax}. + * @throws InputMismatchException when the data does not match this object's schema. + */ + public int unmarshal(byte[] buf, int offset) { + return unmarshal(buf, offset, buf.length); + } + + /** + * Deserializes the object. + * @param buf the data source. + * @param offset the initial index for {@code buf}, inclusive. + * @param end the index limit for {@code buf}, exclusive. + * @return the final index for {@code buf}, exclusive. + * @throws BufferUnderflowException when {@code buf} is incomplete. (EOF) + * @throws SecurityException on an upper limit breach defined by {@link #colferSizeMax}. + * @throws InputMismatchException when the data does not match this object's schema. + */ + public int unmarshal(byte[] buf, int offset, int end) { + if (end > buf.length) end = buf.length; + int i = offset; + + try { + byte header = buf[i++]; + + if (header == (byte) 0) { + int size = 0; + for (int shift = 0; true; shift += 7) { + byte b = buf[i++]; + size |= (b & 0x7f) << shift; + if (shift == 28 || b >= 0) break; + } + if (size < 0 || size > Span.colferSizeMax) + throw new SecurityException(format("colfer: kamon/context/generated/binary/span.Span.traceID size %d exceeds %d bytes", size, Span.colferSizeMax)); + + this.traceID = new byte[size]; + int start = i; + i += size; + System.arraycopy(buf, start, this.traceID, 0, size); + + header = buf[i++]; + } + + if (header == (byte) 1) { + int size = 0; + for (int shift = 0; true; shift += 7) { + byte b = buf[i++]; + size |= (b & 0x7f) << shift; + if (shift == 28 || b >= 0) break; + } + if (size < 0 || size > Span.colferSizeMax) + throw new SecurityException(format("colfer: kamon/context/generated/binary/span.Span.spanID size %d exceeds %d bytes", size, Span.colferSizeMax)); + + this.spanID = new byte[size]; + int start = i; + i += size; + System.arraycopy(buf, start, this.spanID, 0, size); + + header = buf[i++]; + } + + if (header == (byte) 2) { + int size = 0; + for (int shift = 0; true; shift += 7) { + byte b = buf[i++]; + size |= (b & 0x7f) << shift; + if (shift == 28 || b >= 0) break; + } + if (size < 0 || size > Span.colferSizeMax) + throw new SecurityException(format("colfer: kamon/context/generated/binary/span.Span.parentID size %d exceeds %d bytes", size, Span.colferSizeMax)); + + this.parentID = new byte[size]; + int start = i; + i += size; + System.arraycopy(buf, start, this.parentID, 0, size); + + header = buf[i++]; + } + + if (header == (byte) 3) { + this.samplingDecision = buf[i++]; + header = buf[i++]; + } + + if (header != (byte) 0x7f) + throw new InputMismatchException(format("colfer: unknown header at byte %d", i - 1)); + } finally { + if (i > end && end - offset < Span.colferSizeMax) throw new BufferUnderflowException(); + if (i < 0 || i - offset > Span.colferSizeMax) + throw new SecurityException(format("colfer: kamon/context/generated/binary/span.Span exceeds %d bytes", Span.colferSizeMax)); + if (i > end) throw new BufferUnderflowException(); + } + + return i; + } + + // {@link Serializable} version number. + private static final long serialVersionUID = 4L; + + // {@link Serializable} Colfer extension. + private void writeObject(ObjectOutputStream out) throws IOException { + // TODO: better size estimation + byte[] buf = new byte[1024]; + int n; + while (true) try { + n = marshal(buf, 0); + break; + } catch (BufferUnderflowException e) { + buf = new byte[4 * buf.length]; + } + + out.writeInt(n); + out.write(buf, 0, n); + } + + // {@link Serializable} Colfer extension. + private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException { + init(); + + int n = in.readInt(); + byte[] buf = new byte[n]; + in.readFully(buf); + unmarshal(buf, 0); + } + + // {@link Serializable} Colfer extension. + private void readObjectNoData() throws ObjectStreamException { + init(); + } + + /** + * Gets kamon/context/generated/binary/span.Span.traceID. + * @return the value. + */ + public byte[] getTraceID() { + return this.traceID; + } + + /** + * Sets kamon/context/generated/binary/span.Span.traceID. + * @param value the replacement. + */ + public void setTraceID(byte[] value) { + this.traceID = value; + } + + /** + * Sets kamon/context/generated/binary/span.Span.traceID. + * @param value the replacement. + * @return {link this}. + */ + public Span withTraceID(byte[] value) { + this.traceID = value; + return this; + } + + /** + * Gets kamon/context/generated/binary/span.Span.spanID. + * @return the value. + */ + public byte[] getSpanID() { + return this.spanID; + } + + /** + * Sets kamon/context/generated/binary/span.Span.spanID. + * @param value the replacement. + */ + public void setSpanID(byte[] value) { + this.spanID = value; + } + + /** + * Sets kamon/context/generated/binary/span.Span.spanID. + * @param value the replacement. + * @return {link this}. + */ + public Span withSpanID(byte[] value) { + this.spanID = value; + return this; + } + + /** + * Gets kamon/context/generated/binary/span.Span.parentID. + * @return the value. + */ + public byte[] getParentID() { + return this.parentID; + } + + /** + * Sets kamon/context/generated/binary/span.Span.parentID. + * @param value the replacement. + */ + public void setParentID(byte[] value) { + this.parentID = value; + } + + /** + * Sets kamon/context/generated/binary/span.Span.parentID. + * @param value the replacement. + * @return {link this}. + */ + public Span withParentID(byte[] value) { + this.parentID = value; + return this; + } + + /** + * Gets kamon/context/generated/binary/span.Span.samplingDecision. + * @return the value. + */ + public byte getSamplingDecision() { + return this.samplingDecision; + } + + /** + * Sets kamon/context/generated/binary/span.Span.samplingDecision. + * @param value the replacement. + */ + public void setSamplingDecision(byte value) { + this.samplingDecision = value; + } + + /** + * Sets kamon/context/generated/binary/span.Span.samplingDecision. + * @param value the replacement. + * @return {link this}. + */ + public Span withSamplingDecision(byte value) { + this.samplingDecision = value; + return this; + } + + @Override + public final int hashCode() { + int h = 1; + for (byte b : this.traceID) h = 31 * h + b; + for (byte b : this.spanID) h = 31 * h + b; + for (byte b : this.parentID) h = 31 * h + b; + h = 31 * h + (this.samplingDecision & 0xff); + return h; + } + + @Override + public final boolean equals(Object o) { + return o instanceof Span && equals((Span) o); + } + + public final boolean equals(Span o) { + if (o == null) return false; + if (o == this) return true; + return o.getClass() == Span.class + && java.util.Arrays.equals(this.traceID, o.traceID) + && java.util.Arrays.equals(this.spanID, o.spanID) + && java.util.Arrays.equals(this.parentID, o.parentID) + && this.samplingDecision == o.samplingDecision; + } + +} diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 5b885fd0..e0002194 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -123,39 +123,27 @@ kamon { # with the name of the operation on the parent Span, if any. scope-spans-to-parent = yes } - - # The SpanContextCodecs are used to encode/decode the SpanContext data into simple TextMaps, HTTP Headers or Binary - # carriers. The decision about which one to use is based on the kamon.trace.SpanContextCodec.Format instance passed - # to inject/extract calls. - # - # Any external implementation can be configured here, as long as it can be instantiated with a single parameter - # constructor that accepts a IdentityProvider. - span-context-codec { - - # Encodes/Decodes the SpanContext data using a simple key/value pair. Since this is very rarely going to be used - # we default to using the same codec for HTTP Headers, as it is built on top of a TextMap. - text-map = "kamon.trace.SpanContextCodec$ExtendedB3" - - # Encodes/Decodes the SpanContext into a TextMap with HTTP Header friendly, URLEncoded values. The default - # implementation follows the guidelines of B3 propagation. See more at https://github.com/openzipkin/b3-propagation. - http-headers = "kamon.trace.SpanContextCodec$ExtendedB3" - - # Encodes/Decodes the SpanContext using a binary representation. - binary = "TODO" - } - - } context { - encoding { - http-headers { + # Codecs are used to encode/decode Context keys when a Context must be propagated either through HTTP headers or + # Binary transports. Only broadcast keys configured bellow will be processed by the context Codec. The FQCN of + # the appropriate Codecs for each key must be provided, otherwise keys will be ignored. + # + codecs { + + # Size of the encoding buffer for the Binary Codec. + binary-buffer-size = 256 + + # Codecs to be used when propagating a Context through a HTTP Headers transport. + http-headers-keys { span = "kamon.trace.SpanCodec$B3" } - binary { - # span = "kamon.trace.propagation.Binary" + # Codecs to be used when propagating a Context through a Binary transport. + binary-keys { + span = "kamon.trace.SpanCodec$Colfer" } } } diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 2c0561e2..f251b1ec 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -24,7 +24,7 @@ import scala.concurrent.Future import java.time.Duration import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor} -import kamon.context.{Codec, Context, Storage} +import kamon.context.{Codecs, Context, Storage} import org.slf4j.LoggerFactory import scala.util.Try @@ -39,10 +39,10 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler")) private val _metrics = new MetricRegistry(_config, _scheduler) - private val _reporters = new ReporterRegistryImpl(_metrics, _config) - private val _tracer = Tracer.Default(Kamon, _reporters, _config) + private val _reporterRegistry = new ReporterRegistry.Default(_metrics, _config) + private val _tracer = Tracer.Default(Kamon, _reporterRegistry, _config) private val _contextStorage = Storage.ThreadLocal() - private val _contextCodec = new Codec(_config) + private val _contextCodec = new Codecs(_config) private var _onReconfigureHooks = Seq.empty[OnReconfigureHook] def environment: Environment = @@ -56,7 +56,7 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { _environment = Environment.fromConfig(config) _filters = Filters.fromConfig(config) _metrics.reconfigure(config) - _reporters.reconfigure(config) + _reporterRegistry.reconfigure(config) _tracer.reconfigure(config) _contextCodec.reconfigure(config) @@ -100,7 +100,7 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { override def identityProvider: IdentityProvider = _tracer.identityProvider - def contextCodec(): Codec = + def contextCodec(): Codecs = _contextCodec def currentContext(): Context = @@ -120,22 +120,22 @@ object Kamon extends MetricLookup with ReporterRegistry with Tracer { override def loadReportersFromConfig(): Unit = - _reporters.loadReportersFromConfig() + _reporterRegistry.loadReportersFromConfig() override def addReporter(reporter: MetricReporter): Registration = - _reporters.addReporter(reporter) + _reporterRegistry.addReporter(reporter) override def addReporter(reporter: MetricReporter, name: String): Registration = - _reporters.addReporter(reporter, name) + _reporterRegistry.addReporter(reporter, name) override def addReporter(reporter: SpanReporter): Registration = - _reporters.addReporter(reporter) + _reporterRegistry.addReporter(reporter) override def addReporter(reporter: SpanReporter, name: String): Registration = - _reporters.addReporter(reporter, name) + _reporterRegistry.addReporter(reporter, name) override def stopAllReporters(): Future[Unit] = - _reporters.stopAllReporters() + _reporterRegistry.stopAllReporters() def filter(filterName: String, pattern: String): Boolean = _filters.accept(filterName, pattern) diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index f0d744e5..ff135f60 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -33,6 +33,20 @@ import scala.util.control.NonFatal import scala.collection.JavaConverters._ import scala.collection.concurrent.TrieMap +sealed trait Reporter { + def start(): Unit + def stop(): Unit + def reconfigure(config: Config): Unit +} + +trait MetricReporter extends Reporter { + def reportTickSnapshot(snapshot: TickSnapshot): Unit +} + +trait SpanReporter extends Reporter { + def reportSpans(spans: Seq[Span.FinishedSpan]): Unit +} + trait ReporterRegistry { def loadReportersFromConfig(): Unit @@ -48,274 +62,261 @@ object ReporterRegistry { private[kamon] trait SpanSink { def reportSpan(finishedSpan: FinishedSpan): Unit } -} - -sealed trait Reporter { - def start(): Unit - def stop(): Unit - def reconfigure(config: Config): Unit -} -trait MetricReporter extends Reporter { - def reportTickSnapshot(snapshot: TickSnapshot): Unit -} - -trait SpanReporter extends Reporter { - def reportSpans(spans: Seq[Span.FinishedSpan]): Unit -} - -class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry with SpanSink { - private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry]) - private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry")) - private val reporterCounter = new AtomicLong(0L) - private var registryConfiguration = readRegistryConfiguration(initialConfig) - - private val metricReporters = TrieMap[Long, MetricReporterEntry]() - private val metricReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() - private val spanReporters = TrieMap[Long, SpanReporterEntry]() - private val spanReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() - - - reconfigure(initialConfig) - - override def loadReportersFromConfig(): Unit = { - if(registryConfiguration.configuredReporters.isEmpty) - logger.info("The kamon.reporters setting is empty, no reporters have been started.") - else { - registryConfiguration.configuredReporters.foreach { reporterFQCN => - val dynamicAccess = new DynamicAccess(getClass.getClassLoader) - dynamicAccess.createInstanceFor[Reporter](reporterFQCN, Nil).map({ - case mr: MetricReporter => - addMetricReporter(mr, "loaded-from-config: " + reporterFQCN) - logger.info("Loaded metric reporter [{}]", reporterFQCN) - - case sr: SpanReporter => - addSpanReporter(sr, "loaded-from-config: " + reporterFQCN) - logger.info("Loaded span reporter [{}]", reporterFQCN) - - }).failed.foreach { - t => logger.error(s"Failed to load configured reporter [$reporterFQCN]", t) + private[kamon] class Default(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry with SpanSink { + private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry]) + private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry")) + private val reporterCounter = new AtomicLong(0L) + private var registryConfiguration = readRegistryConfiguration(initialConfig) + + private val metricReporters = TrieMap[Long, MetricReporterEntry]() + private val metricReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() + private val spanReporters = TrieMap[Long, SpanReporterEntry]() + private val spanReporterTickerSchedule = new AtomicReference[ScheduledFuture[_]]() + + + reconfigure(initialConfig) + + override def loadReportersFromConfig(): Unit = { + if(registryConfiguration.configuredReporters.isEmpty) + logger.info("The kamon.reporters setting is empty, no reporters have been started.") + else { + registryConfiguration.configuredReporters.foreach { reporterFQCN => + val dynamicAccess = new DynamicAccess(getClass.getClassLoader) + dynamicAccess.createInstanceFor[Reporter](reporterFQCN, Nil).map({ + case mr: MetricReporter => + addMetricReporter(mr, "loaded-from-config: " + reporterFQCN) + logger.info("Loaded metric reporter [{}]", reporterFQCN) + + case sr: SpanReporter => + addSpanReporter(sr, "loaded-from-config: " + reporterFQCN) + logger.info("Loaded span reporter [{}]", reporterFQCN) + + }).failed.foreach { + t => logger.error(s"Failed to load configured reporter [$reporterFQCN]", t) + } } } } - } - override def addReporter(reporter: MetricReporter): Registration = - addMetricReporter(reporter, reporter.getClass.getName()) + override def addReporter(reporter: MetricReporter): Registration = + addMetricReporter(reporter, reporter.getClass.getName()) - override def addReporter(reporter: MetricReporter, name: String): Registration = - addMetricReporter(reporter, name) + override def addReporter(reporter: MetricReporter, name: String): Registration = + addMetricReporter(reporter, name) - override def addReporter(reporter: SpanReporter): Registration = - addSpanReporter(reporter, reporter.getClass.getName()) + override def addReporter(reporter: SpanReporter): Registration = + addSpanReporter(reporter, reporter.getClass.getName()) - override def addReporter(reporter: SpanReporter, name: String): Registration = - addSpanReporter(reporter, name) + override def addReporter(reporter: SpanReporter, name: String): Registration = + addSpanReporter(reporter, name) - private def addMetricReporter(reporter: MetricReporter, name: String): Registration = synchronized { - val executor = Executors.newSingleThreadExecutor(threadFactory(name)) - val reporterEntry = new MetricReporterEntry( - id = reporterCounter.getAndIncrement(), - name = name, - reporter = reporter, - executionContext = ExecutionContext.fromExecutorService(executor) - ) + private def addMetricReporter(reporter: MetricReporter, name: String): Registration = synchronized { + val executor = Executors.newSingleThreadExecutor(threadFactory(name)) + val reporterEntry = new MetricReporterEntry( + id = reporterCounter.getAndIncrement(), + name = name, + reporter = reporter, + executionContext = ExecutionContext.fromExecutorService(executor) + ) - Future(reporterEntry.reporter.start())(reporterEntry.executionContext) + Future(reporterEntry.reporter.start())(reporterEntry.executionContext) - if(metricReporters.isEmpty) - reStartMetricTicker() + if(metricReporters.isEmpty) + reStartMetricTicker() - metricReporters.put(reporterEntry.id, reporterEntry) - createRegistration(reporterEntry.id, metricReporters) + metricReporters.put(reporterEntry.id, reporterEntry) + createRegistration(reporterEntry.id, metricReporters) - } + } - private def addSpanReporter(reporter: SpanReporter, name: String): Registration = synchronized { - val executor = Executors.newSingleThreadExecutor(threadFactory(name)) - val reporterEntry = new SpanReporterEntry( - id = reporterCounter.incrementAndGet(), - name = name, - reporter = reporter, - bufferCapacity = registryConfiguration.traceReporterQueueSize, - executionContext = ExecutionContext.fromExecutorService(executor) - ) + private def addSpanReporter(reporter: SpanReporter, name: String): Registration = synchronized { + val executor = Executors.newSingleThreadExecutor(threadFactory(name)) + val reporterEntry = new SpanReporterEntry( + id = reporterCounter.incrementAndGet(), + name = name, + reporter = reporter, + bufferCapacity = registryConfiguration.traceReporterQueueSize, + executionContext = ExecutionContext.fromExecutorService(executor) + ) - Future(reporterEntry.reporter.start())(reporterEntry.executionContext) + Future(reporterEntry.reporter.start())(reporterEntry.executionContext) - if(spanReporters.isEmpty) - reStartTraceTicker() + if(spanReporters.isEmpty) + reStartTraceTicker() - spanReporters.put(reporterEntry.id, reporterEntry) - createRegistration(reporterEntry.id, spanReporters) - } + spanReporters.put(reporterEntry.id, reporterEntry) + createRegistration(reporterEntry.id, spanReporters) + } - private def createRegistration(id: Long, target: TrieMap[Long, _]): Registration = new Registration { - override def cancel(): Boolean = - target.remove(id).nonEmpty - } + private def createRegistration(id: Long, target: TrieMap[Long, _]): Registration = new Registration { + override def cancel(): Boolean = + target.remove(id).nonEmpty + } - override def stopAllReporters(): Future[Unit] = { - implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext) - val reporterStopFutures = Vector.newBuilder[Future[Unit]] + override def stopAllReporters(): Future[Unit] = { + implicit val stopReporterExeContext = ExecutionContext.fromExecutor(registryExecutionContext) + val reporterStopFutures = Vector.newBuilder[Future[Unit]] - while(metricReporters.nonEmpty) { - val (idToRemove, _) = metricReporters.head - metricReporters.remove(idToRemove).foreach { entry => - reporterStopFutures += stopMetricReporter(entry) + while(metricReporters.nonEmpty) { + val (idToRemove, _) = metricReporters.head + metricReporters.remove(idToRemove).foreach { entry => + reporterStopFutures += stopMetricReporter(entry) + } } - } - while(spanReporters.nonEmpty) { - val (idToRemove, _) = spanReporters.head - spanReporters.remove(idToRemove).foreach { entry => - reporterStopFutures += stopSpanReporter(entry) + while(spanReporters.nonEmpty) { + val (idToRemove, _) = spanReporters.head + spanReporters.remove(idToRemove).foreach { entry => + reporterStopFutures += stopSpanReporter(entry) + } } - } - Future.sequence(reporterStopFutures.result()).map(_ => Try((): Unit)) - } + Future.sequence(reporterStopFutures.result()).map(_ => Try((): Unit)) + } - private[kamon] def reconfigure(config: Config): Unit = synchronized { - val newConfig = readRegistryConfiguration(config) + private[kamon] def reconfigure(config: Config): Unit = synchronized { + val newConfig = readRegistryConfiguration(config) - if(newConfig.metricTickInterval != registryConfiguration.metricTickInterval && metricReporters.nonEmpty) - reStartMetricTicker() + if(newConfig.metricTickInterval != registryConfiguration.metricTickInterval && metricReporters.nonEmpty) + reStartMetricTicker() - if(newConfig.traceTickInterval != registryConfiguration.metricTickInterval && spanReporters.nonEmpty) - reStartTraceTicker() + if(newConfig.traceTickInterval != registryConfiguration.metricTickInterval && spanReporters.nonEmpty) + reStartTraceTicker() - // Reconfigure all registered reporters - metricReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) } - spanReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) } - registryConfiguration = newConfig - } + // Reconfigure all registered reporters + metricReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) } + spanReporters.foreach { case (_, entry) => Future(entry.reporter.reconfigure(config))(entry.executionContext) } + registryConfiguration = newConfig + } - private def reStartMetricTicker(): Unit = { - val tickIntervalMillis = registryConfiguration.metricTickInterval.toMillis - val currentMetricTicker = metricReporterTickerSchedule.get() + private def reStartMetricTicker(): Unit = { + val tickIntervalMillis = registryConfiguration.metricTickInterval.toMillis + val currentMetricTicker = metricReporterTickerSchedule.get() - if(currentMetricTicker != null) - currentMetricTicker.cancel(false) + if(currentMetricTicker != null) + currentMetricTicker.cancel(false) - metricReporterTickerSchedule.set { - registryExecutionContext.scheduleAtFixedRate( - new MetricReporterTicker(metrics, metricReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS - ) + metricReporterTickerSchedule.set { + registryExecutionContext.scheduleAtFixedRate( + new MetricReporterTicker(metrics, metricReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS + ) + } } - } - private def reStartTraceTicker(): Unit = { - val tickIntervalMillis = registryConfiguration.traceTickInterval.toMillis - val currentSpanTicker = spanReporterTickerSchedule.get() - if(currentSpanTicker != null) - currentSpanTicker.cancel(false) + private def reStartTraceTicker(): Unit = { + val tickIntervalMillis = registryConfiguration.traceTickInterval.toMillis + val currentSpanTicker = spanReporterTickerSchedule.get() + if(currentSpanTicker != null) + currentSpanTicker.cancel(false) - spanReporterTickerSchedule.set { - registryExecutionContext.scheduleAtFixedRate( - new SpanReporterTicker(spanReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS - ) + spanReporterTickerSchedule.set { + registryExecutionContext.scheduleAtFixedRate( + new SpanReporterTicker(spanReporters), tickIntervalMillis, tickIntervalMillis, TimeUnit.MILLISECONDS + ) + } } - } - def reportSpan(span: Span.FinishedSpan): Unit = { - spanReporters.foreach { case (_, reporterEntry) => - if(reporterEntry.isActive) - reporterEntry.buffer.offer(span) + def reportSpan(span: Span.FinishedSpan): Unit = { + spanReporters.foreach { case (_, reporterEntry) => + if(reporterEntry.isActive) + reporterEntry.buffer.offer(span) + } } - } - private def stopMetricReporter(entry: MetricReporterEntry): Future[Unit] = { - entry.isActive = false + private def stopMetricReporter(entry: MetricReporterEntry): Future[Unit] = { + entry.isActive = false - Future(entry.reporter.stop())(entry.executionContext).andThen { - case _ => entry.executionContext.shutdown() - }(ExecutionContext.fromExecutor(registryExecutionContext)) - } + Future(entry.reporter.stop())(entry.executionContext).andThen { + case _ => entry.executionContext.shutdown() + }(ExecutionContext.fromExecutor(registryExecutionContext)) + } - private def stopSpanReporter(entry: SpanReporterEntry): Future[Unit] = { - entry.isActive = false + private def stopSpanReporter(entry: SpanReporterEntry): Future[Unit] = { + entry.isActive = false - Future(entry.reporter.stop())(entry.executionContext).andThen { - case _ => entry.executionContext.shutdown() - }(ExecutionContext.fromExecutor(registryExecutionContext)) - } + Future(entry.reporter.stop())(entry.executionContext).andThen { + case _ => entry.executionContext.shutdown() + }(ExecutionContext.fromExecutor(registryExecutionContext)) + } - private class MetricReporterEntry( - @volatile var isActive: Boolean = true, - val id: Long, - val name: String, - val reporter: MetricReporter, - val executionContext: ExecutionContextExecutorService - ) - - private class SpanReporterEntry( - @volatile var isActive: Boolean = true, - val id: Long, - val name: String, - val reporter: SpanReporter, - val bufferCapacity: Int, - val executionContext: ExecutionContextExecutorService - ) { - val buffer = new ArrayBlockingQueue[Span.FinishedSpan](bufferCapacity) - } + private class MetricReporterEntry( + @volatile var isActive: Boolean = true, + val id: Long, + val name: String, + val reporter: MetricReporter, + val executionContext: ExecutionContextExecutorService + ) - private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry]) extends Runnable { - val logger = LoggerFactory.getLogger(classOf[MetricReporterTicker]) - var lastTick = System.currentTimeMillis() + private class SpanReporterEntry( + @volatile var isActive: Boolean = true, + val id: Long, + val name: String, + val reporter: SpanReporter, + val bufferCapacity: Int, + val executionContext: ExecutionContextExecutorService + ) { + val buffer = new ArrayBlockingQueue[Span.FinishedSpan](bufferCapacity) + } - def run(): Unit = try { - val currentTick = System.currentTimeMillis() - val tickSnapshot = TickSnapshot( - interval = Interval(lastTick, currentTick), - metrics = snapshotGenerator.snapshot() - ) + private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry]) extends Runnable { + val logger = LoggerFactory.getLogger(classOf[MetricReporterTicker]) + var lastTick = System.currentTimeMillis() - reporterEntries.foreach { case (_, entry) => - Future { - Try { - if (entry.isActive) - entry.reporter.reportTickSnapshot(tickSnapshot) + def run(): Unit = try { + val currentTick = System.currentTimeMillis() + val tickSnapshot = TickSnapshot( + interval = Interval(lastTick, currentTick), + metrics = snapshotGenerator.snapshot() + ) - }.failed.foreach { error => - logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error) - } + reporterEntries.foreach { case (_, entry) => + Future { + Try { + if (entry.isActive) + entry.reporter.reportTickSnapshot(tickSnapshot) - }(entry.executionContext) - } + }.failed.foreach { error => + logger.error(s"Reporter [${entry.name}] failed to process a metrics tick.", error) + } - lastTick = currentTick + }(entry.executionContext) + } - } catch { - case NonFatal(t) => logger.error("Error while running a tick", t) + lastTick = currentTick + + } catch { + case NonFatal(t) => logger.error("Error while running a tick", t) + } } - } - private class SpanReporterTicker(spanReporters: TrieMap[Long, SpanReporterEntry]) extends Runnable { - override def run(): Unit = { - spanReporters.foreach { - case (_, entry) => + private class SpanReporterTicker(spanReporters: TrieMap[Long, SpanReporterEntry]) extends Runnable { + override def run(): Unit = { + spanReporters.foreach { + case (_, entry) => - val spanBatch = new java.util.ArrayList[Span.FinishedSpan](entry.bufferCapacity) - entry.buffer.drainTo(spanBatch, entry.bufferCapacity) + val spanBatch = new java.util.ArrayList[Span.FinishedSpan](entry.bufferCapacity) + entry.buffer.drainTo(spanBatch, entry.bufferCapacity) - Future { - entry.reporter.reportSpans(spanBatch.asScala) - }(entry.executionContext) + Future { + entry.reporter.reportSpans(spanBatch.asScala) + }(entry.executionContext) + } } } - } - private def readRegistryConfiguration(config: Config): Configuration = - Configuration( - metricTickInterval = config.getDuration("kamon.metric.tick-interval"), - traceTickInterval = config.getDuration("kamon.trace.tick-interval"), - traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size"), - configuredReporters = config.getStringList("kamon.reporters").asScala - ) + private def readRegistryConfiguration(config: Config): Configuration = + Configuration( + metricTickInterval = config.getDuration("kamon.metric.tick-interval"), + traceTickInterval = config.getDuration("kamon.trace.tick-interval"), + traceReporterQueueSize = config.getInt("kamon.trace.reporter-queue-size"), + configuredReporters = config.getStringList("kamon.reporters").asScala + ) + + private case class Configuration(metricTickInterval: Duration, traceTickInterval: Duration, + traceReporterQueueSize: Int, configuredReporters: Seq[String]) + } +} - private case class Configuration(metricTickInterval: Duration, traceTickInterval: Duration, - traceReporterQueueSize: Int, configuredReporters: Seq[String]) -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/context/Codec.scala b/kamon-core/src/main/scala/kamon/context/Codec.scala deleted file mode 100644 index 19ec2a20..00000000 --- a/kamon-core/src/main/scala/kamon/context/Codec.scala +++ /dev/null @@ -1,144 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - -package kamon -package context - -import com.typesafe.config.Config -import kamon.util.DynamicAccess -import org.slf4j.LoggerFactory - -import scala.collection.mutable - -class Codec(initialConfig: Config) { - private val log = LoggerFactory.getLogger(classOf[Codec]) - - @volatile private var httpHeaders: Codec.ForContext[TextMap] = new Codec.HttpHeaders(Map.empty) - //val Binary: Codec.ForContext[ByteBuffer] = _ - reconfigure(initialConfig) - - - def HttpHeaders: Codec.ForContext[TextMap] = - httpHeaders - - def reconfigure(config: Config): Unit = { - httpHeaders = new Codec.HttpHeaders(readEntryCodecs("kamon.context.encoding.http-headers", config)) - } - - private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codec.ForEntry[T]] = { - val rootConfig = config.getConfig(rootKey) - val dynamic = new DynamicAccess(getClass.getClassLoader) - val entries = Map.newBuilder[String, Codec.ForEntry[T]] - - rootConfig.topLevelKeys.foreach(key => { - try { - val fqcn = rootConfig.getString(key) - entries += ((key, dynamic.createInstanceFor[Codec.ForEntry[T]](fqcn, Nil).get)) - } catch { - case e: Throwable => - log.error(s"Failed to initialize codec for key [$key]", e) - } - }) - - entries.result() - } -} - -object Codec { - - trait ForContext[T] { - def encode(context: Context): T - def decode(carrier: T): Context - } - - trait ForEntry[T] { - def encode(context: Context): T - def decode(carrier: T, context: Context): Context - } - - final class HttpHeaders(entryCodecs: Map[String, Codec.ForEntry[TextMap]]) extends Codec.ForContext[TextMap] { - private val log = LoggerFactory.getLogger(classOf[HttpHeaders]) - - override def encode(context: Context): TextMap = { - val encoded = TextMap.Default() - - context.entries.foreach { - case (key, _) if key.broadcast => - entryCodecs.get(key.name) match { - case Some(codec) => - try { - codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2)) - } catch { - case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e) - } - - case None => - log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name) - } - } - - encoded - } - - override def decode(carrier: TextMap): Context = { - var context: Context = Context.Empty - - try { - context = entryCodecs.foldLeft(context)((ctx, codecEntry) => { - val (_, codec) = codecEntry - codec.decode(carrier, ctx) - }) - - } catch { - case e: Throwable => - log.error("Failed to decode context from HttpHeaders", e) - } - - context - } - } -} - - -trait TextMap { - - def get(key: String): Option[String] - - def put(key: String, value: String): Unit - - def values: Iterator[(String, String)] -} - -object TextMap { - - class Default extends TextMap { - private val storage = - mutable.Map.empty[String, String] - - override def get(key: String): Option[String] = - storage.get(key) - - override def put(key: String, value: String): Unit = - storage.put(key, value) - - override def values: Iterator[(String, String)] = - storage.toIterator - } - - object Default { - def apply(): Default = - new Default() - } -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/context/Codecs.scala b/kamon-core/src/main/scala/kamon/context/Codecs.scala new file mode 100644 index 00000000..b50e991d --- /dev/null +++ b/kamon-core/src/main/scala/kamon/context/Codecs.scala @@ -0,0 +1,245 @@ +/* ========================================================================================= + * Copyright © 2013-2017 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon +package context + +import java.nio.ByteBuffer + +import com.typesafe.config.Config +import kamon.util.DynamicAccess +import org.slf4j.LoggerFactory +import kamon.context.generated.binary.context.{Context => ColferContext, Entry => ColferEntry} + +import scala.collection.mutable + +class Codecs(initialConfig: Config) { + private val log = LoggerFactory.getLogger(classOf[Codecs]) + @volatile private var httpHeaders: Codecs.ForContext[TextMap] = new Codecs.HttpHeaders(Map.empty) + @volatile private var binary: Codecs.ForContext[ByteBuffer] = new Codecs.Binary(256, Map.empty) + + reconfigure(initialConfig) + + + def HttpHeaders: Codecs.ForContext[TextMap] = + httpHeaders + + def Binary: Codecs.ForContext[ByteBuffer] = + binary + + def reconfigure(config: Config): Unit = { + try { + val codecsConfig = config.getConfig("kamon.context.codecs") + httpHeaders = new Codecs.HttpHeaders(readEntryCodecs("http-headers-keys", codecsConfig)) + binary = new Codecs.Binary(codecsConfig.getBytes("binary-buffer-size"), readEntryCodecs("binary-keys", codecsConfig)) + } catch { + case t: Throwable => log.error("Failed to initialize Context Codecs", t) + } + } + + private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codecs.ForEntry[T]] = { + val rootConfig = config.getConfig(rootKey) + val dynamic = new DynamicAccess(getClass.getClassLoader) + val entries = Map.newBuilder[String, Codecs.ForEntry[T]] + + rootConfig.topLevelKeys.foreach(key => { + try { + val fqcn = rootConfig.getString(key) + entries += ((key, dynamic.createInstanceFor[Codecs.ForEntry[T]](fqcn, Nil).get)) + } catch { + case e: Throwable => + log.error(s"Failed to initialize codec for key [$key]", e) + } + }) + + entries.result() + } +} + +object Codecs { + + trait ForContext[T] { + def encode(context: Context): T + def decode(carrier: T): Context + } + + trait ForEntry[T] { + def encode(context: Context): T + def decode(carrier: T, context: Context): Context + } + + final class HttpHeaders(entryCodecs: Map[String, Codecs.ForEntry[TextMap]]) extends Codecs.ForContext[TextMap] { + private val log = LoggerFactory.getLogger(classOf[HttpHeaders]) + + override def encode(context: Context): TextMap = { + val encoded = TextMap.Default() + + context.entries.foreach { + case (key, _) if key.broadcast => + entryCodecs.get(key.name) match { + case Some(codec) => + try { + codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2)) + } catch { + case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e) + } + + case None => + log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name) + } + + case _ => // All non-broadcast keys should be ignored. + } + + encoded + } + + override def decode(carrier: TextMap): Context = { + var context: Context = Context.Empty + + try { + context = entryCodecs.foldLeft(context)((ctx, codecEntry) => { + val (_, codec) = codecEntry + codec.decode(carrier, ctx) + }) + + } catch { + case e: Throwable => + log.error("Failed to decode context from HttpHeaders", e) + } + + context + } + } + + + final class Binary(bufferSize: Long, entryCodecs: Map[String, Codecs.ForEntry[ByteBuffer]]) extends Codecs.ForContext[ByteBuffer] { + private val log = LoggerFactory.getLogger(classOf[Binary]) + private val binaryBuffer = newThreadLocalBuffer(bufferSize) + private val emptyBuffer = ByteBuffer.allocate(0) + + override def encode(context: Context): ByteBuffer = { + val entries = context.entries + if(entries.isEmpty) + emptyBuffer + else { + var colferEntries: List[ColferEntry] = Nil + entries.foreach { + case (key, _) if key.broadcast => + entryCodecs.get(key.name) match { + case Some(entryCodec) => + try { + val entryData = entryCodec.encode(context) + if(entryData.capacity() > 0) { + val colferEntry = new ColferEntry() + colferEntry.setName(key.name) + colferEntry.setContent(entryData.array()) + colferEntries = colferEntry :: colferEntries + } + } catch { + case throwable: Throwable => + log.error(s"Failed to encode broadcast context key [${key.name}]", throwable) + } + + case None => + log.error("Failed to encode broadcast context key [{}]. No codec found.", key.name) + } + + case _ => // All non-broadcast keys should be ignored. + } + + if(colferEntries.isEmpty) + emptyBuffer + else { + val buffer = binaryBuffer.get() + val colferContext = new ColferContext() + colferContext.setEntries(colferEntries.toArray) + val marshalledSize = colferContext.marshal(buffer, 0) + + val data = Array.ofDim[Byte](marshalledSize) + System.arraycopy(buffer, 0, data, 0, marshalledSize) + ByteBuffer.wrap(data) + } + } + } + + + override def decode(carrier: ByteBuffer): Context = { + if(carrier.capacity() == 0) + Context.Empty + else { + var context: Context = Context.Empty + + try { + val colferContext = new ColferContext() + colferContext.unmarshal(carrier.array(), 0) + + colferContext.entries.foreach(colferEntry => { + entryCodecs.get(colferEntry.getName()) match { + case Some(entryCodec) => + context = entryCodec.decode(ByteBuffer.wrap(colferEntry.content), context) + + case None => + log.error("Failed to decode entry [{}] with Binary context codec. No entry found for the key.", colferEntry.getName()) + } + }) + + } catch { + case e: Throwable => + log.error("Failed to decode context from Binary", e) + } + + context + } + } + + + private def newThreadLocalBuffer(size: Long): ThreadLocal[Array[Byte]] = new ThreadLocal[Array[Byte]] { + override def initialValue(): Array[Byte] = Array.ofDim[Byte](size.toInt) + } + } +} + + +trait TextMap { + + def get(key: String): Option[String] + + def put(key: String, value: String): Unit + + def values: Iterator[(String, String)] +} + +object TextMap { + + class Default extends TextMap { + private val storage = + mutable.Map.empty[String, String] + + override def get(key: String): Option[String] = + storage.get(key) + + override def put(key: String, value: String): Unit = + storage.put(key, value) + + override def values: Iterator[(String, String)] = + storage.toIterator + } + + object Default { + def apply(): Default = + new Default() + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala index ea28142e..3158aa73 100644 --- a/kamon-core/src/main/scala/kamon/trace/Span.scala +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -195,9 +195,9 @@ object Span { object Local { def apply(spanContext: SpanContext, parent: Option[Span], initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], - initialMetricTags: Map[String, String], startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl, + initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink, scopeSpanMetrics: Boolean): Local = - new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry, scopeSpanMetrics) + new Local(spanContext, parent, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, spanSink, scopeSpanMetrics) } diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala index 96317696..ae78ee67 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala @@ -16,15 +16,17 @@ package kamon.trace import java.net.{URLDecoder, URLEncoder} +import java.nio.ByteBuffer import kamon.Kamon -import kamon.context.{Codec, Context, TextMap} +import kamon.context.{Codecs, Context, TextMap} +import kamon.context.generated.binary.span.{Span => ColferSpan} import kamon.trace.SpanContext.SamplingDecision object SpanCodec { - class B3 extends Codec.ForEntry[TextMap] { + class B3 extends Codecs.ForEntry[TextMap] { import B3.Headers override def encode(context: Context): TextMap = { @@ -96,4 +98,69 @@ object SpanCodec { val Flags = "X-B3-Flags" } } + + + class Colfer extends Codecs.ForEntry[ByteBuffer] { + val emptyBuffer = ByteBuffer.allocate(0) + + override def encode(context: Context): ByteBuffer = { + val span = context.get(Span.ContextKey) + if(span.nonEmpty()) { + val marshalBuffer = Colfer.codecBuffer.get() + val colferSpan = new ColferSpan() + val spanContext = span.context() + + colferSpan.setTraceID(spanContext.traceID.bytes) + colferSpan.setSpanID(spanContext.spanID.bytes) + colferSpan.setParentID(spanContext.parentID.bytes) + colferSpan.setSamplingDecision(samplingDecisionToByte(spanContext.samplingDecision)) + + val marshalledSize = colferSpan.marshal(marshalBuffer, 0) + val buffer = ByteBuffer.allocate(marshalledSize) + buffer.put(marshalBuffer, 0, marshalledSize) + buffer + + } else emptyBuffer + } + + override def decode(carrier: ByteBuffer, context: Context): Context = { + carrier.clear() + + if(carrier.capacity() == 0) + context + else { + val identityProvider = Kamon.tracer.identityProvider + val colferSpan = new ColferSpan() + colferSpan.unmarshal(carrier.array(), 0) + + val spanContext = SpanContext( + traceID = identityProvider.traceIdGenerator().from(colferSpan.traceID), + spanID = identityProvider.traceIdGenerator().from(colferSpan.spanID), + parentID = identityProvider.traceIdGenerator().from(colferSpan.parentID), + samplingDecision = byteToSamplingDecision(colferSpan.samplingDecision) + ) + + context.withKey(Span.ContextKey, Span.Remote(spanContext)) + } + } + + + private def samplingDecisionToByte(samplingDecision: SamplingDecision): Byte = samplingDecision match { + case SamplingDecision.Sample => 1 + case SamplingDecision.DoNotSample => 2 + case SamplingDecision.Unknown => 3 + } + + private def byteToSamplingDecision(byte: Byte): SamplingDecision = byte match { + case 1 => SamplingDecision.Sample + case 2 => SamplingDecision.DoNotSample + case _ => SamplingDecision.Unknown + } + } + + object Colfer { + private val codecBuffer = new ThreadLocal[Array[Byte]] { + override def initialValue(): Array[Byte] = Array.ofDim[Byte](256) + } + } } \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index f2f1918c..5f61f3aa 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -16,7 +16,8 @@ package kamon.trace import com.typesafe.config.Config -import kamon.{Kamon, ReporterRegistryImpl} +import kamon.ReporterRegistry.SpanSink +import kamon.Kamon import kamon.metric.MetricLookup import kamon.trace.Span.TagValue import kamon.trace.SpanContext.SamplingDecision @@ -34,7 +35,7 @@ trait Tracer { object Tracer { - final class Default(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config) extends Tracer { + final class Default(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config) extends Tracer { private val logger = LoggerFactory.getLogger(classOf[Tracer]) private[Tracer] val tracerMetrics = new TracerMetrics(metrics) @@ -46,7 +47,7 @@ object Tracer { reconfigure(initialConfig) override def buildSpan(operationName: String): SpanBuilder = - new SpanBuilder(operationName, this, reporterRegistry) + new SpanBuilder(operationName, this, spanSink) override def identityProvider: IdentityProvider = this._identityProvider @@ -84,11 +85,11 @@ object Tracer { } object Default { - def apply(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config): Default = - new Default(metrics, reporterRegistry, initialConfig) + def apply(metrics: MetricLookup, spanSink: SpanSink, initialConfig: Config): Default = + new Default(metrics, spanSink, initialConfig) } - final class SpanBuilder(operationName: String, tracer: Tracer.Default, reporterRegistry: ReporterRegistryImpl) { + final class SpanBuilder(operationName: String, tracer: Tracer.Default, spanSink: SpanSink) { private var parentSpan: Span = _ private var startTimestamp = 0L private var initialSpanTags = Map.empty[String, Span.TagValue] @@ -157,7 +158,7 @@ object Tracer { } tracer.tracerMetrics.createdSpans.increment() - Span.Local(spanContext, nonRemoteParent, operationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry, tracer.scopeSpanMetrics) + Span.Local(spanContext, nonRemoteParent, operationName, initialSpanTags, initialMetricTags, startTimestampMicros, spanSink, tracer.scopeSpanMetrics) } private def joinParentContext(parent: Span, samplingDecision: SamplingDecision): SpanContext = -- cgit v1.2.3