summaryrefslogtreecommitdiff
path: root/src/main/scala/gh/Http.scala
blob: c72a24e43edd532d3287bec9403e13687e2698b2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package gh

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.model.{ MediaTypes, ResponseEntity }
import akka.http.scaladsl.model.Uri.Query
import akka.http.scaladsl.model.headers.Accept
import akka.http.scaladsl.model.{ ContentType, RequestEntity, Uri }
import akka.http.scaladsl.model.Uri.Path
import akka.http.scaladsl.model.headers.{ Link, LinkValue }
import akka.http.scaladsl.model.headers.LinkParams
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, HttpHeader}
import akka.http.scaladsl.unmarshalling.{ Unmarshal, Unmarshaller }
import akka.http.scaladsl.unmarshalling.FromResponseUnmarshaller
import akka.stream.{ BidiShape, Materializer }
import akka.stream.scaladsl.{ BidiFlow, Broadcast, Flow, GraphDSL, Merge, Sink, Source }
import akka.http.scaladsl.{HttpExt, Http}
import scala.concurrent.ExecutionContext
import spray.json.{ JsonParser, JsonReader }
import spray.json.JsonReader

trait HttpClient {

  def client: HttpClient

  trait HttpClient {
    def get[A: JsonReader](path: String): Source[A, NotUsed]
  }

}

trait AkkaHttpClient extends HttpClient {

  class AkkaHttpClient(host: String)(implicit system: ActorSystem, materializer: Materializer) extends HttpClient {
    
    val http = Http(system)

    def get[A: JsonReader](path: String): Source[A, NotUsed] = {
      implicit val ec = materializer.executionContext

      val uri = Uri() withScheme("https") withHost(host) withPath(Path(path))
      val request = HttpRequest(uri = uri) withHeaders(Accept(MediaTypes.`application/json`))

      println(request)
      val um = implicitly[FromResponseUnmarshaller[String]].map(text => {
        val v = JsonParser(text)
        println(v.prettyPrint)
        v.convertTo[A]
      })

      Source.single(request)
        .via(paginate(100).join(http.outgoingConnectionHttps(host)))
        .mapAsync(10)(um.apply)
    }


    def paginate(itemsPerPage: Int):
        BidiFlow[HttpRequest, HttpRequest, HttpResponse, HttpResponse, NotUsed] = {
      BidiFlow.fromGraph(GraphDSL.create() { implicit builder =>
        import GraphDSL.Implicits._

        val initialRequest = builder.add(Flow[HttpRequest].take(1))

        // set number of items per page
        val perPage = builder.add(Flow[HttpRequest].map{ req =>
          val uri = req.uri withQuery Query("per_page" -> itemsPerPage.toString)
          req withUri uri
        })

        // get next request from a paginated response
        val nextPage = builder.add(Flow[HttpResponse].takeWhile{
          case NextPage(_) => true
          case _ => false
        }.map{
          case NextPage(next) => next
        })

        val merge = builder.add(Merge[HttpRequest](2))
        val split = builder.add(Broadcast[HttpResponse](2))

        initialRequest ~> perPage ~> merge
        split.out(0) ~> nextPage ~> merge

        BidiShape(initialRequest.in, merge.out, split.in, split.out(1))
      })
    }

    def unmarshal[A](implicit u: Unmarshaller[ResponseEntity, A], ec: ExecutionContext): Flow[HttpResponse, A, NotUsed] = {
      Flow[HttpResponse].mapAsync(10){ response =>
        if (response.status.isSuccess()) {
          Unmarshal(response.entity).to[A]
        } else {
          throw new RuntimeException("Unsuccessful response status code: " + response.status.value)
        }
      }
    }

    // Extractor for responses containing a 'Link' header to a next page, i.e.
    // Link: <https://api.github.com/user/repos?page=3&per_page=100>; rel="next"
    private object NextPage {
      def unapply(response: HttpResponse): Option[(HttpRequest)] = {
        val next = for (
          Link(values) <- response.header[Link].toSeq;
          LinkValue(nextUri, Seq(LinkParams.rel("next"))) <- values
        ) yield {
          HttpRequest(uri = nextUri)
        }
        next.headOption
      }
    }

  }

}