diff options
Diffstat (limited to 'kamon-core/src/legacy-main/scala/kamon/util')
19 files changed, 905 insertions, 0 deletions
diff --git a/kamon-core/src/legacy-main/scala/kamon/util/ConfigTools.scala b/kamon-core/src/legacy-main/scala/kamon/util/ConfigTools.scala new file mode 100644 index 00000000..cbf530e4 --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/util/ConfigTools.scala @@ -0,0 +1,48 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util + +import java.util.concurrent.TimeUnit + +import com.typesafe.config.Config + +import scala.concurrent.duration.FiniteDuration + +import kamon.metric.instrument.{Memory, Time} + +object ConfigTools { + implicit class Syntax(val config: Config) extends AnyVal { + // We are using the deprecated .getNanoseconds option to keep Kamon source code compatible with + // versions of Akka using older typesafe-config versions. + + def getFiniteDuration(path: String): FiniteDuration = + FiniteDuration(config.getDuration(path, TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS) + + def firstLevelKeys: Set[String] = { + import scala.collection.JavaConverters._ + + config.entrySet().asScala.map { + case entry ⇒ entry.getKey.takeWhile(_ != '.') + } toSet + } + + def time(path: String): Time = Time(config.getString(path)) + + def memory(path: String): Memory = Memory(config.getString(path)) + } + +} diff --git a/kamon-core/src/legacy-main/scala/kamon/util/FastDispatch.scala b/kamon-core/src/legacy-main/scala/kamon/util/FastDispatch.scala new file mode 100644 index 00000000..7a94aefc --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/util/FastDispatch.scala @@ -0,0 +1,38 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util + +import akka.actor.ActorRef + +import scala.concurrent.{ExecutionContext, Future} + +/** + * Extension for Future[ActorRef]. Try to dispatch a message to a Future[ActorRef] in the same thread if it has already + * completed or do the regular scheduling otherwise. Specially useful when using the ModuleSupervisor extension to + * create actors. + */ +object FastDispatch { + implicit class Syntax(val target: Future[ActorRef]) extends AnyVal { + + def fastDispatch(message: Any)(implicit ec: ExecutionContext): Unit = + if (target.isCompleted) + target.value.get.map(_ ! message) + else + target.map(_ ! message) + } + +} diff --git a/kamon-core/src/legacy-main/scala/kamon/util/FunctionalInterfaces.scala b/kamon-core/src/legacy-main/scala/kamon/util/FunctionalInterfaces.scala new file mode 100644 index 00000000..8dab6519 --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/util/FunctionalInterfaces.scala @@ -0,0 +1,25 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util + +trait Supplier[T] { + def get: T +} + +trait Function[T, R] { + def apply(t: T): R +}
\ No newline at end of file diff --git a/kamon-core/src/legacy-main/scala/kamon/util/JavaTags.scala b/kamon-core/src/legacy-main/scala/kamon/util/JavaTags.scala new file mode 100644 index 00000000..90bece5c --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/util/JavaTags.scala @@ -0,0 +1,14 @@ +package kamon.util + +object JavaTags { + + /** + * Helper method to transform Java maps into Scala maps. Typically this will be used as a static import from + * Java code that wants to use tags, as tags are defined as scala.collection.mutable.Map[String, String] and + * creating them directly from Java is quite verbose. + */ + def tagsFromMap(tags: java.util.Map[String, String]): Map[String, String] = { + import scala.collection.JavaConversions._ + tags.toMap + } +} diff --git a/kamon-core/src/legacy-main/scala/kamon/util/Latency.scala b/kamon-core/src/legacy-main/scala/kamon/util/Latency.scala new file mode 100644 index 00000000..52e044f8 --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/util/Latency.scala @@ -0,0 +1,29 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util + +import kamon.metric.instrument.Histogram + +object Latency { + def measure[A](histogram: Histogram)(thunk: ⇒ A): A = { + val start = RelativeNanoTimestamp.now + try thunk finally { + val latency = NanoInterval.since(start).nanos + histogram.record(latency) + } + } +} diff --git a/kamon-core/src/legacy-main/scala/kamon/util/LazyActorRef.scala b/kamon-core/src/legacy-main/scala/kamon/util/LazyActorRef.scala new file mode 100644 index 00000000..a07abea6 --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/util/LazyActorRef.scala @@ -0,0 +1,69 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util + +import java.util +import java.util.concurrent.ConcurrentLinkedQueue + +import akka.actor.{Actor, ActorRef} +import org.HdrHistogram.WriterReaderPhaser + +import scala.annotation.tailrec + +/** + * A LazyActorRef accumulates messages sent to an actor that doesn't exist yet. Once the actor is created and + * the LazyActorRef is pointed to it, all the accumulated messages are flushed and any new message sent to the + * LazyActorRef will immediately be sent to the pointed ActorRef. + * + * This is intended to be used during Kamon's initialization where some components need to use ActorRefs to work + * (like subscriptions and the trace incubator) but our internal ActorSystem is not yet ready to create the + * required actors. + */ +class LazyActorRef { + private val _refPhaser = new WriterReaderPhaser + private val _backlog = new ConcurrentLinkedQueue[(Any, ActorRef)]() + @volatile private var _target: Option[ActorRef] = None + + def tell(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = { + val criticalEnter = _refPhaser.writerCriticalSectionEnter() + try { + _target.map(_.tell(message, sender)) getOrElse { + _backlog.add((message, sender)) + } + + } finally { _refPhaser.writerCriticalSectionExit(criticalEnter) } + } + + def point(target: ActorRef): Unit = { + @tailrec def drain(q: util.Queue[(Any, ActorRef)]): Unit = if (!q.isEmpty) { + val (msg, sender) = q.poll() + target.tell(msg, sender) + drain(q) + } + + try { + _refPhaser.readerLock() + + if (_target.isEmpty) { + _target = Some(target) + _refPhaser.flipPhase(1000L) + drain(_backlog) + + } else sys.error("A LazyActorRef cannot be pointed more than once.") + } finally { _refPhaser.readerUnlock() } + } +} diff --git a/kamon-core/src/legacy-main/scala/kamon/util/MapMerge.scala b/kamon-core/src/legacy-main/scala/kamon/util/MapMerge.scala new file mode 100644 index 00000000..6fc6fb15 --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/util/MapMerge.scala @@ -0,0 +1,43 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util + +object MapMerge { + + /** + * Merge two immutable maps with the same key and value types, using the provided valueMerge function. + */ + implicit class Syntax[K, V](val map: Map[K, V]) extends AnyVal { + def merge(that: Map[K, V], valueMerge: (V, V) ⇒ V): Map[K, V] = { + val merged = Map.newBuilder[K, V] + + map.foreach { + case (key, value) ⇒ + val mergedValue = that.get(key).map(v ⇒ valueMerge(value, v)).getOrElse(value) + merged += key → mergedValue + } + + that.foreach { + case kv @ (key, _) if !map.contains(key) ⇒ merged += kv + case other ⇒ // ignore, already included. + } + + merged.result(); + } + } + +} diff --git a/kamon-core/src/legacy-main/scala/kamon/util/NeedToScale.scala b/kamon-core/src/legacy-main/scala/kamon/util/NeedToScale.scala new file mode 100644 index 00000000..1397050f --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/util/NeedToScale.scala @@ -0,0 +1,37 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util + +import com.typesafe.config.Config +import kamon.metric.instrument.{Memory, Time} +import kamon.util.ConfigTools._ + +object NeedToScale { + val TimeUnits = "time-units" + val MemoryUnits = "memory-units" + + def unapply(config: Config): Option[(Option[Time], Option[Memory])] = { + val scaleTimeTo: Option[Time] = + if (config.hasPath(TimeUnits)) Some(config.time(TimeUnits)) else None + + val scaleMemoryTo: Option[Memory] = + if (config.hasPath(MemoryUnits)) Some(config.memory(MemoryUnits)) else None + if (scaleTimeTo.isDefined || scaleMemoryTo.isDefined) Some(scaleTimeTo → scaleMemoryTo) + else None + } +} + diff --git a/kamon-core/src/legacy-main/scala/kamon/util/PaddedAtomicLong.scala b/kamon-core/src/legacy-main/scala/kamon/util/PaddedAtomicLong.scala new file mode 100644 index 00000000..9019eb4c --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/util/PaddedAtomicLong.scala @@ -0,0 +1,25 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util + +import java.util.concurrent.atomic.AtomicLong + +class PaddedAtomicLong(value: Long = 0) extends AtomicLong(value) { + @volatile var p1, p2, p3, p4, p5, p6 = 7L + + protected def sumPaddingToPreventOptimisation() = p1 + p2 + p3 + p4 + p5 + p6 +}
\ No newline at end of file diff --git a/kamon-core/src/legacy-main/scala/kamon/util/PathFilter.scala b/kamon-core/src/legacy-main/scala/kamon/util/PathFilter.scala new file mode 100644 index 00000000..50e14ace --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/util/PathFilter.scala @@ -0,0 +1,5 @@ +package kamon.util + +trait PathFilter { + def accept(path: String): Boolean +} diff --git a/kamon-core/src/legacy-main/scala/kamon/util/RegexPathFilter.scala b/kamon-core/src/legacy-main/scala/kamon/util/RegexPathFilter.scala new file mode 100644 index 00000000..848fca87 --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/util/RegexPathFilter.scala @@ -0,0 +1,27 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util + +case class RegexPathFilter(path: String) extends PathFilter { + private val pathRegex = path.r + override def accept(path: String): Boolean = { + path match { + case pathRegex(_*) ⇒ true + case _ ⇒ false + } + } +} diff --git a/kamon-core/src/legacy-main/scala/kamon/util/SameThreadExecutionContext.scala b/kamon-core/src/legacy-main/scala/kamon/util/SameThreadExecutionContext.scala new file mode 100644 index 00000000..5fb0d066 --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/util/SameThreadExecutionContext.scala @@ -0,0 +1,30 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util + +import kamon.util.logger.LazyLogger +import scala.concurrent.ExecutionContext + +/** + * For small code blocks that don't need to be run on a separate thread. + */ +object SameThreadExecutionContext extends ExecutionContext { + val logger = LazyLogger("SameThreadExecutionContext") + + override def execute(runnable: Runnable): Unit = runnable.run + override def reportFailure(t: Throwable): Unit = logger.error(t.getMessage, t) +} diff --git a/kamon-core/src/legacy-main/scala/kamon/util/Sequencer.scala b/kamon-core/src/legacy-main/scala/kamon/util/Sequencer.scala new file mode 100644 index 00000000..f368e54f --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/util/Sequencer.scala @@ -0,0 +1,40 @@ +/* + * ========================================================================================= + * Copyright © 2013-2014 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util + +/** + * This class implements an extremely efficient, thread-safe way to generate a + * incrementing sequence of Longs with a simple Long overflow protection. + */ +class Sequencer { + private val CloseToOverflow = Long.MaxValue - 1000000000 + private val sequenceNumber = new PaddedAtomicLong(1L) + + def next(): Long = { + val current = sequenceNumber.getAndIncrement + + // check if this value is getting close to overflow? + if (current > CloseToOverflow) { + sequenceNumber.set(current - CloseToOverflow) // we need maintain the order + } + current + } +} + +object Sequencer { + def apply(): Sequencer = new Sequencer() +}
\ No newline at end of file diff --git a/kamon-core/src/legacy-main/scala/kamon/util/Timestamp.scala b/kamon-core/src/legacy-main/scala/kamon/util/Timestamp.scala new file mode 100644 index 00000000..002dc470 --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/util/Timestamp.scala @@ -0,0 +1,107 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util + +/** + * Epoch time stamp. + */ +case class Timestamp(seconds: Long) extends AnyVal { + def <(that: Timestamp): Boolean = this.seconds < that.seconds + def >(that: Timestamp): Boolean = this.seconds > that.seconds + def ==(that: Timestamp): Boolean = this.seconds == that.seconds + def >=(that: Timestamp): Boolean = this.seconds >= that.seconds + def <=(that: Timestamp): Boolean = this.seconds <= that.seconds + + override def toString: String = String.valueOf(seconds) + ".seconds" +} + +object Timestamp { + def now: Timestamp = new Timestamp(System.currentTimeMillis() / 1000) + def earlier(l: Timestamp, r: Timestamp): Timestamp = if (l <= r) l else r + def later(l: Timestamp, r: Timestamp): Timestamp = if (l >= r) l else r +} + +/** + * Epoch time stamp in milliseconds. + */ +case class MilliTimestamp(millis: Long) extends AnyVal { + override def toString: String = String.valueOf(millis) + ".millis" + + def toTimestamp: Timestamp = new Timestamp(millis / 1000) + def toRelativeNanoTimestamp: RelativeNanoTimestamp = { + val diff = (System.currentTimeMillis() - millis) * 1000000 + new RelativeNanoTimestamp(System.nanoTime() - diff) + } +} + +object MilliTimestamp { + def now: MilliTimestamp = new MilliTimestamp(System.currentTimeMillis()) +} + +/** + * Epoch time stamp in nanoseconds. + * + * NOTE: This doesn't have any better precision than MilliTimestamp, it is just a convenient way to get a epoch + * timestamp in nanoseconds. + */ +case class NanoTimestamp(nanos: Long) extends AnyVal { + def -(that: NanoTimestamp) = new NanoTimestamp(nanos - that.nanos) + def +(that: NanoTimestamp) = new NanoTimestamp(nanos + that.nanos) + override def toString: String = String.valueOf(nanos) + ".nanos" +} + +object NanoTimestamp { + def now: NanoTimestamp = new NanoTimestamp(System.currentTimeMillis() * 1000000) +} + +/** + * Number of nanoseconds between a arbitrary origin timestamp provided by the JVM via System.nanoTime() + */ +case class RelativeNanoTimestamp(nanos: Long) extends AnyVal { + def -(that: RelativeNanoTimestamp) = new RelativeNanoTimestamp(nanos - that.nanos) + def +(that: RelativeNanoTimestamp) = new RelativeNanoTimestamp(nanos + that.nanos) + override def toString: String = String.valueOf(nanos) + ".nanos" + + def toMilliTimestamp: MilliTimestamp = + new MilliTimestamp(System.currentTimeMillis - ((System.nanoTime - nanos) / 1000000)) +} + +object RelativeNanoTimestamp { + val zero = new RelativeNanoTimestamp(0L) + + def now: RelativeNanoTimestamp = new RelativeNanoTimestamp(System.nanoTime()) + def relativeTo(milliTimestamp: MilliTimestamp): RelativeNanoTimestamp = + new RelativeNanoTimestamp(now.nanos - (MilliTimestamp.now.millis - milliTimestamp.millis) * 1000000) +} + +/** + * Number of nanoseconds that passed between two points in time. + */ +case class NanoInterval(nanos: Long) extends AnyVal { + def <(that: NanoInterval): Boolean = this.nanos < that.nanos + def >(that: NanoInterval): Boolean = this.nanos > that.nanos + def ==(that: NanoInterval): Boolean = this.nanos == that.nanos + def >=(that: NanoInterval): Boolean = this.nanos >= that.nanos + def <=(that: NanoInterval): Boolean = this.nanos <= that.nanos + + override def toString: String = String.valueOf(nanos) + ".nanos" +} + +object NanoInterval { + def default: NanoInterval = new NanoInterval(0L) + def since(relative: RelativeNanoTimestamp): NanoInterval = new NanoInterval(System.nanoTime() - relative.nanos) +} diff --git a/kamon-core/src/legacy-main/scala/kamon/util/TriemapAtomicGetOrElseUpdate.scala b/kamon-core/src/legacy-main/scala/kamon/util/TriemapAtomicGetOrElseUpdate.scala new file mode 100644 index 00000000..d1197a5a --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/util/TriemapAtomicGetOrElseUpdate.scala @@ -0,0 +1,45 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ +package kamon.util + +import scala.collection.concurrent.TrieMap + +object TriemapAtomicGetOrElseUpdate { + + /** + * Workaround to the non thread-safe [[scala.collection.concurrent.TrieMap#getOrElseUpdate]] method. More details on + * why this is necessary can be found at [[https://issues.scala-lang.org/browse/SI-7943]]. + */ + implicit class Syntax[K, V](val trieMap: TrieMap[K, V]) extends AnyVal { + + def atomicGetOrElseUpdate(key: K, op: ⇒ V): V = + atomicGetOrElseUpdate(key, op, { v: V ⇒ Unit }) + + def atomicGetOrElseUpdate(key: K, op: ⇒ V, cleanup: V ⇒ Unit): V = + trieMap.get(key) match { + case Some(v) ⇒ v + case None ⇒ + val d = op + trieMap.putIfAbsent(key, d).map { oldValue ⇒ + // If there was an old value then `d` was never added + // and thus need to be cleanup. + cleanup(d) + oldValue + + } getOrElse (d) + } + } +} diff --git a/kamon-core/src/legacy-main/scala/kamon/util/executors/ExecutorMetricRecorder.scala b/kamon-core/src/legacy-main/scala/kamon/util/executors/ExecutorMetricRecorder.scala new file mode 100644 index 00000000..a4bca570 --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/util/executors/ExecutorMetricRecorder.scala @@ -0,0 +1,99 @@ +/* + * ========================================================================================= + * Copyright © 2013-2016 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util.executors + +import kamon.metric.{EntityRecorderFactory, GenericEntityRecorder} +import kamon.metric.instrument.{Gauge, MinMaxCounter, DifferentialValueCollector, InstrumentFactory} +import java.util.concurrent.{ForkJoinPool ⇒ JavaForkJoinPool, ThreadPoolExecutor} +import kamon.util.executors.ForkJoinPools.ForkJoinMetrics + +import scala.concurrent.forkjoin.ForkJoinPool + +object ForkJoinPools extends ForkJoinLowPriority { + trait ForkJoinMetrics[T] { + def getParallelism(fjp: T): Long + def getPoolSize(fjp: T): Long + def getActiveThreadCount(fjp: T): Long + def getRunningThreadCount(fjp: T): Long + def getQueuedTaskCount(fjp: T): Long + def getQueuedSubmissionCount(fjp: T): Long + } + + implicit object JavaForkJoin extends ForkJoinMetrics[JavaForkJoinPool] { + def getParallelism(fjp: JavaForkJoinPool) = fjp.getParallelism + def getPoolSize(fjp: JavaForkJoinPool) = fjp.getPoolSize.toLong + def getRunningThreadCount(fjp: JavaForkJoinPool) = fjp.getActiveThreadCount.toLong + def getActiveThreadCount(fjp: JavaForkJoinPool) = fjp.getRunningThreadCount.toLong + def getQueuedTaskCount(fjp: JavaForkJoinPool) = fjp.getQueuedTaskCount + def getQueuedSubmissionCount(fjp: JavaForkJoinPool) = fjp.getQueuedSubmissionCount + } +} + +trait ForkJoinLowPriority { + implicit object ScalaForkJoin extends ForkJoinMetrics[ForkJoinPool] { + def getParallelism(fjp: ForkJoinPool) = fjp.getParallelism + def getPoolSize(fjp: ForkJoinPool) = fjp.getPoolSize.toLong + def getRunningThreadCount(fjp: ForkJoinPool) = fjp.getActiveThreadCount.toLong + def getActiveThreadCount(fjp: ForkJoinPool) = fjp.getRunningThreadCount.toLong + def getQueuedTaskCount(fjp: ForkJoinPool) = fjp.getQueuedTaskCount + def getQueuedSubmissionCount(fjp: ForkJoinPool) = fjp.getQueuedSubmissionCount + } +} + +abstract class ForkJoinPoolMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + def paralellism: MinMaxCounter + def poolSize: Gauge + def activeThreads: Gauge + def runningThreads: Gauge + def queuedTaskCount: Gauge + def queuedSubmissionCount: Gauge +} + +object ForkJoinPoolMetrics { + def factory[T: ForkJoinMetrics](fjp: T, categoryName: String) = new EntityRecorderFactory[ForkJoinPoolMetrics] { + val forkJoinMetrics = implicitly[ForkJoinMetrics[T]] + + def category: String = categoryName + def createRecorder(instrumentFactory: InstrumentFactory) = new ForkJoinPoolMetrics(instrumentFactory) { + val paralellism = minMaxCounter("parallelism") + paralellism.increment(forkJoinMetrics.getParallelism(fjp)) // Steady value. + + val poolSize = gauge("pool-size", forkJoinMetrics.getPoolSize(fjp)) + val activeThreads = gauge("active-threads", forkJoinMetrics.getActiveThreadCount(fjp)) + val runningThreads = gauge("running-threads", forkJoinMetrics.getRunningThreadCount(fjp)) + val queuedTaskCount = gauge("queued-task-count", forkJoinMetrics.getQueuedTaskCount(fjp)) + val queuedSubmissionCount = gauge("queued-submission-count", forkJoinMetrics.getQueuedSubmissionCount(fjp)) + } + } +} + +class ThreadPoolExecutorMetrics(tpe: ThreadPoolExecutor, instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + val corePoolSize = gauge("core-pool-size", tpe.getCorePoolSize.toLong) + val maxPoolSize = gauge("max-pool-size", tpe.getMaximumPoolSize.toLong) + val poolSize = gauge("pool-size", tpe.getPoolSize.toLong) + val activeThreads = gauge("active-threads", tpe.getActiveCount.toLong) + val processedTasks = gauge("processed-tasks", DifferentialValueCollector(() ⇒ { + tpe.getTaskCount + })) +} + +object ThreadPoolExecutorMetrics { + def factory(tpe: ThreadPoolExecutor, cat: String) = new EntityRecorderFactory[ThreadPoolExecutorMetrics] { + def category: String = cat + def createRecorder(instrumentFactory: InstrumentFactory) = new ThreadPoolExecutorMetrics(tpe, instrumentFactory) + } +} diff --git a/kamon-core/src/legacy-main/scala/kamon/util/executors/ExecutorServiceMetrics.scala b/kamon-core/src/legacy-main/scala/kamon/util/executors/ExecutorServiceMetrics.scala new file mode 100644 index 00000000..60612beb --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/util/executors/ExecutorServiceMetrics.scala @@ -0,0 +1,134 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util.executors + +import java.util.concurrent.{ExecutorService, ForkJoinPool ⇒ JavaForkJoinPool, ThreadPoolExecutor} + +import kamon.Kamon +import kamon.metric.Entity + +import scala.concurrent.forkjoin.ForkJoinPool +import scala.util.control.NoStackTrace + +object ExecutorServiceMetrics { + val Category = "executor-service" + + private val DelegatedExecutor = Class.forName("java.util.concurrent.Executors$DelegatedExecutorService") + private val FinalizableDelegated = Class.forName("java.util.concurrent.Executors$FinalizableDelegatedExecutorService") + private val DelegateScheduled = Class.forName("java.util.concurrent.Executors$DelegatedScheduledExecutorService") + private val JavaForkJoinPool = classOf[JavaForkJoinPool] + private val ScalaForkJoinPool = classOf[ForkJoinPool] + + private val executorField = { + // executorService is private :( + val field = DelegatedExecutor.getDeclaredField("e") + field.setAccessible(true) + field + } + + /** + * + * Register the [[http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html ThreadPoolExecutor]] to Monitor. + * + * @param name The name of the [[ThreadPoolExecutor]] + * @param threadPool The intance of the [[ThreadPoolExecutor]] + * @param tags The tags associated to the [[ThreadPoolExecutor]] + */ + @inline private def registerThreadPool(name: String, threadPool: ThreadPoolExecutor, tags: Map[String, String]): Entity = { + val threadPoolEntity = Entity(name, Category, tags + ("executor-type" → "thread-pool-executor")) + Kamon.metrics.entity(ThreadPoolExecutorMetrics.factory(threadPool, Category), threadPoolEntity) + threadPoolEntity + } + + /** + * + * Register the [[http://www.scala-lang.org/api/current/index.html#scala.collection.parallel.TaskSupport ForkJoinPool]] to Monitor. + * + * @param name The name of the [[ForkJoinPool]] + * @param forkJoinPool The instance of the [[ForkJoinPool]] + * @param tags The tags associated to the [[ForkJoinPool]] + */ + @inline private def registerScalaForkJoin(name: String, forkJoinPool: ForkJoinPool, tags: Map[String, String]): Entity = { + val forkJoinEntity = Entity(name, Category, tags + ("executor-type" → "fork-join-pool")) + Kamon.metrics.entity(ForkJoinPoolMetrics.factory(forkJoinPool, Category), forkJoinEntity) + forkJoinEntity + } + + /** + * + * Register the [[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html JavaForkJoinPool]] to Monitor. + * + * @param name The name of the [[JavaForkJoinPool]] + * @param forkJoinPool The instance of the [[JavaForkJoinPool]] + * @param tags The tags associated to the [[JavaForkJoinPool]] + */ + @inline private def registerJavaForkJoin(name: String, forkJoinPool: JavaForkJoinPool, tags: Map[String, String]): Entity = { + val forkJoinEntity = Entity(name, Category, tags + ("executor-type" → "fork-join-pool")) + Kamon.metrics.entity(ForkJoinPoolMetrics.factory(forkJoinPool, Category), forkJoinEntity) + forkJoinEntity + } + + /** + * + * Register the [[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html ExecutorService]] to Monitor. + * + * @param name The name of the [[ExecutorService]] + * @param executorService The instance of the [[ExecutorService]] + * @param tags The tags associated to the [[ExecutorService]] + */ + def register(name: String, executorService: ExecutorService, tags: Map[String, String]): Entity = executorService match { + case threadPoolExecutor: ThreadPoolExecutor ⇒ registerThreadPool(name, threadPoolExecutor, tags) + case scalaForkJoinPool: ForkJoinPool if scalaForkJoinPool.getClass.isAssignableFrom(ScalaForkJoinPool) ⇒ registerScalaForkJoin(name, scalaForkJoinPool, tags) + case javaForkJoinPool: JavaForkJoinPool if javaForkJoinPool.getClass.isAssignableFrom(JavaForkJoinPool) ⇒ registerJavaForkJoin(name, javaForkJoinPool, tags) + case delegatedExecutor: ExecutorService if delegatedExecutor.getClass.isAssignableFrom(DelegatedExecutor) ⇒ registerDelegatedExecutor(name, delegatedExecutor, tags) + case delegatedScheduledExecutor: ExecutorService if delegatedScheduledExecutor.getClass.isAssignableFrom(DelegateScheduled) ⇒ registerDelegatedExecutor(name, delegatedScheduledExecutor, tags) + case finalizableDelegatedExecutor: ExecutorService if finalizableDelegatedExecutor.getClass.isAssignableFrom(FinalizableDelegated) ⇒ registerDelegatedExecutor(name, finalizableDelegatedExecutor, tags) + case other ⇒ throw NotSupportedException(s"The ExecutorService $name is not supported.") + } + + //Java variant + def register(name: String, executorService: ExecutorService, tags: java.util.Map[String, String]): Entity = { + import scala.collection.JavaConverters._ + register(name, executorService, tags.asScala.toMap) + } + + /** + * + * Register the [[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html ExecutorService]] to Monitor. + * + * @param name The name of the [[ExecutorService]] + * @param executorService The instance of the [[ExecutorService]] + */ + def register(name: String, executorService: ExecutorService): Entity = { + register(name, executorService, Map.empty[String, String]) + } + + def remove(entity: Entity): Unit = Kamon.metrics.removeEntity(entity) + + /** + * INTERNAL USAGE ONLY + */ + private def registerDelegatedExecutor(name: String, executor: ExecutorService, tags: Map[String, String]) = { + val underlyingExecutor = executorField.get(executor) match { + case executorService: ExecutorService ⇒ executorService + case other ⇒ other + } + register(name, underlyingExecutor.asInstanceOf[ExecutorService], tags) + } + + case class NotSupportedException(message: String) extends RuntimeException with NoStackTrace +}
\ No newline at end of file diff --git a/kamon-core/src/legacy-main/scala/kamon/util/http/HttpServerMetrics.scala b/kamon-core/src/legacy-main/scala/kamon/util/http/HttpServerMetrics.scala new file mode 100644 index 00000000..929fef4f --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/util/http/HttpServerMetrics.scala @@ -0,0 +1,41 @@ +/* + * ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util.http + +import kamon.metric.{EntityRecorderFactory, GenericEntityRecorder} +import kamon.metric.instrument.InstrumentFactory + +/** + * Counts HTTP response status codes into per status code and per trace name + status counters. If recording a HTTP + * response with status 500 for the trace "GetUser", the counter with name "500" as well as the counter with name + * "GetUser_500" will be incremented. + */ +class HttpServerMetrics(instrumentFactory: InstrumentFactory) extends GenericEntityRecorder(instrumentFactory) { + + def recordResponse(statusCode: String): Unit = + counter(statusCode).increment() + + def recordResponse(traceName: String, statusCode: String): Unit = { + recordResponse(statusCode) + counter(traceName + "_" + statusCode).increment() + } +} + +object HttpServerMetrics extends EntityRecorderFactory[HttpServerMetrics] { + def category: String = "http-server" + def createRecorder(instrumentFactory: InstrumentFactory): HttpServerMetrics = new HttpServerMetrics(instrumentFactory) +} diff --git a/kamon-core/src/legacy-main/scala/kamon/util/logger/LazyLogger.scala b/kamon-core/src/legacy-main/scala/kamon/util/logger/LazyLogger.scala new file mode 100644 index 00000000..11be7bbe --- /dev/null +++ b/kamon-core/src/legacy-main/scala/kamon/util/logger/LazyLogger.scala @@ -0,0 +1,49 @@ +/* ========================================================================================= + * Copyright © 2013-2015 the kamon project <http://kamon.io/> + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.util.logger + +import org.slf4j.{Logger ⇒ SLF4JLogger} + +class LazyLogger(val logger: SLF4JLogger) { + + @inline final def isTraceEnabled = logger.isTraceEnabled + @inline final def trace(msg: ⇒ String): Unit = if (isTraceEnabled) logger.trace(msg.toString) + @inline final def trace(msg: ⇒ String, t: ⇒ Throwable): Unit = if (isTraceEnabled) logger.trace(msg, t) + + @inline final def isDebugEnabled = logger.isDebugEnabled + @inline final def debug(msg: ⇒ String): Unit = if (isDebugEnabled) logger.debug(msg.toString) + @inline final def debug(msg: ⇒ String, t: ⇒ Throwable): Unit = if (isDebugEnabled) logger.debug(msg, t) + + @inline final def isErrorEnabled = logger.isErrorEnabled + @inline final def error(msg: ⇒ String): Unit = if (isErrorEnabled) logger.error(msg.toString) + @inline final def error(msg: ⇒ String, t: ⇒ Throwable): Unit = if (isErrorEnabled) logger.error(msg, t) + + @inline final def isInfoEnabled = logger.isInfoEnabled + @inline final def info(msg: ⇒ String): Unit = if (isInfoEnabled) logger.info(msg.toString) + @inline final def info(msg: ⇒ String, t: ⇒ Throwable): Unit = if (isInfoEnabled) logger.info(msg, t) + + @inline final def isWarnEnabled = logger.isWarnEnabled + @inline final def warn(msg: ⇒ String): Unit = if (isWarnEnabled) logger.warn(msg.toString) + @inline final def warn(msg: ⇒ String, t: ⇒ Throwable): Unit = if (isWarnEnabled) logger.warn(msg, t) +} + +object LazyLogger { + import scala.reflect.{classTag, ClassTag} + + def apply(name: String): LazyLogger = new LazyLogger(org.slf4j.LoggerFactory.getLogger(name)) + def apply(cls: Class[_]): LazyLogger = apply(cls.getName) + def apply[C: ClassTag](): LazyLogger = apply(classTag[C].runtimeClass.getName) +}
\ No newline at end of file |