aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-02-19 15:35:23 -0800
committerAndrew Or <andrew@databricks.com>2015-02-19 15:35:23 -0800
commit34b7c35380c88569a1396fb4ed991a0bed4288e7 (patch)
treedeb25f1dd88477aff0dc19904cbdc9aedc0cc7d8 /external
parentad6b169dee84df175b51933b7a3ad7f0bbc52cf3 (diff)
downloadspark-34b7c35380c88569a1396fb4ed991a0bed4288e7.tar.gz
spark-34b7c35380c88569a1396fb4ed991a0bed4288e7.tar.bz2
spark-34b7c35380c88569a1396fb4ed991a0bed4288e7.zip
SPARK-4682 [CORE] Consolidate various 'Clock' classes
Another one from JoshRosen 's wish list. The first commit is much smaller and removes 2 of the 4 Clock classes. The second is much larger, necessary for consolidating the streaming one. I put together implementations in the way that seemed simplest. Almost all the change is standardizing class and method names. Author: Sean Owen <sowen@cloudera.com> Closes #4514 from srowen/SPARK-4682 and squashes the following commits: 5ed3a03 [Sean Owen] Javadoc Clock classes; make ManualClock private[spark] 169dd13 [Sean Owen] Add support for legacy org.apache.spark.streaming clock class names 277785a [Sean Owen] Reduce the net change in this patch by reversing some unnecessary syntax changes along the way b5e53df [Sean Owen] FakeClock -> ManualClock; getTime() -> getTimeMillis() 160863a [Sean Owen] Consolidate Streaming Clock class into common util Clock 7c956b2 [Sean Owen] Consolidate Clocks except for Streaming Clock
Diffstat (limited to 'external')
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java2
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala7
-rw-r--r--external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java2
-rw-r--r--external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java2
-rw-r--r--external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java2
5 files changed, 7 insertions, 8 deletions
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 1e24da7f5f..cfedb5a042 100644
--- a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -31,7 +31,7 @@ public abstract class LocalJavaStreamingContext {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("checkpoint");
}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index b57a1c71e3..e04d4088df 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -34,10 +34,9 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.{Seconds, TestOutputStream, StreamingContext}
import org.apache.spark.streaming.flume.sink._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ManualClock, Utils}
class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging {
@@ -54,7 +53,7 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
def beforeFunction() {
logInfo("Using manual clock")
- conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
+ conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
}
before(beforeFunction())
@@ -236,7 +235,7 @@ class FlumePollingStreamSuite extends FunSuite with BeforeAndAfter with Logging
tx.commit()
tx.close()
Thread.sleep(500) // Allow some time for the events to reach
- clock.addToTime(batchDuration.milliseconds)
+ clock.advance(batchDuration.milliseconds)
}
null
}
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 1e24da7f5f..cfedb5a042 100644
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -31,7 +31,7 @@ public abstract class LocalJavaStreamingContext {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("checkpoint");
}
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 1e24da7f5f..cfedb5a042 100644
--- a/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/external/twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -31,7 +31,7 @@ public abstract class LocalJavaStreamingContext {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("checkpoint");
}
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
index 1e24da7f5f..cfedb5a042 100644
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -31,7 +31,7 @@ public abstract class LocalJavaStreamingContext {
SparkConf conf = new SparkConf()
.setMaster("local[2]")
.setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("checkpoint");
}