blob: 2fedcd707343c36e451c611811fead6984c3d79e (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
package kamon.play.instrumentation
import javax.net.ssl.SSLContext
import org.aspectj.lang.annotation.{ Around, Pointcut, Aspect }
import org.aspectj.lang.ProceedingJoinPoint
import com.ning.http.client._
import com.ning.http.client.filter.{ RequestFilter, FilterContext }
import kamon.trace.{ SegmentCompletionHandle, TraceRecorder }
import kamon.metrics.TraceMetrics.HttpClientRequest
@Aspect
class WSInstrumentation {
@Pointcut("call(* play.api.libs.ws.WS$.newClient(..))")
def onNewAsyncHttpClient(): Unit = {}
@Around("onNewAsyncHttpClient()")
def aroundNewAsyncHttpClient(pjp: ProceedingJoinPoint): Any = {
val playConfig = play.api.Play.maybeApplication.map(_.configuration)
val wsTimeout = playConfig.flatMap(_.getMilliseconds("ws.timeout"))
val asyncHttpConfig = new AsyncHttpClientConfig.Builder()
.setConnectionTimeoutInMs(playConfig.flatMap(_.getMilliseconds("ws.timeout.connection")).orElse(wsTimeout).getOrElse(120000L).toInt)
.setIdleConnectionTimeoutInMs(playConfig.flatMap(_.getMilliseconds("ws.timeout.idle")).orElse(wsTimeout).getOrElse(120000L).toInt)
.setRequestTimeoutInMs(playConfig.flatMap(_.getMilliseconds("ws.timeout.request")).getOrElse(120000L).toInt)
.setFollowRedirects(playConfig.flatMap(_.getBoolean("ws.followRedirects")).getOrElse(true))
.setUseProxyProperties(playConfig.flatMap(_.getBoolean("ws.useProxyProperties")).getOrElse(true))
playConfig.flatMap(_.getString("ws.useragent")).map { useragent ⇒
asyncHttpConfig.setUserAgent(useragent)
}
if (!playConfig.flatMap(_.getBoolean("ws.acceptAnyCertificate")).getOrElse(false)) {
asyncHttpConfig.setSSLContext(SSLContext.getDefault)
}
asyncHttpConfig.addRequestFilter(new KamonRequestFilter())
new AsyncHttpClient(asyncHttpConfig.build())
}
}
class KamonRequestFilter extends RequestFilter {
import KamonRequestFilter._
override def filter(ctx: FilterContext[_]): FilterContext[_] = {
val completionHandle = TraceRecorder.startSegment(HttpClientRequest(ctx.getRequest.getRawURI.toString(), UserTime), basicRequestAttributes(ctx.getRequest()))
new FilterContext.FilterContextBuilder(ctx).asyncHandler(new AsyncHandlerWrapper[Response](ctx.getAsyncHandler(), completionHandle)).build()
}
class AsyncHandlerWrapper[T](asyncHandler: AsyncHandler[_], completionHandle: Option[SegmentCompletionHandle]) extends AsyncCompletionHandler[T] {
override def onCompleted(response: Response): T = {
completionHandle.map(_.finish(Map.empty))
asyncHandler.onCompleted().asInstanceOf[T]
}
override def onThrowable(t: Throwable) = {
asyncHandler.onThrowable(t)
}
}
}
object KamonRequestFilter {
val UserTime = "UserTime"
def basicRequestAttributes(request: Request): Map[String, String] = {
Map[String, String](
"host" -> request.getHeaders().getFirstValue("host"),
"path" -> request.getURI.getPath,
"method" -> request.getMethod)
}
}
|