From 4b672ab5fc91b2b3c13e23f130f810b79ab7c928 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Mon, 12 Nov 2018 14:08:15 -0800 Subject: Initial commit --- .gitignore | 1 + build.sbt | 16 +++++++ example/src/main/scala/example/main.scala | 30 +++++++++++++ project/build.properties | 1 + project/plugins.sbt | 1 + src/main/scala/escale/api.scala | 73 +++++++++++++++++++++++++++++++ 6 files changed, 122 insertions(+) create mode 100644 .gitignore create mode 100644 build.sbt create mode 100644 example/src/main/scala/example/main.scala create mode 100644 project/build.properties create mode 100644 project/plugins.sbt create mode 100644 src/main/scala/escale/api.scala diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9f97022 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +target/ \ No newline at end of file diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..b7d8353 --- /dev/null +++ b/build.sbt @@ -0,0 +1,16 @@ +lazy val escale = project + .in(file(".")) + .settings( + scalaVersion := "2.12.7", + libraryDependencies ++= Seq( + "org.scala-lang" % "scala-reflect" % scalaVersion.value, + "org.scala-lang.modules" %% "scala-async" % "0.9.7" + ) + ) + +lazy val example = project + .in(file("example")) + .dependsOn(escale) + .settings( + scalaVersion := "2.12.7" + ) diff --git a/example/src/main/scala/example/main.scala b/example/src/main/scala/example/main.scala new file mode 100644 index 0000000..f6d0a48 --- /dev/null +++ b/example/src/main/scala/example/main.scala @@ -0,0 +1,30 @@ +package example + +import escale.Channel +import scala.async.Async._ +import scala.concurrent.Await +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ + +object Main extends App { + + val ch = Channel[Int](0) + + val p1 = async { + await(ch.put(1)) + await(ch.put(2)) + await(ch.put(2)) + await(ch.put(5)) + } + + val p2 = async { + await(ch.take()) + await(ch.take()) + await(ch.take()) + await(ch.take()) + } + + val result = Await.result(p2, 3.seconds) + println(result) + +} diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..7c58a83 --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.2.6 diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..f86e373 --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "1.5.1") diff --git a/src/main/scala/escale/api.scala b/src/main/scala/escale/api.scala new file mode 100644 index 0000000..57cb31d --- /dev/null +++ b/src/main/scala/escale/api.scala @@ -0,0 +1,73 @@ +package escale + +import scala.collection.mutable +import scala.concurrent.{Future, Promise} + +class Channel[A](capacity: Int) { + require(capacity >= 0, "capacity must be >= 0") + import Channel._ + + private val puts = mutable.Queue.empty[(Handler[Unit], A)] + private val takes = mutable.Queue.empty[Handler[A]] + + private val buffer = mutable.Queue.empty[A] + + def put(value: A): Future[Unit] = synchronized { + val handler = new Handler[Unit] + + if (takes.size > 0) { + val th = takes.dequeue() + th.promise.success(value) + handler.promise.success(()) + } else if (buffer.size < capacity) { + buffer.enqueue(value) + handler.promise.success(()) + } else { + if (puts.size >= MaxOps) { + handler.promise.failure( + new IllegalArgumentException("Too many pending put operations.")) + } else { + puts.enqueue(handler -> value) + } + } + handler.promise.future + } + def take(): Future[A] = synchronized { + val handler = new Handler[A] + + if (puts.size > 0) { + val (ph, pd) = puts.dequeue() + val data = if (capacity == 0) { + pd + } else { + val d = buffer.dequeue() + buffer.enqueue(pd) + d + } + ph.promise.success(()) + handler.promise.success(data) + } else if (buffer.isEmpty) { + if (takes.size >= MaxOps) { + handler.promise.failure( + new IllegalArgumentException("Too many pending take operations.")) + } else { + takes.enqueue(handler) + } + } else { + handler.promise.success(buffer.dequeue()) + } + handler.promise.future + } + +} + +object Channel { + final val MaxOps = 2 + + def apply[A](capacity: Int): Channel[A] = new Channel[A](capacity) + +} + +class Handler[A] { + val promise = Promise[A] +} -- cgit v1.2.3