aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorLuc Bourlier <luc.bourlier@typesafe.com>2015-09-09 09:57:58 +0100
committerSean Owen <sowen@cloudera.com>2015-09-09 09:57:58 +0100
commitc1bc4f439f54625c01a585691e5293cd9961eb0c (patch)
tree4b3688eae83147aa50d2a55524f8eabfaae242d0 /streaming/src/main
parent91a577d2778ab5946f0c40cb80c89de24e3d10e8 (diff)
downloadspark-c1bc4f439f54625c01a585691e5293cd9961eb0c.tar.gz
spark-c1bc4f439f54625c01a585691e5293cd9961eb0c.tar.bz2
spark-c1bc4f439f54625c01a585691e5293cd9961eb0c.zip
[SPARK-10227] fatal warnings with sbt on Scala 2.11
The bulk of the changes are on `transient` annotation on class parameter. Often the compiler doesn't generate a field for this parameters, so the the transient annotation would be unnecessary. But if the class parameter are used in methods, then fields are created. So it is safer to keep the annotations. The remainder are some potential bugs, and deprecated syntax. Author: Luc Bourlier <luc.bourlier@typesafe.com> Closes #8433 from skyluc/issue/sbt-2.11.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala22
10 files changed, 26 insertions, 26 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 27024ecfd9..8a6050f522 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -32,7 +32,7 @@ import org.apache.spark.streaming.scheduler.JobGenerator
private[streaming]
-class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
+class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
extends Logging with Serializable {
val master = ssc.sc.master
val framework = ssc.sc.appName
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
index 2c373640d2..dfc569451d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
@@ -170,7 +170,7 @@ private[python] object PythonDStream {
*/
private[python] abstract class PythonDStream(
parent: DStream[_],
- @transient pfunc: PythonTransformFunction)
+ pfunc: PythonTransformFunction)
extends DStream[Array[Byte]] (parent.ssc) {
val func = new TransformFunction(pfunc)
@@ -187,7 +187,7 @@ private[python] abstract class PythonDStream(
*/
private[python] class PythonTransformedDStream (
parent: DStream[_],
- @transient pfunc: PythonTransformFunction)
+ pfunc: PythonTransformFunction)
extends PythonDStream(parent, pfunc) {
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
@@ -206,7 +206,7 @@ private[python] class PythonTransformedDStream (
private[python] class PythonTransformed2DStream(
parent: DStream[_],
parent2: DStream[_],
- @transient pfunc: PythonTransformFunction)
+ pfunc: PythonTransformFunction)
extends DStream[Array[Byte]] (parent.ssc) {
val func = new TransformFunction(pfunc)
@@ -230,7 +230,7 @@ private[python] class PythonTransformed2DStream(
*/
private[python] class PythonStateDStream(
parent: DStream[Array[Byte]],
- @transient reduceFunc: PythonTransformFunction)
+ reduceFunc: PythonTransformFunction)
extends PythonDStream(parent, reduceFunc) {
super.persist(StorageLevel.MEMORY_ONLY)
@@ -252,8 +252,8 @@ private[python] class PythonStateDStream(
*/
private[python] class PythonReducedWindowedDStream(
parent: DStream[Array[Byte]],
- @transient preduceFunc: PythonTransformFunction,
- @transient pinvReduceFunc: PythonTransformFunction,
+ preduceFunc: PythonTransformFunction,
+ @transient private val pinvReduceFunc: PythonTransformFunction,
_windowDuration: Duration,
_slideDuration: Duration)
extends PythonDStream(parent, preduceFunc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index c358f5b5bd..40208a6486 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -70,7 +70,7 @@ import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Uti
*/
private[streaming]
class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
directory: String,
filter: Path => Boolean = FileInputDStream.defaultFilter,
newFilesOnly: Boolean = true,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index a6c4cd220e..95994c983c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.Utils
*
* @param ssc_ Streaming context that will execute this input stream
*/
-abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
+abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext)
extends DStream[T](ssc_) {
private[streaming] var lastValidTime: Time = null
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
index 186e1bf03a..002aac9f43 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
@@ -23,7 +23,7 @@ import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class PluggableInputDStream[T: ClassTag](
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
receiver: Receiver[T]) extends ReceiverInputDStream[T](ssc_) {
def getReceiver(): Receiver[T] = {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index bab78a3536..a2685046e0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -27,7 +27,7 @@ import org.apache.spark.streaming.{Time, StreamingContext}
private[streaming]
class QueueInputDStream[T: ClassTag](
- @transient ssc: StreamingContext,
+ ssc: StreamingContext,
val queue: Queue[RDD[T]],
oneAtATime: Boolean,
defaultRDD: RDD[T]
@@ -57,7 +57,7 @@ class QueueInputDStream[T: ClassTag](
if (oneAtATime) {
Some(buffer.head)
} else {
- Some(new UnionRDD(ssc.sc, buffer.toSeq))
+ Some(new UnionRDD(context.sc, buffer.toSeq))
}
} else if (defaultRDD != null) {
Some(defaultRDD)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
index e2925b9e03..5a9eda7c12 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
@@ -39,7 +39,7 @@ import org.apache.spark.streaming.receiver.Receiver
*/
private[streaming]
class RawInputDStream[T: ClassTag](
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 6c139f32da..87c20afd5c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -38,7 +38,7 @@ import org.apache.spark.streaming.{StreamingContext, Time}
* @param ssc_ Streaming context that will execute this input stream
* @tparam T Class type of the object of this stream
*/
-abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
+abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index 5ce5b7aae6..de84e0c9a4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -32,7 +32,7 @@ import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class SocketInputDStream[T: ClassTag](
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index e081ffe46f..f811784b25 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -61,7 +61,7 @@ class WriteAheadLogBackedBlockRDDPartition(
*
*
* @param sc SparkContext
- * @param blockIds Ids of the blocks that contains this RDD's data
+ * @param _blockIds Ids of the blocks that contains this RDD's data
* @param walRecordHandles Record handles in write ahead logs that contain this RDD's data
* @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
* executors). If not, then block lookups by the block ids will be skipped.
@@ -73,23 +73,23 @@ class WriteAheadLogBackedBlockRDDPartition(
*/
private[streaming]
class WriteAheadLogBackedBlockRDD[T: ClassTag](
- @transient sc: SparkContext,
- @transient blockIds: Array[BlockId],
+ sc: SparkContext,
+ @transient private val _blockIds: Array[BlockId],
@transient val walRecordHandles: Array[WriteAheadLogRecordHandle],
- @transient isBlockIdValid: Array[Boolean] = Array.empty,
+ @transient private val isBlockIdValid: Array[Boolean] = Array.empty,
storeInBlockManager: Boolean = false,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER)
- extends BlockRDD[T](sc, blockIds) {
+ extends BlockRDD[T](sc, _blockIds) {
require(
- blockIds.length == walRecordHandles.length,
- s"Number of block Ids (${blockIds.length}) must be " +
+ _blockIds.length == walRecordHandles.length,
+ s"Number of block Ids (${_blockIds.length}) must be " +
s" same as number of WAL record handles (${walRecordHandles.length})")
require(
- isBlockIdValid.isEmpty || isBlockIdValid.length == blockIds.length,
+ isBlockIdValid.isEmpty || isBlockIdValid.length == _blockIds.length,
s"Number of elements in isBlockIdValid (${isBlockIdValid.length}) must be " +
- s" same as number of block Ids (${blockIds.length})")
+ s" same as number of block Ids (${_blockIds.length})")
// Hadoop configuration is not serializable, so broadcast it as a serializable.
@transient private val hadoopConfig = sc.hadoopConfiguration
@@ -99,9 +99,9 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
override def getPartitions: Array[Partition] = {
assertValid()
- Array.tabulate(blockIds.length) { i =>
+ Array.tabulate(_blockIds.length) { i =>
val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i)
- new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), isValid, walRecordHandles(i))
+ new WriteAheadLogBackedBlockRDDPartition(i, _blockIds(i), isValid, walRecordHandles(i))
}
}