aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2014-12-25 19:46:05 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-12-25 19:46:05 -0800
commitf9ed2b6641b9df39cee4b98a33cd5a3ddda2d146 (patch)
tree3f07c19a7e12db441ff1eba906ac5b671f644fe3 /streaming
parentf205fe477c33a541053c198cd43a5811d6cf9fe2 (diff)
downloadspark-f9ed2b6641b9df39cee4b98a33cd5a3ddda2d146.tar.gz
spark-f9ed2b6641b9df39cee4b98a33cd5a3ddda2d146.tar.bz2
spark-f9ed2b6641b9df39cee4b98a33cd5a3ddda2d146.zip
[SPARK-4608][Streaming] Reorganize StreamingContext implicit to improve API convenience
There is only one implicit function `toPairDStreamFunctions` in `StreamingContext`. This PR did similar reorganization like [SPARK-4397](https://issues.apache.org/jira/browse/SPARK-4397). Compiled the following codes with Spark Streaming 1.1.0 and ran it with this PR. Everything is fine. ```Scala import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ object StreamingApp { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local[2]").setAppName("FileWordCount") val ssc = new StreamingContext(conf, Seconds(10)) val lines = ssc.textFileStream("/some/path") val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } } ``` Author: zsxwing <zsxwing@gmail.com> Closes #3464 from zsxwing/SPARK-4608 and squashes the following commits: aa6d44a [zsxwing] Fix a copy-paste error f74c190 [zsxwing] Merge branch 'master' into SPARK-4608 e6f9cc9 [zsxwing] Update the docs 27833bb [zsxwing] Remove `import StreamingContext._` c15162c [zsxwing] Reorganize StreamingContext implicit to improve API convenience
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala22
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/package.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streamingtest/ImplicitSuite.scala35
11 files changed, 58 insertions, 21 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ecab5510a8..8ef0787137 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.Map
import scala.collection.mutable.Queue
-import scala.language.implicitConversions
import scala.reflect.ClassTag
import akka.actor.{Props, SupervisorStrategy}
@@ -523,9 +522,11 @@ object StreamingContext extends Logging {
private[streaming] val DEFAULT_CLEANER_TTL = 3600
- implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
+ @deprecated("Replaced by implicit functions in the DStream companion object. This is " +
+ "kept here only for backward compatibility.", "1.3.0")
+ def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
- new PairDStreamFunctions[K, V](stream)
+ DStream.toPairDStreamFunctions(stream)(kt, vt, ord)
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index bb44b906d7..de124cf40e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -36,7 +36,6 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
-import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
/**
@@ -815,6 +814,6 @@ object JavaPairDStream {
def scalaToJavaLong[K: ClassTag](dstream: JavaPairDStream[K, Long])
: JavaPairDStream[K, JLong] = {
- StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
+ DStream.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index dbf1ebbaf6..7f8651e719 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -20,8 +20,8 @@ package org.apache.spark.streaming.dstream
import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
-import scala.deprecated
import scala.collection.mutable.HashMap
+import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.util.matching.Regex
@@ -29,7 +29,7 @@ import org.apache.spark.{Logging, SparkException}
import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
-import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.StreamingContext.rddToFileName
import org.apache.spark.streaming.scheduler.Job
import org.apache.spark.util.{CallSite, MetadataCleaner, Utils}
@@ -48,8 +48,7 @@ import org.apache.spark.util.{CallSite, MetadataCleaner, Utils}
* `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
* operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and
* `join`. These operations are automatically available on any DStream of pairs
- * (e.g., DStream[(Int, Int)] through implicit conversions when
- * `org.apache.spark.streaming.StreamingContext._` is imported.
+ * (e.g., DStream[(Int, Int)] through implicit conversions.
*
* DStreams internally is characterized by a few basic properties:
* - A list of other DStreams that the DStream depends on
@@ -802,10 +801,21 @@ abstract class DStream[T: ClassTag] (
}
}
-private[streaming] object DStream {
+object DStream {
+
+ // `toPairDStreamFunctions` was in SparkContext before 1.3 and users had to
+ // `import StreamingContext._` to enable it. Now we move it here to make the compiler find
+ // it automatically. However, we still keep the old function in StreamingContext for backward
+ // compatibility and forward to the following function directly.
+
+ implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
+ (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null):
+ PairDStreamFunctions[K, V] = {
+ new PairDStreamFunctions[K, V](stream)
+ }
/** Get the creation site of a DStream from the stack trace of when the DStream is created. */
- def getCreationSite(): CallSite = {
+ private[streaming] def getCreationSite(): CallSite = {
val SPARK_CLASS_REGEX = """^org\.apache\.spark""".r
val SPARK_STREAMING_TESTCLASS_REGEX = """^org\.apache\.spark\.streaming\.test""".r
val SPARK_EXAMPLES_CLASS_REGEX = """^org\.apache\.spark\.examples""".r
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 98539e06b4..8a58571632 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -27,12 +27,10 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.spark.{HashPartitioner, Partitioner, SerializableWritable}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, Time}
-import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.StreamingContext.rddToFileName
/**
* Extra functions available on DStream of (key, value) pairs through an implicit conversion.
- * Import `org.apache.spark.streaming.StreamingContext._` at the top of your program to use
- * these functions.
*/
class PairDStreamFunctions[K, V](self: DStream[(K,V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K])
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index 1a47089e51..c0a5af0b65 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -17,8 +17,6 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.streaming.StreamingContext._
-
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{CoGroupedRDD, MapPartitionsRDD}
import org.apache.spark.Partitioner
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/package.scala b/streaming/src/main/scala/org/apache/spark/streaming/package.scala
index 4dd985cf5a..2153ae0d34 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/package.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/package.scala
@@ -26,7 +26,7 @@ package org.apache.spark
* available only on DStreams
* of key-value pairs, such as `groupByKey` and `reduceByKey`. These operations are automatically
* available on any DStream of the right type (e.g. DStream[(Int, Int)] through implicit
- * conversions when you `import org.apache.spark.streaming.StreamingContext._`.
+ * conversions.
*
* For the Java API of Spark Streaming, take a look at the
* [[org.apache.spark.streaming.api.java.JavaStreamingContext]] which serves as the entry point, and
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 86b96785d7..199f5e7161 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -28,7 +28,6 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.{DStream, WindowedDStream}
import org.apache.spark.HashPartitioner
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index c97998add8..72d055eb2e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -30,7 +30,6 @@ import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
-import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index 5dbb723200..e0f14fd954 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -20,7 +20,6 @@ package org.apache.spark.streaming
import org.apache.spark.Logging
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.util.Utils
-import org.apache.spark.streaming.StreamingContext._
import scala.util.Random
import scala.collection.mutable.ArrayBuffer
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index 471c99fab4..a5d2bb2fde 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -17,7 +17,6 @@
package org.apache.spark.streaming
-import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.storage.StorageLevel
diff --git a/streaming/src/test/scala/org/apache/spark/streamingtest/ImplicitSuite.scala b/streaming/src/test/scala/org/apache/spark/streamingtest/ImplicitSuite.scala
new file mode 100644
index 0000000000..d0bf328f2b
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streamingtest/ImplicitSuite.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streamingtest
+
+/**
+ * A test suite to make sure all `implicit` functions work correctly.
+ *
+ * As `implicit` is a compiler feature, we don't need to run this class.
+ * What we need to do is making the compiler happy.
+ */
+class ImplicitSuite {
+
+ // We only want to test if `implict` works well with the compiler, so we don't need a real DStream.
+ def mockDStream[T]: org.apache.spark.streaming.dstream.DStream[T] = null
+
+ def testToPairDStreamFunctions(): Unit = {
+ val dstream: org.apache.spark.streaming.dstream.DStream[(Int, Int)] = mockDStream
+ dstream.groupByKey()
+ }
+}