aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuc Bourlier <luc.bourlier@typesafe.com>2015-09-09 09:57:58 +0100
committerSean Owen <sowen@cloudera.com>2015-09-09 09:57:58 +0100
commitc1bc4f439f54625c01a585691e5293cd9961eb0c (patch)
tree4b3688eae83147aa50d2a55524f8eabfaae242d0
parent91a577d2778ab5946f0c40cb80c89de24e3d10e8 (diff)
downloadspark-c1bc4f439f54625c01a585691e5293cd9961eb0c.tar.gz
spark-c1bc4f439f54625c01a585691e5293cd9961eb0c.tar.bz2
spark-c1bc4f439f54625c01a585691e5293cd9961eb0c.zip
[SPARK-10227] fatal warnings with sbt on Scala 2.11
The bulk of the changes are on `transient` annotation on class parameter. Often the compiler doesn't generate a field for this parameters, so the the transient annotation would be unnecessary. But if the class parameter are used in methods, then fields are created. So it is safer to keep the annotations. The remainder are some potential bugs, and deprecated syntax. Author: Luc Bourlier <luc.bourlier@typesafe.com> Closes #8433 from skyluc/issue/sbt-2.11.
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulators.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/Dependency.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/Partitioner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/input/PortableDataStream.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala46
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala2
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala2
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala4
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala2
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala2
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala6
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala12
-rw-r--r--repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala22
60 files changed, 158 insertions, 151 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index c39c8667d0..5592b75afb 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -47,7 +47,7 @@ import org.apache.spark.util.Utils
* @tparam T partial data that can be added in
*/
class Accumulable[R, T] private[spark] (
- @transient initialValue: R,
+ initialValue: R,
param: AccumulableParam[R, T],
val name: Option[String],
internal: Boolean)
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala
index fc8cdde934..cfeeb3902c 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -66,7 +66,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
*/
@DeveloperApi
class ShuffleDependency[K, V, C](
- @transient _rdd: RDD[_ <: Product2[K, V]],
+ @transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 29e581bb57..e4df7af81a 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -104,8 +104,8 @@ class HashPartitioner(partitions: Int) extends Partitioner {
* the value of `partitions`.
*/
class RangePartitioner[K : Ordering : ClassTag, V](
- @transient partitions: Int,
- @transient rdd: RDD[_ <: Product2[K, V]],
+ partitions: Int,
+ rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index f5dd36cbcf..ae5926dd53 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -37,7 +37,7 @@ import org.apache.spark.util.SerializableJobConf
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
private[spark]
-class SparkHadoopWriter(@transient jobConf: JobConf)
+class SparkHadoopWriter(jobConf: JobConf)
extends Logging
with SparkHadoopMapRedUtil
with Serializable {
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index b4d152b336..69da180593 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -41,7 +41,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
import scala.util.control.NonFatal
private[spark] class PythonRDD(
- @transient parent: RDD[_],
+ parent: RDD[_],
command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
@@ -785,7 +785,7 @@ class BytesToString extends org.apache.spark.api.java.function.Function[Array[By
* Internal class that acts as an `AccumulatorParam` for Python accumulators. Inside, it
* collects a list of pickled strings that we pass to Python through a socket.
*/
-private class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
+private class PythonAccumulatorParam(@transient private val serverHost: String, serverPort: Int)
extends AccumulatorParam[JList[Array[Byte]]] {
Utils.checkHost(serverHost, "Expected hostname")
diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index a5ad47293f..e2ffc3b64e 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -131,8 +131,8 @@ private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDat
*/
@Experimental
class PortableDataStream(
- @transient isplit: CombineFileSplit,
- @transient context: TaskAttemptContext,
+ isplit: CombineFileSplit,
+ context: TaskAttemptContext,
index: Integer)
extends Serializable {
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 4b851bcb36..70a42f9045 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -137,7 +137,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
new RpcResponseCallback {
override def onSuccess(response: Array[Byte]): Unit = {
logTrace(s"Successfully uploaded block $blockId")
- result.success()
+ result.success((): Unit)
}
override def onFailure(e: Throwable): Unit = {
logError(s"Error while uploading block $blockId", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
index 1f755db485..6fec00dcd0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
@@ -28,7 +28,7 @@ private[spark] class BinaryFileRDD[T](
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
keyClass: Class[String],
valueClass: Class[T],
- @transient conf: Configuration,
+ conf: Configuration,
minPartitions: Int)
extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) {
@@ -36,10 +36,10 @@ private[spark] class BinaryFileRDD[T](
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
- configurable.setConf(conf)
+ configurable.setConf(getConf)
case _ =>
}
- val jobContext = newJobContext(conf, jobId)
+ val jobContext = newJobContext(getConf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index 9220302637..fc1710fbad 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -28,7 +28,7 @@ private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends P
}
private[spark]
-class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds: Array[BlockId])
+class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId])
extends RDD[T](sc, Nil) {
@transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
@@ -64,7 +64,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
*/
private[spark] def removeBlocks() {
blockIds.foreach { blockId =>
- sc.env.blockManager.master.removeBlock(blockId)
+ sparkContext.env.blockManager.master.removeBlock(blockId)
}
_isValid = false
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
index c1d6971787..18e8cddbc4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
@@ -27,8 +27,8 @@ import org.apache.spark.util.Utils
private[spark]
class CartesianPartition(
idx: Int,
- @transient rdd1: RDD[_],
- @transient rdd2: RDD[_],
+ @transient private val rdd1: RDD[_],
+ @transient private val rdd2: RDD[_],
s1Index: Int,
s2Index: Int
) extends Partition {
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 72fe215dae..b0364623af 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -29,7 +29,7 @@ private[spark] class CheckpointRDDPartition(val index: Int) extends Partition
/**
* An RDD that recovers checkpointed data from storage.
*/
-private[spark] abstract class CheckpointRDD[T: ClassTag](@transient sc: SparkContext)
+private[spark] abstract class CheckpointRDD[T: ClassTag](sc: SparkContext)
extends RDD[T](sc, Nil) {
// CheckpointRDD should not be checkpointed again
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index e1f8719eea..8f2655d63b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -51,7 +51,7 @@ import org.apache.spark.storage.StorageLevel
/**
* A Spark split class that wraps around a Hadoop InputSplit.
*/
-private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSplit)
+private[spark] class HadoopPartition(rddId: Int, idx: Int, s: InputSplit)
extends Partition {
val inputSplit = new SerializableWritable[InputSplit](s)
@@ -99,7 +99,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
*/
@DeveloperApi
class HadoopRDD[K, V](
- @transient sc: SparkContext,
+ sc: SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
@@ -109,7 +109,7 @@ class HadoopRDD[K, V](
extends RDD[(K, V)](sc, Nil) with Logging {
if (initLocalJobConfFuncOpt.isDefined) {
- sc.clean(initLocalJobConfFuncOpt.get)
+ sparkContext.clean(initLocalJobConfFuncOpt.get)
}
def this(
@@ -137,7 +137,7 @@ class HadoopRDD[K, V](
// used to build JobTracker ID
private val createTime = new Date()
- private val shouldCloneJobConf = sc.conf.getBoolean("spark.hadoop.cloneConf", false)
+ private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
index daa5779d68..bfe19195fc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
@@ -35,7 +35,7 @@ import org.apache.spark.storage.RDDBlockId
* @param numPartitions the number of partitions in the checkpointed RDD
*/
private[spark] class LocalCheckpointRDD[T: ClassTag](
- @transient sc: SparkContext,
+ sc: SparkContext,
rddId: Int,
numPartitions: Int)
extends CheckpointRDD[T](sc) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
index d6fad89684..c115e0ff74 100644
--- a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.Utils
* is written to the local, ephemeral block storage that lives in each executor. This is useful
* for use cases where RDDs build up long lineages that need to be truncated often (e.g. GraphX).
*/
-private[spark] class LocalRDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
+private[spark] class LocalRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends RDDCheckpointData[T](rdd) with Logging {
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 6a9c004d65..174979aaeb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -40,7 +40,7 @@ import org.apache.spark.storage.StorageLevel
private[spark] class NewHadoopPartition(
rddId: Int,
val index: Int,
- @transient rawSplit: InputSplit with Writable)
+ rawSplit: InputSplit with Writable)
extends Partition {
val serializableHadoopSplit = new SerializableWritable(rawSplit)
@@ -68,14 +68,14 @@ class NewHadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- @transient conf: Configuration)
+ @transient private val _conf: Configuration)
extends RDD[(K, V)](sc, Nil)
with SparkHadoopMapReduceUtil
with Logging {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
- private val confBroadcast = sc.broadcast(new SerializableConfiguration(conf))
- // private val serializableConf = new SerializableWritable(conf)
+ private val confBroadcast = sc.broadcast(new SerializableConfiguration(_conf))
+ // private val serializableConf = new SerializableWritable(_conf)
private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
@@ -88,10 +88,10 @@ class NewHadoopRDD[K, V](
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
- configurable.setConf(conf)
+ configurable.setConf(_conf)
case _ =>
}
- val jobContext = newJobContext(conf, jobId)
+ val jobContext = newJobContext(_conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
@@ -262,7 +262,7 @@ private[spark] class WholeTextFileRDD(
inputFormatClass: Class[_ <: WholeTextFileInputFormat],
keyClass: Class[String],
valueClass: Class[String],
- @transient conf: Configuration,
+ conf: Configuration,
minPartitions: Int)
extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {
@@ -270,10 +270,10 @@ private[spark] class WholeTextFileRDD(
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
- configurable.setConf(conf)
+ configurable.setConf(getConf)
case _ =>
}
- val jobContext = newJobContext(conf, jobId)
+ val jobContext = newJobContext(getConf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index e2394e28f8..582fa93afe 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -83,8 +83,8 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
}
private[spark] class ParallelCollectionRDD[T: ClassTag](
- @transient sc: SparkContext,
- @transient data: Seq[T],
+ sc: SparkContext,
+ @transient private val data: Seq[T],
numSlices: Int,
locationPrefs: Map[Int, Seq[String]])
extends RDD[T](sc, Nil) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
index a00f4c1cdf..d6a37e8cc5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
@@ -32,7 +32,7 @@ private[spark] class PartitionPruningRDDPartition(idx: Int, val parentSplit: Par
* Represents a dependency between the PartitionPruningRDD and its parent. In this
* case, the child RDD contains a subset of partitions of the parents'.
*/
-private[spark] class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
+private[spark] class PruneDependency[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean)
extends NarrowDependency[T](rdd) {
@transient
@@ -55,8 +55,8 @@ private[spark] class PruneDependency[T](rdd: RDD[T], @transient partitionFilterF
*/
@DeveloperApi
class PartitionPruningRDD[T: ClassTag](
- @transient prev: RDD[T],
- @transient partitionFilterFunc: Int => Boolean)
+ prev: RDD[T],
+ partitionFilterFunc: Int => Boolean)
extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) {
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala
index a637d6f15b..3b1acacf40 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala
@@ -47,8 +47,8 @@ class PartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long)
private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
prev: RDD[T],
sampler: RandomSampler[T, U],
- @transient preservesPartitioning: Boolean,
- @transient seed: Long = Utils.random.nextLong)
+ preservesPartitioning: Boolean,
+ @transient private val seed: Long = Utils.random.nextLong)
extends RDD[U](prev) {
@transient override val partitioner = if (preservesPartitioning) prev.partitioner else None
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index 0e43520870..429514b4f6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -36,7 +36,7 @@ private[spark] object CheckpointState extends Enumeration {
* as well as, manages the post-checkpoint state by providing the updated partitions,
* iterator and preferred locations of the checkpointed RDD.
*/
-private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
+private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends Serializable {
import CheckpointState._
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index 35d8b0bfd1..1c3b5da19c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -32,7 +32,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
* An RDD that reads from checkpoint files previously written to reliable storage.
*/
private[spark] class ReliableCheckpointRDD[T: ClassTag](
- @transient sc: SparkContext,
+ sc: SparkContext,
val checkpointPath: String)
extends CheckpointRDD[T](sc) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
index 1df8eef5ff..e9f6060301 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.SerializableConfiguration
* An implementation of checkpointing that writes the RDD data to reliable storage.
* This allows drivers to be restarted on failure with previously computed state.
*/
-private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
+private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends RDDCheckpointData[T](rdd) with Logging {
// The directory to which the associated RDD has been checkpointed to
diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
index fa3fecc80c..9babe56267 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, Ut
private[spark] class SqlNewHadoopPartition(
rddId: Int,
val index: Int,
- @transient rawSplit: InputSplit with Writable)
+ rawSplit: InputSplit with Writable)
extends SparkPartition {
val serializableHadoopSplit = new SerializableWritable(rawSplit)
@@ -61,9 +61,9 @@ private[spark] class SqlNewHadoopPartition(
* changes based on [[org.apache.spark.rdd.HadoopRDD]].
*/
private[spark] class SqlNewHadoopRDD[V: ClassTag](
- @transient sc : SparkContext,
+ sc : SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
- @transient initDriverSideJobFuncOpt: Option[Job => Unit],
+ @transient private val initDriverSideJobFuncOpt: Option[Job => Unit],
initLocalJobFuncOpt: Option[Job => Unit],
inputFormatClass: Class[_ <: InputFormat[Void, V]],
valueClass: Class[V])
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index 3986645350..66cf4369da 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -37,9 +37,9 @@ import org.apache.spark.util.Utils
*/
private[spark] class UnionPartition[T: ClassTag](
idx: Int,
- @transient rdd: RDD[T],
+ @transient private val rdd: RDD[T],
val parentRddIndex: Int,
- @transient parentRddPartitionIndex: Int)
+ @transient private val parentRddPartitionIndex: Int)
extends Partition {
var parentPartition: Partition = rdd.partitions(parentRddPartitionIndex)
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index b3c64394ab..70bf04de64 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -26,7 +26,7 @@ import org.apache.spark.util.Utils
private[spark] class ZippedPartitionsPartition(
idx: Int,
- @transient rdds: Seq[RDD[_]],
+ @transient private val rdds: Seq[RDD[_]],
@transient val preferredLocations: Seq[String])
extends Partition {
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
index e277ae28d5..32931d59ac 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -37,7 +37,7 @@ class ZippedWithIndexRDDPartition(val prev: Partition, val startIndex: Long)
* @tparam T parent RDD item type
*/
private[spark]
-class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, Long)](prev) {
+class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) {
/** The start index of each partition. */
@transient private val startIndices: Array[Long] = {
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
index 7409ac8859..f25710bb5b 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
@@ -26,7 +26,7 @@ import org.apache.spark.{SparkException, Logging, SparkConf}
/**
* A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.
*/
-private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
+private[spark] abstract class RpcEndpointRef(conf: SparkConf)
extends Serializable with Logging {
private[this] val maxRetries = RpcUtils.numRetries(conf)
diff --git a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
index fc17542abf..ad67e1c5ad 100644
--- a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
@@ -87,9 +87,9 @@ private[spark] class AkkaRpcEnv private[akka] (
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
@volatile var endpointRef: AkkaRpcEndpointRef = null
- // Use lazy because the Actor needs to use `endpointRef`.
+ // Use defered function because the Actor needs to use `endpointRef`.
// So `actorRef` should be created after assigning `endpointRef`.
- lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging {
+ val actorRef = () => actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging {
assert(endpointRef != null)
@@ -272,13 +272,20 @@ private[akka] class ErrorMonitor extends Actor with ActorLogReceive with Logging
}
private[akka] class AkkaRpcEndpointRef(
- @transient defaultAddress: RpcAddress,
- @transient _actorRef: => ActorRef,
- @transient conf: SparkConf,
- @transient initInConstructor: Boolean = true)
+ @transient private val defaultAddress: RpcAddress,
+ @transient private val _actorRef: () => ActorRef,
+ conf: SparkConf,
+ initInConstructor: Boolean)
extends RpcEndpointRef(conf) with Logging {
- lazy val actorRef = _actorRef
+ def this(
+ defaultAddress: RpcAddress,
+ _actorRef: ActorRef,
+ conf: SparkConf) = {
+ this(defaultAddress, () => _actorRef, conf, true)
+ }
+
+ lazy val actorRef = _actorRef()
override lazy val address: RpcAddress = {
val akkaAddress = actorRef.path.address
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index c4dc080e2b..fb693721a9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -44,7 +44,7 @@ private[spark] class ResultTask[T, U](
stageAttemptId: Int,
taskBinary: Broadcast[Array[Byte]],
partition: Partition,
- @transient locs: Seq[TaskLocation],
+ locs: Seq[TaskLocation],
val outputId: Int,
internalAccumulators: Seq[Accumulator[Long]])
extends Task[U](stageId, stageAttemptId, partition.index, internalAccumulators)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
index df7bbd6424..75f22f642b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
@@ -159,7 +159,7 @@ private[spark] class UnsafeShuffleManager(conf: SparkConf) extends ShuffleManage
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
handle match {
- case unsafeShuffleHandle: UnsafeShuffleHandle[K, V] =>
+ case unsafeShuffleHandle: UnsafeShuffleHandle[K @unchecked, V @unchecked] =>
numMapsForShufflesThatUsedNewPath.putIfAbsent(handle.shuffleId, unsafeShuffleHandle.numMaps)
val env = SparkEnv.get
new UnsafeShuffleWriter(
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 150d82b393..1b49dca9dc 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -94,7 +94,7 @@ private[spark] object ClosureCleaner extends Logging {
if (cls.isPrimitive) {
cls match {
case java.lang.Boolean.TYPE => new java.lang.Boolean(false)
- case java.lang.Character.TYPE => new java.lang.Character('\0')
+ case java.lang.Character.TYPE => new java.lang.Character('\u0000')
case java.lang.Void.TYPE =>
// This should not happen because `Foo(void x) {}` does not compile.
throw new IllegalStateException("Unexpected void parameter in constructor")
diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index 61ff9b89ec..db4a8b304e 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -217,7 +217,9 @@ private [util] class SparkShutdownHookManager {
}
Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {
case Success(shmClass) =>
- val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
+ val fsPriority = classOf[FileSystem]
+ .getField("SHUTDOWN_HOOK_PRIORITY")
+ .get(null) // static field, the value is not used
.asInstanceOf[Int]
val shm = shmClass.getMethod("get").invoke(null)
shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
index 7138b4b8e4..1e8476c4a0 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
@@ -79,32 +79,30 @@ private[spark] class RollingFileAppender(
val rolloverSuffix = rollingPolicy.generateRolledOverFileSuffix()
val rolloverFile = new File(
activeFile.getParentFile, activeFile.getName + rolloverSuffix).getAbsoluteFile
- try {
- logDebug(s"Attempting to rollover file $activeFile to file $rolloverFile")
- if (activeFile.exists) {
- if (!rolloverFile.exists) {
- Files.move(activeFile, rolloverFile)
- logInfo(s"Rolled over $activeFile to $rolloverFile")
- } else {
- // In case the rollover file name clashes, make a unique file name.
- // The resultant file names are long and ugly, so this is used only
- // if there is a name collision. This can be avoided by the using
- // the right pattern such that name collisions do not occur.
- var i = 0
- var altRolloverFile: File = null
- do {
- altRolloverFile = new File(activeFile.getParent,
- s"${activeFile.getName}$rolloverSuffix--$i").getAbsoluteFile
- i += 1
- } while (i < 10000 && altRolloverFile.exists)
-
- logWarning(s"Rollover file $rolloverFile already exists, " +
- s"rolled over $activeFile to file $altRolloverFile")
- Files.move(activeFile, altRolloverFile)
- }
+ logDebug(s"Attempting to rollover file $activeFile to file $rolloverFile")
+ if (activeFile.exists) {
+ if (!rolloverFile.exists) {
+ Files.move(activeFile, rolloverFile)
+ logInfo(s"Rolled over $activeFile to $rolloverFile")
} else {
- logWarning(s"File $activeFile does not exist")
+ // In case the rollover file name clashes, make a unique file name.
+ // The resultant file names are long and ugly, so this is used only
+ // if there is a name collision. This can be avoided by the using
+ // the right pattern such that name collisions do not occur.
+ var i = 0
+ var altRolloverFile: File = null
+ do {
+ altRolloverFile = new File(activeFile.getParent,
+ s"${activeFile.getName}$rolloverSuffix--$i").getAbsoluteFile
+ i += 1
+ } while (i < 10000 && altRolloverFile.exists)
+
+ logWarning(s"Rollover file $rolloverFile already exists, " +
+ s"rolled over $activeFile to file $altRolloverFile")
+ Files.move(activeFile, altRolloverFile)
}
+ } else {
+ logWarning(s"File $activeFile does not exist")
}
}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 2bf99cb3cb..c8780aa83b 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -43,7 +43,7 @@ import org.jboss.netty.handler.codec.compression._
private[streaming]
class FlumeInputDStream[T: ClassTag](
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel,
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
index 0bc46209b8..3b936d88ab 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -46,7 +46,7 @@ import org.apache.spark.streaming.flume.sink._
* @tparam T Class type of the object of this stream
*/
private[streaming] class FlumePollingInputDStream[T: ClassTag](
- @transient _ssc: StreamingContext,
+ _ssc: StreamingContext,
val addresses: Seq[InetSocketAddress],
val maxBatchSize: Int,
val parallelism: Int,
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 1000094e93..8a087474d3 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -58,7 +58,7 @@ class DirectKafkaInputDStream[
U <: Decoder[K]: ClassTag,
T <: Decoder[V]: ClassTag,
R: ClassTag](
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
val kafkaParams: Map[String, String],
val fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
@@ -79,7 +79,7 @@ class DirectKafkaInputDStream[
override protected[streaming] val rateController: Option[RateController] = {
if (RateController.isBackPressureEnabled(ssc.conf)) {
Some(new DirectKafkaRateController(id,
- RateEstimator.create(ssc.conf, ssc_.graph.batchDuration)))
+ RateEstimator.create(ssc.conf, context.graph.batchDuration)))
} else {
None
}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 04b2dc10d3..38730fecf3 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -48,7 +48,7 @@ class KafkaInputDStream[
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag](
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
useReliableReceiver: Boolean,
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 7c2f18cb35..116c170489 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -38,7 +38,7 @@ import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class MQTTInputDStream(
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 7cf02d85d7..d7de74b350 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -39,7 +39,7 @@ import org.apache.spark.streaming.receiver.Receiver
*/
private[streaming]
class TwitterInputDStream(
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
twitterAuth: Option[Authorization],
filters: Seq[String],
storageLevel: StorageLevel
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 4611a3ace2..ee7302a1ed 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -38,8 +38,8 @@ import org.apache.spark.graphx.impl.EdgeRDDImpl
* `impl.ReplicatedVertexView`.
*/
abstract class EdgeRDD[ED](
- @transient sc: SparkContext,
- @transient deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps) {
+ sc: SparkContext,
+ deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps) {
// scalastyle:off structural.type
private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])] forSome { type VD }
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index db73a8abc5..869caa340f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -46,7 +46,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* @note vertex ids are unique.
* @return an RDD containing the vertices in this graph
*/
- @transient val vertices: VertexRDD[VD]
+ val vertices: VertexRDD[VD]
/**
* An RDD containing the edges and their associated attributes. The entries in the RDD contain
@@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* along with their vertex data.
*
*/
- @transient val edges: EdgeRDD[ED]
+ val edges: EdgeRDD[ED]
/**
* An RDD containing the edge triplets, which are edges along with the vertex data associated with
@@ -77,7 +77,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* val numInvalid = graph.triplets.map(e => if (e.src.data == e.dst.data) 1 else 0).sum
* }}}
*/
- @transient val triplets: RDD[EdgeTriplet[VD, ED]]
+ val triplets: RDD[EdgeTriplet[VD, ED]]
/**
* Caches the vertices and edges associated with this graph at the specified storage level,
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index a9f04b559c..1ef7a78fbc 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -55,8 +55,8 @@ import org.apache.spark.graphx.impl.VertexRDDImpl
* @tparam VD the vertex attribute associated with each vertex in the set.
*/
abstract class VertexRDD[VD](
- @transient sc: SparkContext,
- @transient deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps) {
+ sc: SparkContext,
+ deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps) {
implicit protected def vdTag: ClassTag[VD]
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala
index 910eff9540..f8cea7ecea 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala
@@ -35,11 +35,11 @@ private[mllib] class RandomRDDPartition[T](override val index: Int,
}
// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue
-private[mllib] class RandomRDD[T: ClassTag](@transient sc: SparkContext,
+private[mllib] class RandomRDD[T: ClassTag](sc: SparkContext,
size: Long,
numPartitions: Int,
- @transient rng: RandomDataGenerator[T],
- @transient seed: Long = Utils.random.nextLong) extends RDD[T](sc, Nil) {
+ @transient private val rng: RandomDataGenerator[T],
+ @transient private val seed: Long = Utils.random.nextLong) extends RDD[T](sc, Nil) {
require(size > 0, "Positive RDD size required.")
require(numPartitions > 0, "Positive number of partitions required")
@@ -56,12 +56,12 @@ private[mllib] class RandomRDD[T: ClassTag](@transient sc: SparkContext,
}
}
-private[mllib] class RandomVectorRDD(@transient sc: SparkContext,
+private[mllib] class RandomVectorRDD(sc: SparkContext,
size: Long,
vectorSize: Int,
numPartitions: Int,
- @transient rng: RandomDataGenerator[Double],
- @transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) {
+ @transient private val rng: RandomDataGenerator[Double],
+ @transient private val seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) {
require(size > 0, "Positive RDD size required.")
require(numPartitions > 0, "Positive number of partitions required")
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index bf609ff0f6..33d262558b 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -118,5 +118,5 @@ object SparkILoop {
}
}
}
- def run(lines: List[String]): String = run(lines map (_ + "\n") mkString)
+ def run(lines: List[String]): String = run(lines.map(_ + "\n").mkString)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
index 48d02bb534..a09d5b6e3a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
@@ -255,7 +255,7 @@ object StringTranslate {
val dict = new HashMap[Character, Character]()
var i = 0
while (i < matching.length()) {
- val rep = if (i < replace.length()) replace.charAt(i) else '\0'
+ val rep = if (i < replace.length()) replace.charAt(i) else '\u0000'
if (null == dict.get(matching.charAt(i))) {
dict.put(matching.charAt(i), rep)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala
index e0667c6294..1d2d007c2b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala
@@ -126,7 +126,7 @@ protected[sql] object AnyDataType extends AbstractDataType {
*/
protected[sql] abstract class AtomicType extends DataType {
private[sql] type InternalType
- @transient private[sql] val tag: TypeTag[InternalType]
+ private[sql] val tag: TypeTag[InternalType]
private[sql] val ordering: Ordering[InternalType]
@transient private[sql] val classTag = ScalaReflectionLock.synchronized {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 879fd69863..9a573db0c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.SerializableConfiguration
private[sql] abstract class BaseWriterContainer(
@transient val relation: HadoopFsRelation,
- @transient job: Job,
+ @transient private val job: Job,
isAppend: Boolean)
extends SparkHadoopMapReduceUtil
with Logging
@@ -222,8 +222,8 @@ private[sql] abstract class BaseWriterContainer(
* A writer that writes all of the rows in a partition to a single file.
*/
private[sql] class DefaultWriterContainer(
- @transient relation: HadoopFsRelation,
- @transient job: Job,
+ relation: HadoopFsRelation,
+ job: Job,
isAppend: Boolean)
extends BaseWriterContainer(relation, job, isAppend) {
@@ -286,8 +286,8 @@ private[sql] class DefaultWriterContainer(
* writer externally sorts the remaining rows and then writes out them out one file at a time.
*/
private[sql] class DynamicPartitionWriterContainer(
- @transient relation: HadoopFsRelation,
- @transient job: Job,
+ relation: HadoopFsRelation,
+ job: Job,
partitionColumns: Seq[Attribute],
dataColumns: Seq[Attribute],
inputSchema: Seq[Attribute],
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index b8da0840ae..0a5569b0a4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -767,7 +767,7 @@ private[hive] case class InsertIntoHiveTable(
private[hive] case class MetastoreRelation
(databaseName: String, tableName: String, alias: Option[String])
(val table: HiveTable)
- (@transient sqlContext: SQLContext)
+ (@transient private val sqlContext: SQLContext)
extends LeafNode with MultiInstanceRelation with FileRelation {
override def equals(other: Any): Boolean = other match {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index dc35569085..e35468a624 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -54,10 +54,10 @@ private[hive] sealed trait TableReader {
*/
private[hive]
class HadoopTableReader(
- @transient attributes: Seq[Attribute],
- @transient relation: MetastoreRelation,
- @transient sc: HiveContext,
- @transient hiveExtraConf: HiveConf)
+ @transient private val attributes: Seq[Attribute],
+ @transient private val relation: MetastoreRelation,
+ @transient private val sc: HiveContext,
+ hiveExtraConf: HiveConf)
extends TableReader with Logging {
// Hadoop honors "mapred.map.tasks" as hint, but will ignore when mapred.job.tracker is "local".
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index c7651daffe..32bddbaeae 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -53,7 +53,7 @@ case class ScriptTransformation(
script: String,
output: Seq[Attribute],
child: SparkPlan,
- ioschema: HiveScriptIOSchema)(@transient sc: HiveContext)
+ ioschema: HiveScriptIOSchema)(@transient private val sc: HiveContext)
extends UnaryNode {
override def otherCopyArgs: Seq[HiveContext] = sc :: Nil
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 8dc796b056..29a6f08f40 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -45,7 +45,7 @@ import org.apache.spark.util.SerializableJobConf
* It is based on [[SparkHadoopWriter]].
*/
private[hive] class SparkHiveWriterContainer(
- @transient jobConf: JobConf,
+ jobConf: JobConf,
fileSinkConf: FileSinkDesc)
extends Logging
with SparkHadoopMapRedUtil
@@ -163,7 +163,7 @@ private[spark] object SparkHiveDynamicPartitionWriterContainer {
}
private[spark] class SparkHiveDynamicPartitionWriterContainer(
- @transient jobConf: JobConf,
+ jobConf: JobConf,
fileSinkConf: FileSinkDesc,
dynamicPartColNames: Array[String])
extends SparkHiveWriterContainer(jobConf, fileSinkConf) {
@@ -194,10 +194,10 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
// Better solution is to add a step similar to what Hive FileSinkOperator.jobCloseOp does:
// calling something like Utilities.mvFileToFinalPath to cleanup the output directory and then
// load it with loadDynamicPartitions/loadPartition/loadTable.
- val oldMarker = jobConf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)
- jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false)
+ val oldMarker = conf.value.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)
+ conf.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false)
super.commitJob()
- jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
+ conf.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
}
override def getLocalFileWriter(row: InternalRow, schema: StructType)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 27024ecfd9..8a6050f522 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -32,7 +32,7 @@ import org.apache.spark.streaming.scheduler.JobGenerator
private[streaming]
-class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
+class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
extends Logging with Serializable {
val master = ssc.sc.master
val framework = ssc.sc.appName
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
index 2c373640d2..dfc569451d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
@@ -170,7 +170,7 @@ private[python] object PythonDStream {
*/
private[python] abstract class PythonDStream(
parent: DStream[_],
- @transient pfunc: PythonTransformFunction)
+ pfunc: PythonTransformFunction)
extends DStream[Array[Byte]] (parent.ssc) {
val func = new TransformFunction(pfunc)
@@ -187,7 +187,7 @@ private[python] abstract class PythonDStream(
*/
private[python] class PythonTransformedDStream (
parent: DStream[_],
- @transient pfunc: PythonTransformFunction)
+ pfunc: PythonTransformFunction)
extends PythonDStream(parent, pfunc) {
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
@@ -206,7 +206,7 @@ private[python] class PythonTransformedDStream (
private[python] class PythonTransformed2DStream(
parent: DStream[_],
parent2: DStream[_],
- @transient pfunc: PythonTransformFunction)
+ pfunc: PythonTransformFunction)
extends DStream[Array[Byte]] (parent.ssc) {
val func = new TransformFunction(pfunc)
@@ -230,7 +230,7 @@ private[python] class PythonTransformed2DStream(
*/
private[python] class PythonStateDStream(
parent: DStream[Array[Byte]],
- @transient reduceFunc: PythonTransformFunction)
+ reduceFunc: PythonTransformFunction)
extends PythonDStream(parent, reduceFunc) {
super.persist(StorageLevel.MEMORY_ONLY)
@@ -252,8 +252,8 @@ private[python] class PythonStateDStream(
*/
private[python] class PythonReducedWindowedDStream(
parent: DStream[Array[Byte]],
- @transient preduceFunc: PythonTransformFunction,
- @transient pinvReduceFunc: PythonTransformFunction,
+ preduceFunc: PythonTransformFunction,
+ @transient private val pinvReduceFunc: PythonTransformFunction,
_windowDuration: Duration,
_slideDuration: Duration)
extends PythonDStream(parent, preduceFunc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index c358f5b5bd..40208a6486 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -70,7 +70,7 @@ import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Uti
*/
private[streaming]
class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
directory: String,
filter: Path => Boolean = FileInputDStream.defaultFilter,
newFilesOnly: Boolean = true,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index a6c4cd220e..95994c983c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.Utils
*
* @param ssc_ Streaming context that will execute this input stream
*/
-abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
+abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext)
extends DStream[T](ssc_) {
private[streaming] var lastValidTime: Time = null
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
index 186e1bf03a..002aac9f43 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
@@ -23,7 +23,7 @@ import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class PluggableInputDStream[T: ClassTag](
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
receiver: Receiver[T]) extends ReceiverInputDStream[T](ssc_) {
def getReceiver(): Receiver[T] = {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index bab78a3536..a2685046e0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -27,7 +27,7 @@ import org.apache.spark.streaming.{Time, StreamingContext}
private[streaming]
class QueueInputDStream[T: ClassTag](
- @transient ssc: StreamingContext,
+ ssc: StreamingContext,
val queue: Queue[RDD[T]],
oneAtATime: Boolean,
defaultRDD: RDD[T]
@@ -57,7 +57,7 @@ class QueueInputDStream[T: ClassTag](
if (oneAtATime) {
Some(buffer.head)
} else {
- Some(new UnionRDD(ssc.sc, buffer.toSeq))
+ Some(new UnionRDD(context.sc, buffer.toSeq))
}
} else if (defaultRDD != null) {
Some(defaultRDD)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
index e2925b9e03..5a9eda7c12 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
@@ -39,7 +39,7 @@ import org.apache.spark.streaming.receiver.Receiver
*/
private[streaming]
class RawInputDStream[T: ClassTag](
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 6c139f32da..87c20afd5c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -38,7 +38,7 @@ import org.apache.spark.streaming.{StreamingContext, Time}
* @param ssc_ Streaming context that will execute this input stream
* @tparam T Class type of the object of this stream
*/
-abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
+abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index 5ce5b7aae6..de84e0c9a4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -32,7 +32,7 @@ import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class SocketInputDStream[T: ClassTag](
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index e081ffe46f..f811784b25 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -61,7 +61,7 @@ class WriteAheadLogBackedBlockRDDPartition(
*
*
* @param sc SparkContext
- * @param blockIds Ids of the blocks that contains this RDD's data
+ * @param _blockIds Ids of the blocks that contains this RDD's data
* @param walRecordHandles Record handles in write ahead logs that contain this RDD's data
* @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
* executors). If not, then block lookups by the block ids will be skipped.
@@ -73,23 +73,23 @@ class WriteAheadLogBackedBlockRDDPartition(
*/
private[streaming]
class WriteAheadLogBackedBlockRDD[T: ClassTag](
- @transient sc: SparkContext,
- @transient blockIds: Array[BlockId],
+ sc: SparkContext,
+ @transient private val _blockIds: Array[BlockId],
@transient val walRecordHandles: Array[WriteAheadLogRecordHandle],
- @transient isBlockIdValid: Array[Boolean] = Array.empty,
+ @transient private val isBlockIdValid: Array[Boolean] = Array.empty,
storeInBlockManager: Boolean = false,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER)
- extends BlockRDD[T](sc, blockIds) {
+ extends BlockRDD[T](sc, _blockIds) {
require(
- blockIds.length == walRecordHandles.length,
- s"Number of block Ids (${blockIds.length}) must be " +
+ _blockIds.length == walRecordHandles.length,
+ s"Number of block Ids (${_blockIds.length}) must be " +
s" same as number of WAL record handles (${walRecordHandles.length})")
require(
- isBlockIdValid.isEmpty || isBlockIdValid.length == blockIds.length,
+ isBlockIdValid.isEmpty || isBlockIdValid.length == _blockIds.length,
s"Number of elements in isBlockIdValid (${isBlockIdValid.length}) must be " +
- s" same as number of block Ids (${blockIds.length})")
+ s" same as number of block Ids (${_blockIds.length})")
// Hadoop configuration is not serializable, so broadcast it as a serializable.
@transient private val hadoopConfig = sc.hadoopConfiguration
@@ -99,9 +99,9 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
override def getPartitions: Array[Partition] = {
assertValid()
- Array.tabulate(blockIds.length) { i =>
+ Array.tabulate(_blockIds.length) { i =>
val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i)
- new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), isValid, walRecordHandles(i))
+ new WriteAheadLogBackedBlockRDDPartition(i, _blockIds(i), isValid, walRecordHandles(i))
}
}