aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-13 12:18:05 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-13 12:18:05 -0800
commitb93f9d42f21f03163734ef97b2871db945e166da (patch)
treeec06b7bd5e6dd7c61061995c94f8d75661c58918
parente6ed13f255d70de422711b979447690cdab7423b (diff)
parentffa1d38ef19a7d5c5c2fc173d1d2f54267449f80 (diff)
downloadspark-b93f9d42f21f03163734ef97b2871db945e166da.tar.gz
spark-b93f9d42f21f03163734ef97b2871db945e166da.tar.bz2
spark-b93f9d42f21f03163734ef97b2871db945e166da.zip
Merge pull request #400 from tdas/dstream-move
Moved DStream and PairDSream to org.apache.spark.streaming.dstream Similar to the package location of `org.apache.spark.rdd.RDD`, `DStream` has been moved from `org.apache.spark.streaming.DStream` to `org.apache.spark.streaming.dstream.DStream`. I know that the package name is a little long, but I think its better to keep it consistent with Spark's structure. Also fixed persistence of windowed DStream. The RDDs generated generated by windowed DStream are essentially unions of underlying RDDs, and persistent these union RDDs would store numerous copies of the underlying data. Instead setting the persistence level on the windowed DStream is made to set the persistence level of the underlying DStream.
-rw-r--r--docs/streaming-programming-guide.md2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala2
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala4
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala6
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala4
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala7
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/DStream.scala)13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala)10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala)3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala17
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala15
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala50
40 files changed, 121 insertions, 78 deletions
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 3273817c78..4e8a680a75 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -167,7 +167,7 @@ Spark Streaming features windowed computations, which allow you to apply transfo
</tr>
</table>
-A complete list of DStream operations is available in the API documentation of [DStream](api/streaming/index.html#org.apache.spark.streaming.DStream) and [PairDStreamFunctions](api/streaming/index.html#org.apache.spark.streaming.PairDStreamFunctions).
+A complete list of DStream operations is available in the API documentation of [DStream](api/streaming/index.html#org.apache.spark.streaming.dstream.DStream) and [PairDStreamFunctions](api/streaming/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions).
## Output Operations
When an output operator is called, it triggers the computation of a stream. Currently the following output operators are defined:
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
index d51e6e9418..8c5d0bd568 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
@@ -82,7 +82,7 @@ object RecoverableNetworkWordCount {
val lines = ssc.socketTextStream(ip, port)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.foreach((rdd: RDD[(String, Int)], time: Time) => {
+ wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")
println(counts)
println("Appending to " + outputFile.getAbsolutePath)
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 834b775d4f..d53b66dd46 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -18,8 +18,9 @@
package org.apache.spark.streaming.flume
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
+import org.apache.spark.streaming.dstream.DStream
object FlumeUtils {
/**
@@ -42,6 +43,7 @@ object FlumeUtils {
/**
* Creates a input stream from a Flume source.
+ * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
*/
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index c2d851f943..37c03be4e7 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -26,8 +26,9 @@ import java.util.{Map => JMap}
import kafka.serializer.{Decoder, StringDecoder}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
+import org.apache.spark.streaming.dstream.DStream
object KafkaUtils {
@@ -77,6 +78,7 @@ object KafkaUtils {
/**
* Create an input stream that pulls messages form a Kafka Broker.
+ * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
* @param groupId The group id for this consumer
@@ -127,7 +129,7 @@ object KafkaUtils {
* see http://kafka.apache.org/08/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread
- * @param storageLevel RDD storage level. Defaults to MEMORY_AND_DISK_2.
+ * @param storageLevel RDD storage level.
*/
def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
jssc: JavaStreamingContext,
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index 0e6c25dbee..3636e46bb8 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -18,9 +18,10 @@
package org.apache.spark.streaming.mqtt
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
import scala.reflect.ClassTag
+import org.apache.spark.streaming.dstream.DStream
object MQTTUtils {
/**
@@ -43,6 +44,7 @@ object MQTTUtils {
/**
* Create an input stream that receives messages pushed by a MQTT publisher.
+ * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
* @param brokerUrl Url of remote MQTT publisher
* @param topic Topic name to subscribe to
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
index 5e506ffabc..b8bae7b6d3 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
@@ -20,8 +20,9 @@ package org.apache.spark.streaming.twitter
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.DStream
object TwitterUtils {
/**
@@ -50,6 +51,7 @@ object TwitterUtils {
* OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
* twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
* twitter4j.oauth.accessTokenSecret.
+ * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
*/
def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = {
@@ -61,6 +63,7 @@ object TwitterUtils {
* OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey,
* twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and
* twitter4j.oauth.accessTokenSecret.
+ * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
* @param filters Set of filter strings to get only those tweets that match them
*/
@@ -87,6 +90,7 @@ object TwitterUtils {
/**
* Create a input stream that returns tweets received from Twitter.
+ * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
* @param twitterAuth Twitter4J Authorization
*/
@@ -96,6 +100,7 @@ object TwitterUtils {
/**
* Create a input stream that returns tweets received from Twitter.
+ * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
* @param twitterAuth Twitter4J Authorization
* @param filters Set of filter strings to get only those tweets that match them
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index 546d9df3b5..7a14b3d2bf 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -25,8 +25,9 @@ import akka.zeromq.Subscribe
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
-import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
+import org.apache.spark.streaming.dstream.DStream
object ZeroMQUtils {
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 668e5324e6..8faa79f8c7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -17,11 +17,11 @@
package org.apache.spark.streaming
-import org.apache.spark.streaming.dstream.{NetworkInputDStream, InputDStream}
+import scala.collection.mutable.ArrayBuffer
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
-import collection.mutable.ArrayBuffer
import org.apache.spark.Logging
import org.apache.spark.streaming.scheduler.Job
+import org.apache.spark.streaming.dstream.{DStream, NetworkInputDStream, InputDStream}
final private[streaming] class DStreamGraph extends Serializable with Logging {
@@ -78,7 +78,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
def remember(duration: Duration) {
this.synchronized {
if (rememberDuration != null) {
- throw new Exception("Batch duration already set as " + batchDuration +
+ throw new Exception("Remember duration already set as " + batchDuration +
". cannot set it again.")
}
rememberDuration = duration
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ee83ae902b..7b27933403 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -168,7 +168,7 @@ class StreamingContext private[streaming] (
}
/**
- * Set the context to periodically checkpoint the DStream operations for master
+ * Set the context to periodically checkpoint the DStream operations for driver
* fault-tolerance.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored.
* Note that this must be a fault-tolerant file system like HDFS for
@@ -220,7 +220,7 @@ class StreamingContext private[streaming] (
def actorStream[T: ClassTag](
props: Props,
name: String,
- storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
): DStream[T] = {
networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
@@ -272,6 +272,7 @@ class StreamingContext private[streaming] (
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
* @tparam T Type of the objects in the received blocks
*/
def rawSocketStream[T: ClassTag](
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index d29033df32..c92854ccd9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@ -17,13 +17,14 @@
package org.apache.spark.streaming.api.java
-import org.apache.spark.streaming.{Duration, Time, DStream}
+import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
+import org.apache.spark.streaming.dstream.DStream
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index cea4795eb5..1ec4492bca 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -30,6 +30,7 @@ import org.apache.spark.api.java.function.{Function3 => JFunction3, _}
import java.util
import org.apache.spark.rdd.RDD
import JavaDStream._
+import org.apache.spark.streaming.dstream.DStream
trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]
extends Serializable {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 6c3467d405..6bb985ca54 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -35,6 +35,7 @@ import org.apache.spark.storage.StorageLevel
import com.google.common.base.Optional
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PairRDDFunctions
+import org.apache.spark.streaming.dstream.DStream
class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
implicit val kManifest: ClassTag[K],
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index b4c46f5e50..a2f0b88cb0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -36,6 +36,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.scheduler.StreamingListener
import org.apache.hadoop.conf.Configuration
+import org.apache.spark.streaming.dstream.DStream
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -150,7 +151,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
- * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
: JavaDStream[String] = {
@@ -160,7 +160,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Create a input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
- * lines.
+ * lines. Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
*/
@@ -301,6 +301,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Create an input stream with any arbitrary user implemented actor receiver.
+ * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param props Props object defining creation of the actor
* @param name Name of the actor
*
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index f760093579..a7c4cca7ea 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -15,22 +15,23 @@
* limitations under the License.
*/
-package org.apache.spark.streaming
+package org.apache.spark.streaming.dstream
-import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
import scala.deprecated
import scala.collection.mutable.HashMap
import scala.reflect.ClassTag
-import StreamingContext._
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.scheduler.Job
import org.apache.spark.util.MetadataCleaner
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.scheduler.Job
+import org.apache.spark.streaming.Duration
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
@@ -42,7 +43,7 @@ import org.apache.spark.util.MetadataCleaner
* by a parent DStream.
*
* This class contains the basic operations available on all DStreams, such as `map`, `filter` and
- * `window`. In addition, [[org.apache.spark.streaming.PairDStreamFunctions]] contains operations available
+ * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains operations available
* only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations
* are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through
* implicit conversions when `spark.streaming.StreamingContext._` is imported.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 671f7bbce7..2da4127f47 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -15,17 +15,15 @@
* limitations under the License.
*/
-package org.apache.spark.streaming
+package org.apache.spark.streaming.dstream
-import scala.collection.mutable.{HashMap, HashSet}
+import scala.collection.mutable.HashMap
import scala.reflect.ClassTag
-
+import java.io.{ObjectInputStream, IOException}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
-
import org.apache.spark.Logging
-
-import java.io.{ObjectInputStream, IOException}
+import org.apache.spark.streaming.Time
private[streaming]
class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index f10d483634..37c46b26a5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.UnionRDD
-import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
+import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.util.TimeStampedHashMap
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
index db2e0a4cee..c81534ae58 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FilteredDStream.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
index 244dc3ee4f..6586234554 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMapValuedDStream.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import scala.reflect.ClassTag
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
index 336c4b7a92..c7bb2833ea 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlatMappedDStream.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index 364abcde68..905bc723f6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -18,7 +18,7 @@
package org.apache.spark.streaming.dstream
import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.streaming.scheduler.Job
import scala.reflect.ClassTag
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
index 23136f44fa..a9bb51f054 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/GlommedDStream.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 8f84232cab..a1075ad304 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.streaming.{Time, Duration, StreamingContext, DStream}
+import org.apache.spark.streaming.{Time, Duration, StreamingContext}
import scala.reflect.ClassTag
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
index 8a04060e5b..3d8ee29df1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapPartitionedDStream.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
index 0ce364fd46..7aea1f945d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapValuedDStream.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import scala.reflect.ClassTag
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
index c0b7491d09..02704a8d1c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/MappedDStream.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 69d80c3711..6b3e48382e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming
+package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream._
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.conf.Configuration
+import org.apache.spark.streaming.{Time, Duration}
class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
extends Serializable {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index db56345ca8..7a6b1ea35e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -26,7 +26,7 @@ import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.streaming.{Duration, Interval, Time, DStream}
+import org.apache.spark.streaming.{Duration, Interval, Time}
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
index 84e69f277b..880a89bc36 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
import scala.reflect.ClassTag
private[streaming]
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index b34ba7b9b4..9d8889b655 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.Partitioner
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Duration, Time, DStream}
+import org.apache.spark.streaming.{Duration, Time}
import scala.reflect.ClassTag
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index aeea060df7..7cd4554282 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -18,7 +18,7 @@
package org.apache.spark.streaming.dstream
import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
import scala.reflect.ClassTag
private[streaming]
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
index 0d84ec84f2..4ecba03ab5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
@@ -17,9 +17,8 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.streaming.{Duration, DStream, Time}
+import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.rdd.RDD
-import collection.mutable.ArrayBuffer
import org.apache.spark.rdd.UnionRDD
import scala.collection.mutable.ArrayBuffer
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 89c43ff935..6301772468 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -32,13 +32,14 @@ class WindowedDStream[T: ClassTag](
extends DStream[T](parent.ssc) {
if (!_windowDuration.isMultipleOf(parent.slideDuration))
- throw new Exception("The window duration of WindowedDStream (" + _slideDuration + ") " +
- "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+ throw new Exception("The window duration of windowed DStream (" + _slideDuration + ") " +
+ "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
if (!_slideDuration.isMultipleOf(parent.slideDuration))
- throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " +
- "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+ throw new Exception("The slide duration of windowed DStream (" + _slideDuration + ") " +
+ "must be a multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
+ // Persist parent level by default, as those RDDs are going to be obviously reused.
parent.persist(StorageLevel.MEMORY_ONLY_SER)
def windowDuration: Duration = _windowDuration
@@ -49,6 +50,14 @@ class WindowedDStream[T: ClassTag](
override def parentRememberDuration: Duration = rememberDuration + windowDuration
+ override def persist(level: StorageLevel): DStream[T] = {
+ // Do not let this windowed DStream be persisted as windowed (union-ed) RDDs share underlying
+ // RDDs and persisting the windowed RDDs would store numerous copies of the underlying data.
+ // Instead control the persistence of the parent DStream.
+ parent.persist(level)
+ this
+ }
+
override def compute(validTime: Time): Option[RDD[T]] = {
val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
val rddsInWindow = parent.slice(currentWindow)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index 592e84791b..be67af3a64 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.util
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
-import org.apache.spark.streaming.dstream.ForEachDStream
+import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
import StreamingContext._
import scala.util.Random
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 9406e0e20a..7037aae234 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark.SparkContext._
import util.ManualClock
import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.streaming.dstream.DStream
class BasicOperationsSuite extends TestSuiteBase {
test("map") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 67ce5bc566..0c68c44ddb 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -26,7 +26,7 @@ import com.google.common.io.Files
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.dstream.FileInputDStream
+import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils
import org.apache.spark.SparkConf
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index a477d200c9..f7f3346f81 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -23,6 +23,7 @@ import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
import org.apache.spark.{SparkException, SparkConf, SparkContext}
import org.apache.spark.util.{Utils, MetadataCleaner}
+import org.apache.spark.streaming.dstream.DStream
class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
@@ -186,7 +187,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
inputStream.map(x => { throw new TestException("error in map task"); x})
- .foreach(_.count)
+ .foreachRDD(_.count)
val exception = intercept[Exception] {
ssc.start()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index fa64142096..9e0f2c900e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming
import org.apache.spark.streaming.scheduler._
import scala.collection.mutable.ArrayBuffer
import org.scalatest.matchers.ShouldMatchers
+import org.apache.spark.streaming.dstream.DStream
class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 9b2bb57e77..535e5bd1f1 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming
-import org.apache.spark.streaming.dstream.{InputDStream, ForEachDStream}
+import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream}
import org.apache.spark.streaming.util.ManualClock
import scala.collection.mutable.ArrayBuffer
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index c39abfc21b..471c99fab4 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -18,6 +18,8 @@
package org.apache.spark.streaming
import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.storage.StorageLevel
class WindowOperationsSuite extends TestSuiteBase {
@@ -143,6 +145,19 @@ class WindowOperationsSuite extends TestSuiteBase {
Seconds(3)
)
+ test("window - persistence level") {
+ val input = Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5))
+ val ssc = new StreamingContext(conf, batchDuration)
+ val inputStream = new TestInputStream[Int](ssc, input, 1)
+ val windowStream1 = inputStream.window(batchDuration * 2)
+ assert(windowStream1.storageLevel === StorageLevel.NONE)
+ assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY_SER)
+ windowStream1.persist(StorageLevel.MEMORY_ONLY)
+ assert(windowStream1.storageLevel === StorageLevel.NONE)
+ assert(inputStream.storageLevel === StorageLevel.MEMORY_ONLY)
+ ssc.stop()
+ }
+
// Testing naive reduceByKeyAndWindow (without invertible function)
testReduceByKeyAndWindow(
diff --git a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
index f670f65bf5..4886cd6ea8 100644
--- a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
@@ -24,8 +24,9 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark._
import org.apache.spark.api.java._
import org.apache.spark.rdd.{RDD, DoubleRDDFunctions, PairRDDFunctions, OrderedRDDFunctions}
-import org.apache.spark.streaming.{PairDStreamFunctions, DStream, StreamingContext}
+import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.{DStream, PairDStreamFunctions}
private[spark] abstract class SparkType(val name: String)
@@ -147,7 +148,7 @@ object JavaAPICompletenessChecker {
} else {
ParameterizedType(classOf[JavaRDD[_]].getName, parameters.map(applySubs))
}
- case "org.apache.spark.streaming.DStream" =>
+ case "org.apache.spark.streaming.dstream.DStream" =>
if (parameters(0).name == classOf[Tuple2[_, _]].getName) {
val tupleParams =
parameters(0).asInstanceOf[ParameterizedType].parameters.map(applySubs)
@@ -248,30 +249,29 @@ object JavaAPICompletenessChecker {
"org.apache.spark.SparkContext.getSparkHome",
"org.apache.spark.SparkContext.executorMemoryRequested",
"org.apache.spark.SparkContext.getExecutorStorageStatus",
- "org.apache.spark.streaming.DStream.generatedRDDs",
- "org.apache.spark.streaming.DStream.zeroTime",
- "org.apache.spark.streaming.DStream.rememberDuration",
- "org.apache.spark.streaming.DStream.storageLevel",
- "org.apache.spark.streaming.DStream.mustCheckpoint",
- "org.apache.spark.streaming.DStream.checkpointDuration",
- "org.apache.spark.streaming.DStream.checkpointData",
- "org.apache.spark.streaming.DStream.graph",
- "org.apache.spark.streaming.DStream.isInitialized",
- "org.apache.spark.streaming.DStream.parentRememberDuration",
- "org.apache.spark.streaming.DStream.initialize",
- "org.apache.spark.streaming.DStream.validate",
- "org.apache.spark.streaming.DStream.setContext",
- "org.apache.spark.streaming.DStream.setGraph",
- "org.apache.spark.streaming.DStream.remember",
- "org.apache.spark.streaming.DStream.getOrCompute",
- "org.apache.spark.streaming.DStream.generateJob",
- "org.apache.spark.streaming.DStream.clearOldMetadata",
- "org.apache.spark.streaming.DStream.addMetadata",
- "org.apache.spark.streaming.DStream.updateCheckpointData",
- "org.apache.spark.streaming.DStream.restoreCheckpointData",
- "org.apache.spark.streaming.DStream.isTimeValid",
+ "org.apache.spark.streaming.dstream.DStream.generatedRDDs",
+ "org.apache.spark.streaming.dstream.DStream.zeroTime",
+ "org.apache.spark.streaming.dstream.DStream.rememberDuration",
+ "org.apache.spark.streaming.dstream.DStream.storageLevel",
+ "org.apache.spark.streaming.dstream.DStream.mustCheckpoint",
+ "org.apache.spark.streaming.dstream.DStream.checkpointDuration",
+ "org.apache.spark.streaming.dstream.DStream.checkpointData",
+ "org.apache.spark.streaming.dstream.DStream.graph",
+ "org.apache.spark.streaming.dstream.DStream.isInitialized",
+ "org.apache.spark.streaming.dstream.DStream.parentRememberDuration",
+ "org.apache.spark.streaming.dstream.DStream.initialize",
+ "org.apache.spark.streaming.dstream.DStream.validate",
+ "org.apache.spark.streaming.dstream.DStream.setContext",
+ "org.apache.spark.streaming.dstream.DStream.setGraph",
+ "org.apache.spark.streaming.dstream.DStream.remember",
+ "org.apache.spark.streaming.dstream.DStream.getOrCompute",
+ "org.apache.spark.streaming.dstream.DStream.generateJob",
+ "org.apache.spark.streaming.dstream.DStream.clearOldMetadata",
+ "org.apache.spark.streaming.dstream.DStream.addMetadata",
+ "org.apache.spark.streaming.dstream.DStream.updateCheckpointData",
+ "org.apache.spark.streaming.dstream.DStream.restoreCheckpointData",
+ "org.apache.spark.streaming.dstream.DStream.isTimeValid",
"org.apache.spark.streaming.StreamingContext.nextNetworkInputStreamId",
- "org.apache.spark.streaming.StreamingContext.networkInputTracker",
"org.apache.spark.streaming.StreamingContext.checkpointDir",
"org.apache.spark.streaming.StreamingContext.checkpointDuration",
"org.apache.spark.streaming.StreamingContext.receiverJobThread",