aboutsummaryrefslogtreecommitdiff
path: root/kamon-datadog/src/main/scala/kamon/datadog/DatadogMetricsSender.scala
blob: 0f67cc345ba64d1dc6172392a9a72fbe8c949191 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/*
 * =========================================================================================
 * Copyright © 2013-2014 the kamon project <http://kamon.io/>
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the
 * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
 * either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 * =========================================================================================
 */

package kamon.datadog

import akka.actor.{ ActorSystem, Props, ActorRef, Actor }
import akka.io.{ Udp, IO }
import java.net.InetSocketAddress
import akka.util.ByteString
import kamon.metric.Subscriptions.TickMetricSnapshot
import java.text.{ DecimalFormatSymbols, DecimalFormat }
import kamon.metric.instrument.{ Counter, Histogram }
import kamon.metric.{ MetricIdentity, MetricGroupIdentity }
import java.util.Locale

class DatadogMetricsSender(remote: InetSocketAddress, maxPacketSizeInBytes: Long) extends Actor with UdpExtensionProvider {

  import context.system

  val appName = context.system.settings.config.getString("kamon.datadog.application-name")
  val symbols = DecimalFormatSymbols.getInstance(Locale.US)
  symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of.

  // Absurdly high number of decimal digits, let the other end lose precision if it needs to.
  val samplingRateFormat = new DecimalFormat("#.################################################################", symbols)

  udpExtension ! Udp.SimpleSender

  def receive = {
    case Udp.SimpleSenderReady 
      context.become(ready(sender))
  }

  def ready(udpSender: ActorRef): Receive = {
    case tick: TickMetricSnapshot  writeMetricsToRemote(tick, udpSender)
  }

  def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = {
    val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, remote)

    for {
      (groupIdentity, groupSnapshot)  tick.metrics
      (metricIdentity, metricSnapshot)  groupSnapshot.metrics
    } {

      val key = buildMetricName(groupIdentity, metricIdentity)

      metricSnapshot match {
        case hs: Histogram.Snapshot 
          hs.recordsIterator.foreach { record 
            val measurementData = formatMeasurement(groupIdentity, metricIdentity, encodeStatsDTimer(record.level, record.count))
            packetBuilder.appendMeasurement(key, measurementData)

          }

        case cs: Counter.Snapshot 
          val measurementData = formatMeasurement(groupIdentity, metricIdentity, encodeStatsDCounter(cs.count))
          packetBuilder.appendMeasurement(key, measurementData)
      }
    }
    packetBuilder.flush()
  }

  def formatMeasurement(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity, measurementData: String): String =
    StringBuilder.newBuilder
      .append(measurementData)
      .append(buildIdentificationTag(groupIdentity, metricIdentity))
      .result()

  def encodeStatsDTimer(level: Long, count: Long): String = {
    val samplingRate: Double = 1D / count
    level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")
  }

  def encodeStatsDCounter(count: Long): String = count.toString + "|c"

  def buildMetricName(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String =
    s"$appName.${groupIdentity.category.name}.${metricIdentity.name}"

  def buildIdentificationTag(groupIdentity: MetricGroupIdentity, metricIdentity: MetricIdentity): String = {
    // Make the automatic HTTP trace names a bit more friendly
    val normalizedEntityName = groupIdentity.name.replace(": ", ":")
    s"|#${groupIdentity.category.name}:${normalizedEntityName}"
  }
}

object DatadogMetricsSender {
  def props(remote: InetSocketAddress, maxPacketSize: Long): Props = Props(new DatadogMetricsSender(remote, maxPacketSize))
}

trait UdpExtensionProvider {
  def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp)
}

class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, remote: InetSocketAddress) {
  val metricSeparator = "\n"
  val measurementSeparator = ":"
  var lastKey = ""
  var buffer = new StringBuilder()

  def appendMeasurement(key: String, measurementData: String): Unit = {
    val data = key + measurementSeparator + measurementData

    if (fitsOnBuffer(metricSeparator + data)) {
      val mSeparator = if (buffer.length > 0) metricSeparator else ""
      buffer.append(mSeparator).append(data)
    } else {
      flushToUDP(buffer.toString())
      buffer.clear()
      buffer.append(data)
    }
  }

  def fitsOnBuffer(data: String): Boolean = (buffer.length + data.length) <= maxPacketSizeInBytes

  private def flushToUDP(data: String): Unit = udpSender ! Udp.Send(ByteString(data), remote)

  def flush(): Unit = {
    flushToUDP(buffer.toString)
    buffer.clear()
  }
}