diff options
author | Ivan Topolnjak <ivantopo@gmail.com> | 2014-08-13 00:47:01 -0400 |
---|---|---|
committer | Ivan Topolnjak <ivantopo@gmail.com> | 2014-08-13 00:47:01 -0400 |
commit | 207034dd531687b28819980bb61831576ef5a3ea (patch) | |
tree | 927e339619d5ba74339f640683b55de17966bcc5 | |
parent | cc26f291af439812954139f1f4efc84e9ed4c1fd (diff) | |
parent | 79ccffe3fff870d8a9338576eff6caad7ed69992 (diff) | |
download | Kamon-207034dd531687b28819980bb61831576ef5a3ea.tar.gz Kamon-207034dd531687b28819980bb61831576ef5a3ea.tar.bz2 Kamon-207034dd531687b28819980bb61831576ef5a3ea.zip |
Merge branch 'master' into release-0.3_scala-2.11
22 files changed, 889 insertions, 88 deletions
diff --git a/kamon-core/src/main/java/kamon/jsr166/LongAdder.java b/kamon-core/src/main/java/kamon/jsr166/LongAdder.java new file mode 100644 index 00000000..18f91759 --- /dev/null +++ b/kamon-core/src/main/java/kamon/jsr166/LongAdder.java @@ -0,0 +1,209 @@ +/* + +Note: this was copied from Doug Lea's CVS repository + http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/ + +LongAdder.java version 1.14 + +*/ + + +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +package kamon.jsr166; + +import java.io.Serializable; + +/** + * One or more variables that together maintain an initially zero + * {@code long} sum. When updates (method {@link #add}) are contended + * across threads, the set of variables may grow dynamically to reduce + * contention. Method {@link #sum} (or, equivalently, {@link + * #longValue}) returns the current total combined across the + * variables maintaining the sum. + * + * <p>This class is usually preferable to {@link java.util.concurrent.atomic.AtomicLong} when + * multiple threads update a common sum that is used for purposes such + * as collecting statistics, not for fine-grained synchronization + * control. Under low update contention, the two classes have similar + * characteristics. But under high contention, expected throughput of + * this class is significantly higher, at the expense of higher space + * consumption. + * + * <p>This class extends {@link Number}, but does <em>not</em> define + * methods such as {@code equals}, {@code hashCode} and {@code + * compareTo} because instances are expected to be mutated, and so are + * not useful as collection keys. + * + * <p><em>jsr166e note: This class is targeted to be placed in + * java.util.concurrent.atomic.</em> + * + * @since 1.8 + * @author Doug Lea + */ +public class LongAdder extends Striped64 implements Serializable { + private static final long serialVersionUID = 7249069246863182397L; + + /** + * Version of plus for use in retryUpdate + */ + final long fn(long v, long x) { return v + x; } + + /** + * Creates a new adder with initial sum of zero. + */ + public LongAdder() { + } + + /** + * Adds the given value. + * + * @param x the value to add + */ + public void add(long x) { + Cell[] as; long b, v; HashCode hc; Cell a; int n; + if ((as = cells) != null || !casBase(b = base, b + x)) { + boolean uncontended = true; + int h = (hc = threadHashCode.get()).code; + if (as == null || (n = as.length) < 1 || + (a = as[(n - 1) & h]) == null || + !(uncontended = a.cas(v = a.value, v + x))) + retryUpdate(x, hc, uncontended); + } + } + + /** + * Equivalent to {@code add(1)}. + */ + public void increment() { + add(1L); + } + + /** + * Equivalent to {@code add(-1)}. + */ + public void decrement() { + add(-1L); + } + + /** + * Returns the current sum. The returned value is <em>NOT</em> an + * atomic snapshot; invocation in the absence of concurrent + * updates returns an accurate result, but concurrent updates that + * occur while the sum is being calculated might not be + * incorporated. + * + * @return the sum + */ + public long sum() { + long sum = base; + Cell[] as = cells; + if (as != null) { + int n = as.length; + for (int i = 0; i < n; ++i) { + Cell a = as[i]; + if (a != null) + sum += a.value; + } + } + return sum; + } + + /** + * Resets variables maintaining the sum to zero. This method may + * be a useful alternative to creating a new adder, but is only + * effective if there are no concurrent updates. Because this + * method is intrinsically racy, it should only be used when it is + * known that no threads are concurrently updating. + */ + public void reset() { + internalReset(0L); + } + + /** + * Equivalent in effect to {@link #sum} followed by {@link + * #reset}. This method may apply for example during quiescent + * points between multithreaded computations. If there are + * updates concurrent with this method, the returned value is + * <em>not</em> guaranteed to be the final value occurring before + * the reset. + * + * @return the sum + */ + public long sumThenReset() { + long sum = base; + Cell[] as = cells; + base = 0L; + if (as != null) { + int n = as.length; + for (int i = 0; i < n; ++i) { + Cell a = as[i]; + if (a != null) { + sum += a.value; + a.value = 0L; + } + } + } + return sum; + } + + /** + * Returns the String representation of the {@link #sum}. + * @return the String representation of the {@link #sum} + */ + public String toString() { + return Long.toString(sum()); + } + + /** + * Equivalent to {@link #sum}. + * + * @return the sum + */ + public long longValue() { + return sum(); + } + + /** + * Returns the {@link #sum} as an {@code int} after a narrowing + * primitive conversion. + */ + public int intValue() { + return (int)sum(); + } + + /** + * Returns the {@link #sum} as a {@code float} + * after a widening primitive conversion. + */ + public float floatValue() { + return (float)sum(); + } + + /** + * Returns the {@link #sum} as a {@code double} after a widening + * primitive conversion. + */ + public double doubleValue() { + return (double)sum(); + } + + private void writeObject(java.io.ObjectOutputStream s) + throws java.io.IOException { + s.defaultWriteObject(); + s.writeLong(sum()); + } + + private void readObject(java.io.ObjectInputStream s) + throws java.io.IOException, ClassNotFoundException { + s.defaultReadObject(); + busy = 0; + cells = null; + base = s.readLong(); + } + +} diff --git a/kamon-core/src/main/java/kamon/jsr166/LongMaxUpdater.java b/kamon-core/src/main/java/kamon/jsr166/LongMaxUpdater.java new file mode 100644 index 00000000..fc9ea4e5 --- /dev/null +++ b/kamon-core/src/main/java/kamon/jsr166/LongMaxUpdater.java @@ -0,0 +1,191 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +package kamon.jsr166; +import java.io.Serializable; + +/** + * One or more variables that together maintain a running {@code long} + * maximum with initial value {@code Long.MIN_VALUE}. When updates + * (method {@link #update}) are contended across threads, the set of + * variables may grow dynamically to reduce contention. Method {@link + * #max} (or, equivalently, {@link #longValue}) returns the current + * maximum across the variables maintaining updates. + * + * <p>This class extends {@link Number}, but does <em>not</em> define + * methods such as {@code equals}, {@code hashCode} and {@code + * compareTo} because instances are expected to be mutated, and so are + * not useful as collection keys. + * + * <p><em>jsr166e note: This class is targeted to be placed in + * java.util.concurrent.atomic.</em> + * + * @since 1.8 + * @author Doug Lea + */ +public class LongMaxUpdater extends Striped64 implements Serializable { + private static final long serialVersionUID = 7249069246863182397L; + + /** + * Version of max for use in retryUpdate + */ + final long fn(long v, long x) { return v > x ? v : x; } + + /** + * Creates a new instance with initial maximum of {@code + * Long.MIN_VALUE}. + */ + public LongMaxUpdater() { + base = Long.MIN_VALUE; + } + + /** + * Creates a new instance with the given initialValue + */ + public LongMaxUpdater(long initialValue) { + base = initialValue; + } + + + /** + * Updates the maximum to be at least the given value. + * + * @param x the value to update + */ + public void update(long x) { + Cell[] as; long b, v; HashCode hc; Cell a; int n; + if ((as = cells) != null || + (b = base) < x && !casBase(b, x)) { + boolean uncontended = true; + int h = (hc = threadHashCode.get()).code; + if (as == null || (n = as.length) < 1 || + (a = as[(n - 1) & h]) == null || + ((v = a.value) < x && !(uncontended = a.cas(v, x)))) + retryUpdate(x, hc, uncontended); + } + } + + /** + * Returns the current maximum. The returned value is + * <em>NOT</em> an atomic snapshot; invocation in the absence of + * concurrent updates returns an accurate result, but concurrent + * updates that occur while the value is being calculated might + * not be incorporated. + * + * @return the maximum + */ + public long max() { + Cell[] as = cells; + long max = base; + if (as != null) { + int n = as.length; + long v; + for (int i = 0; i < n; ++i) { + Cell a = as[i]; + if (a != null && (v = a.value) > max) + max = v; + } + } + return max; + } + + /** + * Resets variables maintaining updates to {@code Long.MIN_VALUE}. + * This method may be a useful alternative to creating a new + * updater, but is only effective if there are no concurrent + * updates. Because this method is intrinsically racy, it should + * only be used when it is known that no threads are concurrently + * updating. + */ + public void reset() { + internalReset(Long.MIN_VALUE); + } + + /** + * Equivalent in effect to {@link #max} followed by {@link + * #reset}. This method may apply for example during quiescent + * points between multithreaded computations. If there are + * updates concurrent with this method, the returned value is + * <em>not</em> guaranteed to be the final value occurring before + * the reset. + * + * @return the maximum + */ + public long maxThenReset(long newValue) { + Cell[] as = cells; + long max = base; + base = newValue; + if (as != null) { + int n = as.length; + for (int i = 0; i < n; ++i) { + Cell a = as[i]; + if (a != null) { + long v = a.value; + a.value = newValue; + if (v > max) + max = v; + } + } + } + return max; + } + + /** + * Returns the String representation of the {@link #max}. + * @return the String representation of the {@link #max} + */ + public String toString() { + return Long.toString(max()); + } + + /** + * Equivalent to {@link #max}. + * + * @return the maximum + */ + public long longValue() { + return max(); + } + + /** + * Returns the {@link #max} as an {@code int} after a narrowing + * primitive conversion. + */ + public int intValue() { + return (int)max(); + } + + /** + * Returns the {@link #max} as a {@code float} + * after a widening primitive conversion. + */ + public float floatValue() { + return (float)max(); + } + + /** + * Returns the {@link #max} as a {@code double} after a widening + * primitive conversion. + */ + public double doubleValue() { + return (double)max(); + } + + private void writeObject(java.io.ObjectOutputStream s) + throws java.io.IOException { + s.defaultWriteObject(); + s.writeLong(max()); + } + + private void readObject(java.io.ObjectInputStream s) + throws java.io.IOException, ClassNotFoundException { + s.defaultReadObject(); + busy = 0; + cells = null; + base = s.readLong(); + } + +} diff --git a/kamon-core/src/main/java/kamon/jsr166/Striped64.java b/kamon-core/src/main/java/kamon/jsr166/Striped64.java new file mode 100644 index 00000000..4840c8d7 --- /dev/null +++ b/kamon-core/src/main/java/kamon/jsr166/Striped64.java @@ -0,0 +1,352 @@ +/* + +Note: this was copied from Doug Lea's CVS repository + http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/ + +Striped64.java version 1.8 + +*/ + + +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +package kamon.jsr166; + +import java.util.Random; + +/** + * A package-local class holding common representation and mechanics + * for classes supporting dynamic striping on 64bit values. The class + * extends Number so that concrete subclasses must publicly do so. + */ +abstract class Striped64 extends Number { + /* + * This class maintains a lazily-initialized table of atomically + * updated variables, plus an extra "base" field. The table size + * is a power of two. Indexing uses masked per-thread hash codes. + * Nearly all declarations in this class are package-private, + * accessed directly by subclasses. + * + * Table entries are of class Cell; a variant of AtomicLong padded + * to reduce cache contention on most processors. Padding is + * overkill for most Atomics because they are usually irregularly + * scattered in memory and thus don't interfere much with each + * other. But Atomic objects residing in arrays will tend to be + * placed adjacent to each other, and so will most often share + * cache lines (with a huge negative performance impact) without + * this precaution. + * + * In part because Cells are relatively large, we avoid creating + * them until they are needed. When there is no contention, all + * updates are made to the base field. Upon first contention (a + * failed CAS on base update), the table is initialized to size 2. + * The table size is doubled upon further contention until + * reaching the nearest power of two greater than or equal to the + * number of CPUS. Table slots remain empty (null) until they are + * needed. + * + * A single spinlock ("busy") is used for initializing and + * resizing the table, as well as populating slots with new Cells. + * There is no need for a blocking lock; when the lock is not + * available, threads try other slots (or the base). During these + * retries, there is increased contention and reduced locality, + * which is still better than alternatives. + * + * Per-thread hash codes are initialized to random values. + * Contention and/or table collisions are indicated by failed + * CASes when performing an update operation (see method + * retryUpdate). Upon a collision, if the table size is less than + * the capacity, it is doubled in size unless some other thread + * holds the lock. If a hashed slot is empty, and lock is + * available, a new Cell is created. Otherwise, if the slot + * exists, a CAS is tried. Retries proceed by "double hashing", + * using a secondary hash (Marsaglia XorShift) to try to find a + * free slot. + * + * The table size is capped because, when there are more threads + * than CPUs, supposing that each thread were bound to a CPU, + * there would exist a perfect hash function mapping threads to + * slots that eliminates collisions. When we reach capacity, we + * search for this mapping by randomly varying the hash codes of + * colliding threads. Because search is random, and collisions + * only become known via CAS failures, convergence can be slow, + * and because threads are typically not bound to CPUS forever, + * may not occur at all. However, despite these limitations, + * observed contention rates are typically low in these cases. + * + * It is possible for a Cell to become unused when threads that + * once hashed to it terminate, as well as in the case where + * doubling the table causes no thread to hash to it under + * expanded mask. We do not try to detect or remove such cells, + * under the assumption that for long-running instances, observed + * contention levels will recur, so the cells will eventually be + * needed again; and for short-lived ones, it does not matter. + */ + + /** + * Padded variant of AtomicLong supporting only raw accesses plus CAS. + * The value field is placed between pads, hoping that the JVM doesn't + * reorder them. + * + * JVM intrinsics note: It would be possible to use a release-only + * form of CAS here, if it were provided. + */ + static final class Cell { + volatile long p0, p1, p2, p3, p4, p5, p6; + volatile long value; + volatile long q0, q1, q2, q3, q4, q5, q6; + Cell(long x) { value = x; } + + final boolean cas(long cmp, long val) { + return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); + } + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long valueOffset; + static { + try { + UNSAFE = getUnsafe(); + Class<?> ak = Cell.class; + valueOffset = UNSAFE.objectFieldOffset + (ak.getDeclaredField("value")); + } catch (Exception e) { + throw new Error(e); + } + } + + } + + /** + * Holder for the thread-local hash code. The code is initially + * random, but may be set to a different value upon collisions. + */ + static final class HashCode { + static final Random rng = new Random(); + int code; + HashCode() { + int h = rng.nextInt(); // Avoid zero to allow xorShift rehash + code = (h == 0) ? 1 : h; + } + } + + /** + * The corresponding ThreadLocal class + */ + static final class ThreadHashCode extends ThreadLocal<HashCode> { + public HashCode initialValue() { return new HashCode(); } + } + + /** + * Static per-thread hash codes. Shared across all instances to + * reduce ThreadLocal pollution and because adjustments due to + * collisions in one table are likely to be appropriate for + * others. + */ + static final ThreadHashCode threadHashCode = new ThreadHashCode(); + + /** Number of CPUS, to place bound on table size */ + static final int NCPU = Runtime.getRuntime().availableProcessors(); + + /** + * Table of cells. When non-null, size is a power of 2. + */ + transient volatile Cell[] cells; + + /** + * Base value, used mainly when there is no contention, but also as + * a fallback during table initialization races. Updated via CAS. + */ + transient volatile long base; + + /** + * Spinlock (locked via CAS) used when resizing and/or creating Cells. + */ + transient volatile int busy; + + /** + * Package-private default constructor + */ + Striped64() { + } + + /** + * CASes the base field. + */ + final boolean casBase(long cmp, long val) { + return UNSAFE.compareAndSwapLong(this, baseOffset, cmp, val); + } + + /** + * CASes the busy field from 0 to 1 to acquire lock. + */ + final boolean casBusy() { + return UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1); + } + + /** + * Computes the function of current and new value. Subclasses + * should open-code this update function for most uses, but the + * virtualized form is needed within retryUpdate. + * + * @param currentValue the current value (of either base or a cell) + * @param newValue the argument from a user update call + * @return result of the update function + */ + abstract long fn(long currentValue, long newValue); + + /** + * Handles cases of updates involving initialization, resizing, + * creating new Cells, and/or contention. See above for + * explanation. This method suffers the usual non-modularity + * problems of optimistic retry code, relying on rechecked sets of + * reads. + * + * @param x the value + * @param hc the hash code holder + * @param wasUncontended false if CAS failed before call + */ + final void retryUpdate(long x, HashCode hc, boolean wasUncontended) { + int h = hc.code; + boolean collide = false; // True if last slot nonempty + for (;;) { + Cell[] as; Cell a; int n; long v; + if ((as = cells) != null && (n = as.length) > 0) { + if ((a = as[(n - 1) & h]) == null) { + if (busy == 0) { // Try to attach new Cell + Cell r = new Cell(x); // Optimistically create + if (busy == 0 && casBusy()) { + boolean created = false; + try { // Recheck under lock + Cell[] rs; int m, j; + if ((rs = cells) != null && + (m = rs.length) > 0 && + rs[j = (m - 1) & h] == null) { + rs[j] = r; + created = true; + } + } finally { + busy = 0; + } + if (created) + break; + continue; // Slot is now non-empty + } + } + collide = false; + } + else if (!wasUncontended) // CAS already known to fail + wasUncontended = true; // Continue after rehash + else if (a.cas(v = a.value, fn(v, x))) + break; + else if (n >= NCPU || cells != as) + collide = false; // At max size or stale + else if (!collide) + collide = true; + else if (busy == 0 && casBusy()) { + try { + if (cells == as) { // Expand table unless stale + Cell[] rs = new Cell[n << 1]; + for (int i = 0; i < n; ++i) + rs[i] = as[i]; + cells = rs; + } + } finally { + busy = 0; + } + collide = false; + continue; // Retry with expanded table + } + h ^= h << 13; // Rehash + h ^= h >>> 17; + h ^= h << 5; + } + else if (busy == 0 && cells == as && casBusy()) { + boolean init = false; + try { // Initialize table + if (cells == as) { + Cell[] rs = new Cell[2]; + rs[h & 1] = new Cell(x); + cells = rs; + init = true; + } + } finally { + busy = 0; + } + if (init) + break; + } + else if (casBase(v = base, fn(v, x))) + break; // Fall back on using base + } + hc.code = h; // Record index for next time + } + + + /** + * Sets base and all cells to the given value. + */ + final void internalReset(long initialValue) { + Cell[] as = cells; + base = initialValue; + if (as != null) { + int n = as.length; + for (int i = 0; i < n; ++i) { + Cell a = as[i]; + if (a != null) + a.value = initialValue; + } + } + } + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long baseOffset; + private static final long busyOffset; + static { + try { + UNSAFE = getUnsafe(); + Class<?> sk = Striped64.class; + baseOffset = UNSAFE.objectFieldOffset + (sk.getDeclaredField("base")); + busyOffset = UNSAFE.objectFieldOffset + (sk.getDeclaredField("busy")); + } catch (Exception e) { + throw new Error(e); + } + } + + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + private static sun.misc.Unsafe getUnsafe() { + try { + return sun.misc.Unsafe.getUnsafe(); + } catch (SecurityException tryReflectionInstead) {} + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() { + public sun.misc.Unsafe run() throws Exception { + Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class; + for (java.lang.reflect.Field f : k.getDeclaredFields()) { + f.setAccessible(true); + Object x = f.get(null); + if (k.isInstance(x)) + return k.cast(x); + } + throw new NoSuchFieldError("the Unsafe"); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); + } + } +} diff --git a/kamon-core/src/main/scala/kamon/Kamon.scala b/kamon-core/src/main/scala/kamon/Kamon.scala index 44e895bd..24bbb5f0 100644 --- a/kamon-core/src/main/scala/kamon/Kamon.scala +++ b/kamon-core/src/main/scala/kamon/Kamon.scala @@ -16,13 +16,10 @@ package kamon import akka.actor._ -import akka.event.Logging.{ Info, Error } +import akka.event.Logging.Error object Kamon { trait Extension extends akka.actor.Extension { - def publishInfoMessage(system: ActorSystem, msg: String): Unit = { - system.eventStream.publish(Info("", classOf[Extension], msg)) - } def publishErrorMessage(system: ActorSystem, msg: String, cause: Throwable): Unit = { system.eventStream.publish(new Error(cause, "", classOf[Extension], msg)) } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala index 446bc487..235f5143 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorCellInstrumentation.scala @@ -52,7 +52,7 @@ class ActorCellInstrumentation { val cellWithMetrics = cell.asInstanceOf[ActorCellMetrics] try { - TraceRecorder.withTraceContext(contextAndTimestamp.traceContext) { + TraceRecorder.withInlineTraceContextReplacement(contextAndTimestamp.traceContext) { pjp.proceed() } } finally { diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala index ee9d442f..82b8304d 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorLoggingInstrumentation.scala @@ -40,7 +40,7 @@ class ActorLoggingInstrumentation { @Around("withMdcInvocation(logSource, logEvent, logStatement)") def aroundWithMdcInvocation(pjp: ProceedingJoinPoint, logSource: String, logEvent: TraceContextAware, logStatement: () ⇒ _): Unit = { - TraceRecorder.withTraceContext(logEvent.traceContext) { + TraceRecorder.withInlineTraceContextReplacement(logEvent.traceContext) { pjp.proceed() } } diff --git a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala index 9b6b6866..7845e90d 100644 --- a/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala +++ b/kamon-core/src/main/scala/kamon/instrumentation/akka/ActorSystemMessageInstrumentation.scala @@ -31,7 +31,7 @@ class ActorSystemMessageInstrumentation { def aroundSystemMessageInvoke(pjp: ProceedingJoinPoint, messages: EarliestFirstSystemMessageList): Any = { if (messages.nonEmpty) { val ctx = messages.head.asInstanceOf[TraceContextAware].traceContext - TraceRecorder.withTraceContext(ctx)(pjp.proceed()) + TraceRecorder.withInlineTraceContextReplacement(ctx)(pjp.proceed()) } else pjp.proceed() } @@ -73,7 +73,7 @@ class TraceContextIntoRepointableActorRefMixin { @Around("repointableActorRefCreation(repointableActorRef)") def afterRepointableActorRefCreation(pjp: ProceedingJoinPoint, repointableActorRef: TraceContextAware): Any = { - TraceRecorder.withTraceContext(repointableActorRef.traceContext) { + TraceRecorder.withInlineTraceContextReplacement(repointableActorRef.traceContext) { pjp.proceed() } } diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala index b592bcd3..0f29ba6f 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Counter.scala @@ -16,7 +16,7 @@ package kamon.metric.instrument -import jsr166e.LongAdder +import kamon.jsr166.LongAdder import kamon.metric.{ CollectionContext, MetricSnapshot, MetricRecorder } trait Counter extends MetricRecorder { diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala index 471e7bd4..4882d2aa 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference import akka.actor.{ ActorSystem, Cancellable } import com.typesafe.config.Config -import jsr166e.LongMaxUpdater +import kamon.jsr166.LongMaxUpdater import kamon.metric.{ Scale, MetricRecorder, CollectionContext } import kamon.util.PaddedAtomicLong import scala.concurrent.duration.FiniteDuration @@ -33,6 +33,7 @@ trait MinMaxCounter extends MetricRecorder { def increment(times: Long): Unit def decrement() def decrement(times: Long) + def refreshValues(): Unit } object MinMaxCounter { @@ -63,14 +64,11 @@ object MinMaxCounter { } class PaddedMinMaxCounter(underlyingHistogram: Histogram) extends MinMaxCounter { - private val min = new LongMaxUpdater - private val max = new LongMaxUpdater + private val min = new LongMaxUpdater(0L) + private val max = new LongMaxUpdater(0L) private val sum = new PaddedAtomicLong val refreshValuesSchedule = new AtomicReference[Cancellable]() - min.update(0L) - max.update(0L) - def increment(): Unit = increment(1L) def increment(times: Long): Unit = { @@ -98,19 +96,21 @@ class PaddedMinMaxCounter(underlyingHistogram: Histogram) extends MinMaxCounter def refreshValues(): Unit = { val currentValue = { val value = sum.get() - if (value < 0) 0 else value + if (value <= 0) 0 else value } val currentMin = { - val minAbs = abs(min.maxThenReset()) - if (minAbs <= currentValue) minAbs else 0 + val rawMin = min.maxThenReset(-currentValue) + if (rawMin >= 0) + 0 + else + abs(rawMin) } + val currentMax = max.maxThenReset(currentValue) + underlyingHistogram.record(currentValue) underlyingHistogram.record(currentMin) - underlyingHistogram.record(max.maxThenReset()) - - max.update(currentValue) - min.update(-currentValue) + underlyingHistogram.record(currentMax) } } diff --git a/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala b/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala index cb03664c..2c11adc3 100644 --- a/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala +++ b/kamon-core/src/test/scala/kamon/metric/instrument/MinMaxCounterSpec.scala @@ -17,14 +17,15 @@ package kamon.metric.instrument import java.nio.LongBuffer -import akka.actor.ActorSystem +import akka.actor._ +import akka.testkit.TestProbe import com.typesafe.config.ConfigFactory import kamon.metric.CollectionContext import kamon.metric.instrument.Histogram.MutableRecord import org.scalatest.{ Matchers, WordSpecLike } class MinMaxCounterSpec extends WordSpecLike with Matchers { - val system = ActorSystem("min-max-counter-spec") + implicit val system = ActorSystem("min-max-counter-spec") val minMaxCounterConfig = ConfigFactory.parseString( """ |refresh-interval = 1 hour @@ -83,7 +84,7 @@ class MinMaxCounterSpec extends WordSpecLike with Matchers { MutableRecord(2, 3)) // min, max and current } - "report zero as the min and current values if they current value fell bellow zero" in new MinMaxCounterFixture { + "report zero as the min and current values if the current value fell bellow zero" in new MinMaxCounterFixture { mmCounter.decrement(3) val snapshot = collectCounterSnapshot() @@ -93,6 +94,22 @@ class MinMaxCounterSpec extends WordSpecLike with Matchers { snapshot.recordsIterator.toStream should contain( MutableRecord(0, 3)) // min, max and current (even while current really is -3 } + + "never record values bellow zero in very busy situations" in new MinMaxCounterFixture { + val monitor = TestProbe() + val workers = for (workers ← 1 to 50) yield { + system.actorOf(Props(new MinMaxCounterUpdateActor(mmCounter, monitor.ref))) + } + + workers foreach (_ ! "increment") + for (refresh ← 1 to 1000) { + collectCounterSnapshot() + Thread.sleep(10) + } + + monitor.expectNoMsg() + workers foreach (_ ! PoisonPill) + } } trait MinMaxCounterFixture { @@ -106,3 +123,20 @@ class MinMaxCounterSpec extends WordSpecLike with Matchers { def collectCounterSnapshot(): Histogram.Snapshot = mmCounter.collect(collectionContext) } } + +class MinMaxCounterUpdateActor(mmc: MinMaxCounter, monitor: ActorRef) extends Actor { + val x = Array.ofDim[Int](4) + def receive = { + case "increment" ⇒ + mmc.increment() + self ! "decrement" + case "decrement" ⇒ + mmc.decrement() + self ! "increment" + try { + mmc.refreshValues() + } catch { + case _: IndexOutOfBoundsException ⇒ monitor ! "failed" + } + } +}
\ No newline at end of file diff --git a/kamon-examples/kamon-play-example/app/Global.scala b/kamon-examples/kamon-play-example/app/Global.scala index 5fbb9c7e..535ea308 100644 --- a/kamon-examples/kamon-play-example/app/Global.scala +++ b/kamon-examples/kamon-play-example/app/Global.scala @@ -17,9 +17,4 @@ import filters.TraceLocalFilter import play.api.mvc.WithFilters -object Global extends WithFilters(TraceLocalFilter){ - -} - - - +object Global extends WithFilters(TraceLocalFilter)
\ No newline at end of file diff --git a/kamon-examples/kamon-play-example/app/controllers/KamonPlayExample.scala b/kamon-examples/kamon-play-example/app/controllers/KamonPlayExample.scala index 2b2e9373..7be69f6a 100644 --- a/kamon-examples/kamon-play-example/app/controllers/KamonPlayExample.scala +++ b/kamon-examples/kamon-play-example/app/controllers/KamonPlayExample.scala @@ -15,15 +15,18 @@ * ========================================================== */ package controllers +import kamon.Kamon +import kamon.metric.UserMetrics import kamon.play.action.TraceName import play.api.Logger import play.api.libs.concurrent.Execution.Implicits.defaultContext import play.api.mvc.{Action, Controller} +import play.libs.Akka import scala.concurrent._ /** - * In order to run the example we need set the -agent parameter to the JVM but Play have some limitations when trying to set an + * In order to run the example we need set the -javaagent option to the JVM, but Play have some limitations when trying to set an * java agent in Play dev mode (ie, play run) -> https://github.com/playframework/playframework/issues/1372, so we have others options: * * The first option is set -javaagent: path-to-aspectj-weaver in your IDE or @@ -51,8 +54,9 @@ import scala.concurrent._ object KamonPlayExample extends Controller { val logger = Logger(this.getClass) + val counter = Kamon(UserMetrics)(Akka.system()).registerCounter("my-counter") - def sayHello() = Action.async { + def sayHello = Action.async { Future { logger.info("Say hello to Kamon") Ok("Say hello to Kamon") @@ -60,12 +64,20 @@ object KamonPlayExample extends Controller { } //using the Kamon TraceName Action to rename the trace name in metrics - def sayHelloWithTraceName() = TraceName("my-trace-name") { + def sayHelloWithTraceName = TraceName("my-trace-name") { Action.async { Future { - logger.info("Say hello to Kamon") - Ok("Say hello to Kamon") + logger.info("Say hello to Kamon with trace name") + Ok("Say hello to Kamon with trace name") } } } + + def incrementCounter = Action.async { + Future { + logger.info("increment") + counter.increment() + Ok("increment") + } + } } diff --git a/kamon-examples/kamon-play-example/app/filters/TraceLocalFilter.scala b/kamon-examples/kamon-play-example/app/filters/TraceLocalFilter.scala index 08ea782c..bf496530 100644 --- a/kamon-examples/kamon-play-example/app/filters/TraceLocalFilter.scala +++ b/kamon-examples/kamon-play-example/app/filters/TraceLocalFilter.scala @@ -27,8 +27,8 @@ object TraceLocalKey extends TraceLocal.TraceLocalKey { } /* - By default Kamon spreads the trace-token-header-name but sometimes is necessary pass through the application requests with some infomation like - extra headers, with kamon it's possible using TraceLocalStorage, in Play applications we can do an Action Filter or using Action Composition, + By default kamon spreads the trace-token-header-name, but sometimes is necessary pass through the application requests with some information like + extra headers, with kamon it's possible using the TraceLocalStorage, in Play applications we can do an Action Filter or using Action Composition, in this example we are using a simple filter where given a Header store the value and then put the value in the result headers.. More detailed usage of TraceLocalStorage: https://github.com/kamon-io/Kamon/blob/b17539d231da923ea854c01d2c69eb02ef1e85b1/kamon-core/src/test/scala/kamon/trace/TraceLocalSpec.scala diff --git a/kamon-examples/kamon-play-example/conf/application.conf b/kamon-examples/kamon-play-example/conf/application.conf index 4f9a60ec..65a834c6 100644 --- a/kamon-examples/kamon-play-example/conf/application.conf +++ b/kamon-examples/kamon-play-example/conf/application.conf @@ -1,11 +1,10 @@ #kamon related configuration akka { - extensions = ["kamon.statsd.StatsD"] + extensions = ["kamon.statsd.StatsD", "kamon.system.SystemMetrics", "kamon.logreporter.LogReporter"] } kamon { - - statsd { + statsd { # Hostname and port in which your StatsD is running. Remember that StatsD packets are sent using UDP and # setting unreachable hosts and/or not open ports wont be warned by the Kamon, your data wont go anywhere. hostname = "127.0.0.1" @@ -26,6 +25,10 @@ kamon { dispatcher = [ "*" ] } + # Enable system metrics + # In order to not get a ClassNotFoundException, we must register the kamon-sytem-metrics module + report-system-metrics = true + simple-metric-key-generator { # Application prefix for all metrics pushed to StatsD. The default namespacing scheme for metrics follows # this pattern: @@ -33,10 +36,17 @@ kamon { application = "kamon" } } - + + weaver { + showWeaveInfo = off + verbose = off + debug = off + showWarn = off + } + play { - include-trace-token-header = true - trace-token-header-name = "X-Trace-Token" + include-trace-token-header = true + trace-token-header-name = "X-Trace-Token" } } @@ -47,11 +57,11 @@ kamon { # ~~~~~ # The secret key is used to secure cryptographics functions. # If you deploy your application to several instances be sure to use the same key! -application.secret="3BLM`<aD^5r/L[MinNdw8Tp@915n0djY[g66OSOLi@?k`>AZE9EOphrmf;;6JsAN" +application.secret = "3BLM`<aD^5r/L[MinNdw8Tp@915n0djY[g66OSOLi@?k`>AZE9EOphrmf;;6JsAN" # The application languages # ~~~~~ -application.langs="en" +application.langs = "en" # Global object class # ~~~~~ diff --git a/kamon-examples/kamon-play-example/conf/routes b/kamon-examples/kamon-play-example/conf/routes index 122c355a..2178c946 100644 --- a/kamon-examples/kamon-play-example/conf/routes +++ b/kamon-examples/kamon-play-example/conf/routes @@ -1,3 +1,4 @@ # Routes GET /helloKamon controllers.KamonPlayExample.sayHello -GET /helloKamonWithTraceName controllers.KamonPlayExample.sayHelloWithTraceName
\ No newline at end of file +GET /helloKamonWithTraceName controllers.KamonPlayExample.sayHelloWithTraceName +GET /incrementCounter controllers.KamonPlayExample.incrementCounter
\ No newline at end of file diff --git a/kamon-examples/kamon-play-example/project/Build.scala b/kamon-examples/kamon-play-example/project/Build.scala index c348862a..c9693c24 100644 --- a/kamon-examples/kamon-play-example/project/Build.scala +++ b/kamon-examples/kamon-play-example/project/Build.scala @@ -36,10 +36,12 @@ object ApplicationBuild extends Build { )) val dependencies = Seq( - "io.kamon" %% "kamon-core" % "0.3.1", - "io.kamon" %% "kamon-play" % "0.3.1", - "io.kamon" %% "kamon-statsd" % "0.3.1", - "org.aspectj" % "aspectjweaver" % "1.8.1" + "io.kamon" %% "kamon-core" % "0.3.3", + "io.kamon" %% "kamon-play" % "0.3.3", + "io.kamon" %% "kamon-statsd" % "0.3.3", + "io.kamon" %% "kamon-log-reporter" % "0.3.3", + "io.kamon" %% "kamon-system-metrics" % "0.3.3", + "org.aspectj" % "aspectjweaver" % "1.8.1" ) val main = Project(appName, file(".")).enablePlugins(play.PlayScala, SbtWeb) diff --git a/kamon-play/src/main/scala/kamon/play/Play.scala b/kamon-play/src/main/scala/kamon/play/Play.scala index 03436458..7b8777e0 100644 --- a/kamon-play/src/main/scala/kamon/play/Play.scala +++ b/kamon-play/src/main/scala/kamon/play/Play.scala @@ -16,7 +16,8 @@ package kamon.play -import akka.actor.{ ExtendedActorSystem, Extension, ExtensionIdProvider, ExtensionId } +import akka.actor.{ ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } +import akka.event.Logging import kamon.Kamon import kamon.http.HttpServerMetrics import kamon.metric.Metrics @@ -27,7 +28,8 @@ object Play extends ExtensionId[PlayExtension] with ExtensionIdProvider { } class PlayExtension(private val system: ExtendedActorSystem) extends Kamon.Extension { - publishInfoMessage(system, "Play Extension Loaded!!") + val log = Logging(system, classOf[PlayExtension]) + log.info(s"Starting the Kamon(Play) extension") private val config = system.settings.config.getConfig("kamon.play") diff --git a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala index 975510e9..ebac38d9 100644 --- a/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala +++ b/kamon-play/src/main/scala/kamon/play/instrumentation/RequestInstrumentation.scala @@ -16,7 +16,7 @@ package kamon.play.instrumentation import kamon.Kamon -import kamon.play.{ PlayExtension, Play } +import kamon.play.{ Play, PlayExtension } import kamon.trace.{ TraceContextAware, TraceRecorder } import org.aspectj.lang.ProceedingJoinPoint import org.aspectj.lang.annotation._ @@ -52,31 +52,24 @@ class RequestInstrumentation { val essentialAction = (requestHeader: RequestHeader) ⇒ { val incomingContext = TraceRecorder.currentContext - val playExtension = Kamon(Play)(Akka.system()) - val executor = playExtension.defaultDispatcher + val executor = Kamon(Play)(Akka.system()).defaultDispatcher next(requestHeader).map { result ⇒ - TraceRecorder.currentContext.map { ctx ⇒ - recordHttpServerMetrics(result, ctx.name, playExtension) - } - TraceRecorder.finish() - incomingContext match { - case None ⇒ result - case Some(traceContext) ⇒ - val playExtension = Kamon(Play)(traceContext.system) - if (playExtension.includeTraceToken) { - result.withHeaders(playExtension.traceTokenHeaderName -> traceContext.token) - } else result - } + incomingContext.map { ctx ⇒ + val playExtension = Kamon(Play)(ctx.system) + recordHttpServerMetrics(result, ctx.name, playExtension) + if (playExtension.includeTraceToken) result.withHeaders(playExtension.traceTokenHeaderName -> ctx.token) + else result + }.getOrElse(result) }(executor) } pjp.proceed(Array(EssentialAction(essentialAction))) } - def recordHttpServerMetrics(result: Result, traceName: String, playExtension: PlayExtension): Unit = + private def recordHttpServerMetrics(result: Result, traceName: String, playExtension: PlayExtension): Unit = playExtension.httpServerMetrics.recordResponse(traceName, result.header.status.toString, 1L) @Around("execution(* play.api.GlobalSettings+.onError(..)) && args(request, ex)") diff --git a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala index d787bda4..df1d2b59 100644 --- a/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala +++ b/kamon-spray/src/main/scala/spray/can/client/ClientRequestInstrumentation.scala @@ -63,7 +63,7 @@ class ClientRequestInstrumentation { @Around("copyingRequestContext(old)") def aroundCopyingRequestContext(pjp: ProceedingJoinPoint, old: SegmentCompletionHandleAware): Any = { - TraceRecorder.withTraceContext(old.traceContext) { + TraceRecorder.withInlineTraceContextReplacement(old.traceContext) { pjp.proceed() } } @@ -75,7 +75,7 @@ class ClientRequestInstrumentation { def aroundDispatchToCommander(pjp: ProceedingJoinPoint, requestContext: SegmentCompletionHandleAware, message: Any) = { requestContext.traceContext match { case ctx @ Some(_) ⇒ - TraceRecorder.withTraceContext(ctx) { + TraceRecorder.withInlineTraceContextReplacement(ctx) { if (message.isInstanceOf[HttpMessageEnd]) requestContext.segmentCompletionHandle.map(_.finish(Map.empty)) diff --git a/kamon-system-metrics/src/main/scala/kamon/system/sigar/SigarLoader.scala b/kamon-system-metrics/src/main/scala/kamon/system/sigar/SigarLoader.scala index d138ec8f..af49fca5 100644 --- a/kamon-system-metrics/src/main/scala/kamon/system/sigar/SigarLoader.scala +++ b/kamon-system-metrics/src/main/scala/kamon/system/sigar/SigarLoader.scala @@ -17,6 +17,7 @@ package kamon.system.sigar import java.io._ +import java.text.SimpleDateFormat import java.util import java.util.logging.Logger import java.util.{ ArrayList, Date, List } @@ -133,26 +134,18 @@ object SigarLoader { } private[sigar] def printBanner(sigar: Sigar) = { + val os = OperatingSystem.getInstance + def loadAverage(sigar: Sigar) = { val average = sigar.getLoadAverage (average(0), average(1), average(2)) } def uptime(sigar: Sigar) = { + val sdf = new SimpleDateFormat("yyyy-MM-dd") val uptime = sigar.getUptime val now = System.currentTimeMillis() - new Date(now - (uptime.getUptime() * 1000).toLong) - } - - def osInfo() = { - val NewLine = "\n" - val os = OperatingSystem.getInstance - val osInfo = new StringBuilder("------ OS Information ------").append(NewLine) - osInfo.append("Description: ").append(os.getDescription).append(NewLine) - .append("Name: ").append(os.getName).append(NewLine) - .append("Version: ").append(os.getVersion).append(NewLine) - .append("Arch: ").append(os.getArch).append(NewLine) - .toString() + sdf.format(new Date(now - (uptime.getUptime() * 1000).toLong)) } val message = @@ -166,7 +159,15 @@ object SigarLoader { ||_____/ \__, |___/\__\___|_| |_| |_|_| |_|\___|\__|_| |_|\___|___/______\___/ \__,_|\__,_|\___|\__,_| | __/ | | |___/ - """.stripMargin + s"\nBoot Time: ${uptime(sigar)} \nLoad Average: ${loadAverage(sigar)} \n${osInfo()}" + | + | [System Status] [OS Information] + | |--------------------------------| |----------------------------------------| + | Boot Time: %-10s Description: %s + | Load Average: %-16s Name: %s + | Version: %s + | Arch: %s + | + """.stripMargin.format(uptime(sigar), os.getDescription, loadAverage(sigar), os.getName, os.getVersion, os.getArch) log.info(message) } class Loader private[sigar] diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 34c33262..c6b49944 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -34,7 +34,6 @@ object Dependencies { val playTest = "org.scalatestplus" %% "play" % "1.1.0" val slf4Api = "org.slf4j" % "slf4j-api" % slf4jVersion val slf4nop = "org.slf4j" % "slf4j-nop" % slf4jVersion - val jsr166 = "io.gatling" % "jsr166e" % "1.0" val scalaCompiler = "org.scala-lang" % "scala-compiler" % Settings.ScalaVersion val sigar = "org.fusesource" % "sigar" % "1.6.4" diff --git a/project/Projects.scala b/project/Projects.scala index 8259f2aa..cbdd7d3d 100644 --- a/project/Projects.scala +++ b/project/Projects.scala @@ -23,16 +23,19 @@ object Projects extends Build { mappings in (Compile, packageBin) ++= mappings.in(kamonMacros, Compile, packageBin).value, mappings in (Compile, packageSrc) ++= mappings.in(kamonMacros, Compile, packageSrc).value, libraryDependencies ++= - compile(akkaActor, aspectJ, aspectjWeaver, hdrHistogram, jsr166) ++ - provided(logback) ++ + compile(akkaActor, aspectJ, hdrHistogram) ++ + provided(logback, aspectjWeaver) ++ test(scalatest, akkaTestKit, sprayTestkit, akkaSlf4j, logback)) lazy val kamonSpray = Project("kamon-spray", file("kamon-spray")) + .dependsOn(kamonMacros % "compile-internal, test-internal") .settings(basicSettings: _*) .settings(formatSettings: _*) .settings(aspectJSettings: _*) .settings( + mappings in (Compile, packageBin) ++= mappings.in(kamonMacros, Compile, packageBin).value, + mappings in (Compile, packageSrc) ++= mappings.in(kamonMacros, Compile, packageSrc).value, libraryDependencies ++= compile(akkaActor, aspectJ, sprayCan, sprayClient, sprayRouting) ++ test(scalatest, akkaTestKit, sprayTestkit, slf4Api, slf4nop)) @@ -71,7 +74,7 @@ object Projects extends Build { lazy val kamonTestkit = Project("kamon-testkit", file("kamon-testkit")) .settings(basicSettings: _*) .settings(formatSettings: _*) - .settings(libraryDependencies ++= compile(akkaActor, akkaTestKit) ++ test(slf4Api, slf4nop)) + .settings(libraryDependencies ++= compile(akkaActor, akkaTestKit, aspectJ) ++ test(slf4Api, slf4nop)) .dependsOn(kamonCore) lazy val kamonPlay = Project("kamon-play", file("kamon-play")) |