diff options
Diffstat (limited to 'kamon-core/src/main')
18 files changed, 1671 insertions, 516 deletions
diff --git a/kamon-core/src/main/colfer/context.colf b/kamon-core/src/main/colfer/context.colf new file mode 100644 index 00000000..26421cba --- /dev/null +++ b/kamon-core/src/main/colfer/context.colf @@ -0,0 +1,10 @@ +package kamon + +type Entry struct { + name text + content binary +} + +type Context struct { + entries []Entry +}
\ No newline at end of file diff --git a/kamon-core/src/main/java/kamon/context/encoding/Context.java b/kamon-core/src/main/java/kamon/context/encoding/Context.java new file mode 100644 index 00000000..db6ed7a9 --- /dev/null +++ b/kamon-core/src/main/java/kamon/context/encoding/Context.java @@ -0,0 +1,359 @@ +package kamon.context.encoding; + + +// Code generated by colf(1); DO NOT EDIT. + + +import static java.lang.String.format; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.ObjectStreamException; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.InputMismatchException; +import java.nio.BufferOverflowException; +import java.nio.BufferUnderflowException; + + +/** + * Data bean with built-in serialization support. + + * @author generated by colf(1) + * @see <a href="https://github.com/pascaldekloe/colfer">Colfer's home</a> + */ +@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file context.colf") +public class Context implements Serializable { + + /** The upper limit for serial byte sizes. */ + public static int colferSizeMax = 16 * 1024 * 1024; + + /** The upper limit for the number of elements in a list. */ + public static int colferListMax = 64 * 1024; + + + + + public Entry[] entries; + + + /** Default constructor */ + public Context() { + init(); + } + + private static final Entry[] _zeroEntries = new Entry[0]; + + /** Colfer zero values. */ + private void init() { + entries = _zeroEntries; + } + + /** + * {@link #reset(InputStream) Reusable} deserialization of Colfer streams. + */ + public static class Unmarshaller { + + /** The data source. */ + protected InputStream in; + + /** The read buffer. */ + public byte[] buf; + + /** The {@link #buf buffer}'s data start index, inclusive. */ + protected int offset; + + /** The {@link #buf buffer}'s data end index, exclusive. */ + protected int i; + + + /** + * @param in the data source or {@code null}. + * @param buf the initial buffer or {@code null}. + */ + public Unmarshaller(InputStream in, byte[] buf) { + // TODO: better size estimation + if (buf == null || buf.length == 0) + buf = new byte[Math.min(Context.colferSizeMax, 2048)]; + this.buf = buf; + reset(in); + } + + /** + * Reuses the marshaller. + * @param in the data source or {@code null}. + * @throws IllegalStateException on pending data. + */ + public void reset(InputStream in) { + if (this.i != this.offset) throw new IllegalStateException("colfer: pending data"); + this.in = in; + this.offset = 0; + this.i = 0; + } + + /** + * Deserializes the following object. + * @return the result or {@code null} when EOF. + * @throws IOException from the input stream. + * @throws SecurityException on an upper limit breach defined by either {@link #colferSizeMax} or {@link #colferListMax}. + * @throws InputMismatchException when the data does not match this object's schema. + */ + public Context next() throws IOException { + if (in == null) return null; + + while (true) { + if (this.i > this.offset) { + try { + Context o = new Context(); + this.offset = o.unmarshal(this.buf, this.offset, this.i); + return o; + } catch (BufferUnderflowException e) { + } + } + // not enough data + + if (this.i <= this.offset) { + this.offset = 0; + this.i = 0; + } else if (i == buf.length) { + byte[] src = this.buf; + // TODO: better size estimation + if (offset == 0) this.buf = new byte[Math.min(Context.colferSizeMax, this.buf.length * 4)]; + System.arraycopy(src, this.offset, this.buf, 0, this.i - this.offset); + this.i -= this.offset; + this.offset = 0; + } + assert this.i < this.buf.length; + + int n = in.read(buf, i, buf.length - i); + if (n < 0) { + if (this.i > this.offset) + throw new InputMismatchException("colfer: pending data with EOF"); + return null; + } + assert n > 0; + i += n; + } + } + + } + + + /** + * Serializes the object. + * All {@code null} elements in {@link #entries} will be replaced with a {@code new} value. + * @param out the data destination. + * @param buf the initial buffer or {@code null}. + * @return the final buffer. When the serial fits into {@code buf} then the return is {@code buf}. + * Otherwise the return is a new buffer, large enough to hold the whole serial. + * @throws IOException from {@code out}. + * @throws IllegalStateException on an upper limit breach defined by either {@link #colferSizeMax} or {@link #colferListMax}. + */ + public byte[] marshal(OutputStream out, byte[] buf) throws IOException { + // TODO: better size estimation + if (buf == null || buf.length == 0) + buf = new byte[Math.min(Context.colferSizeMax, 2048)]; + + while (true) { + int i; + try { + i = marshal(buf, 0); + } catch (BufferOverflowException e) { + buf = new byte[Math.min(Context.colferSizeMax, buf.length * 4)]; + continue; + } + + out.write(buf, 0, i); + return buf; + } + } + + /** + * Serializes the object. + * All {@code null} elements in {@link #entries} will be replaced with a {@code new} value. + * @param buf the data destination. + * @param offset the initial index for {@code buf}, inclusive. + * @return the final index for {@code buf}, exclusive. + * @throws BufferOverflowException when {@code buf} is too small. + * @throws IllegalStateException on an upper limit breach defined by either {@link #colferSizeMax} or {@link #colferListMax}. + */ + public int marshal(byte[] buf, int offset) { + int i = offset; + + try { + if (this.entries.length != 0) { + buf[i++] = (byte) 0; + Entry[] a = this.entries; + + int x = a.length; + if (x > Context.colferListMax) + throw new IllegalStateException(format("colfer: kamon/context/kamon.Context.entries length %d exceeds %d elements", x, Context.colferListMax)); + while (x > 0x7f) { + buf[i++] = (byte) (x | 0x80); + x >>>= 7; + } + buf[i++] = (byte) x; + + for (int ai = 0; ai < a.length; ai++) { + Entry o = a[ai]; + if (o == null) { + o = new Entry(); + a[ai] = o; + } + i = o.marshal(buf, i); + } + } + + buf[i++] = (byte) 0x7f; + return i; + } catch (ArrayIndexOutOfBoundsException e) { + if (i - offset > Context.colferSizeMax) + throw new IllegalStateException(format("colfer: kamon/context/kamon.Context exceeds %d bytes", Context.colferSizeMax)); + if (i > buf.length) throw new BufferOverflowException(); + throw e; + } + } + + /** + * Deserializes the object. + * @param buf the data source. + * @param offset the initial index for {@code buf}, inclusive. + * @return the final index for {@code buf}, exclusive. + * @throws BufferUnderflowException when {@code buf} is incomplete. (EOF) + * @throws SecurityException on an upper limit breach defined by either {@link #colferSizeMax} or {@link #colferListMax}. + * @throws InputMismatchException when the data does not match this object's schema. + */ + public int unmarshal(byte[] buf, int offset) { + return unmarshal(buf, offset, buf.length); + } + + /** + * Deserializes the object. + * @param buf the data source. + * @param offset the initial index for {@code buf}, inclusive. + * @param end the index limit for {@code buf}, exclusive. + * @return the final index for {@code buf}, exclusive. + * @throws BufferUnderflowException when {@code buf} is incomplete. (EOF) + * @throws SecurityException on an upper limit breach defined by either {@link #colferSizeMax} or {@link #colferListMax}. + * @throws InputMismatchException when the data does not match this object's schema. + */ + public int unmarshal(byte[] buf, int offset, int end) { + if (end > buf.length) end = buf.length; + int i = offset; + + try { + byte header = buf[i++]; + + if (header == (byte) 0) { + int length = 0; + for (int shift = 0; true; shift += 7) { + byte b = buf[i++]; + length |= (b & 0x7f) << shift; + if (shift == 28 || b >= 0) break; + } + if (length < 0 || length > Context.colferListMax) + throw new SecurityException(format("colfer: kamon/context/kamon.Context.entries length %d exceeds %d elements", length, Context.colferListMax)); + + Entry[] a = new Entry[length]; + for (int ai = 0; ai < length; ai++) { + Entry o = new Entry(); + i = o.unmarshal(buf, i, end); + a[ai] = o; + } + this.entries = a; + header = buf[i++]; + } + + if (header != (byte) 0x7f) + throw new InputMismatchException(format("colfer: unknown header at byte %d", i - 1)); + } finally { + if (i > end && end - offset < Context.colferSizeMax) throw new BufferUnderflowException(); + if (i < 0 || i - offset > Context.colferSizeMax) + throw new SecurityException(format("colfer: kamon/context/kamon.Context exceeds %d bytes", Context.colferSizeMax)); + if (i > end) throw new BufferUnderflowException(); + } + + return i; + } + + // {@link Serializable} version number. + private static final long serialVersionUID = 1L; + + // {@link Serializable} Colfer extension. + private void writeObject(ObjectOutputStream out) throws IOException { + // TODO: better size estimation + byte[] buf = new byte[1024]; + int n; + while (true) try { + n = marshal(buf, 0); + break; + } catch (BufferUnderflowException e) { + buf = new byte[4 * buf.length]; + } + + out.writeInt(n); + out.write(buf, 0, n); + } + + // {@link Serializable} Colfer extension. + private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException { + init(); + + int n = in.readInt(); + byte[] buf = new byte[n]; + in.readFully(buf); + unmarshal(buf, 0); + } + + // {@link Serializable} Colfer extension. + private void readObjectNoData() throws ObjectStreamException { + init(); + } + + /** + * Gets kamon/context/kamon.Context.entries. + * @return the value. + */ + public Entry[] getEntries() { + return this.entries; + } + + /** + * Sets kamon/context/kamon.Context.entries. + * @param value the replacement. + */ + public void setEntries(Entry[] value) { + this.entries = value; + } + + /** + * Sets kamon/context/kamon.Context.entries. + * @param value the replacement. + * @return {link this}. + */ + public Context withEntries(Entry[] value) { + this.entries = value; + return this; + } + + @Override + public final int hashCode() { + int h = 1; + for (Entry o : this.entries) h = 31 * h + (o == null ? 0 : o.hashCode()); + return h; + } + + @Override + public final boolean equals(Object o) { + return o instanceof Context && equals((Context) o); + } + + public final boolean equals(Context o) { + if (o == null) return false; + if (o == this) return true; + return o.getClass() == Context.class + && java.util.Arrays.equals(this.entries, o.entries); + } + +} diff --git a/kamon-core/src/main/java/kamon/context/encoding/Entry.java b/kamon-core/src/main/java/kamon/context/encoding/Entry.java new file mode 100644 index 00000000..d7734c13 --- /dev/null +++ b/kamon-core/src/main/java/kamon/context/encoding/Entry.java @@ -0,0 +1,442 @@ +package kamon.context.encoding; + + +// Code generated by colf(1); DO NOT EDIT. + + +import static java.lang.String.format; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.ObjectStreamException; +import java.io.OutputStream; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.InputMismatchException; +import java.nio.BufferOverflowException; +import java.nio.BufferUnderflowException; + + +/** + * Data bean with built-in serialization support. + + * @author generated by colf(1) + * @see <a href="https://github.com/pascaldekloe/colfer">Colfer's home</a> + */ +@javax.annotation.Generated(value="colf(1)", comments="Colfer from schema file context.colf") +public class Entry implements Serializable { + + /** The upper limit for serial byte sizes. */ + public static int colferSizeMax = 16 * 1024 * 1024; + + + + + public String name; + + public byte[] content; + + + /** Default constructor */ + public Entry() { + init(); + } + + private static final byte[] _zeroBytes = new byte[0]; + + /** Colfer zero values. */ + private void init() { + name = ""; + content = _zeroBytes; + } + + /** + * {@link #reset(InputStream) Reusable} deserialization of Colfer streams. + */ + public static class Unmarshaller { + + /** The data source. */ + protected InputStream in; + + /** The read buffer. */ + public byte[] buf; + + /** The {@link #buf buffer}'s data start index, inclusive. */ + protected int offset; + + /** The {@link #buf buffer}'s data end index, exclusive. */ + protected int i; + + + /** + * @param in the data source or {@code null}. + * @param buf the initial buffer or {@code null}. + */ + public Unmarshaller(InputStream in, byte[] buf) { + // TODO: better size estimation + if (buf == null || buf.length == 0) + buf = new byte[Math.min(Entry.colferSizeMax, 2048)]; + this.buf = buf; + reset(in); + } + + /** + * Reuses the marshaller. + * @param in the data source or {@code null}. + * @throws IllegalStateException on pending data. + */ + public void reset(InputStream in) { + if (this.i != this.offset) throw new IllegalStateException("colfer: pending data"); + this.in = in; + this.offset = 0; + this.i = 0; + } + + /** + * Deserializes the following object. + * @return the result or {@code null} when EOF. + * @throws IOException from the input stream. + * @throws SecurityException on an upper limit breach defined by {@link #colferSizeMax}. + * @throws InputMismatchException when the data does not match this object's schema. + */ + public Entry next() throws IOException { + if (in == null) return null; + + while (true) { + if (this.i > this.offset) { + try { + Entry o = new Entry(); + this.offset = o.unmarshal(this.buf, this.offset, this.i); + return o; + } catch (BufferUnderflowException e) { + } + } + // not enough data + + if (this.i <= this.offset) { + this.offset = 0; + this.i = 0; + } else if (i == buf.length) { + byte[] src = this.buf; + // TODO: better size estimation + if (offset == 0) this.buf = new byte[Math.min(Entry.colferSizeMax, this.buf.length * 4)]; + System.arraycopy(src, this.offset, this.buf, 0, this.i - this.offset); + this.i -= this.offset; + this.offset = 0; + } + assert this.i < this.buf.length; + + int n = in.read(buf, i, buf.length - i); + if (n < 0) { + if (this.i > this.offset) + throw new InputMismatchException("colfer: pending data with EOF"); + return null; + } + assert n > 0; + i += n; + } + } + + } + + + /** + * Serializes the object. + * @param out the data destination. + * @param buf the initial buffer or {@code null}. + * @return the final buffer. When the serial fits into {@code buf} then the return is {@code buf}. + * Otherwise the return is a new buffer, large enough to hold the whole serial. + * @throws IOException from {@code out}. + * @throws IllegalStateException on an upper limit breach defined by {@link #colferSizeMax}. + */ + public byte[] marshal(OutputStream out, byte[] buf) throws IOException { + // TODO: better size estimation + if (buf == null || buf.length == 0) + buf = new byte[Math.min(Entry.colferSizeMax, 2048)]; + + while (true) { + int i; + try { + i = marshal(buf, 0); + } catch (BufferOverflowException e) { + buf = new byte[Math.min(Entry.colferSizeMax, buf.length * 4)]; + continue; + } + + out.write(buf, 0, i); + return buf; + } + } + + /** + * Serializes the object. + * @param buf the data destination. + * @param offset the initial index for {@code buf}, inclusive. + * @return the final index for {@code buf}, exclusive. + * @throws BufferOverflowException when {@code buf} is too small. + * @throws IllegalStateException on an upper limit breach defined by {@link #colferSizeMax}. + */ + public int marshal(byte[] buf, int offset) { + int i = offset; + + try { + if (! this.name.isEmpty()) { + buf[i++] = (byte) 0; + int start = ++i; + + String s = this.name; + for (int sIndex = 0, sLength = s.length(); sIndex < sLength; sIndex++) { + char c = s.charAt(sIndex); + if (c < '\u0080') { + buf[i++] = (byte) c; + } else if (c < '\u0800') { + buf[i++] = (byte) (192 | c >>> 6); + buf[i++] = (byte) (128 | c & 63); + } else if (c < '\ud800' || c > '\udfff') { + buf[i++] = (byte) (224 | c >>> 12); + buf[i++] = (byte) (128 | c >>> 6 & 63); + buf[i++] = (byte) (128 | c & 63); + } else { + int cp = 0; + if (++sIndex < sLength) cp = Character.toCodePoint(c, s.charAt(sIndex)); + if ((cp >= 1 << 16) && (cp < 1 << 21)) { + buf[i++] = (byte) (240 | cp >>> 18); + buf[i++] = (byte) (128 | cp >>> 12 & 63); + buf[i++] = (byte) (128 | cp >>> 6 & 63); + buf[i++] = (byte) (128 | cp & 63); + } else + buf[i++] = (byte) '?'; + } + } + int size = i - start; + if (size > Entry.colferSizeMax) + throw new IllegalStateException(format("colfer: kamon/context/kamon.Entry.name size %d exceeds %d UTF-8 bytes", size, Entry.colferSizeMax)); + + int ii = start - 1; + if (size > 0x7f) { + i++; + for (int x = size; x >= 1 << 14; x >>>= 7) i++; + System.arraycopy(buf, start, buf, i - size, size); + + do { + buf[ii++] = (byte) (size | 0x80); + size >>>= 7; + } while (size > 0x7f); + } + buf[ii] = (byte) size; + } + + if (this.content.length != 0) { + buf[i++] = (byte) 1; + + int size = this.content.length; + if (size > Entry.colferSizeMax) + throw new IllegalStateException(format("colfer: kamon/context/kamon.Entry.content size %d exceeds %d bytes", size, Entry.colferSizeMax)); + + int x = size; + while (x > 0x7f) { + buf[i++] = (byte) (x | 0x80); + x >>>= 7; + } + buf[i++] = (byte) x; + + int start = i; + i += size; + System.arraycopy(this.content, 0, buf, start, size); + } + + buf[i++] = (byte) 0x7f; + return i; + } catch (ArrayIndexOutOfBoundsException e) { + if (i - offset > Entry.colferSizeMax) + throw new IllegalStateException(format("colfer: kamon/context/kamon.Entry exceeds %d bytes", Entry.colferSizeMax)); + if (i > buf.length) throw new BufferOverflowException(); + throw e; + } + } + + /** + * Deserializes the object. + * @param buf the data source. + * @param offset the initial index for {@code buf}, inclusive. + * @return the final index for {@code buf}, exclusive. + * @throws BufferUnderflowException when {@code buf} is incomplete. (EOF) + * @throws SecurityException on an upper limit breach defined by {@link #colferSizeMax}. + * @throws InputMismatchException when the data does not match this object's schema. + */ + public int unmarshal(byte[] buf, int offset) { + return unmarshal(buf, offset, buf.length); + } + + /** + * Deserializes the object. + * @param buf the data source. + * @param offset the initial index for {@code buf}, inclusive. + * @param end the index limit for {@code buf}, exclusive. + * @return the final index for {@code buf}, exclusive. + * @throws BufferUnderflowException when {@code buf} is incomplete. (EOF) + * @throws SecurityException on an upper limit breach defined by {@link #colferSizeMax}. + * @throws InputMismatchException when the data does not match this object's schema. + */ + public int unmarshal(byte[] buf, int offset, int end) { + if (end > buf.length) end = buf.length; + int i = offset; + + try { + byte header = buf[i++]; + + if (header == (byte) 0) { + int size = 0; + for (int shift = 0; true; shift += 7) { + byte b = buf[i++]; + size |= (b & 0x7f) << shift; + if (shift == 28 || b >= 0) break; + } + if (size < 0 || size > Entry.colferSizeMax) + throw new SecurityException(format("colfer: kamon/context/kamon.Entry.name size %d exceeds %d UTF-8 bytes", size, Entry.colferSizeMax)); + + int start = i; + i += size; + this.name = new String(buf, start, size, StandardCharsets.UTF_8); + header = buf[i++]; + } + + if (header == (byte) 1) { + int size = 0; + for (int shift = 0; true; shift += 7) { + byte b = buf[i++]; + size |= (b & 0x7f) << shift; + if (shift == 28 || b >= 0) break; + } + if (size < 0 || size > Entry.colferSizeMax) + throw new SecurityException(format("colfer: kamon/context/kamon.Entry.content size %d exceeds %d bytes", size, Entry.colferSizeMax)); + + this.content = new byte[size]; + int start = i; + i += size; + System.arraycopy(buf, start, this.content, 0, size); + + header = buf[i++]; + } + + if (header != (byte) 0x7f) + throw new InputMismatchException(format("colfer: unknown header at byte %d", i - 1)); + } finally { + if (i > end && end - offset < Entry.colferSizeMax) throw new BufferUnderflowException(); + if (i < 0 || i - offset > Entry.colferSizeMax) + throw new SecurityException(format("colfer: kamon/context/kamon.Entry exceeds %d bytes", Entry.colferSizeMax)); + if (i > end) throw new BufferUnderflowException(); + } + + return i; + } + + // {@link Serializable} version number. + private static final long serialVersionUID = 2L; + + // {@link Serializable} Colfer extension. + private void writeObject(ObjectOutputStream out) throws IOException { + // TODO: better size estimation + byte[] buf = new byte[1024]; + int n; + while (true) try { + n = marshal(buf, 0); + break; + } catch (BufferUnderflowException e) { + buf = new byte[4 * buf.length]; + } + + out.writeInt(n); + out.write(buf, 0, n); + } + + // {@link Serializable} Colfer extension. + private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException { + init(); + + int n = in.readInt(); + byte[] buf = new byte[n]; + in.readFully(buf); + unmarshal(buf, 0); + } + + // {@link Serializable} Colfer extension. + private void readObjectNoData() throws ObjectStreamException { + init(); + } + + /** + * Gets kamon/context/kamon.Entry.name. + * @return the value. + */ + public String getName() { + return this.name; + } + + /** + * Sets kamon/context/kamon.Entry.name. + * @param value the replacement. + */ + public void setName(String value) { + this.name = value; + } + + /** + * Sets kamon/context/kamon.Entry.name. + * @param value the replacement. + * @return {link this}. + */ + public Entry withName(String value) { + this.name = value; + return this; + } + + /** + * Gets kamon/context/kamon.Entry.content. + * @return the value. + */ + public byte[] getContent() { + return this.content; + } + + /** + * Sets kamon/context/kamon.Entry.content. + * @param value the replacement. + */ + public void setContent(byte[] value) { + this.content = value; + } + + /** + * Sets kamon/context/kamon.Entry.content. + * @param value the replacement. + * @return {link this}. + */ + public Entry withContent(byte[] value) { + this.content = value; + return this; + } + + @Override + public final int hashCode() { + int h = 1; + if (this.name != null) h = 31 * h + this.name.hashCode(); + for (byte b : this.content) h = 31 * h + b; + return h; + } + + @Override + public final boolean equals(Object o) { + return o instanceof Entry && equals((Entry) o); + } + + public final boolean equals(Entry o) { + if (o == null) return false; + if (o == this) return true; + return o.getClass() == Entry.class + && (this.name == null ? o.name == null : this.name.equals(o.name)) + && java.util.Arrays.equals(this.content, o.content); + } + +} diff --git a/kamon-core/src/main/resources/reference.conf b/kamon-core/src/main/resources/reference.conf index 6ad06325..ad180f1c 100644 --- a/kamon-core/src/main/resources/reference.conf +++ b/kamon-core/src/main/resources/reference.conf @@ -85,20 +85,71 @@ kamon { reporter-queue-size = 1024 + # Decide whether a new, locally created Span should have the same Span Identifier as it's remote parent (if any) or + # get a new local identifier. Certain tracing systems use the same Span Identifier to represent both sides (client + # and server) of a RPC call, if you are reporting data to such systems then this option should be enabled. + # + # If you are using Zipkin, keep this option enabled. If you are using Jaeger, disable it. + join-remote-parents-with-same-span-id = no + # Configures a sample that decides which traces should be reported to the trace backends. The possible values are: # - always: report all traces. # - never: don't report any trace. - # - random: use the random tracer. + # - random: randomly decide using the probability defined in the random-sampler.probability setting. # sampler = "random" # The random sampler uses the "chance" setting and a random number to take a decision, if the random number is # on the upper (chance * 100) percent of the number spectrum the trace will be sampled. E.g. a chance of 0.01 will # hint that 1% of all traces should be reported. - sampler-random { + random-sampler { + + # Probability of a span being sampled. Must be a value between 0 and 1. + probability = 0.01 + } + + # The IdentityProvider used to generate Trace and Span Identifiers in Kamon. There are two default implementations + # that ship with Kamon: + # - kamon.trace.IdentityProvider$Default: Creates 8-byte identifiers for both Traces and Spans. + # - kamon.trace.IdentityProvider$DoubleSizeTraceID: Creates 16-byte identifiers for Traces and 8-byte identifiers + # for Spans. + # + # Any external implementation can be configured here, as long as it can be instantiated with a parameterless constructor. + identity-provider = "kamon.trace.IdentityProvider$Default" + + # The SpanContextCodecs are used to encode/decode the SpanContext data into simple TextMaps, HTTP Headers or Binary + # carriers. The decision about which one to use is based on the kamon.trace.SpanContextCodec.Format instance passed + # to inject/extract calls. + # + # Any external implementation can be configured here, as long as it can be instantiated with a single parameter + # constructor that accepts a IdentityProvider. + span-context-codec { + + # Encodes/Decodes the SpanContext data using a simple key/value pair. Since this is very rarely going to be used + # we default to using the same codec for HTTP Headers, as it is built on top of a TextMap. + text-map = "kamon.trace.SpanContextCodec$ExtendedB3" + + # Encodes/Decodes the SpanContext into a TextMap with HTTP Header friendly, URLEncoded values. The default + # implementation follows the guidelines of B3 propagation. See more at https://github.com/openzipkin/b3-propagation. + http-headers = "kamon.trace.SpanContextCodec$ExtendedB3" - # Chance of a span being sampled. Must be a value between 0 and 1. - chance = 0.01 + # Encodes/Decodes the SpanContext using a binary representation. + binary = "TODO" + } + + + } + + context { + encoding { + + http-headers { + span = "kamon.trace.SpanCodec$B3" + } + + binary { + # span = "kamon.trace.propagation.Binary" + } } } diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index ecbc796e..b1490e32 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -16,24 +16,23 @@ package kamon import com.typesafe.config.{Config, ConfigFactory} -import io.opentracing.propagation.Format -import io.opentracing.{ActiveSpan, Span, SpanContext} import kamon.metric._ -import kamon.trace.Tracer +import kamon.trace._ import kamon.util.{Filters, MeasurementUnit, Registration} import scala.concurrent.Future import java.time.Duration import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledThreadPoolExecutor} -import io.opentracing.ActiveSpan.Continuation +import kamon.context.{Context, Storage} import org.slf4j.LoggerFactory import scala.util.Try -object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Tracer { +object Kamon extends MetricLookup with ReporterRegistry with Tracer { private val logger = LoggerFactory.getLogger("kamon.Kamon") + @volatile private var _config = ConfigFactory.load() @volatile private var _environment = Environment.fromConfig(_config) @volatile private var _filters = Filters.fromConfig(_config) @@ -41,7 +40,8 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac private val _scheduler = Executors.newScheduledThreadPool(schedulerPoolSize(_config), numberedThreadFactory("kamon-scheduler")) private val _metrics = new MetricRegistry(_config, _scheduler) private val _reporters = new ReporterRegistryImpl(_metrics, _config) - private val _tracer = new Tracer(Kamon, _reporters, _config) + private val _tracer = Tracer.Default(Kamon, _reporters, _config) + private val _contextStorage = Storage.ThreadLocal() private var _onReconfigureHooks = Seq.empty[OnReconfigureHook] def environment: Environment = @@ -56,6 +56,7 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac _filters = Filters.fromConfig(config) _metrics.reconfigure(config) _reporters.reconfigure(config) + _tracer.reconfigure(config) _onReconfigureHooks.foreach(hook => { Try(hook.onReconfigure(config)).failed.foreach(error => @@ -90,73 +91,28 @@ object Kamon extends MetricLookup with ReporterRegistry with io.opentracing.Trac def tracer: Tracer = _tracer - override def buildSpan(operationName: String): io.opentracing.Tracer.SpanBuilder = + override def buildSpan(operationName: String): Tracer.SpanBuilder = _tracer.buildSpan(operationName) - override def extract[C](format: Format[C], carrier: C): SpanContext = - _tracer.extract(format, carrier) - - override def inject[C](spanContext: SpanContext, format: Format[C], carrier: C): Unit = - _tracer.inject(spanContext, format, carrier) - - override def activeSpan(): ActiveSpan = - _tracer.activeSpan() - override def makeActive(span: Span): ActiveSpan = - _tracer.makeActive(span) + override def identityProvider: IdentityProvider = + _tracer.identityProvider + def currentContext(): Context = + _contextStorage.current() - /** - * Makes the provided Span active before code is evaluated and deactivates it afterwards. - */ - def withSpan[T](span: Span)(code: => T): T = { - val activeSpan = makeActive(span) - val evaluatedCode = code - activeSpan.deactivate() - evaluatedCode - } + def storeContext(context: Context): Storage.Scope = + _contextStorage.store(context) - /** - * Actives the provided Continuation before code is evaluated and deactivates it afterwards. - */ - def withContinuation[T](continuation: Continuation)(code: => T): T = { - if(continuation == null) - code - else { - val activeSpan = continuation.activate() - val evaluatedCode = code - activeSpan.deactivate() - evaluatedCode + def withContext[T](context: Context)(f: => T): T = { + val scope = _contextStorage.store(context) + try { + f + } finally { + scope.close() } } - /** - * Captures a continuation from the currently active Span (if any). - */ - def activeSpanContinuation(): Continuation = { - val activeSpan = Kamon.activeSpan() - if(activeSpan == null) - null - else - activeSpan.capture() - } - - /** - * Runs the provided closure with the currently active Span (if any). - */ - def onActiveSpan[T](code: ActiveSpan => T): Unit = { - val activeSpan = Kamon.activeSpan() - if(activeSpan != null) - code(activeSpan) - } - - /** - * Evaluates the provided closure with the currently active Span (if any) and returns the evaluation result. If there - * was no active Span then the provided fallback value - */ - def fromActiveSpan[T](code: ActiveSpan => T): Option[T] = - Option(activeSpan()).map(code) - override def loadReportersFromConfig(): Unit = _reporters.loadReportersFromConfig() diff --git a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala index 5f46edf6..f0d744e5 100644 --- a/kamon-core/src/main/scala/kamon/ReporterRegistry.scala +++ b/kamon-core/src/main/scala/kamon/ReporterRegistry.scala @@ -20,9 +20,11 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference} import java.util.concurrent._ import com.typesafe.config.Config +import kamon.ReporterRegistry.SpanSink import kamon.metric._ import kamon.trace.Span -import kamon.util.{DynamicAccess, Registration} +import kamon.trace.Span.FinishedSpan +import kamon.util.{CallingThreadExecutionContext, DynamicAccess, Registration} import org.slf4j.LoggerFactory import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} @@ -42,6 +44,12 @@ trait ReporterRegistry { def stopAllReporters(): Future[Unit] } +object ReporterRegistry { + private[kamon] trait SpanSink { + def reportSpan(finishedSpan: FinishedSpan): Unit + } +} + sealed trait Reporter { def start(): Unit def stop(): Unit @@ -53,10 +61,10 @@ trait MetricReporter extends Reporter { } trait SpanReporter extends Reporter { - def reportSpans(spans: Seq[Span.CompletedSpan]): Unit + def reportSpans(spans: Seq[Span.FinishedSpan]): Unit } -class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry { +class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Config) extends ReporterRegistry with SpanSink { private val logger = LoggerFactory.getLogger(classOf[ReporterRegistry]) private val registryExecutionContext = Executors.newScheduledThreadPool(2, threadFactory("kamon-reporter-registry")) private val reporterCounter = new AtomicLong(0L) @@ -212,7 +220,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con } } - private[kamon] def reportSpan(span: Span.CompletedSpan): Unit = { + def reportSpan(span: Span.FinishedSpan): Unit = { spanReporters.foreach { case (_, reporterEntry) => if(reporterEntry.isActive) reporterEntry.buffer.offer(span) @@ -251,7 +259,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con val bufferCapacity: Int, val executionContext: ExecutionContextExecutorService ) { - val buffer = new ArrayBlockingQueue[Span.CompletedSpan](bufferCapacity) + val buffer = new ArrayBlockingQueue[Span.FinishedSpan](bufferCapacity) } private class MetricReporterTicker(snapshotGenerator: MetricsSnapshotGenerator, reporterEntries: TrieMap[Long, MetricReporterEntry]) extends Runnable { @@ -290,7 +298,7 @@ class ReporterRegistryImpl(metrics: MetricsSnapshotGenerator, initialConfig: Con spanReporters.foreach { case (_, entry) => - val spanBatch = new java.util.ArrayList[Span.CompletedSpan](entry.bufferCapacity) + val spanBatch = new java.util.ArrayList[Span.FinishedSpan](entry.bufferCapacity) entry.buffer.drainTo(spanBatch, entry.bufferCapacity) Future { diff --git a/kamon-core/src/main/scala/kamon/context/Codec.scala b/kamon-core/src/main/scala/kamon/context/Codec.scala new file mode 100644 index 00000000..50b7e93d --- /dev/null +++ b/kamon-core/src/main/scala/kamon/context/Codec.scala @@ -0,0 +1,130 @@ +package kamon +package context + +import com.typesafe.config.Config +import kamon.trace.IdentityProvider +import kamon.util.DynamicAccess +import org.slf4j.LoggerFactory + +import scala.collection.mutable + +class Codec(identityProvider: IdentityProvider, initialConfig: Config) { + private val log = LoggerFactory.getLogger(classOf[Codec]) + + @volatile private var httpHeaders: Codec.ForContext[TextMap] = new Codec.HttpHeaders(Map.empty) + //val Binary: Codec.ForContext[ByteBuffer] = _ + reconfigure(initialConfig) + + + def HttpHeaders: Codec.ForContext[TextMap] = + httpHeaders + + def reconfigure(config: Config): Unit = { + httpHeaders = new Codec.HttpHeaders(readEntryCodecs("kamon.context.encoding.http-headers", config)) + } + + private def readEntryCodecs[T](rootKey: String, config: Config): Map[String, Codec.ForEntry[T]] = { + val rootConfig = config.getConfig(rootKey) + val dynamic = new DynamicAccess(getClass.getClassLoader) + val entries = Map.newBuilder[String, Codec.ForEntry[T]] + + rootConfig.topLevelKeys.foreach(key => { + try { + val fqcn = rootConfig.getString(key) + entries += ((key, dynamic.createInstanceFor[Codec.ForEntry[T]](fqcn, Nil).get)) + } catch { + case e: Throwable => + log.error(s"Failed to initialize codec for key [$key]", e) + } + }) + + entries.result() + } +} + +object Codec { + + trait ForContext[T] { + def encode(context: Context): T + def decode(carrier: T): Context + } + + trait ForEntry[T] { + def encode(context: Context): T + def decode(carrier: T, context: Context): Context + } + + final class HttpHeaders(entryCodecs: Map[String, Codec.ForEntry[TextMap]]) extends Codec.ForContext[TextMap] { + private val log = LoggerFactory.getLogger(classOf[HttpHeaders]) + + override def encode(context: Context): TextMap = { + val encoded = TextMap.Default() + + context.entries.foreach { + case (key, _) if key.broadcast => + entryCodecs.get(key.name) match { + case Some(codec) => + try { + codec.encode(context).values.foreach(pair => encoded.put(pair._1, pair._2)) + } catch { + case e: Throwable => log.error(s"Failed to encode key [${key.name}]", e) + } + + case None => + log.error("Context key [{}] should be encoded in HttpHeaders but no codec was found for it.", key.name) + } + } + + encoded + } + + override def decode(carrier: TextMap): Context = { + var context: Context = Context.Empty + + try { + context = entryCodecs.foldLeft(context)((ctx, codecEntry) => { + val (_, codec) = codecEntry + codec.decode(carrier, ctx) + }) + + } catch { + case e: Throwable => + log.error("Failed to decode context from HttpHeaders", e) + } + + context + } + } +} + + +trait TextMap { + + def get(key: String): Option[String] + + def put(key: String, value: String): Unit + + def values: Iterator[(String, String)] +} + +object TextMap { + + class Default extends TextMap { + private val storage = + mutable.Map.empty[String, String] + + override def get(key: String): Option[String] = + storage.get(key) + + override def put(key: String, value: String): Unit = + storage.put(key, value) + + override def values: Iterator[(String, String)] = + storage.toIterator + } + + object Default { + def apply(): Default = + new Default() + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/context/Context.scala b/kamon-core/src/main/scala/kamon/context/Context.scala new file mode 100644 index 00000000..f8a4662f --- /dev/null +++ b/kamon-core/src/main/scala/kamon/context/Context.scala @@ -0,0 +1,50 @@ +package kamon.context + +class Context private (private[context] val entries: Map[Key[_], Any]) { + def get[T](key: Key[T]): T = + entries.get(key).getOrElse(key.emptyValue).asInstanceOf[T] + + def withKey[T](key: Key[T], value: T): Context = + new Context(entries.updated(key, value)) +} + +object Context { + val Empty = new Context(Map.empty) + + def apply(): Context = + Empty + + def create(): Context = + Empty + + def apply[T](key: Key[T], value: T): Context = + new Context(Map(key -> value)) + + def create[T](key: Key[T], value: T): Context = + apply(key, value) +} + + +trait Key[T] { + def name: String + def emptyValue: T + def broadcast: Boolean +} + +object Key { + + def local[T](name: String, emptyValue: T): Key[T] = + new Default[T](name, emptyValue, false) + + def broadcast[T](name: String, emptyValue: T): Key[T] = + new Default[T](name, emptyValue, true) + + + private class Default[T](val name: String, val emptyValue: T, val broadcast: Boolean) extends Key[T] { + override def hashCode(): Int = + name.hashCode + + override def equals(that: Any): Boolean = + that.isInstanceOf[Default[_]] && that.asInstanceOf[Default[_]].name == this.name + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/util/Mixin.scala b/kamon-core/src/main/scala/kamon/context/Mixin.scala index 348b34f1..64e03748 100644 --- a/kamon-core/src/main/scala/kamon/util/Mixin.scala +++ b/kamon-core/src/main/scala/kamon/context/Mixin.scala @@ -13,33 +13,33 @@ * ========================================================================================= */ -package kamon -package util +package kamon.context + +import kamon.Kamon -import io.opentracing.ActiveSpan -import io.opentracing.ActiveSpan.Continuation /** - * Utility trait that marks objects carrying an ActiveSpan.Continuation. + * Utility trait that marks objects carrying a reference to a Span. + * */ -trait HasContinuation { - def continuation: Continuation +trait HasContext { + def context: Context } -object HasContinuation { - private class Default(val continuation: Continuation) extends HasContinuation +object HasContext { + private case class Default(context: Context) extends HasContext /** - * Construct a HasContinuation instance by capturing a continuation from the provided active span. + * Construct a HasSpan instance that references the provided Span. + * */ - def from(activeSpan: ActiveSpan): HasContinuation = { - val continuation = if(activeSpan == null) null else activeSpan.capture() - new Default(continuation) - } + def from(context: Context): HasContext = + Default(context) /** - * Constructs a new HasContinuation instance using Kamon's tracer currently active span. + * Construct a HasSpan instance that references the currently ActiveSpan in Kamon's tracer. + * */ - def fromTracerActiveSpan(): HasContinuation = - new Default(Kamon.activeSpanContinuation()) + def fromCurrentContext(): HasContext = + Default(Kamon.currentContext()) } diff --git a/kamon-core/src/main/scala/kamon/context/Storage.scala b/kamon-core/src/main/scala/kamon/context/Storage.scala new file mode 100644 index 00000000..6b92ff85 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/context/Storage.scala @@ -0,0 +1,39 @@ +package kamon.context + +trait Storage { + def current(): Context + def store(context: Context): Storage.Scope +} + +object Storage { + + trait Scope { + def context: Context + def close(): Unit + } + + + class ThreadLocal extends Storage { + private val tls = new java.lang.ThreadLocal[Context]() { + override def initialValue(): Context = Context.Empty + } + + override def current(): Context = + tls.get() + + override def store(context: Context): Scope = { + val newContext = context + val previousContext = tls.get() + tls.set(newContext) + + new Scope { + override def context: Context = newContext + override def close(): Unit = tls.set(previousContext) + } + } + } + + object ThreadLocal { + def apply(): ThreadLocal = new ThreadLocal() + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala b/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala new file mode 100644 index 00000000..937200f5 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/IdentityProvider.scala @@ -0,0 +1,106 @@ +package kamon.trace + +import java.nio.ByteBuffer +import java.util.concurrent.ThreadLocalRandom + +import kamon.util.HexCodec + +import scala.util.Try + +trait IdentityProvider { + def traceIdGenerator(): IdentityProvider.Generator + def spanIdGenerator(): IdentityProvider.Generator +} + +object IdentityProvider { + case class Identifier(string: String, bytes: Array[Byte]) { + + override def equals(obj: Any): Boolean = { + if(obj != null && obj.isInstanceOf[Identifier]) + obj.asInstanceOf[Identifier].string == string + else false + } + } + + val NoIdentifier = Identifier("", new Array[Byte](0)) + + trait Generator { + def generate(): Identifier + def from(string: String): Identifier + def from(bytes: Array[Byte]): Identifier + } + + + class Default extends IdentityProvider { + protected val longGenerator = new Generator { + override def generate(): Identifier = { + val data = ByteBuffer.wrap(new Array[Byte](8)) + val random = ThreadLocalRandom.current().nextLong() + data.putLong(random) + + Identifier(HexCodec.toLowerHex(random), data.array()) + } + + override def from(string: String): Identifier = Try { + val identifierLong = HexCodec.lowerHexToUnsignedLong(string) + val data = ByteBuffer.allocate(8) + data.putLong(identifierLong) + + Identifier(string, data.array()) + } getOrElse(IdentityProvider.NoIdentifier) + + override def from(bytes: Array[Byte]): Identifier = Try { + val buffer = ByteBuffer.wrap(bytes) + val identifierLong = buffer.getLong + + Identifier(HexCodec.toLowerHex(identifierLong), bytes) + } getOrElse(IdentityProvider.NoIdentifier) + } + + override def traceIdGenerator(): Generator = longGenerator + override def spanIdGenerator(): Generator = longGenerator + } + + object Default { + def apply(): Default = new Default() + } + + + class DoubleSizeTraceID extends Default { + private val doubleLongGenerator = new Generator { + override def generate(): Identifier = { + val data = ByteBuffer.wrap(new Array[Byte](16)) + val highLong = ThreadLocalRandom.current().nextLong() + val lowLong = ThreadLocalRandom.current().nextLong() + data.putLong(highLong) + data.putLong(lowLong) + + Identifier(HexCodec.toLowerHex(highLong) + HexCodec.toLowerHex(lowLong), data.array()) + } + + override def from(string: String): Identifier = Try { + val highPart = HexCodec.lowerHexToUnsignedLong(string.substring(0, 16)) + val lowPart = HexCodec.lowerHexToUnsignedLong(string.substring(16, 32)) + val data = ByteBuffer.allocate(16) + data.putLong(highPart) + data.putLong(lowPart) + + Identifier(string, data.array()) + } getOrElse(IdentityProvider.NoIdentifier) + + override def from(bytes: Array[Byte]): Identifier = Try { + val buffer = ByteBuffer.wrap(bytes) + val highLong = buffer.getLong + val lowLong = buffer.getLong + + Identifier(HexCodec.toLowerHex(highLong) + HexCodec.toLowerHex(lowLong), bytes) + } getOrElse(IdentityProvider.NoIdentifier) + } + + override def traceIdGenerator(): Generator = doubleLongGenerator + } + + object DoubleSizeTraceID { + def apply(): DoubleSizeTraceID = new DoubleSizeTraceID() + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/Sampler.scala b/kamon-core/src/main/scala/kamon/trace/Sampler.scala index 0347a151..3f366175 100644 --- a/kamon-core/src/main/scala/kamon/trace/Sampler.scala +++ b/kamon-core/src/main/scala/kamon/trace/Sampler.scala @@ -15,39 +15,44 @@ package kamon.trace +import java.util.concurrent.ThreadLocalRandom +import kamon.trace.SpanContext.SamplingDecision + trait Sampler { - def decide(spanID: Long): Boolean + def decide(operationName: String, builderTags: Map[String, Span.TagValue]): SamplingDecision } object Sampler { - val always = new Constant(true) - val never = new Constant(false) + val Always = new Constant(SamplingDecision.Sample) + val Never = new Constant(SamplingDecision.DoNotSample) - def random(chance: Double): Sampler = { - assert(chance >= 0D && chance <= 1.0D, "Change should be >= 0 and <= 1.0") + def random(probability: Double): Sampler = { + assert(probability >= 0D && probability <= 1.0D, "The probability should be >= 0 and <= 1.0") - chance match { - case 0D => never - case 1.0D => always + probability match { + case 0D => Never + case 1.0D => Always case anyOther => new Random(anyOther) } } - class Constant(decision: Boolean) extends Sampler { - override def decide(spanID: Long): Boolean = decision + class Constant(decision: SamplingDecision) extends Sampler { + override def decide(operationName: String, builderTags: Map[String, Span.TagValue]): SamplingDecision = decision override def toString: String = s"Sampler.Constant(decision = $decision)" } - class Random(chance: Double) extends Sampler { - val upperBoundary = Long.MaxValue * chance + class Random(probability: Double) extends Sampler { + val upperBoundary = Long.MaxValue * probability val lowerBoundary = -upperBoundary - override def decide(spanID: Long): Boolean = - spanID >= lowerBoundary && spanID <= upperBoundary + override def decide(operationName: String, builderTags: Map[String, Span.TagValue]): SamplingDecision = { + val random = ThreadLocalRandom.current().nextLong() + if(random >= lowerBoundary && random <= upperBoundary) SamplingDecision.Sample else SamplingDecision.DoNotSample + } override def toString: String = - s"Sampler.Random(chance = $chance)" + s"Sampler.Random(probability = $probability)" } } diff --git a/kamon-core/src/main/scala/kamon/trace/Span.scala b/kamon-core/src/main/scala/kamon/trace/Span.scala index 464559e3..a4424a45 100644 --- a/kamon-core/src/main/scala/kamon/trace/Span.scala +++ b/kamon-core/src/main/scala/kamon/trace/Span.scala @@ -16,176 +16,220 @@ package kamon package trace - -import scala.collection.JavaConverters._ +import kamon.ReporterRegistry.SpanSink +import kamon.context.Key +import kamon.trace.SpanContext.SamplingDecision import kamon.util.{Clock, MeasurementUnit} -class Span(spanContext: SpanContext, initialOperationName: String, initialTags: Map[String, String], startTimestampMicros: Long, - reporterRegistry: ReporterRegistryImpl) extends io.opentracing.Span { - private var isOpen: Boolean = true - private val sampled: Boolean = spanContext.sampled - private var operationName: String = initialOperationName - private var endTimestampMicros: Long = 0 +trait Span { - private var tags = initialTags - private var logs = List.empty[Span.LogEntry] - private var additionalMetricTags = Map.empty[String, String] + def isEmpty(): Boolean + def isLocal(): Boolean + def nonEmpty(): Boolean = !isEmpty() + def isRemote(): Boolean = !isLocal() - override def log(fields: java.util.Map[String, _]): Span = - log(fields.asScala.asInstanceOf[Map[String, _]]) + def context(): SpanContext - def log(fields: Map[String, _]): Span = synchronized { - if (sampled && isOpen) - logs = Span.LogEntry(Clock.microTimestamp(), fields) :: logs - this - } + def annotate(annotation: Span.Annotation): Span - def log(timestampMicroseconds: Long, fields: Map[String, _]): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(timestampMicroseconds, fields) :: logs - this - } + def addSpanTag(key: String, value: String): Span - override def log(timestampMicroseconds: Long, fields: java.util.Map[String, _]): Span = - log(timestampMicroseconds, fields.asScala.asInstanceOf[Map[String, _]]) + def addSpanTag(key: String, value: Long): Span - override def log(event: String): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(Clock.microTimestamp(), Map("event" -> event)) :: logs - this - } + def addSpanTag(key: String, value: Boolean): Span - override def log(timestampMicroseconds: Long, event: String): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(timestampMicroseconds, Map("event" -> event)) :: logs - this - } + def addMetricTag(key: String, value: String): Span - override def log(eventName: String, payload: scala.Any): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(Clock.microTimestamp(), Map(eventName -> payload)) :: logs - this - } + def setOperationName(name: String): Span + + def disableMetricsCollection(): Span + + def finish(finishTimestampMicros: Long): Unit + + def finish(): Unit = + finish(Clock.microTimestamp()) + + def annotate(name: String): Span = + annotate(Span.Annotation(Clock.microTimestamp(), name, Map.empty)) - override def log(timestampMicroseconds: Long, eventName: String, payload: scala.Any): Span = synchronized { - if(sampled && isOpen) - logs = Span.LogEntry(timestampMicroseconds, Map(eventName -> payload)) :: logs - this + def annotate(name: String, fields: Map[String, String]): Span = + annotate(Span.Annotation(Clock.microTimestamp(), name, fields)) + + def annotate(timestampMicroseconds: Long, name: String, fields: Map[String, String]): Span = + annotate(Span.Annotation(timestampMicroseconds, name, fields)) + +} + +object Span { + + val ContextKey = Key.broadcast[Span]("span", Span.Empty) + + object Empty extends Span { + override val context: SpanContext = SpanContext.EmptySpanContext + override def isEmpty(): Boolean = true + override def isLocal(): Boolean = true + override def annotate(annotation: Annotation): Span = this + override def addSpanTag(key: String, value: String): Span = this + override def addSpanTag(key: String, value: Long): Span = this + override def addSpanTag(key: String, value: Boolean): Span = this + override def addMetricTag(key: String, value: String): Span = this + override def setOperationName(name: String): Span = this + override def disableMetricsCollection(): Span = this + override def finish(finishTimestampMicros: Long): Unit = {} } - override def getBaggageItem(key: String): String = - spanContext.getBaggage(key) + /** + * + * @param spanContext + * @param initialOperationName + * @param initialSpanTags + * @param startTimestampMicros + * @param spanSink + */ + final class Local(spanContext: SpanContext, initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], + initialMetricTags: Map[String, String], startTimestampMicros: Long, spanSink: SpanSink) extends Span { + + private var collectMetrics: Boolean = true + private var open: Boolean = true + private val sampled: Boolean = spanContext.samplingDecision == SamplingDecision.Sample + private var operationName: String = initialOperationName + + private var spanTags: Map[String, Span.TagValue] = initialSpanTags + private var customMetricTags = initialMetricTags + private var annotations = List.empty[Span.Annotation] + + override def isEmpty(): Boolean = false + override def isLocal(): Boolean = true + + def annotate(annotation: Annotation): Span = synchronized { + if(sampled && open) + annotations = annotation :: annotations + this + } - override def context(): SpanContext = - spanContext + override def addSpanTag(key: String, value: String): Span = synchronized { + if(sampled && open) + spanTags = spanTags + (key -> TagValue.String(value)) + this + } - override def setTag(key: String, value: String): Span = synchronized { - if (isOpen) { - extractMetricTag(key, value) - if(sampled) - tags = tags ++ Map(key -> value) + override def addSpanTag(key: String, value: Long): Span = synchronized { + if(sampled && open) + spanTags = spanTags + (key -> TagValue.Number(value)) + this } - this - } - override def setTag(key: String, value: Boolean): Span = { - if (isOpen) { - val tagValue = if(value) Span.BooleanTagTrueValue else Span.BooleanTagFalseValue - extractMetricTag(key, tagValue) - if(sampled) - tags = tags + (key -> tagValue) + override def addSpanTag(key: String, value: Boolean): Span = synchronized { + if(sampled && open) { + val tagValue = if (value) TagValue.True else TagValue.False + spanTags = spanTags + (key -> tagValue) + } + this } - this - } - override def setTag(key: String, value: Number): Span = { - if (isOpen) { - val tagValue = String.valueOf(value) - extractMetricTag(key, tagValue) - if(sampled) - tags = tags + (key -> tagValue) + override def addMetricTag(key: String, value: String): Span = synchronized { + if(sampled && open && collectMetrics) + customMetricTags = customMetricTags + (key -> value) + this } - this - } - def setMetricTag(key: String, value: String): Span = synchronized { - if (isOpen) - additionalMetricTags = additionalMetricTags ++ Map(key -> value) - this - } + override def disableMetricsCollection(): Span = synchronized { + collectMetrics = false + this + } - override def setBaggageItem(key: String, value: String): Span = synchronized { - if (isOpen) - spanContext.addBaggageItem(key, value) - this - } + override def context(): SpanContext = + spanContext - override def setOperationName(operationName: String): Span = synchronized { - if(isOpen) - this.operationName = operationName - this - } + override def setOperationName(operationName: String): Span = synchronized { + if(open) + this.operationName = operationName + this + } - private def extractMetricTag(tag: String, value: String): Unit = - if(tag.startsWith(Span.MetricTagPrefix)) - additionalMetricTags = additionalMetricTags ++ Map(tag.substring(Span.MetricTagPrefix.length) -> value) + override def finish(finishMicros: Long): Unit = synchronized { + if (open) { + open = false - override def finish(): Unit = - finish(Clock.microTimestamp()) + if(collectMetrics) + recordSpanMetrics(finishMicros) + + if(sampled) + spanSink.reportSpan(toFinishedSpan(finishMicros)) + } + } + + private def toFinishedSpan(endTimestampMicros: Long): Span.FinishedSpan = + Span.FinishedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, spanTags, annotations) + + private def recordSpanMetrics(endTimestampMicros: Long): Unit = { + val elapsedTime = endTimestampMicros - startTimestampMicros + val metricTags = Map("operation" -> operationName) ++ customMetricTags - override def finish(finishMicros: Long): Unit = synchronized { - if (isOpen) { - isOpen = false - endTimestampMicros = finishMicros - recordSpanMetrics() + val isError = spanTags.get("error").exists { + errorTag => errorTag != null && errorTag.equals(Span.TagValue.True) + } - if(sampled) - reporterRegistry.reportSpan(completedSpan) + val refinedMetricTags = if(isError) + metricTags + ("error" -> "true") + else + metricTags + + val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(refinedMetricTags) + latencyHistogram.record(elapsedTime) } } - private def completedSpan: Span.CompletedSpan = - Span.CompletedSpan(spanContext, operationName, startTimestampMicros, endTimestampMicros, tags, logs) + object Local { + def apply(spanContext: SpanContext, initialOperationName: String, initialSpanTags: Map[String, Span.TagValue], + initialMetricTags: Map[String, String], startTimestampMicros: Long, reporterRegistry: ReporterRegistryImpl): Local = + new Local(spanContext, initialOperationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry) + } - private def recordSpanMetrics(): Unit = { - val elapsedTime = endTimestampMicros - startTimestampMicros - val metricTags = Map("operation" -> operationName) ++ additionalMetricTags - val isError = tags.get("error").exists { - errorTag => errorTag != null && errorTag.equals(Span.BooleanTagTrueValue) - } + final class Remote(val context: SpanContext) extends Span { + override def isEmpty(): Boolean = false + override def isLocal(): Boolean = false + override def annotate(annotation: Annotation): Span = this + override def addSpanTag(key: String, value: String): Span = this + override def addSpanTag(key: String, value: Long): Span = this + override def addSpanTag(key: String, value: Boolean): Span = this + override def addMetricTag(key: String, value: String): Span = this + override def setOperationName(name: String): Span = this + override def disableMetricsCollection(): Span = this + override def finish(finishTimestampMicros: Long): Unit = {} + } - val refinedTags = if(isError) { - metricTags + ("error" -> Span.BooleanTagTrueValue) - } else { - metricTags - } + object Remote { + def apply(spanContext: SpanContext): Remote = + new Remote(spanContext) + } + + sealed trait TagValue + object TagValue { + sealed trait Boolean extends TagValue + case object True extends Boolean + case object False extends Boolean - val latencyHistogram = Span.Metrics.SpanProcessingTimeMetric.refine(refinedTags) - latencyHistogram.record(elapsedTime) + case class String(string: java.lang.String) extends TagValue + case class Number(number: Long) extends TagValue } -} -object Span { object Metrics { val SpanProcessingTimeMetric = Kamon.histogram("span.processing-time", MeasurementUnit.time.microseconds) val SpanErrorCount = Kamon.counter("span.error-count") } - val MetricTagPrefix = "metric." - val BooleanTagTrueValue = "1" - val BooleanTagFalseValue = "0" - - case class LogEntry(timestamp: Long, fields: Map[String, _]) + case class Annotation(timestampMicros: Long, name: String, fields: Map[String, String]) - case class CompletedSpan( + case class FinishedSpan( context: SpanContext, operationName: String, startTimestampMicros: Long, endTimestampMicros: Long, - tags: Map[String, String], - logs: Seq[LogEntry] + tags: Map[String, Span.TagValue], + annotations: Seq[Annotation] ) }
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala new file mode 100644 index 00000000..e04ceb03 --- /dev/null +++ b/kamon-core/src/main/scala/kamon/trace/SpanCodec.scala @@ -0,0 +1,99 @@ +/* ========================================================================================= + * Copyright © 2013-2017 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.trace + +import java.net.{URLDecoder, URLEncoder} + +import kamon.Kamon +import kamon.context.{Codec, Context, TextMap} +import kamon.trace.SpanContext.SamplingDecision + + +object SpanCodec { + + class B3 extends Codec.ForEntry[TextMap] { + import B3.Headers + + override def encode(context: Context): TextMap = { + val span = context.get(Span.ContextKey) + val carrier = TextMap.Default() + + if(span.nonEmpty()) { + val spanContext = span.context + carrier.put(Headers.TraceIdentifier, urlEncode(spanContext.traceID.string)) + carrier.put(Headers.SpanIdentifier, urlEncode(spanContext.spanID.string)) + carrier.put(Headers.ParentSpanIdentifier, urlEncode(spanContext.parentID.string)) + + encodeSamplingDecision(spanContext.samplingDecision).foreach { samplingDecision => + carrier.put(Headers.Sampled, samplingDecision) + } + } + + carrier + } + + override def decode(carrier: TextMap, context: Context): Context = { + val identityProvider = Kamon.tracer.identityProvider + val traceID = carrier.get(Headers.TraceIdentifier) + .map(id => identityProvider.traceIdGenerator().from(urlDecode(id))) + .getOrElse(IdentityProvider.NoIdentifier) + + val spanID = carrier.get(Headers.SpanIdentifier) + .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) + .getOrElse(IdentityProvider.NoIdentifier) + + if(traceID != IdentityProvider.NoIdentifier && spanID != IdentityProvider.NoIdentifier) { + val parentID = carrier.get(Headers.ParentSpanIdentifier) + .map(id => identityProvider.spanIdGenerator().from(urlDecode(id))) + .getOrElse(IdentityProvider.NoIdentifier) + + val flags = carrier.get(Headers.Flags) + + val samplingDecision = flags.orElse(carrier.get(Headers.Sampled)) match { + case Some(sampled) if sampled == "1" => SamplingDecision.Sample + case Some(sampled) if sampled == "0" => SamplingDecision.DoNotSample + case _ => SamplingDecision.Unknown + } + + context.withKey(Span.ContextKey, Span.Remote(SpanContext(traceID, spanID, parentID, samplingDecision))) + + } else context + } + + private def encodeSamplingDecision(samplingDecision: SamplingDecision): Option[String] = samplingDecision match { + case SamplingDecision.Sample => Some("1") + case SamplingDecision.DoNotSample => Some("0") + case SamplingDecision.Unknown => None + } + + private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8") + private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8") + } + + object B3 { + + def apply(): B3 = + new B3() + + object Headers { + val TraceIdentifier = "X-B3-TraceId" + val ParentSpanIdentifier = "X-B3-ParentSpanId" + val SpanIdentifier = "X-B3-SpanId" + val Sampled = "X-B3-Sampled" + val Flags = "X-B3-Flags" + } + } +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContext.scala b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala index b37e208b..4d013881 100644 --- a/kamon-core/src/main/scala/kamon/trace/SpanContext.scala +++ b/kamon-core/src/main/scala/kamon/trace/SpanContext.scala @@ -15,26 +15,51 @@ package kamon.trace -import java.lang -import java.util.{Map => JavaMap} +import kamon.trace.IdentityProvider.Identifier +import kamon.trace.SpanContext.SamplingDecision -import scala.collection.JavaConverters._ +/** + * + * @param traceID + * @param spanID + * @param parentID + * @param samplingDecision + */ +case class SpanContext(traceID: Identifier, spanID: Identifier, parentID: Identifier, samplingDecision: SamplingDecision) { -class SpanContext(val traceID: Long, val spanID: Long, val parentID: Long, val sampled: Boolean, - private var baggage: Map[String, String]) extends io.opentracing.SpanContext { + def createChild(childSpanID: Identifier, samplingDecision: SamplingDecision): SpanContext = + this.copy(parentID = this.spanID, spanID = childSpanID) +} - private[kamon] def addBaggageItem(key: String, value: String): Unit = synchronized { - baggage = baggage + (key -> value) - } +object SpanContext { - private[kamon] def getBaggage(key: String): String = synchronized { - baggage.get(key).getOrElse(null) - } + val EmptySpanContext = SpanContext( + traceID = IdentityProvider.NoIdentifier, + spanID = IdentityProvider.NoIdentifier, + parentID = IdentityProvider.NoIdentifier, + samplingDecision = SamplingDecision.DoNotSample + ) + + + sealed trait SamplingDecision - private[kamon] def baggageMap: Map[String, String] = - baggage + object SamplingDecision { + + /** + * The Trace is sampled, all child Spans should be sampled as well. + */ + case object Sample extends SamplingDecision + + /** + * The Trace is not sampled, none of the child Spans should be sampled. + */ + case object DoNotSample extends SamplingDecision + + /** + * The sampling decision has not been taken yet, the Tracer is free to decide when creating a Span. + */ + case object Unknown extends SamplingDecision - override def baggageItems(): lang.Iterable[JavaMap.Entry[String, String]] = synchronized { - baggage.asJava.entrySet() } -} + +}
\ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala b/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala deleted file mode 100644 index 8e3a446b..00000000 --- a/kamon-core/src/main/scala/kamon/trace/SpanContextCodec.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* ========================================================================================= - * Copyright © 2013-2017 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ - - -package kamon.trace - -import java.net.{URLDecoder, URLEncoder} -import java.util.concurrent.ThreadLocalRandom - -import scala.collection.JavaConverters._ -import io.opentracing.propagation.TextMap -import kamon.util.HexCodec - - -trait SpanContextCodec[T] { - def inject(spanContext: SpanContext, carrier: T): Unit - def extract(carrier: T, sampler: Sampler): SpanContext -} - -object SpanContextCodec { - - val TextMap: SpanContextCodec[TextMap] = new TextMapSpanCodec( - traceIDKey = "TRACE_ID", - parentIDKey = "PARENT_ID", - spanIDKey = "SPAN_ID", - sampledKey = "SAMPLED", - baggagePrefix = "BAGGAGE_", - baggageValueEncoder = identity, - baggageValueDecoder = identity - ) - - val ZipkinB3: SpanContextCodec[TextMap] = new TextMapSpanCodec( - traceIDKey = "X-B3-TraceId", - parentIDKey = "X-B3-ParentSpanId", - spanIDKey = "X-B3-SpanId", - sampledKey = "X-B3-Sampled", - baggagePrefix = "X-B3-Baggage-", - baggageValueEncoder = urlEncode, - baggageValueDecoder = urlDecode - ) - - private def urlEncode(s: String): String = URLEncoder.encode(s, "UTF-8") - private def urlDecode(s: String): String = URLDecoder.decode(s, "UTF-8") - - private class TextMapSpanCodec(traceIDKey: String, parentIDKey: String, spanIDKey: String, sampledKey: String, baggagePrefix: String, - baggageValueEncoder: String => String, baggageValueDecoder: String => String) extends SpanContextCodec[TextMap] { - - override def inject(spanContext: SpanContext, carrier: TextMap): Unit = { - carrier.put(traceIDKey, encodeLong(spanContext.traceID)) - carrier.put(parentIDKey, encodeLong(spanContext.parentID)) - carrier.put(spanIDKey, encodeLong(spanContext.spanID)) - - spanContext.baggageItems().iterator().asScala.foreach { entry => - carrier.put(baggagePrefix + entry.getKey, baggageValueEncoder(entry.getValue)) - } - } - - override def extract(carrier: TextMap, sampler: Sampler): SpanContext = { - var traceID: String = null - var parentID: String = null - var spanID: String = null - var sampled: String = null - var baggage: Map[String, String] = Map.empty - - carrier.iterator().asScala.foreach { entry => - if(entry.getKey.equals(traceIDKey)) - traceID = baggageValueDecoder(entry.getValue) - else if(entry.getKey.equals(parentIDKey)) - parentID = baggageValueDecoder(entry.getValue) - else if(entry.getKey.equals(spanIDKey)) - spanID = baggageValueDecoder(entry.getValue) - else if(entry.getKey.equals(sampledKey)) - sampled = entry.getValue - else if(entry.getKey.startsWith(baggagePrefix)) - baggage = baggage + (entry.getKey.substring(baggagePrefix.length) -> baggageValueDecoder(entry.getValue)) - } - - if(traceID != null && spanID != null) { - val actualParent = if(parentID == null) 0L else decodeLong(parentID) - val isSampled = if(sampled == null) sampler.decide(ThreadLocalRandom.current().nextLong()) else sampled.equals("1") - - new SpanContext(decodeLong(traceID), decodeLong(spanID), actualParent, isSampled, baggage) - } else null - } - - private def decodeLong(input: String): Long = - HexCodec.lowerHexToUnsignedLong(input) - - private def encodeLong(input: Long): String = - HexCodec.toLowerHex(input) - - } -} diff --git a/kamon-core/src/main/scala/kamon/trace/Tracer.scala b/kamon-core/src/main/scala/kamon/trace/Tracer.scala index 19067f5e..7d8830ca 100644 --- a/kamon-core/src/main/scala/kamon/trace/Tracer.scala +++ b/kamon-core/src/main/scala/kamon/trace/Tracer.scala @@ -13,148 +13,156 @@ * ========================================================================================= */ - package kamon.trace -import java.util.concurrent.ThreadLocalRandom - import com.typesafe.config.Config -import io.opentracing.propagation.{Format, TextMap} -import io.opentracing.propagation.Format.Builtin.{BINARY, HTTP_HEADERS, TEXT_MAP} -import io.opentracing.util.ThreadLocalActiveSpanSource -import kamon.ReporterRegistryImpl +import kamon.{Kamon, ReporterRegistryImpl} import kamon.metric.MetricLookup -import kamon.util.Clock +import kamon.trace.Span.TagValue +import kamon.trace.SpanContext.SamplingDecision +import kamon.trace.Tracer.SpanBuilder +import kamon.util.{Clock, DynamicAccess} import org.slf4j.LoggerFactory +import scala.collection.immutable +import scala.util.Try -class Tracer(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config) - extends ThreadLocalActiveSpanSource with io.opentracing.Tracer { +trait Tracer { + def buildSpan(operationName: String): SpanBuilder + def identityProvider: IdentityProvider +} - private val logger = LoggerFactory.getLogger(classOf[Tracer]) - private val tracerMetrics = new TracerMetrics(metrics) +object Tracer { - @volatile private var configuredSampler: Sampler = Sampler.never - @volatile private var textMapSpanContextCodec = SpanContextCodec.TextMap - @volatile private var httpHeaderSpanContextCodec = SpanContextCodec.ZipkinB3 + final class Default(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config) extends Tracer { + private val logger = LoggerFactory.getLogger(classOf[Tracer]) - reconfigure(initialConfig) + private[Tracer] val tracerMetrics = new TracerMetrics(metrics) + @volatile private[Tracer] var joinRemoteParentsWithSameSpanID: Boolean = true + @volatile private[Tracer] var configuredSampler: Sampler = Sampler.Never + @volatile private[Tracer] var _identityProvider: IdentityProvider = IdentityProvider.Default() - override def buildSpan(operationName: String): io.opentracing.Tracer.SpanBuilder = - new SpanBuilder(operationName) + reconfigure(initialConfig) - override def extract[C](format: Format[C], carrier: C): io.opentracing.SpanContext = format match { - case HTTP_HEADERS => httpHeaderSpanContextCodec.extract(carrier.asInstanceOf[TextMap], configuredSampler) - case TEXT_MAP => textMapSpanContextCodec.extract(carrier.asInstanceOf[TextMap], configuredSampler) - case BINARY => null // TODO: Implement Binary Encoding - case _ => null - } + override def buildSpan(operationName: String): SpanBuilder = + new SpanBuilder(operationName, this, reporterRegistry) - override def inject[C](spanContext: io.opentracing.SpanContext, format: Format[C], carrier: C): Unit = format match { - case HTTP_HEADERS => httpHeaderSpanContextCodec.inject(spanContext.asInstanceOf[SpanContext], carrier.asInstanceOf[TextMap]) - case TEXT_MAP => textMapSpanContextCodec.inject(spanContext.asInstanceOf[SpanContext], carrier.asInstanceOf[TextMap]) - case BINARY => - case _ => - } + override def identityProvider: IdentityProvider = + this._identityProvider + + def sampler: Sampler = + configuredSampler + + private[kamon] def reconfigure(config: Config): Unit = synchronized { + Try { + val dynamic = new DynamicAccess(getClass.getClassLoader) + val traceConfig = config.getConfig("kamon.trace") - def sampler: Sampler = - configuredSampler + val newSampler = traceConfig.getString("sampler") match { + case "always" => Sampler.Always + case "never" => Sampler.Never + case "random" => Sampler.random(traceConfig.getDouble("random-sampler.probability")) + case other => sys.error(s"Unexpected sampler name $other.") + } + + val newJoinRemoteParentsWithSameSpanID = traceConfig.getBoolean("join-remote-parents-with-same-span-id") - def setTextMapSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit = - this.textMapSpanContextCodec = codec + val newIdentityProvider = dynamic.createInstanceFor[IdentityProvider]( + traceConfig.getString("identity-provider"), immutable.Seq.empty[(Class[_], AnyRef)] + ).get - def setHttpHeaderSpanContextCodec(codec: SpanContextCodec[TextMap]): Unit = - this.httpHeaderSpanContextCodec = codec + configuredSampler = newSampler + joinRemoteParentsWithSameSpanID = newJoinRemoteParentsWithSameSpanID + _identityProvider = newIdentityProvider - private class SpanBuilder(operationName: String) extends io.opentracing.Tracer.SpanBuilder { - private var parentContext: SpanContext = _ + }.failed.foreach { + ex => logger.error("Unable to reconfigure Kamon Tracer", ex) + } + } + } + + object Default { + def apply(metrics: MetricLookup, reporterRegistry: ReporterRegistryImpl, initialConfig: Config): Default = + new Default(metrics, reporterRegistry, initialConfig) + } + + final class SpanBuilder(operationName: String, tracer: Tracer.Default, reporterRegistry: ReporterRegistryImpl) { + private var parentSpan: Span = _ private var startTimestamp = 0L - private var initialTags = Map.empty[String, String] + private var initialSpanTags = Map.empty[String, Span.TagValue] + private var initialMetricTags = Map.empty[String, String] private var useActiveSpanAsParent = true - override def asChildOf(parent: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = parent match { - case spanContext: kamon.trace.SpanContext => - this.parentContext = spanContext - this - case null => this - case _ => logger.error("Can't extract the parent ID from a non-Kamon SpanContext"); this + def asChildOf(parent: Span): SpanBuilder = { + if(parent != Span.Empty) this.parentSpan = parent + this } - override def asChildOf(parent: io.opentracing.BaseSpan[_]): io.opentracing.Tracer.SpanBuilder = - asChildOf(parent.context()) - - override def addReference(referenceType: String, referencedContext: io.opentracing.SpanContext): io.opentracing.Tracer.SpanBuilder = { - if(referenceType != null && referenceType.equals(io.opentracing.References.CHILD_OF)) { - asChildOf(referencedContext) - } else this + def withMetricTag(key: String, value: String): SpanBuilder = { + this.initialMetricTags = this.initialMetricTags + (key -> value) + this } - override def withTag(key: String, value: String): io.opentracing.Tracer.SpanBuilder = { - this.initialTags = this.initialTags + (key -> value) + def withSpanTag(key: String, value: String): SpanBuilder = { + this.initialSpanTags = this.initialSpanTags + (key -> TagValue.String(value)) this } - override def withTag(key: String, value: Boolean): io.opentracing.Tracer.SpanBuilder = { - this.initialTags = this.initialTags + (key -> value.toString) + def withSpanTag(key: String, value: Long): SpanBuilder = { + this.initialSpanTags = this.initialSpanTags + (key -> TagValue.Number(value)) this } - override def withTag(key: String, value: Number): io.opentracing.Tracer.SpanBuilder = { - this.initialTags = this.initialTags + (key -> value.toString) + def withSpanTag(key: String, value: Boolean): SpanBuilder = { + val tagValue = if (value) TagValue.True else TagValue.False + this.initialSpanTags = this.initialSpanTags + (key -> tagValue) this } - override def withStartTimestamp(microseconds: Long): io.opentracing.Tracer.SpanBuilder = { + def withStartTimestamp(microseconds: Long): SpanBuilder = { this.startTimestamp = microseconds this } - override def ignoreActiveSpan(): io.opentracing.Tracer.SpanBuilder = { + def ignoreActiveSpan(): SpanBuilder = { this.useActiveSpanAsParent = false this } - override def start(): io.opentracing.Span = - startManual() + def start(): Span = { + val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else Clock.microTimestamp() - override def startActive(): io.opentracing.ActiveSpan = - makeActive(startManual()) + val parentSpan: Option[Span] = Option(this.parentSpan) + .orElse(if(useActiveSpanAsParent) Some(Kamon.currentContext().get(Span.ContextKey)) else None) + .filter(span => span != Span.Empty) - override def startManual(): Span = { - val startTimestampMicros = if(startTimestamp != 0L) startTimestamp else Clock.microTimestamp() + val samplingDecision: SamplingDecision = parentSpan + .map(_.context.samplingDecision) + .filter(_ != SamplingDecision.Unknown) + .getOrElse(tracer.sampler.decide(operationName, initialSpanTags)) - if(parentContext == null && useActiveSpanAsParent) { - val possibleParent = activeSpan() - if(possibleParent != null) - parentContext = possibleParent.context().asInstanceOf[SpanContext] + val spanContext = parentSpan match { + case Some(parent) => joinParentContext(parent, samplingDecision) + case None => newSpanContext(samplingDecision) } - val spanContext = - if(parentContext != null) - new SpanContext(parentContext.traceID, createID(), parentContext.spanID, parentContext.sampled, parentContext.baggageMap) - else { - val traceID = createID() - new SpanContext(traceID, traceID, 0L, configuredSampler.decide(traceID), Map.empty) - } - - tracerMetrics.createdSpans.increment() - new Span(spanContext, operationName, initialTags, startTimestampMicros, reporterRegistry) + tracer.tracerMetrics.createdSpans.increment() + Span.Local(spanContext, operationName, initialSpanTags, initialMetricTags, startTimestampMicros, reporterRegistry) } - private def createID(): Long = - ThreadLocalRandom.current().nextLong() - } - - - private[kamon] def reconfigure(config: Config): Unit = synchronized { - val traceConfig = config.getConfig("kamon.trace") - - configuredSampler = traceConfig.getString("sampler") match { - case "always" => Sampler.always - case "never" => Sampler.never - case "random" => Sampler.random(traceConfig.getDouble("sampler-random.chance")) - case other => sys.error(s"Unexpected sampler name $other.") - } + private def joinParentContext(parent: Span, samplingDecision: SamplingDecision): SpanContext = + if(parent.isRemote() && tracer.joinRemoteParentsWithSameSpanID) + parent.context().copy(samplingDecision = samplingDecision) + else + parent.context().createChild(tracer._identityProvider.spanIdGenerator().generate(), samplingDecision) + + private def newSpanContext(samplingDecision: SamplingDecision): SpanContext = + SpanContext( + traceID = tracer._identityProvider.traceIdGenerator().generate(), + spanID = tracer._identityProvider.spanIdGenerator().generate(), + parentID = IdentityProvider.NoIdentifier, + samplingDecision = samplingDecision + ) } private final class TracerMetrics(metricLookup: MetricLookup) { diff --git a/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala b/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala deleted file mode 100644 index 885a73d9..00000000 --- a/kamon-core/src/main/scala/kamon/util/BaggageOnMDC.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * ========================================================================================= - * Copyright © 2013-2015 the kamon project <http://kamon.io/> - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * ========================================================================================= - */ -package kamon.util - -import java.util.function.Supplier - -import kamon.trace.{SpanContext => KamonSpanContext} -import kamon.Kamon -import org.slf4j.MDC - -import scala.collection.JavaConverters._ - -object BaggageOnMDC { - - /** - * Copy all baggage keys into SLF4J's MDC, evaluates the provided piece of code and removes the baggage keys - * afterwards, only when there is a currently active span. Optionally adds the Trace ID as well. - * - */ - def withBaggageOnMDC[T](includeTraceID: Boolean, code: => T): T = { - val activeSpan = Kamon.activeSpan() - if(activeSpan == null) - code - else { - val baggageItems = activeSpan.context().baggageItems().asScala - baggageItems.foreach(entry => MDC.put(entry.getKey, entry.getValue)) - if(includeTraceID) - addTraceIDToMDC(activeSpan.context()) - - val evaluatedCode = code - - baggageItems.foreach(entry => MDC.remove(entry.getKey)) - if(includeTraceID) - removeTraceIDFromMDC() - - evaluatedCode - - } - } - - def withBaggageOnMDC[T](code: Supplier[T]): T = - withBaggageOnMDC(true, code.get()) - - def withBaggageOnMDC[T](includeTraceID: Boolean, code: Supplier[T]): T = - withBaggageOnMDC(includeTraceID, code.get()) - - def withBaggageOnMDC[T](code: => T): T = - withBaggageOnMDC(true, code) - - private val TraceIDKey = "trace_id" - - private def addTraceIDToMDC(context: io.opentracing.SpanContext): Unit = context match { - case ctx: KamonSpanContext => MDC.put(TraceIDKey, HexCodec.toLowerHex(ctx.traceID)) - case _ => - } - - private def removeTraceIDFromMDC(): Unit = - MDC.remove(TraceIDKey) -} |