From f24c1a7a4b96dcfb2609c6f512f34dd6d54de439 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Fri, 28 Apr 2017 14:56:02 +0200 Subject: implement MinMaxCounter and Gauge, include them in the InstrumentFactory --- .../src/main/java/kamon/jsr166/LongAdder.java | 224 +++++++++++++ .../src/main/java/kamon/jsr166/LongMaxUpdater.java | 191 +++++++++++ .../src/main/java/kamon/jsr166/Striped64.java | 371 +++++++++++++++++++++ .../src/main/java/kamon/util/GlobPathFilter.java | 110 ++++++ .../main/scala/kamon/metric/EntitySnapshot.scala | 11 + .../src/main/scala/kamon/metric/TickSnapshot.scala | 17 + .../main/scala/kamon/metric/instrument/Gauge.scala | 27 +- .../metric/instrument/InstrumentFactory.scala | 11 +- .../metric/instrument/InstrumentSnapshot.scala | 28 +- .../kamon/metric/instrument/MinMaxCounter.scala | 65 +++- .../kamon/metric/snapshot/EntitySnapshot.scala | 12 - .../scala/kamon/metric/snapshot/TickSnapshot.scala | 17 - 12 files changed, 1027 insertions(+), 57 deletions(-) create mode 100644 kamon-core/src/main/java/kamon/jsr166/LongAdder.java create mode 100644 kamon-core/src/main/java/kamon/jsr166/LongMaxUpdater.java create mode 100644 kamon-core/src/main/java/kamon/jsr166/Striped64.java create mode 100644 kamon-core/src/main/java/kamon/util/GlobPathFilter.java create mode 100644 kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala create mode 100644 kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/snapshot/EntitySnapshot.scala delete mode 100644 kamon-core/src/main/scala/kamon/metric/snapshot/TickSnapshot.scala (limited to 'kamon-core/src') diff --git a/kamon-core/src/main/java/kamon/jsr166/LongAdder.java b/kamon-core/src/main/java/kamon/jsr166/LongAdder.java new file mode 100644 index 00000000..7e47ae63 --- /dev/null +++ b/kamon-core/src/main/java/kamon/jsr166/LongAdder.java @@ -0,0 +1,224 @@ +/* + +Note: this was copied from Doug Lea's CVS repository + http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/ + +LongAdder.java version 1.14 + +*/ + + +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +package kamon.jsr166; + +import java.io.Serializable; + +/** + * One or more variables that together maintain an initially zero + * {@code long} sum. When updates (method {@link #add}) are contended + * across threads, the set of variables may grow dynamically to reduce + * contention. Method {@link #sum} (or, equivalently, {@link + * #longValue}) returns the current total combined across the + * variables maintaining the sum. + * + *

This class is usually preferable to {@link java.util.concurrent.atomic.AtomicLong} when + * multiple threads update a common sum that is used for purposes such + * as collecting statistics, not for fine-grained synchronization + * control. Under low update contention, the two classes have similar + * characteristics. But under high contention, expected throughput of + * this class is significantly higher, at the expense of higher space + * consumption. + * + *

This class extends {@link Number}, but does not define + * methods such as {@code equals}, {@code hashCode} and {@code + * compareTo} because instances are expected to be mutated, and so are + * not useful as collection keys. + * + *

jsr166e note: This class is targeted to be placed in + * java.util.concurrent.atomic. + * + * @since 1.8 + * @author Doug Lea + */ +public class LongAdder extends Striped64 implements Serializable { + private static final long serialVersionUID = 7249069246863182397L; + + /** + * Version of plus for use in retryUpdate + */ + final long fn(long v, long x) { return v + x; } + + /** + * Creates a new adder with initial sum of zero. + */ + public LongAdder() { + } + + /** + * Adds the given value. + * + * @param x the value to add + */ + public void add(long x) { + Cell[] as; long b, v; HashCode hc; Cell a; int n; + if ((as = cells) != null || !casBase(b = base, b + x)) { + boolean uncontended = true; + int h = (hc = threadHashCode.get()).code; + if (as == null || (n = as.length) < 1 || + (a = as[(n - 1) & h]) == null || + !(uncontended = a.cas(v = a.value, v + x))) + retryUpdate(x, hc, uncontended); + } + } + + /** + * Equivalent to {@code add(1)}. + */ + public void increment() { + add(1L); + } + + /** + * Equivalent to {@code add(-1)}. + */ + public void decrement() { + add(-1L); + } + + /** + * Returns the current sum. The returned value is NOT an + * atomic snapshot; invocation in the absence of concurrent + * updates returns an accurate result, but concurrent updates that + * occur while the sum is being calculated might not be + * incorporated. + * + * @return the sum + */ + public long sum() { + long sum = base; + Cell[] as = cells; + if (as != null) { + int n = as.length; + for (int i = 0; i < n; ++i) { + Cell a = as[i]; + if (a != null) + sum += a.value; + } + } + return sum; + } + + /** + * Resets variables maintaining the sum to zero. This method may + * be a useful alternative to creating a new adder, but is only + * effective if there are no concurrent updates. Because this + * method is intrinsically racy, it should only be used when it is + * known that no threads are concurrently updating. + */ + public void reset() { + internalReset(0L); + } + + /** + * Equivalent in effect to {@link #sum} followed by {@link + * #reset}. This method may apply for example during quiescent + * points between multithreaded computations. If there are + * updates concurrent with this method, the returned value is + * not guaranteed to be the final value occurring before + * the reset. + * + * @return the sum + */ + public long sumThenReset() { + long sum = base; + Cell[] as = cells; + base = 0L; + if (as != null) { + int n = as.length; + for (int i = 0; i < n; ++i) { + Cell a = as[i]; + if (a != null) { + sum += a.value; + a.value = 0L; + } + } + } + return sum; + } + + public long sumAndReset() { + long sum = getAndSetBase(0L); + Cell[] as = cells; + if (as != null) { + int n = as.length; + for (int i = 0; i < n; ++i) { + Cell a = as[i]; + if (a != null) { + sum += a.getAndSet(0L); + } + } + } + return sum; + } + + /** + * Returns the String representation of the {@link #sum}. + * @return the String representation of the {@link #sum} + */ + public String toString() { + return Long.toString(sum()); + } + + /** + * Equivalent to {@link #sum}. + * + * @return the sum + */ + public long longValue() { + return sum(); + } + + /** + * Returns the {@link #sum} as an {@code int} after a narrowing + * primitive conversion. + */ + public int intValue() { + return (int)sum(); + } + + /** + * Returns the {@link #sum} as a {@code float} + * after a widening primitive conversion. + */ + public float floatValue() { + return (float)sum(); + } + + /** + * Returns the {@link #sum} as a {@code double} after a widening + * primitive conversion. + */ + public double doubleValue() { + return (double)sum(); + } + + private void writeObject(java.io.ObjectOutputStream s) + throws java.io.IOException { + s.defaultWriteObject(); + s.writeLong(sum()); + } + + private void readObject(java.io.ObjectInputStream s) + throws java.io.IOException, ClassNotFoundException { + s.defaultReadObject(); + busy = 0; + cells = null; + base = s.readLong(); + } + +} diff --git a/kamon-core/src/main/java/kamon/jsr166/LongMaxUpdater.java b/kamon-core/src/main/java/kamon/jsr166/LongMaxUpdater.java new file mode 100644 index 00000000..fc9ea4e5 --- /dev/null +++ b/kamon-core/src/main/java/kamon/jsr166/LongMaxUpdater.java @@ -0,0 +1,191 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +package kamon.jsr166; +import java.io.Serializable; + +/** + * One or more variables that together maintain a running {@code long} + * maximum with initial value {@code Long.MIN_VALUE}. When updates + * (method {@link #update}) are contended across threads, the set of + * variables may grow dynamically to reduce contention. Method {@link + * #max} (or, equivalently, {@link #longValue}) returns the current + * maximum across the variables maintaining updates. + * + *

This class extends {@link Number}, but does not define + * methods such as {@code equals}, {@code hashCode} and {@code + * compareTo} because instances are expected to be mutated, and so are + * not useful as collection keys. + * + *

jsr166e note: This class is targeted to be placed in + * java.util.concurrent.atomic. + * + * @since 1.8 + * @author Doug Lea + */ +public class LongMaxUpdater extends Striped64 implements Serializable { + private static final long serialVersionUID = 7249069246863182397L; + + /** + * Version of max for use in retryUpdate + */ + final long fn(long v, long x) { return v > x ? v : x; } + + /** + * Creates a new instance with initial maximum of {@code + * Long.MIN_VALUE}. + */ + public LongMaxUpdater() { + base = Long.MIN_VALUE; + } + + /** + * Creates a new instance with the given initialValue + */ + public LongMaxUpdater(long initialValue) { + base = initialValue; + } + + + /** + * Updates the maximum to be at least the given value. + * + * @param x the value to update + */ + public void update(long x) { + Cell[] as; long b, v; HashCode hc; Cell a; int n; + if ((as = cells) != null || + (b = base) < x && !casBase(b, x)) { + boolean uncontended = true; + int h = (hc = threadHashCode.get()).code; + if (as == null || (n = as.length) < 1 || + (a = as[(n - 1) & h]) == null || + ((v = a.value) < x && !(uncontended = a.cas(v, x)))) + retryUpdate(x, hc, uncontended); + } + } + + /** + * Returns the current maximum. The returned value is + * NOT an atomic snapshot; invocation in the absence of + * concurrent updates returns an accurate result, but concurrent + * updates that occur while the value is being calculated might + * not be incorporated. + * + * @return the maximum + */ + public long max() { + Cell[] as = cells; + long max = base; + if (as != null) { + int n = as.length; + long v; + for (int i = 0; i < n; ++i) { + Cell a = as[i]; + if (a != null && (v = a.value) > max) + max = v; + } + } + return max; + } + + /** + * Resets variables maintaining updates to {@code Long.MIN_VALUE}. + * This method may be a useful alternative to creating a new + * updater, but is only effective if there are no concurrent + * updates. Because this method is intrinsically racy, it should + * only be used when it is known that no threads are concurrently + * updating. + */ + public void reset() { + internalReset(Long.MIN_VALUE); + } + + /** + * Equivalent in effect to {@link #max} followed by {@link + * #reset}. This method may apply for example during quiescent + * points between multithreaded computations. If there are + * updates concurrent with this method, the returned value is + * not guaranteed to be the final value occurring before + * the reset. + * + * @return the maximum + */ + public long maxThenReset(long newValue) { + Cell[] as = cells; + long max = base; + base = newValue; + if (as != null) { + int n = as.length; + for (int i = 0; i < n; ++i) { + Cell a = as[i]; + if (a != null) { + long v = a.value; + a.value = newValue; + if (v > max) + max = v; + } + } + } + return max; + } + + /** + * Returns the String representation of the {@link #max}. + * @return the String representation of the {@link #max} + */ + public String toString() { + return Long.toString(max()); + } + + /** + * Equivalent to {@link #max}. + * + * @return the maximum + */ + public long longValue() { + return max(); + } + + /** + * Returns the {@link #max} as an {@code int} after a narrowing + * primitive conversion. + */ + public int intValue() { + return (int)max(); + } + + /** + * Returns the {@link #max} as a {@code float} + * after a widening primitive conversion. + */ + public float floatValue() { + return (float)max(); + } + + /** + * Returns the {@link #max} as a {@code double} after a widening + * primitive conversion. + */ + public double doubleValue() { + return (double)max(); + } + + private void writeObject(java.io.ObjectOutputStream s) + throws java.io.IOException { + s.defaultWriteObject(); + s.writeLong(max()); + } + + private void readObject(java.io.ObjectInputStream s) + throws java.io.IOException, ClassNotFoundException { + s.defaultReadObject(); + busy = 0; + cells = null; + base = s.readLong(); + } + +} diff --git a/kamon-core/src/main/java/kamon/jsr166/Striped64.java b/kamon-core/src/main/java/kamon/jsr166/Striped64.java new file mode 100644 index 00000000..8fbfa4ba --- /dev/null +++ b/kamon-core/src/main/java/kamon/jsr166/Striped64.java @@ -0,0 +1,371 @@ +/* + +Note: this was copied from Doug Lea's CVS repository + http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/ + +Striped64.java version 1.8 + +*/ + + +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ + +package kamon.jsr166; + +import java.util.Random; + +/** + * A package-local class holding common representation and mechanics + * for classes supporting dynamic striping on 64bit values. The class + * extends Number so that concrete subclasses must publicly do so. + */ +abstract class Striped64 extends Number { + /* + * This class maintains a lazily-initialized table of atomically + * updated variables, plus an extra "base" field. The table size + * is a power of two. Indexing uses masked per-thread hash codes. + * Nearly all declarations in this class are package-private, + * accessed directly by subclasses. + * + * Table entries are of class Cell; a variant of AtomicLong padded + * to reduce cache contention on most processors. Padding is + * overkill for most Atomics because they are usually irregularly + * scattered in memory and thus don't interfere much with each + * other. But Atomic objects residing in arrays will tend to be + * placed adjacent to each other, and so will most often share + * cache lines (with a huge negative performance impact) without + * this precaution. + * + * In part because Cells are relatively large, we avoid creating + * them until they are needed. When there is no contention, all + * updates are made to the base field. Upon first contention (a + * failed CAS on base update), the table is initialized to size 2. + * The table size is doubled upon further contention until + * reaching the nearest power of two greater than or equal to the + * number of CPUS. Table slots remain empty (null) until they are + * needed. + * + * A single spinlock ("busy") is used for initializing and + * resizing the table, as well as populating slots with new Cells. + * There is no need for a blocking lock; when the lock is not + * available, threads try other slots (or the base). During these + * retries, there is increased contention and reduced locality, + * which is still better than alternatives. + * + * Per-thread hash codes are initialized to random values. + * Contention and/or table collisions are indicated by failed + * CASes when performing an update operation (see method + * retryUpdate). Upon a collision, if the table size is less than + * the capacity, it is doubled in size unless some other thread + * holds the lock. If a hashed slot is empty, and lock is + * available, a new Cell is created. Otherwise, if the slot + * exists, a CAS is tried. Retries proceed by "double hashing", + * using a secondary hash (Marsaglia XorShift) to try to find a + * free slot. + * + * The table size is capped because, when there are more threads + * than CPUs, supposing that each thread were bound to a CPU, + * there would exist a perfect hash function mapping threads to + * slots that eliminates collisions. When we reach capacity, we + * search for this mapping by randomly varying the hash codes of + * colliding threads. Because search is random, and collisions + * only become known via CAS failures, convergence can be slow, + * and because threads are typically not bound to CPUS forever, + * may not occur at all. However, despite these limitations, + * observed contention rates are typically low in these cases. + * + * It is possible for a Cell to become unused when threads that + * once hashed to it terminate, as well as in the case where + * doubling the table causes no thread to hash to it under + * expanded mask. We do not try to detect or remove such cells, + * under the assumption that for long-running instances, observed + * contention levels will recur, so the cells will eventually be + * needed again; and for short-lived ones, it does not matter. + */ + + /** + * Padded variant of AtomicLong supporting only raw accesses plus CAS. + * The value field is placed between pads, hoping that the JVM doesn't + * reorder them. + * + * JVM intrinsics note: It would be possible to use a release-only + * form of CAS here, if it were provided. + */ + static final class Cell { + volatile long p0, p1, p2, p3, p4, p5, p6; + volatile long value; + volatile long q0, q1, q2, q3, q4, q5, q6; + Cell(long x) { value = x; } + + final boolean cas(long cmp, long val) { + return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); + } + + final long getAndSet(long val) { + long v; + do { + v = UNSAFE.getLongVolatile(this, valueOffset); + } while (!UNSAFE.compareAndSwapLong(this, valueOffset, v, val)); + return v; + } + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long valueOffset; + static { + try { + UNSAFE = getUnsafe(); + Class ak = Cell.class; + valueOffset = UNSAFE.objectFieldOffset + (ak.getDeclaredField("value")); + } catch (Exception e) { + throw new Error(e); + } + } + + } + + /** + * Holder for the thread-local hash code. The code is initially + * random, but may be set to a different value upon collisions. + */ + static final class HashCode { + static final Random rng = new Random(); + int code; + HashCode() { + int h = rng.nextInt(); // Avoid zero to allow xorShift rehash + code = (h == 0) ? 1 : h; + } + } + + /** + * The corresponding ThreadLocal class + */ + static final class ThreadHashCode extends ThreadLocal { + public HashCode initialValue() { return new HashCode(); } + } + + /** + * Static per-thread hash codes. Shared across all instances to + * reduce ThreadLocal pollution and because adjustments due to + * collisions in one table are likely to be appropriate for + * others. + */ + static final ThreadHashCode threadHashCode = new ThreadHashCode(); + + /** Number of CPUS, to place bound on table size */ + static final int NCPU = Runtime.getRuntime().availableProcessors(); + + /** + * Table of cells. When non-null, size is a power of 2. + */ + transient volatile Cell[] cells; + + /** + * Base value, used mainly when there is no contention, but also as + * a fallback during table initialization races. Updated via CAS. + */ + transient volatile long base; + + /** + * Spinlock (locked via CAS) used when resizing and/or creating Cells. + */ + transient volatile int busy; + + /** + * Package-private default constructor + */ + Striped64() { + } + + /** + * CASes the base field. + */ + final boolean casBase(long cmp, long val) { + return UNSAFE.compareAndSwapLong(this, baseOffset, cmp, val); + } + + /** + * CASes the base field. + */ + final long getAndSetBase(long val) { + long v; + do { + v = UNSAFE.getLongVolatile(this, baseOffset); + } while (!UNSAFE.compareAndSwapLong(this, baseOffset, v, val)); + return v; + } + + /** + * CASes the busy field from 0 to 1 to acquire lock. + */ + final boolean casBusy() { + return UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1); + } + + /** + * Computes the function of current and new value. Subclasses + * should open-code this update function for most uses, but the + * virtualized form is needed within retryUpdate. + * + * @param currentValue the current value (of either base or a cell) + * @param newValue the argument from a user update call + * @return result of the update function + */ + abstract long fn(long currentValue, long newValue); + + /** + * Handles cases of updates involving initialization, resizing, + * creating new Cells, and/or contention. See above for + * explanation. This method suffers the usual non-modularity + * problems of optimistic retry code, relying on rechecked sets of + * reads. + * + * @param x the value + * @param hc the hash code holder + * @param wasUncontended false if CAS failed before call + */ + final void retryUpdate(long x, HashCode hc, boolean wasUncontended) { + int h = hc.code; + boolean collide = false; // True if last slot nonempty + for (;;) { + Cell[] as; Cell a; int n; long v; + if ((as = cells) != null && (n = as.length) > 0) { + if ((a = as[(n - 1) & h]) == null) { + if (busy == 0) { // Try to attach new Cell + Cell r = new Cell(x); // Optimistically create + if (busy == 0 && casBusy()) { + boolean created = false; + try { // Recheck under lock + Cell[] rs; int m, j; + if ((rs = cells) != null && + (m = rs.length) > 0 && + rs[j = (m - 1) & h] == null) { + rs[j] = r; + created = true; + } + } finally { + busy = 0; + } + if (created) + break; + continue; // Slot is now non-empty + } + } + collide = false; + } + else if (!wasUncontended) // CAS already known to fail + wasUncontended = true; // Continue after rehash + else if (a.cas(v = a.value, fn(v, x))) + break; + else if (n >= NCPU || cells != as) + collide = false; // At max size or stale + else if (!collide) + collide = true; + else if (busy == 0 && casBusy()) { + try { + if (cells == as) { // Expand table unless stale + Cell[] rs = new Cell[n << 1]; + for (int i = 0; i < n; ++i) + rs[i] = as[i]; + cells = rs; + } + } finally { + busy = 0; + } + collide = false; + continue; // Retry with expanded table + } + h ^= h << 13; // Rehash + h ^= h >>> 17; + h ^= h << 5; + } + else if (busy == 0 && cells == as && casBusy()) { + boolean init = false; + try { // Initialize table + if (cells == as) { + Cell[] rs = new Cell[2]; + rs[h & 1] = new Cell(x); + cells = rs; + init = true; + } + } finally { + busy = 0; + } + if (init) + break; + } + else if (casBase(v = base, fn(v, x))) + break; // Fall back on using base + } + hc.code = h; // Record index for next time + } + + + /** + * Sets base and all cells to the given value. + */ + final void internalReset(long initialValue) { + Cell[] as = cells; + base = initialValue; + if (as != null) { + int n = as.length; + for (int i = 0; i < n; ++i) { + Cell a = as[i]; + if (a != null) + a.value = initialValue; + } + } + } + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long baseOffset; + private static final long busyOffset; + static { + try { + UNSAFE = getUnsafe(); + Class sk = Striped64.class; + baseOffset = UNSAFE.objectFieldOffset + (sk.getDeclaredField("base")); + busyOffset = UNSAFE.objectFieldOffset + (sk.getDeclaredField("busy")); + } catch (Exception e) { + throw new Error(e); + } + } + + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + private static sun.misc.Unsafe getUnsafe() { + try { + return sun.misc.Unsafe.getUnsafe(); + } catch (SecurityException tryReflectionInstead) {} + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + Class k = sun.misc.Unsafe.class; + for (java.lang.reflect.Field f : k.getDeclaredFields()) { + f.setAccessible(true); + Object x = f.get(null); + if (k.isInstance(x)) + return k.cast(x); + } + throw new NoSuchFieldError("the Unsafe"); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); + } + } +} diff --git a/kamon-core/src/main/java/kamon/util/GlobPathFilter.java b/kamon-core/src/main/java/kamon/util/GlobPathFilter.java new file mode 100644 index 00000000..1bfbaefc --- /dev/null +++ b/kamon-core/src/main/java/kamon/util/GlobPathFilter.java @@ -0,0 +1,110 @@ +/* + * ========================================================================================= + * Copyright 2013-2014 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Default implementation of PathFilter. Uses glob based includes and excludes to determine whether to export. + * + * @author John E. Bailey + * @author David M. Lloyd + */ +public final class GlobPathFilter { + private static final Pattern GLOB_PATTERN = Pattern.compile("(\\*\\*?)|(\\?)|(\\\\.)|(/+)|([^*?]+)"); + + private final String glob; + private final Pattern pattern; + + /** + * Construct a new instance. + * + * @param glob the path glob to match + */ + public GlobPathFilter(final String glob) { + pattern = getGlobPattern(glob); + this.glob = glob; + } + + /** + * Determine whether a path should be accepted. + * + * @param path the path to check + * @return true if the path should be accepted, false if not + */ + public boolean accept(final String path) { + return pattern.matcher(path).matches(); + } + + /** + * Get a regular expression pattern which accept any path names which match the given glob. The glob patterns + * function similarly to {@code ant} file patterns. Valid metacharacters in the glob pattern include: + *

+ * In addition, any glob pattern matches all subdirectories thereof. A glob pattern ending in {@code /} is equivalent + * to a glob pattern ending in /** in that the named directory is not itself included in the glob. + *

+ * See also: "Patterns" in the Ant Manual + * + * @param glob the glob to match + * + * @return the pattern + */ + private static Pattern getGlobPattern(final String glob) { + StringBuilder patternBuilder = new StringBuilder(); + final Matcher m = GLOB_PATTERN.matcher(glob); + boolean lastWasSlash = false; + while (m.find()) { + lastWasSlash = false; + String grp; + if ((grp = m.group(1)) != null) { + // match a * or ** + if (grp.length() == 2) { + // it's a *workers are able to process multiple metrics* + patternBuilder.append(".*"); + } else { + // it's a * + patternBuilder.append("[^/]*"); + } + } else if ((grp = m.group(2)) != null) { + // match a '?' glob pattern; any non-slash character + patternBuilder.append("[^/]"); + } else if ((grp = m.group(3)) != null) { + // backslash-escaped value + patternBuilder.append(Pattern.quote(m.group().substring(1))); + } else if ((grp = m.group(4)) != null) { + // match any number of / chars + patternBuilder.append("/+"); + lastWasSlash = true; + } else { + // some other string + patternBuilder.append(Pattern.quote(m.group())); + } + } + if (lastWasSlash) { + // ends in /, append ** + patternBuilder.append(".*"); + } + return Pattern.compile(patternBuilder.toString()); + } +} diff --git a/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala b/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala new file mode 100644 index 00000000..e51e80cc --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/EntitySnapshot.scala @@ -0,0 +1,11 @@ +package kamon.metric + +import kamon.metric.instrument.{DistributionSnapshot, SingleValueSnapshot} + +trait EntitySnapshot { + def entity: Entity + def histograms: Seq[DistributionSnapshot] + def minMaxCounters: Seq[DistributionSnapshot] + def gauges: Seq[SingleValueSnapshot] + def counters: Seq[SingleValueSnapshot] +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala new file mode 100644 index 00000000..4248180c --- /dev/null +++ b/kamon-core/src/main/scala/kamon/metric/TickSnapshot.scala @@ -0,0 +1,17 @@ +package kamon.metric + +import java.time.Instant + + +trait TickSnapshot { + def interval: Interval + def entities: Seq[EntitySnapshot] +} + +trait Interval { + def from: Instant + def to: Instant +} + + + diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala index bb31e30a..5263d258 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/Gauge.scala @@ -1,5 +1,7 @@ package kamon.metric.instrument +import java.util.concurrent.atomic.AtomicLong + import kamon.metric.Entity import kamon.util.MeasurementUnit @@ -13,6 +15,27 @@ trait Gauge { def set(value: Long): Unit } -object Gauge { - def apply(entity: Entity, name: String): Gauge = ??? + +class AtomicLongGauge(entity: Entity, name: String, val measurementUnit: MeasurementUnit) + extends Gauge with SingleValueSnapshotInstrument { + + private val currentValue = new AtomicLong(0L) + + def increment(): Unit = + currentValue.incrementAndGet() + + def increment(times: Long): Unit = + currentValue.addAndGet(times) + + def decrement(): Unit = + currentValue.decrementAndGet() + + def decrement(times: Long): Unit = + currentValue.addAndGet(-times) + + def set(value: Long): Unit = + currentValue.set(value) + + def snapshot(): SingleValueSnapshot = + SingleValueSnapshot(name, measurementUnit, currentValue.get()) } diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala index 1ccd5899..fb6dfe27 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentFactory.scala @@ -16,7 +16,7 @@ private[metric] class InstrumentFactory private ( customSettings: Map[(String, String), CustomInstrumentSettings]) { def buildHistogram(entity: Entity, name: String, dynamicRange: DynamicRange = defaultHistogramDynamicRange, - measurementUnit: MeasurementUnit = MeasurementUnit.none): Histogram = { + measurementUnit: MeasurementUnit = MeasurementUnit.none): Histogram with DistributionSnapshotInstrument = { new HdrHistogram( entity, @@ -29,16 +29,17 @@ private[metric] class InstrumentFactory private ( def buildMinMaxCounter(entity: Entity, name: String, dynamicRange: DynamicRange = defaultMMCounterDynamicRange, sampleInterval: Duration = defaultMMCounterSampleRate, measurementUnit: MeasurementUnit = MeasurementUnit.none): MinMaxCounter = { - MinMaxCounter( + val underlyingHistogram = buildHistogram(entity, name, dynamicRange, measurementUnit) + new PaddedMinMaxCounter( entity, name, - instrumentDynamicRange(entity, name, dynamicRange), + underlyingHistogram, instrumentSampleInterval(entity, name, sampleInterval) ) } - def buildGauge(entity: Entity, name: String, measurementUnit: MeasurementUnit = MeasurementUnit.none): Gauge = - Gauge(entity, name) + def buildGauge(entity: Entity, name: String, measurementUnit: MeasurementUnit = MeasurementUnit.none): Gauge with SingleValueSnapshotInstrument = + new AtomicLongGauge(entity, name, measurementUnit) def buildCounter(entity: Entity, name: String, measurementUnit: MeasurementUnit = MeasurementUnit.none): Counter with SingleValueSnapshotInstrument = new LongAdderCounter(entity, name, measurementUnit) diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSnapshot.scala b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSnapshot.scala index 58e10c54..ffb00080 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSnapshot.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/InstrumentSnapshot.scala @@ -2,21 +2,18 @@ package kamon.metric.instrument import kamon.util.MeasurementUnit - +/** + * Snapshot for instruments that internally track a single value. Meant to be used for counters and gauges. + * + */ case class SingleValueSnapshot(name: String, measurementUnit: MeasurementUnit, value: Long) +/** + * Snapshot for instruments that internally the distribution of values in a defined dynamic range. Meant to be used + * with histograms and min max counters. + */ case class DistributionSnapshot(name: String, measurementUnit: MeasurementUnit, dynamicRange: DynamicRange, distribution: Distribution) -trait DistributionSnapshotInstrument { - def snapshot(): DistributionSnapshot -} - -trait SingleValueSnapshotInstrument { - def snapshot(): SingleValueSnapshot -} - - - trait Distribution { def buckets: Seq[Bucket] @@ -42,3 +39,12 @@ trait Percentile { def value: Long def countUnderQuantile: Long } + + +trait DistributionSnapshotInstrument { + private[kamon] def snapshot(): DistributionSnapshot +} + +trait SingleValueSnapshotInstrument { + private[kamon] def snapshot(): SingleValueSnapshot +} diff --git a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala index 8a43865f..cddd8ed9 100644 --- a/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala +++ b/kamon-core/src/main/scala/kamon/metric/instrument/MinMaxCounter.scala @@ -1,7 +1,10 @@ package kamon.metric.instrument +import java.lang.Math.abs import java.time.Duration +import java.util.concurrent.atomic.{AtomicLong, AtomicReference} +import kamon.jsr166.LongMaxUpdater import kamon.metric.Entity import kamon.util.MeasurementUnit @@ -14,18 +17,60 @@ trait MinMaxCounter { def increment(times: Long): Unit def decrement(): Unit def decrement(times: Long): Unit + def sample(): Unit } -object MinMaxCounter { - def apply(entity: Entity, name: String, dynamicRange2: DynamicRange, sampleInterval2: Duration): MinMaxCounter = new MinMaxCounter { - override def measurementUnit: MeasurementUnit = ??? +class PaddedMinMaxCounter(entity: Entity, name: String, underlyingHistogram: Histogram with DistributionSnapshotInstrument, + val sampleInterval: Duration) extends MinMaxCounter with DistributionSnapshotInstrument { - override def sampleInterval: Duration = sampleInterval2 - override def increment(): Unit = ??? - override def increment(times: Long): Unit = ??? - override def decrement(): Unit = ??? - override def decrement(times: Long): Unit = ??? - override def dynamicRange: DynamicRange = dynamicRange2 + private val min = new LongMaxUpdater(0L) + private val max = new LongMaxUpdater(0L) + private val sum = new AtomicLong() + + def dynamicRange: DynamicRange = + underlyingHistogram.dynamicRange + + def measurementUnit: MeasurementUnit = + underlyingHistogram.measurementUnit + + private[kamon] def snapshot(): DistributionSnapshot = + underlyingHistogram.snapshot() + + def increment(): Unit = + increment(1L) + + def increment(times: Long): Unit = { + val currentValue = sum.addAndGet(times) + max.update(currentValue) } -} + + def decrement(): Unit = + decrement(1L) + + def decrement(times: Long): Unit = { + val currentValue = sum.addAndGet(-times) + min.update(-currentValue) + } + + def sample(): Unit = { + val currentValue = { + val value = sum.get() + if (value <= 0) 0 else value + } + + val currentMin = { + val rawMin = min.maxThenReset(-currentValue) + if (rawMin >= 0) + 0 + else + abs(rawMin) + } + + val currentMax = max.maxThenReset(currentValue) + + underlyingHistogram.record(currentValue) + underlyingHistogram.record(currentMin) + underlyingHistogram.record(currentMax) + } +} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/snapshot/EntitySnapshot.scala b/kamon-core/src/main/scala/kamon/metric/snapshot/EntitySnapshot.scala deleted file mode 100644 index 41fde2c0..00000000 --- a/kamon-core/src/main/scala/kamon/metric/snapshot/EntitySnapshot.scala +++ /dev/null @@ -1,12 +0,0 @@ -package kamon.metric.snapshot - -import kamon.metric.Entity -import kamon.metric.instrument.{DistributionSnapshot, SingleValueSnapshot} - -trait EntitySnapshot { - def entity: Entity - def histograms: Seq[DistributionSnapshot] - def minMaxCounters: Seq[DistributionSnapshot] - def gauges: Seq[SingleValueSnapshot] - def counters: Seq[SingleValueSnapshot] -} \ No newline at end of file diff --git a/kamon-core/src/main/scala/kamon/metric/snapshot/TickSnapshot.scala b/kamon-core/src/main/scala/kamon/metric/snapshot/TickSnapshot.scala deleted file mode 100644 index a1bdd6b1..00000000 --- a/kamon-core/src/main/scala/kamon/metric/snapshot/TickSnapshot.scala +++ /dev/null @@ -1,17 +0,0 @@ -package kamon.metric.snapshot - -import java.time.Instant - - -trait TickSnapshot { - def interval: Interval - def entities: Seq[EntitySnapshot] -} - -trait Interval { - def from: Instant - def to: Instant -} - - - -- cgit v1.2.3