summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2016-10-11 01:08:14 -0700
committerJakob Odersky <jakob@odersky.com>2016-10-11 01:08:14 -0700
commit7ab87deaa91bd19527da2c72937b1e2d1a97ef12 (patch)
tree5846934da9697498eb7c93f5c6679739870ae5c8
downloadghstream-7ab87deaa91bd19527da2c72937b1e2d1a97ef12.tar.gz
ghstream-7ab87deaa91bd19527da2c72937b1e2d1a97ef12.tar.bz2
ghstream-7ab87deaa91bd19527da2c72937b1e2d1a97ef12.zip
Initial commitHEADmaster
-rw-r--r--.gitignore7
-rw-r--r--build.sbt14
-rw-r--r--project/build.properties1
-rw-r--r--src/main/scala/gh/GitHub.scala31
-rw-r--r--src/main/scala/gh/Http.scala114
-rw-r--r--src/main/scala/gh/Issues.scala91
-rw-r--r--src/main/scala/gh/SnakifiedSprayJsonSupport.scala52
7 files changed, 310 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..5066109
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,7 @@
+target/
+
+*.class
+*~
+
+.ensime
+.ensime_cache/
diff --git a/build.sbt b/build.sbt
new file mode 100644
index 0000000..08620fd
--- /dev/null
+++ b/build.sbt
@@ -0,0 +1,14 @@
+name := "akka-github"
+
+scalaVersion := "2.11.8"
+
+val AkkaVersion = "2.4.11"
+
+libraryDependencies ++= Seq(
+ "com.typesafe.akka" %% "akka-actor" % AkkaVersion,
+ "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
+ "com.typesafe.akka" %% "akka-http-core" % AkkaVersion,
+ "com.typesafe.akka" %% "akka-http-testkit" % AkkaVersion,
+ "com.typesafe.akka" %% "akka-http-experimental" % AkkaVersion,
+ "io.spray" %% "spray-json" % "1.3.2"
+)
diff --git a/project/build.properties b/project/build.properties
new file mode 100644
index 0000000..35c88ba
--- /dev/null
+++ b/project/build.properties
@@ -0,0 +1 @@
+sbt.version=0.13.12
diff --git a/src/main/scala/gh/GitHub.scala b/src/main/scala/gh/GitHub.scala
new file mode 100644
index 0000000..6cc9764
--- /dev/null
+++ b/src/main/scala/gh/GitHub.scala
@@ -0,0 +1,31 @@
+package gh
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{ Keep, Sink }
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.util.control.NonFatal
+
+
+object GitHub extends Issues with AkkaHttpClient {
+
+ implicit val system = ActorSystem("gh")
+ implicit val materializer = ActorMaterializer()
+
+ def client = new AkkaHttpClient("api.github.com")
+
+}
+
+object Main extends App {
+ import GitHub._
+
+ val done = issues.get("jodersky", "sbt-jni").toMat(Sink.foreach(println))(Keep.right).run()
+
+ try {
+ println(Await.result(done, 10.seconds))
+ } finally {
+ GitHub.system.shutdown()
+ }
+
+}
diff --git a/src/main/scala/gh/Http.scala b/src/main/scala/gh/Http.scala
new file mode 100644
index 0000000..c72a24e
--- /dev/null
+++ b/src/main/scala/gh/Http.scala
@@ -0,0 +1,114 @@
+package gh
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.{ MediaTypes, ResponseEntity }
+import akka.http.scaladsl.model.Uri.Query
+import akka.http.scaladsl.model.headers.Accept
+import akka.http.scaladsl.model.{ ContentType, RequestEntity, Uri }
+import akka.http.scaladsl.model.Uri.Path
+import akka.http.scaladsl.model.headers.{ Link, LinkValue }
+import akka.http.scaladsl.model.headers.LinkParams
+import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, HttpHeader}
+import akka.http.scaladsl.unmarshalling.{ Unmarshal, Unmarshaller }
+import akka.http.scaladsl.unmarshalling.FromResponseUnmarshaller
+import akka.stream.{ BidiShape, Materializer }
+import akka.stream.scaladsl.{ BidiFlow, Broadcast, Flow, GraphDSL, Merge, Sink, Source }
+import akka.http.scaladsl.{HttpExt, Http}
+import scala.concurrent.ExecutionContext
+import spray.json.{ JsonParser, JsonReader }
+import spray.json.JsonReader
+
+trait HttpClient {
+
+ def client: HttpClient
+
+ trait HttpClient {
+ def get[A: JsonReader](path: String): Source[A, NotUsed]
+ }
+
+}
+
+trait AkkaHttpClient extends HttpClient {
+
+ class AkkaHttpClient(host: String)(implicit system: ActorSystem, materializer: Materializer) extends HttpClient {
+
+ val http = Http(system)
+
+ def get[A: JsonReader](path: String): Source[A, NotUsed] = {
+ implicit val ec = materializer.executionContext
+
+ val uri = Uri() withScheme("https") withHost(host) withPath(Path(path))
+ val request = HttpRequest(uri = uri) withHeaders(Accept(MediaTypes.`application/json`))
+
+ println(request)
+ val um = implicitly[FromResponseUnmarshaller[String]].map(text => {
+ val v = JsonParser(text)
+ println(v.prettyPrint)
+ v.convertTo[A]
+ })
+
+ Source.single(request)
+ .via(paginate(100).join(http.outgoingConnectionHttps(host)))
+ .mapAsync(10)(um.apply)
+ }
+
+
+ def paginate(itemsPerPage: Int):
+ BidiFlow[HttpRequest, HttpRequest, HttpResponse, HttpResponse, NotUsed] = {
+ BidiFlow.fromGraph(GraphDSL.create() { implicit builder =>
+ import GraphDSL.Implicits._
+
+ val initialRequest = builder.add(Flow[HttpRequest].take(1))
+
+ // set number of items per page
+ val perPage = builder.add(Flow[HttpRequest].map{ req =>
+ val uri = req.uri withQuery Query("per_page" -> itemsPerPage.toString)
+ req withUri uri
+ })
+
+ // get next request from a paginated response
+ val nextPage = builder.add(Flow[HttpResponse].takeWhile{
+ case NextPage(_) => true
+ case _ => false
+ }.map{
+ case NextPage(next) => next
+ })
+
+ val merge = builder.add(Merge[HttpRequest](2))
+ val split = builder.add(Broadcast[HttpResponse](2))
+
+ initialRequest ~> perPage ~> merge
+ split.out(0) ~> nextPage ~> merge
+
+ BidiShape(initialRequest.in, merge.out, split.in, split.out(1))
+ })
+ }
+
+ def unmarshal[A](implicit u: Unmarshaller[ResponseEntity, A], ec: ExecutionContext): Flow[HttpResponse, A, NotUsed] = {
+ Flow[HttpResponse].mapAsync(10){ response =>
+ if (response.status.isSuccess()) {
+ Unmarshal(response.entity).to[A]
+ } else {
+ throw new RuntimeException("Unsuccessful response status code: " + response.status.value)
+ }
+ }
+ }
+
+ // Extractor for responses containing a 'Link' header to a next page, i.e.
+ // Link: <https://api.github.com/user/repos?page=3&per_page=100>; rel="next"
+ private object NextPage {
+ def unapply(response: HttpResponse): Option[(HttpRequest)] = {
+ val next = for (
+ Link(values) <- response.header[Link].toSeq;
+ LinkValue(nextUri, Seq(LinkParams.rel("next"))) <- values
+ ) yield {
+ HttpRequest(uri = nextUri)
+ }
+ next.headOption
+ }
+ }
+
+ }
+
+}
diff --git a/src/main/scala/gh/Issues.scala b/src/main/scala/gh/Issues.scala
new file mode 100644
index 0000000..6187d2d
--- /dev/null
+++ b/src/main/scala/gh/Issues.scala
@@ -0,0 +1,91 @@
+package gh
+
+import akka.NotUsed
+import akka.stream.scaladsl.Source
+import scala.concurrent.Future
+import spray.json.{ DefaultJsonProtocol, JsonFormat }
+
+trait Issues { self: HttpClient =>
+ import self._
+ case class Issue(
+ id: Int,
+ number: Int,
+ state: String,
+ title: String,
+ body: String,
+ user: User,
+ labels: Seq[Label],
+ assignee: Option[User],
+ milestone: Option[Milestone],
+ locked: Boolean,
+ comments: Int,
+ closedAt: Option[String],
+ createdAt: String,
+ updatedAt: String
+ )
+
+ case class User(
+ login: String,
+ id: Int,
+ avatarUrl: String,
+ gravatarId: String,
+ `type`: String,
+ siteAdmin: Boolean
+ )
+
+ case class Label(
+ name: String,
+ color: String
+ )
+
+ case class Milestone(
+ id: Int,
+ state: String,
+ title: String,
+ description: String,
+ creator: User,
+ openIssues: Int,
+ closedIssues: Int,
+ createdAt: String,
+ updatedAt: String,
+ closedAt: String,
+ dueOn: String
+ )
+
+ case class Comment(
+ id: Int,
+ body: String,
+ user: User,
+ createdAt: String,
+ updatedAt: String
+ )
+
+ private object JsonSupport extends DefaultJsonProtocol with SnakifiedSprayJsonSupport {
+ implicit val userFormat = jsonFormat6(User)
+ implicit val commentFormat = jsonFormat5(Comment)
+ implicit val milestoneFormat = jsonFormat11(Milestone)
+ implicit val labelFormat = jsonFormat2(Label)
+ implicit val itemFormat = jsonFormat14(Issue)
+ }
+ import JsonSupport._
+
+ object issues {
+ def get(
+ owner: String,
+ repo: String,
+ milestone: String = "*",
+ state: String = "all",
+ assignee: String = "*",
+ creator: String = "*",
+ mentioned: String = "*",
+ labels: String = "*",
+ sort: String = "created",
+ direction: String = "desc",
+ since: String = "1970-01-01T00:00:00"
+ ): Source[Issue, NotUsed] = {
+ client.get[collection.immutable.Seq[Issue]]("/repos/" + owner + "/" + repo + "/issues").mapConcat(x => x)
+ }
+
+ }
+
+}
diff --git a/src/main/scala/gh/SnakifiedSprayJsonSupport.scala b/src/main/scala/gh/SnakifiedSprayJsonSupport.scala
new file mode 100644
index 0000000..529b098
--- /dev/null
+++ b/src/main/scala/gh/SnakifiedSprayJsonSupport.scala
@@ -0,0 +1,52 @@
+/**
+ * The MIT License (MIT)
+ *
+ * Copyright (c) 2013-2015 Andrew Snare, Age Mooij
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+package gh
+import spray.json._
+
+/**
+ * A custom version of the Spray DefaultJsonProtocol with a modified field naming strategy
+ */
+trait SnakifiedSprayJsonSupport extends DefaultJsonProtocol {
+ import reflect._
+
+ /**
+ * This is the most important piece of code in this object!
+ * It overrides the default naming scheme used by spray-json and replaces it with a scheme that turns camelcased
+ * names into snakified names (i.e. using underscores as word separators).
+ */
+ override protected def extractFieldNames(classTag: ClassTag[_]) = {
+ import java.util.Locale
+
+ def snakify(name: String) = {
+ val pass1 = """([A-Z]+)([A-Z][a-z])""".r.replaceAllIn(name, "$1_$2")
+ val pass2 = """([a-z\d])([A-Z])""".r.replaceAllIn(pass1, "$1_$2")
+
+ pass2.toLowerCase(Locale.US)
+ }
+
+ super.extractFieldNames(classTag).map{ snakify(_) }
+ }
+}
+
+object SnakifiedSprayJsonSupport extends SnakifiedSprayJsonSupport