summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2018-11-12 14:08:15 -0800
committerJakob Odersky <jakob@odersky.com>2018-11-12 14:08:15 -0800
commit4b672ab5fc91b2b3c13e23f130f810b79ab7c928 (patch)
treed9fec4768fb91256a863dba607d29f82b6c8115a
downloadescale-4b672ab5fc91b2b3c13e23f130f810b79ab7c928.tar.gz
escale-4b672ab5fc91b2b3c13e23f130f810b79ab7c928.tar.bz2
escale-4b672ab5fc91b2b3c13e23f130f810b79ab7c928.zip
Initial commit
-rw-r--r--.gitignore1
-rw-r--r--build.sbt16
-rw-r--r--example/src/main/scala/example/main.scala30
-rw-r--r--project/build.properties1
-rw-r--r--project/plugins.sbt1
-rw-r--r--src/main/scala/escale/api.scala73
6 files changed, 122 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..9f97022
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+target/ \ No newline at end of file
diff --git a/build.sbt b/build.sbt
new file mode 100644
index 0000000..b7d8353
--- /dev/null
+++ b/build.sbt
@@ -0,0 +1,16 @@
+lazy val escale = project
+ .in(file("."))
+ .settings(
+ scalaVersion := "2.12.7",
+ libraryDependencies ++= Seq(
+ "org.scala-lang" % "scala-reflect" % scalaVersion.value,
+ "org.scala-lang.modules" %% "scala-async" % "0.9.7"
+ )
+ )
+
+lazy val example = project
+ .in(file("example"))
+ .dependsOn(escale)
+ .settings(
+ scalaVersion := "2.12.7"
+ )
diff --git a/example/src/main/scala/example/main.scala b/example/src/main/scala/example/main.scala
new file mode 100644
index 0000000..f6d0a48
--- /dev/null
+++ b/example/src/main/scala/example/main.scala
@@ -0,0 +1,30 @@
+package example
+
+import escale.Channel
+import scala.async.Async._
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+
+object Main extends App {
+
+ val ch = Channel[Int](0)
+
+ val p1 = async {
+ await(ch.put(1))
+ await(ch.put(2))
+ await(ch.put(2))
+ await(ch.put(5))
+ }
+
+ val p2 = async {
+ await(ch.take())
+ await(ch.take())
+ await(ch.take())
+ await(ch.take())
+ }
+
+ val result = Await.result(p2, 3.seconds)
+ println(result)
+
+}
diff --git a/project/build.properties b/project/build.properties
new file mode 100644
index 0000000..7c58a83
--- /dev/null
+++ b/project/build.properties
@@ -0,0 +1 @@
+sbt.version=1.2.6
diff --git a/project/plugins.sbt b/project/plugins.sbt
new file mode 100644
index 0000000..f86e373
--- /dev/null
+++ b/project/plugins.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "1.5.1")
diff --git a/src/main/scala/escale/api.scala b/src/main/scala/escale/api.scala
new file mode 100644
index 0000000..57cb31d
--- /dev/null
+++ b/src/main/scala/escale/api.scala
@@ -0,0 +1,73 @@
+package escale
+
+import scala.collection.mutable
+import scala.concurrent.{Future, Promise}
+
+class Channel[A](capacity: Int) {
+ require(capacity >= 0, "capacity must be >= 0")
+ import Channel._
+
+ private val puts = mutable.Queue.empty[(Handler[Unit], A)]
+ private val takes = mutable.Queue.empty[Handler[A]]
+
+ private val buffer = mutable.Queue.empty[A]
+
+ def put(value: A): Future[Unit] = synchronized {
+ val handler = new Handler[Unit]
+
+ if (takes.size > 0) {
+ val th = takes.dequeue()
+ th.promise.success(value)
+ handler.promise.success(())
+ } else if (buffer.size < capacity) {
+ buffer.enqueue(value)
+ handler.promise.success(())
+ } else {
+ if (puts.size >= MaxOps) {
+ handler.promise.failure(
+ new IllegalArgumentException("Too many pending put operations."))
+ } else {
+ puts.enqueue(handler -> value)
+ }
+ }
+ handler.promise.future
+ }
+ def take(): Future[A] = synchronized {
+ val handler = new Handler[A]
+
+ if (puts.size > 0) {
+ val (ph, pd) = puts.dequeue()
+ val data = if (capacity == 0) {
+ pd
+ } else {
+ val d = buffer.dequeue()
+ buffer.enqueue(pd)
+ d
+ }
+ ph.promise.success(())
+ handler.promise.success(data)
+ } else if (buffer.isEmpty) {
+ if (takes.size >= MaxOps) {
+ handler.promise.failure(
+ new IllegalArgumentException("Too many pending take operations."))
+ } else {
+ takes.enqueue(handler)
+ }
+ } else {
+ handler.promise.success(buffer.dequeue())
+ }
+ handler.promise.future
+ }
+
+}
+
+object Channel {
+ final val MaxOps = 2
+
+ def apply[A](capacity: Int): Channel[A] = new Channel[A](capacity)
+
+}
+
+class Handler[A] {
+ val promise = Promise[A]
+}