aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-09-23 11:58:05 -0700
committerMatei Zaharia <matei@databricks.com>2014-09-23 11:58:05 -0700
commite73b48ace0a7e0f249221240140235d33eeac36b (patch)
tree50adea378f370306aa024e6165b385e4dad1d9c2 /streaming/src
parent3b8eefa9b843c7f1e0e8dda6023272bc9f011c5c (diff)
downloadspark-e73b48ace0a7e0f249221240140235d33eeac36b.tar.gz
spark-e73b48ace0a7e0f249221240140235d33eeac36b.tar.bz2
spark-e73b48ace0a7e0f249221240140235d33eeac36b.zip
SPARK-2745 [STREAMING] Add Java friendly methods to Duration class
tdas is this what you had in mind for this JIRA? I saw this one and thought it would be easy to take care of, and helpful as I use streaming from Java. I could do the same for `Time`? Happy to do so. Author: Sean Owen <sowen@cloudera.com> Closes #2403 from srowen/SPARK-2745 and squashes the following commits: 5a9e706 [Sean Owen] Change "Duration" to "Durations" to avoid changing Duration case class API bda301c [Sean Owen] Just delegate to Scala binary operator syntax to avoid scalastyle warning 7dde949 [Sean Owen] Disable scalastyle for false positives. Add Java static factory methods seconds(), minutes() to Duration. Add Java-friendly methods to Time too, and unit tests. Remove unnecessary math.floor from Time.floor() 4dee32e [Sean Owen] Add named methods to Duration in parallel to symbolic methods for Java-friendliness. Also add unit tests for Duration, in Scala and Java.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Duration.scala39
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Time.scala20
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaDurationSuite.java84
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaTimeSuite.java63
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/DurationSuite.scala110
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala111
6 files changed, 425 insertions, 2 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Duration.scala b/streaming/src/main/scala/org/apache/spark/streaming/Duration.scala
index 6bf275f5af..a0d8fb5ab9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Duration.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Duration.scala
@@ -37,6 +37,25 @@ case class Duration (private val millis: Long) {
def / (that: Duration): Double = millis.toDouble / that.millis.toDouble
+ // Java-friendlier versions of the above.
+
+ def less(that: Duration): Boolean = this < that
+
+ def lessEq(that: Duration): Boolean = this <= that
+
+ def greater(that: Duration): Boolean = this > that
+
+ def greaterEq(that: Duration): Boolean = this >= that
+
+ def plus(that: Duration): Duration = this + that
+
+ def minus(that: Duration): Duration = this - that
+
+ def times(times: Int): Duration = this * times
+
+ def div(that: Duration): Double = this / that
+
+
def isMultipleOf(that: Duration): Boolean =
(this.millis % that.millis == 0)
@@ -80,4 +99,24 @@ object Minutes {
def apply(minutes: Long) = new Duration(minutes * 60000)
}
+// Java-friendlier versions of the objects above.
+// Named "Durations" instead of "Duration" to avoid changing the case class's implied API.
+
+object Durations {
+
+ /**
+ * @return [[org.apache.spark.streaming.Duration]] representing given number of milliseconds.
+ */
+ def milliseconds(milliseconds: Long) = Milliseconds(milliseconds)
+ /**
+ * @return [[org.apache.spark.streaming.Duration]] representing given number of seconds.
+ */
+ def seconds(seconds: Long) = Seconds(seconds)
+
+ /**
+ * @return [[org.apache.spark.streaming.Duration]] representing given number of minutes.
+ */
+ def minutes(minutes: Long) = Minutes(minutes)
+
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
index 37b3b28fa0..42c49678d2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
@@ -41,10 +41,26 @@ case class Time(private val millis: Long) {
def - (that: Duration): Time = new Time(millis - that.milliseconds)
+ // Java-friendlier versions of the above.
+
+ def less(that: Time): Boolean = this < that
+
+ def lessEq(that: Time): Boolean = this <= that
+
+ def greater(that: Time): Boolean = this > that
+
+ def greaterEq(that: Time): Boolean = this >= that
+
+ def plus(that: Duration): Time = this + that
+
+ def minus(that: Time): Duration = this - that
+
+ def minus(that: Duration): Time = this - that
+
+
def floor(that: Duration): Time = {
val t = that.milliseconds
- val m = math.floor(this.millis / t).toLong
- new Time(m * t)
+ new Time((this.millis / t) * t)
}
def isMultipleOf(that: Duration): Boolean =
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaDurationSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaDurationSuite.java
new file mode 100644
index 0000000000..76425fe2aa
--- /dev/null
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaDurationSuite.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.streaming;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JavaDurationSuite {
+
+ // Just testing the methods that are specially exposed for Java.
+ // This does not repeat all tests found in the Scala suite.
+
+ @Test
+ public void testLess() {
+ Assert.assertTrue(new Duration(999).less(new Duration(1000)));
+ }
+
+ @Test
+ public void testLessEq() {
+ Assert.assertTrue(new Duration(1000).lessEq(new Duration(1000)));
+ }
+
+ @Test
+ public void testGreater() {
+ Assert.assertTrue(new Duration(1000).greater(new Duration(999)));
+ }
+
+ @Test
+ public void testGreaterEq() {
+ Assert.assertTrue(new Duration(1000).greaterEq(new Duration(1000)));
+ }
+
+ @Test
+ public void testPlus() {
+ Assert.assertEquals(new Duration(1100), new Duration(1000).plus(new Duration(100)));
+ }
+
+ @Test
+ public void testMinus() {
+ Assert.assertEquals(new Duration(900), new Duration(1000).minus(new Duration(100)));
+ }
+
+ @Test
+ public void testTimes() {
+ Assert.assertEquals(new Duration(200), new Duration(100).times(2));
+ }
+
+ @Test
+ public void testDiv() {
+ Assert.assertEquals(200.0, new Duration(1000).div(new Duration(5)), 1.0e-12);
+ }
+
+ @Test
+ public void testMilliseconds() {
+ Assert.assertEquals(new Duration(100), Durations.milliseconds(100));
+ }
+
+ @Test
+ public void testSeconds() {
+ Assert.assertEquals(new Duration(30 * 1000), Durations.seconds(30));
+ }
+
+ @Test
+ public void testMinutes() {
+ Assert.assertEquals(new Duration(2 * 60 * 1000), Durations.minutes(2));
+ }
+
+}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTimeSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaTimeSuite.java
new file mode 100644
index 0000000000..ad6b1853e3
--- /dev/null
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTimeSuite.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JavaTimeSuite {
+
+ // Just testing the methods that are specially exposed for Java.
+ // This does not repeat all tests found in the Scala suite.
+
+ @Test
+ public void testLess() {
+ Assert.assertTrue(new Time(999).less(new Time(1000)));
+ }
+
+ @Test
+ public void testLessEq() {
+ Assert.assertTrue(new Time(1000).lessEq(new Time(1000)));
+ }
+
+ @Test
+ public void testGreater() {
+ Assert.assertTrue(new Time(1000).greater(new Time(999)));
+ }
+
+ @Test
+ public void testGreaterEq() {
+ Assert.assertTrue(new Time(1000).greaterEq(new Time(1000)));
+ }
+
+ @Test
+ public void testPlus() {
+ Assert.assertEquals(new Time(1100), new Time(1000).plus(new Duration(100)));
+ }
+
+ @Test
+ public void testMinusTime() {
+ Assert.assertEquals(new Duration(900), new Time(1000).minus(new Time(100)));
+ }
+
+ @Test
+ public void testMinusDuration() {
+ Assert.assertEquals(new Time(900), new Time(1000).minus(new Duration(100)));
+ }
+
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DurationSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DurationSuite.scala
new file mode 100644
index 0000000000..6202250e89
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/DurationSuite.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+class DurationSuite extends TestSuiteBase {
+
+ test("less") {
+ assert(new Duration(999) < new Duration(1000))
+ assert(new Duration(0) < new Duration(1))
+ assert(!(new Duration(1000) < new Duration(999)))
+ assert(!(new Duration(1000) < new Duration(1000)))
+ }
+
+ test("lessEq") {
+ assert(new Duration(999) <= new Duration(1000))
+ assert(new Duration(0) <= new Duration(1))
+ assert(!(new Duration(1000) <= new Duration(999)))
+ assert(new Duration(1000) <= new Duration(1000))
+ }
+
+ test("greater") {
+ assert(!(new Duration(999) > new Duration(1000)))
+ assert(!(new Duration(0) > new Duration(1)))
+ assert(new Duration(1000) > new Duration(999))
+ assert(!(new Duration(1000) > new Duration(1000)))
+ }
+
+ test("greaterEq") {
+ assert(!(new Duration(999) >= new Duration(1000)))
+ assert(!(new Duration(0) >= new Duration(1)))
+ assert(new Duration(1000) >= new Duration(999))
+ assert(new Duration(1000) >= new Duration(1000))
+ }
+
+ test("plus") {
+ assert((new Duration(1000) + new Duration(100)) == new Duration(1100))
+ assert((new Duration(1000) + new Duration(0)) == new Duration(1000))
+ }
+
+ test("minus") {
+ assert((new Duration(1000) - new Duration(100)) == new Duration(900))
+ assert((new Duration(1000) - new Duration(0)) == new Duration(1000))
+ assert((new Duration(1000) - new Duration(1000)) == new Duration(0))
+ }
+
+ test("times") {
+ assert((new Duration(100) * 2) == new Duration(200))
+ assert((new Duration(100) * 1) == new Duration(100))
+ assert((new Duration(100) * 0) == new Duration(0))
+ }
+
+ test("div") {
+ assert((new Duration(1000) / new Duration(5)) == 200.0)
+ assert((new Duration(1000) / new Duration(1)) == 1000.0)
+ assert((new Duration(1000) / new Duration(1000)) == 1.0)
+ assert((new Duration(1000) / new Duration(2000)) == 0.5)
+ }
+
+ test("isMultipleOf") {
+ assert(new Duration(1000).isMultipleOf(new Duration(5)))
+ assert(new Duration(1000).isMultipleOf(new Duration(1000)))
+ assert(new Duration(1000).isMultipleOf(new Duration(1)))
+ assert(!new Duration(1000).isMultipleOf(new Duration(6)))
+ }
+
+ test("min") {
+ assert(new Duration(999).min(new Duration(1000)) == new Duration(999))
+ assert(new Duration(1000).min(new Duration(999)) == new Duration(999))
+ assert(new Duration(1000).min(new Duration(1000)) == new Duration(1000))
+ }
+
+ test("max") {
+ assert(new Duration(999).max(new Duration(1000)) == new Duration(1000))
+ assert(new Duration(1000).max(new Duration(999)) == new Duration(1000))
+ assert(new Duration(1000).max(new Duration(1000)) == new Duration(1000))
+ }
+
+ test("isZero") {
+ assert(new Duration(0).isZero)
+ assert(!(new Duration(1).isZero))
+ }
+
+ test("Milliseconds") {
+ assert(new Duration(100) == Milliseconds(100))
+ }
+
+ test("Seconds") {
+ assert(new Duration(30 * 1000) == Seconds(30))
+ }
+
+ test("Minutes") {
+ assert(new Duration(2 * 60 * 1000) == Minutes(2))
+ }
+
+}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala
new file mode 100644
index 0000000000..5579ac3643
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+class TimeSuite extends TestSuiteBase {
+
+ test("less") {
+ assert(new Time(999) < new Time(1000))
+ assert(new Time(0) < new Time(1))
+ assert(!(new Time(1000) < new Time(999)))
+ assert(!(new Time(1000) < new Time(1000)))
+ }
+
+ test("lessEq") {
+ assert(new Time(999) <= new Time(1000))
+ assert(new Time(0) <= new Time(1))
+ assert(!(new Time(1000) <= new Time(999)))
+ assert(new Time(1000) <= new Time(1000))
+ }
+
+ test("greater") {
+ assert(!(new Time(999) > new Time(1000)))
+ assert(!(new Time(0) > new Time(1)))
+ assert(new Time(1000) > new Time(999))
+ assert(!(new Time(1000) > new Time(1000)))
+ }
+
+ test("greaterEq") {
+ assert(!(new Time(999) >= new Time(1000)))
+ assert(!(new Time(0) >= new Time(1)))
+ assert(new Time(1000) >= new Time(999))
+ assert(new Time(1000) >= new Time(1000))
+ }
+
+ test("plus") {
+ assert((new Time(1000) + new Duration(100)) == new Time(1100))
+ assert((new Time(1000) + new Duration(0)) == new Time(1000))
+ }
+
+ test("minus Time") {
+ assert((new Time(1000) - new Time(100)) == new Duration(900))
+ assert((new Time(1000) - new Time(0)) == new Duration(1000))
+ assert((new Time(1000) - new Time(1000)) == new Duration(0))
+ }
+
+ test("minus Duration") {
+ assert((new Time(1000) - new Duration(100)) == new Time(900))
+ assert((new Time(1000) - new Duration(0)) == new Time(1000))
+ assert((new Time(1000) - new Duration(1000)) == new Time(0))
+ }
+
+ test("floor") {
+ assert(new Time(1350).floor(new Duration(200)) == new Time(1200))
+ assert(new Time(1200).floor(new Duration(200)) == new Time(1200))
+ assert(new Time(199).floor(new Duration(200)) == new Time(0))
+ assert(new Time(1).floor(new Duration(1)) == new Time(1))
+ }
+
+ test("isMultipleOf") {
+ assert(new Time(1000).isMultipleOf(new Duration(5)))
+ assert(new Time(1000).isMultipleOf(new Duration(1000)))
+ assert(new Time(1000).isMultipleOf(new Duration(1)))
+ assert(!new Time(1000).isMultipleOf(new Duration(6)))
+ }
+
+ test("min") {
+ assert(new Time(999).min(new Time(1000)) == new Time(999))
+ assert(new Time(1000).min(new Time(999)) == new Time(999))
+ assert(new Time(1000).min(new Time(1000)) == new Time(1000))
+ }
+
+ test("max") {
+ assert(new Time(999).max(new Time(1000)) == new Time(1000))
+ assert(new Time(1000).max(new Time(999)) == new Time(1000))
+ assert(new Time(1000).max(new Time(1000)) == new Time(1000))
+ }
+
+ test("until") {
+ assert(new Time(1000).until(new Time(1100), new Duration(100)) ==
+ Seq(Time(1000)))
+ assert(new Time(1000).until(new Time(1000), new Duration(100)) ==
+ Seq())
+ assert(new Time(1000).until(new Time(1100), new Duration(30)) ==
+ Seq(Time(1000), Time(1030), Time(1060), Time(1090)))
+ }
+
+ test("to") {
+ assert(new Time(1000).to(new Time(1100), new Duration(100)) ==
+ Seq(Time(1000), Time(1100)))
+ assert(new Time(1000).to(new Time(1000), new Duration(100)) ==
+ Seq(Time(1000)))
+ assert(new Time(1000).to(new Time(1100), new Duration(30)) ==
+ Seq(Time(1000), Time(1030), Time(1060), Time(1090)))
+ }
+
+}