aboutsummaryrefslogtreecommitdiff
path: root/kamon-play/src/main/scala/kamon/play/instrumentation/WSInstrumentation.scala
blob: 2fedcd707343c36e451c611811fead6984c3d79e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package kamon.play.instrumentation

import javax.net.ssl.SSLContext
import org.aspectj.lang.annotation.{ Around, Pointcut, Aspect }
import org.aspectj.lang.ProceedingJoinPoint
import com.ning.http.client._
import com.ning.http.client.filter.{ RequestFilter, FilterContext }
import kamon.trace.{ SegmentCompletionHandle, TraceRecorder }
import kamon.metrics.TraceMetrics.HttpClientRequest

@Aspect
class WSInstrumentation {

  @Pointcut("call(* play.api.libs.ws.WS$.newClient(..))")
  def onNewAsyncHttpClient(): Unit = {}

  @Around("onNewAsyncHttpClient()")
  def aroundNewAsyncHttpClient(pjp: ProceedingJoinPoint): Any = {
    val playConfig = play.api.Play.maybeApplication.map(_.configuration)
    val wsTimeout = playConfig.flatMap(_.getMilliseconds("ws.timeout"))
    val asyncHttpConfig = new AsyncHttpClientConfig.Builder()
      .setConnectionTimeoutInMs(playConfig.flatMap(_.getMilliseconds("ws.timeout.connection")).orElse(wsTimeout).getOrElse(120000L).toInt)
      .setIdleConnectionTimeoutInMs(playConfig.flatMap(_.getMilliseconds("ws.timeout.idle")).orElse(wsTimeout).getOrElse(120000L).toInt)
      .setRequestTimeoutInMs(playConfig.flatMap(_.getMilliseconds("ws.timeout.request")).getOrElse(120000L).toInt)
      .setFollowRedirects(playConfig.flatMap(_.getBoolean("ws.followRedirects")).getOrElse(true))
      .setUseProxyProperties(playConfig.flatMap(_.getBoolean("ws.useProxyProperties")).getOrElse(true))

    playConfig.flatMap(_.getString("ws.useragent")).map { useragent 
      asyncHttpConfig.setUserAgent(useragent)
    }
    if (!playConfig.flatMap(_.getBoolean("ws.acceptAnyCertificate")).getOrElse(false)) {
      asyncHttpConfig.setSSLContext(SSLContext.getDefault)
    }

    asyncHttpConfig.addRequestFilter(new KamonRequestFilter())

    new AsyncHttpClient(asyncHttpConfig.build())
  }
}

class KamonRequestFilter extends RequestFilter {
  import KamonRequestFilter._

  override def filter(ctx: FilterContext[_]): FilterContext[_] = {
    val completionHandle = TraceRecorder.startSegment(HttpClientRequest(ctx.getRequest.getRawURI.toString(), UserTime), basicRequestAttributes(ctx.getRequest()))
    new FilterContext.FilterContextBuilder(ctx).asyncHandler(new AsyncHandlerWrapper[Response](ctx.getAsyncHandler(), completionHandle)).build()
  }

  class AsyncHandlerWrapper[T](asyncHandler: AsyncHandler[_], completionHandle: Option[SegmentCompletionHandle]) extends AsyncCompletionHandler[T] {
    override def onCompleted(response: Response): T = {
      completionHandle.map(_.finish(Map.empty))
      asyncHandler.onCompleted().asInstanceOf[T]
    }
    override def onThrowable(t: Throwable) = {
      asyncHandler.onThrowable(t)
    }
  }
}

object KamonRequestFilter {
  val UserTime = "UserTime"

  def basicRequestAttributes(request: Request): Map[String, String] = {
    Map[String, String](
      "host" -> request.getHeaders().getFirstValue("host"),
      "path" -> request.getURI.getPath,
      "method" -> request.getMethod)
  }
}