From 7ab87deaa91bd19527da2c72937b1e2d1a97ef12 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Tue, 11 Oct 2016 01:08:14 -0700 Subject: Initial commit --- .gitignore | 7 ++ build.sbt | 14 +++ project/build.properties | 1 + src/main/scala/gh/GitHub.scala | 31 ++++++ src/main/scala/gh/Http.scala | 114 ++++++++++++++++++++++ src/main/scala/gh/Issues.scala | 91 +++++++++++++++++ src/main/scala/gh/SnakifiedSprayJsonSupport.scala | 52 ++++++++++ 7 files changed, 310 insertions(+) create mode 100644 .gitignore create mode 100644 build.sbt create mode 100644 project/build.properties create mode 100644 src/main/scala/gh/GitHub.scala create mode 100644 src/main/scala/gh/Http.scala create mode 100644 src/main/scala/gh/Issues.scala create mode 100644 src/main/scala/gh/SnakifiedSprayJsonSupport.scala diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5066109 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +target/ + +*.class +*~ + +.ensime +.ensime_cache/ diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..08620fd --- /dev/null +++ b/build.sbt @@ -0,0 +1,14 @@ +name := "akka-github" + +scalaVersion := "2.11.8" + +val AkkaVersion = "2.4.11" + +libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-actor" % AkkaVersion, + "com.typesafe.akka" %% "akka-stream" % AkkaVersion, + "com.typesafe.akka" %% "akka-http-core" % AkkaVersion, + "com.typesafe.akka" %% "akka-http-testkit" % AkkaVersion, + "com.typesafe.akka" %% "akka-http-experimental" % AkkaVersion, + "io.spray" %% "spray-json" % "1.3.2" +) diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..35c88ba --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.12 diff --git a/src/main/scala/gh/GitHub.scala b/src/main/scala/gh/GitHub.scala new file mode 100644 index 0000000..6cc9764 --- /dev/null +++ b/src/main/scala/gh/GitHub.scala @@ -0,0 +1,31 @@ +package gh + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{ Keep, Sink } +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.control.NonFatal + + +object GitHub extends Issues with AkkaHttpClient { + + implicit val system = ActorSystem("gh") + implicit val materializer = ActorMaterializer() + + def client = new AkkaHttpClient("api.github.com") + +} + +object Main extends App { + import GitHub._ + + val done = issues.get("jodersky", "sbt-jni").toMat(Sink.foreach(println))(Keep.right).run() + + try { + println(Await.result(done, 10.seconds)) + } finally { + GitHub.system.shutdown() + } + +} diff --git a/src/main/scala/gh/Http.scala b/src/main/scala/gh/Http.scala new file mode 100644 index 0000000..c72a24e --- /dev/null +++ b/src/main/scala/gh/Http.scala @@ -0,0 +1,114 @@ +package gh + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.http.scaladsl.model.{ MediaTypes, ResponseEntity } +import akka.http.scaladsl.model.Uri.Query +import akka.http.scaladsl.model.headers.Accept +import akka.http.scaladsl.model.{ ContentType, RequestEntity, Uri } +import akka.http.scaladsl.model.Uri.Path +import akka.http.scaladsl.model.headers.{ Link, LinkValue } +import akka.http.scaladsl.model.headers.LinkParams +import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, HttpHeader} +import akka.http.scaladsl.unmarshalling.{ Unmarshal, Unmarshaller } +import akka.http.scaladsl.unmarshalling.FromResponseUnmarshaller +import akka.stream.{ BidiShape, Materializer } +import akka.stream.scaladsl.{ BidiFlow, Broadcast, Flow, GraphDSL, Merge, Sink, Source } +import akka.http.scaladsl.{HttpExt, Http} +import scala.concurrent.ExecutionContext +import spray.json.{ JsonParser, JsonReader } +import spray.json.JsonReader + +trait HttpClient { + + def client: HttpClient + + trait HttpClient { + def get[A: JsonReader](path: String): Source[A, NotUsed] + } + +} + +trait AkkaHttpClient extends HttpClient { + + class AkkaHttpClient(host: String)(implicit system: ActorSystem, materializer: Materializer) extends HttpClient { + + val http = Http(system) + + def get[A: JsonReader](path: String): Source[A, NotUsed] = { + implicit val ec = materializer.executionContext + + val uri = Uri() withScheme("https") withHost(host) withPath(Path(path)) + val request = HttpRequest(uri = uri) withHeaders(Accept(MediaTypes.`application/json`)) + + println(request) + val um = implicitly[FromResponseUnmarshaller[String]].map(text => { + val v = JsonParser(text) + println(v.prettyPrint) + v.convertTo[A] + }) + + Source.single(request) + .via(paginate(100).join(http.outgoingConnectionHttps(host))) + .mapAsync(10)(um.apply) + } + + + def paginate(itemsPerPage: Int): + BidiFlow[HttpRequest, HttpRequest, HttpResponse, HttpResponse, NotUsed] = { + BidiFlow.fromGraph(GraphDSL.create() { implicit builder => + import GraphDSL.Implicits._ + + val initialRequest = builder.add(Flow[HttpRequest].take(1)) + + // set number of items per page + val perPage = builder.add(Flow[HttpRequest].map{ req => + val uri = req.uri withQuery Query("per_page" -> itemsPerPage.toString) + req withUri uri + }) + + // get next request from a paginated response + val nextPage = builder.add(Flow[HttpResponse].takeWhile{ + case NextPage(_) => true + case _ => false + }.map{ + case NextPage(next) => next + }) + + val merge = builder.add(Merge[HttpRequest](2)) + val split = builder.add(Broadcast[HttpResponse](2)) + + initialRequest ~> perPage ~> merge + split.out(0) ~> nextPage ~> merge + + BidiShape(initialRequest.in, merge.out, split.in, split.out(1)) + }) + } + + def unmarshal[A](implicit u: Unmarshaller[ResponseEntity, A], ec: ExecutionContext): Flow[HttpResponse, A, NotUsed] = { + Flow[HttpResponse].mapAsync(10){ response => + if (response.status.isSuccess()) { + Unmarshal(response.entity).to[A] + } else { + throw new RuntimeException("Unsuccessful response status code: " + response.status.value) + } + } + } + + // Extractor for responses containing a 'Link' header to a next page, i.e. + // Link: ; rel="next" + private object NextPage { + def unapply(response: HttpResponse): Option[(HttpRequest)] = { + val next = for ( + Link(values) <- response.header[Link].toSeq; + LinkValue(nextUri, Seq(LinkParams.rel("next"))) <- values + ) yield { + HttpRequest(uri = nextUri) + } + next.headOption + } + } + + } + +} diff --git a/src/main/scala/gh/Issues.scala b/src/main/scala/gh/Issues.scala new file mode 100644 index 0000000..6187d2d --- /dev/null +++ b/src/main/scala/gh/Issues.scala @@ -0,0 +1,91 @@ +package gh + +import akka.NotUsed +import akka.stream.scaladsl.Source +import scala.concurrent.Future +import spray.json.{ DefaultJsonProtocol, JsonFormat } + +trait Issues { self: HttpClient => + import self._ + case class Issue( + id: Int, + number: Int, + state: String, + title: String, + body: String, + user: User, + labels: Seq[Label], + assignee: Option[User], + milestone: Option[Milestone], + locked: Boolean, + comments: Int, + closedAt: Option[String], + createdAt: String, + updatedAt: String + ) + + case class User( + login: String, + id: Int, + avatarUrl: String, + gravatarId: String, + `type`: String, + siteAdmin: Boolean + ) + + case class Label( + name: String, + color: String + ) + + case class Milestone( + id: Int, + state: String, + title: String, + description: String, + creator: User, + openIssues: Int, + closedIssues: Int, + createdAt: String, + updatedAt: String, + closedAt: String, + dueOn: String + ) + + case class Comment( + id: Int, + body: String, + user: User, + createdAt: String, + updatedAt: String + ) + + private object JsonSupport extends DefaultJsonProtocol with SnakifiedSprayJsonSupport { + implicit val userFormat = jsonFormat6(User) + implicit val commentFormat = jsonFormat5(Comment) + implicit val milestoneFormat = jsonFormat11(Milestone) + implicit val labelFormat = jsonFormat2(Label) + implicit val itemFormat = jsonFormat14(Issue) + } + import JsonSupport._ + + object issues { + def get( + owner: String, + repo: String, + milestone: String = "*", + state: String = "all", + assignee: String = "*", + creator: String = "*", + mentioned: String = "*", + labels: String = "*", + sort: String = "created", + direction: String = "desc", + since: String = "1970-01-01T00:00:00" + ): Source[Issue, NotUsed] = { + client.get[collection.immutable.Seq[Issue]]("/repos/" + owner + "/" + repo + "/issues").mapConcat(x => x) + } + + } + +} diff --git a/src/main/scala/gh/SnakifiedSprayJsonSupport.scala b/src/main/scala/gh/SnakifiedSprayJsonSupport.scala new file mode 100644 index 0000000..529b098 --- /dev/null +++ b/src/main/scala/gh/SnakifiedSprayJsonSupport.scala @@ -0,0 +1,52 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2015 Andrew Snare, Age Mooij + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +package gh +import spray.json._ + +/** + * A custom version of the Spray DefaultJsonProtocol with a modified field naming strategy + */ +trait SnakifiedSprayJsonSupport extends DefaultJsonProtocol { + import reflect._ + + /** + * This is the most important piece of code in this object! + * It overrides the default naming scheme used by spray-json and replaces it with a scheme that turns camelcased + * names into snakified names (i.e. using underscores as word separators). + */ + override protected def extractFieldNames(classTag: ClassTag[_]) = { + import java.util.Locale + + def snakify(name: String) = { + val pass1 = """([A-Z]+)([A-Z][a-z])""".r.replaceAllIn(name, "$1_$2") + val pass2 = """([a-z\d])([A-Z])""".r.replaceAllIn(pass1, "$1_$2") + + pass2.toLowerCase(Locale.US) + } + + super.extractFieldNames(classTag).map{ snakify(_) } + } +} + +object SnakifiedSprayJsonSupport extends SnakifiedSprayJsonSupport -- cgit v1.2.3