aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulators.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/Dependency.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/Partitioner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/input/PortableDataStream.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala46
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala2
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala2
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala4
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala2
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala2
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Graph.scala6
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala12
-rw-r--r--repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala22
60 files changed, 158 insertions, 151 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index c39c8667d0..5592b75afb 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -47,7 +47,7 @@ import org.apache.spark.util.Utils
* @tparam T partial data that can be added in
*/
class Accumulable[R, T] private[spark] (
- @transient initialValue: R,
+ initialValue: R,
param: AccumulableParam[R, T],
val name: Option[String],
internal: Boolean)
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala
index fc8cdde934..cfeeb3902c 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -66,7 +66,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
*/
@DeveloperApi
class ShuffleDependency[K, V, C](
- @transient _rdd: RDD[_ <: Product2[K, V]],
+ @transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 29e581bb57..e4df7af81a 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -104,8 +104,8 @@ class HashPartitioner(partitions: Int) extends Partitioner {
* the value of `partitions`.
*/
class RangePartitioner[K : Ordering : ClassTag, V](
- @transient partitions: Int,
- @transient rdd: RDD[_ <: Product2[K, V]],
+ partitions: Int,
+ rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index f5dd36cbcf..ae5926dd53 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -37,7 +37,7 @@ import org.apache.spark.util.SerializableJobConf
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
private[spark]
-class SparkHadoopWriter(@transient jobConf: JobConf)
+class SparkHadoopWriter(jobConf: JobConf)
extends Logging
with SparkHadoopMapRedUtil
with Serializable {
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index b4d152b336..69da180593 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -41,7 +41,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
import scala.util.control.NonFatal
private[spark] class PythonRDD(
- @transient parent: RDD[_],
+ parent: RDD[_],
command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
@@ -785,7 +785,7 @@ class BytesToString extends org.apache.spark.api.java.function.Function[Array[By
* Internal class that acts as an `AccumulatorParam` for Python accumulators. Inside, it
* collects a list of pickled strings that we pass to Python through a socket.
*/
-private class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
+private class PythonAccumulatorParam(@transient private val serverHost: String, serverPort: Int)
extends AccumulatorParam[JList[Array[Byte]]] {
Utils.checkHost(serverHost, "Expected hostname")
diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index a5ad47293f..e2ffc3b64e 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -131,8 +131,8 @@ private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDat
*/
@Experimental
class PortableDataStream(
- @transient isplit: CombineFileSplit,
- @transient context: TaskAttemptContext,
+ isplit: CombineFileSplit,
+ context: TaskAttemptContext,
index: Integer)
extends Serializable {
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 4b851bcb36..70a42f9045 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -137,7 +137,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
new RpcResponseCallback {
override def onSuccess(response: Array[Byte]): Unit = {
logTrace(s"Successfully uploaded block $blockId")
- result.success()
+ result.success((): Unit)
}
override def onFailure(e: Throwable): Unit = {
logError(s"Error while uploading block $blockId", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
index 1f755db485..6fec00dcd0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
@@ -28,7 +28,7 @@ private[spark] class BinaryFileRDD[T](
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
keyClass: Class[String],
valueClass: Class[T],
- @transient conf: Configuration,
+ conf: Configuration,
minPartitions: Int)
extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) {
@@ -36,10 +36,10 @@ private[spark] class BinaryFileRDD[T](
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
- configurable.setConf(conf)
+ configurable.setConf(getConf)
case _ =>
}
- val jobContext = newJobContext(conf, jobId)
+ val jobContext = newJobContext(getConf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index 9220302637..fc1710fbad 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -28,7 +28,7 @@ private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends P
}
private[spark]
-class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds: Array[BlockId])
+class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId])
extends RDD[T](sc, Nil) {
@transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
@@ -64,7 +64,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
*/
private[spark] def removeBlocks() {
blockIds.foreach { blockId =>
- sc.env.blockManager.master.removeBlock(blockId)
+ sparkContext.env.blockManager.master.removeBlock(blockId)
}
_isValid = false
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
index c1d6971787..18e8cddbc4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
@@ -27,8 +27,8 @@ import org.apache.spark.util.Utils
private[spark]
class CartesianPartition(
idx: Int,
- @transient rdd1: RDD[_],
- @transient rdd2: RDD[_],
+ @transient private val rdd1: RDD[_],
+ @transient private val rdd2: RDD[_],
s1Index: Int,
s2Index: Int
) extends Partition {
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 72fe215dae..b0364623af 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -29,7 +29,7 @@ private[spark] class CheckpointRDDPartition(val index: Int) extends Partition
/**
* An RDD that recovers checkpointed data from storage.
*/
-private[spark] abstract class CheckpointRDD[T: ClassTag](@transient sc: SparkContext)
+private[spark] abstract class CheckpointRDD[T: ClassTag](sc: SparkContext)
extends RDD[T](sc, Nil) {
// CheckpointRDD should not be checkpointed again
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index e1f8719eea..8f2655d63b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -51,7 +51,7 @@ import org.apache.spark.storage.StorageLevel
/**
* A Spark split class that wraps around a Hadoop InputSplit.
*/
-private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSplit)
+private[spark] class HadoopPartition(rddId: Int, idx: Int, s: InputSplit)
extends Partition {
val inputSplit = new SerializableWritable[InputSplit](s)
@@ -99,7 +99,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
*/
@DeveloperApi
class HadoopRDD[K, V](
- @transient sc: SparkContext,
+ sc: SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
@@ -109,7 +109,7 @@ class HadoopRDD[K, V](
extends RDD[(K, V)](sc, Nil) with Logging {
if (initLocalJobConfFuncOpt.isDefined) {
- sc.clean(initLocalJobConfFuncOpt.get)
+ sparkContext.clean(initLocalJobConfFuncOpt.get)
}
def this(
@@ -137,7 +137,7 @@ class HadoopRDD[K, V](
// used to build JobTracker ID
private val createTime = new Date()
- private val shouldCloneJobConf = sc.conf.getBoolean("spark.hadoop.cloneConf", false)
+ private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
index daa5779d68..bfe19195fc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
@@ -35,7 +35,7 @@ import org.apache.spark.storage.RDDBlockId
* @param numPartitions the number of partitions in the checkpointed RDD
*/
private[spark] class LocalCheckpointRDD[T: ClassTag](
- @transient sc: SparkContext,
+ sc: SparkContext,
rddId: Int,
numPartitions: Int)
extends CheckpointRDD[T](sc) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
index d6fad89684..c115e0ff74 100644
--- a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.Utils
* is written to the local, ephemeral block storage that lives in each executor. This is useful
* for use cases where RDDs build up long lineages that need to be truncated often (e.g. GraphX).
*/
-private[spark] class LocalRDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
+private[spark] class LocalRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends RDDCheckpointData[T](rdd) with Logging {
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 6a9c004d65..174979aaeb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -40,7 +40,7 @@ import org.apache.spark.storage.StorageLevel
private[spark] class NewHadoopPartition(
rddId: Int,
val index: Int,
- @transient rawSplit: InputSplit with Writable)
+ rawSplit: InputSplit with Writable)
extends Partition {
val serializableHadoopSplit = new SerializableWritable(rawSplit)
@@ -68,14 +68,14 @@ class NewHadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- @transient conf: Configuration)
+ @transient private val _conf: Configuration)
extends RDD[(K, V)](sc, Nil)
with SparkHadoopMapReduceUtil
with Logging {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
- private val confBroadcast = sc.broadcast(new SerializableConfiguration(conf))
- // private val serializableConf = new SerializableWritable(conf)
+ private val confBroadcast = sc.broadcast(new SerializableConfiguration(_conf))
+ // private val serializableConf = new SerializableWritable(_conf)
private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
@@ -88,10 +88,10 @@ class NewHadoopRDD[K, V](
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
- configurable.setConf(conf)
+ configurable.setConf(_conf)
case _ =>
}
- val jobContext = newJobContext(conf, jobId)
+ val jobContext = newJobContext(_conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
@@ -262,7 +262,7 @@ private[spark] class WholeTextFileRDD(
inputFormatClass: Class[_ <: WholeTextFileInputFormat],
keyClass: Class[String],
valueClass: Class[String],
- @transient conf: Configuration,
+ conf: Configuration,
minPartitions: Int)
extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {
@@ -270,10 +270,10 @@ private[spark] class WholeTextFileRDD(
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
- configurable.setConf(conf)
+ configurable.setConf(getConf)
case _ =>
}
- val jobContext = newJobContext(conf, jobId)
+ val jobContext = newJobContext(getConf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index e2394e28f8..582fa93afe 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -83,8 +83,8 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
}
private[spark] class ParallelCollectionRDD[T: ClassTag](
- @transient sc: SparkContext,
- @transient data: Seq[T],
+ sc: SparkContext,
+ @transient private val data: Seq[T],
numSlices: Int,
locationPrefs: Map[Int, Seq[String]])
extends RDD[T](sc, Nil) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
index a00f4c1cdf..d6a37e8cc5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
@@ -32,7 +32,7 @@ private[spark] class PartitionPruningRDDPartition(idx: Int, val parentSplit: Par
* Represents a dependency between the PartitionPruningRDD and its parent. In this
* case, the child RDD contains a subset of partitions of the parents'.
*/
-private[spark] class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
+private[spark] class PruneDependency[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean)
extends NarrowDependency[T](rdd) {
@transient
@@ -55,8 +55,8 @@ private[spark] class PruneDependency[T](rdd: RDD[T], @transient partitionFilterF
*/
@DeveloperApi
class PartitionPruningRDD[T: ClassTag](
- @transient prev: RDD[T],
- @transient partitionFilterFunc: Int => Boolean)
+ prev: RDD[T],
+ partitionFilterFunc: Int => Boolean)
extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) {
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala
index a637d6f15b..3b1acacf40 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala
@@ -47,8 +47,8 @@ class PartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long)
private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
prev: RDD[T],
sampler: RandomSampler[T, U],
- @transient preservesPartitioning: Boolean,
- @transient seed: Long = Utils.random.nextLong)
+ preservesPartitioning: Boolean,
+ @transient private val seed: Long = Utils.random.nextLong)
extends RDD[U](prev) {
@transient override val partitioner = if (preservesPartitioning) prev.partitioner else None
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index 0e43520870..429514b4f6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -36,7 +36,7 @@ private[spark] object CheckpointState extends Enumeration {
* as well as, manages the post-checkpoint state by providing the updated partitions,
* iterator and preferred locations of the checkpointed RDD.
*/
-private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
+private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends Serializable {
import CheckpointState._
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index 35d8b0bfd1..1c3b5da19c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -32,7 +32,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
* An RDD that reads from checkpoint files previously written to reliable storage.
*/
private[spark] class ReliableCheckpointRDD[T: ClassTag](
- @transient sc: SparkContext,
+ sc: SparkContext,
val checkpointPath: String)
extends CheckpointRDD[T](sc) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
index 1df8eef5ff..e9f6060301 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.SerializableConfiguration
* An implementation of checkpointing that writes the RDD data to reliable storage.
* This allows drivers to be restarted on failure with previously computed state.
*/
-private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
+private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends RDDCheckpointData[T](rdd) with Logging {
// The directory to which the associated RDD has been checkpointed to
diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
index fa3fecc80c..9babe56267 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, Ut
private[spark] class SqlNewHadoopPartition(
rddId: Int,
val index: Int,
- @transient rawSplit: InputSplit with Writable)
+ rawSplit: InputSplit with Writable)
extends SparkPartition {
val serializableHadoopSplit = new SerializableWritable(rawSplit)
@@ -61,9 +61,9 @@ private[spark] class SqlNewHadoopPartition(
* changes based on [[org.apache.spark.rdd.HadoopRDD]].
*/
private[spark] class SqlNewHadoopRDD[V: ClassTag](
- @transient sc : SparkContext,
+ sc : SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
- @transient initDriverSideJobFuncOpt: Option[Job => Unit],
+ @transient private val initDriverSideJobFuncOpt: Option[Job => Unit],
initLocalJobFuncOpt: Option[Job => Unit],
inputFormatClass: Class[_ <: InputFormat[Void, V]],
valueClass: Class[V])
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index 3986645350..66cf4369da 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -37,9 +37,9 @@ import org.apache.spark.util.Utils
*/
private[spark] class UnionPartition[T: ClassTag](
idx: Int,
- @transient rdd: RDD[T],
+ @transient private val rdd: RDD[T],
val parentRddIndex: Int,
- @transient parentRddPartitionIndex: Int)
+ @transient private val parentRddPartitionIndex: Int)
extends Partition {
var parentPartition: Partition = rdd.partitions(parentRddPartitionIndex)
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index b3c64394ab..70bf04de64 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -26,7 +26,7 @@ import org.apache.spark.util.Utils
private[spark] class ZippedPartitionsPartition(
idx: Int,
- @transient rdds: Seq[RDD[_]],
+ @transient private val rdds: Seq[RDD[_]],
@transient val preferredLocations: Seq[String])
extends Partition {
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
index e277ae28d5..32931d59ac 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -37,7 +37,7 @@ class ZippedWithIndexRDDPartition(val prev: Partition, val startIndex: Long)
* @tparam T parent RDD item type
*/
private[spark]
-class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, Long)](prev) {
+class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) {
/** The start index of each partition. */
@transient private val startIndices: Array[Long] = {
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
index 7409ac8859..f25710bb5b 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
@@ -26,7 +26,7 @@ import org.apache.spark.{SparkException, Logging, SparkConf}
/**
* A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.
*/
-private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
+private[spark] abstract class RpcEndpointRef(conf: SparkConf)
extends Serializable with Logging {
private[this] val maxRetries = RpcUtils.numRetries(conf)
diff --git a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
index fc17542abf..ad67e1c5ad 100644
--- a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
@@ -87,9 +87,9 @@ private[spark] class AkkaRpcEnv private[akka] (
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
@volatile var endpointRef: AkkaRpcEndpointRef = null
- // Use lazy because the Actor needs to use `endpointRef`.
+ // Use defered function because the Actor needs to use `endpointRef`.
// So `actorRef` should be created after assigning `endpointRef`.
- lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging {
+ val actorRef = () => actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging {
assert(endpointRef != null)
@@ -272,13 +272,20 @@ private[akka] class ErrorMonitor extends Actor with ActorLogReceive with Logging
}
private[akka] class AkkaRpcEndpointRef(
- @transient defaultAddress: RpcAddress,
- @transient _actorRef: => ActorRef,
- @transient conf: SparkConf,
- @transient initInConstructor: Boolean = true)
+ @transient private val defaultAddress: RpcAddress,
+ @transient private val _actorRef: () => ActorRef,
+ conf: SparkConf,
+ initInConstructor: Boolean)
extends RpcEndpointRef(conf) with Logging {
- lazy val actorRef = _actorRef
+ def this(
+ defaultAddress: RpcAddress,
+ _actorRef: ActorRef,
+ conf: SparkConf) = {
+ this(defaultAddress, () => _actorRef, conf, true)
+ }
+
+ lazy val actorRef = _actorRef()
override lazy val address: RpcAddress = {
val akkaAddress = actorRef.path.address
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index c4dc080e2b..fb693721a9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -44,7 +44,7 @@ private[spark] class ResultTask[T, U](
stageAttemptId: Int,
taskBinary: Broadcast[Array[Byte]],
partition: Partition,
- @transient locs: Seq[TaskLocation],
+ locs: Seq[TaskLocation],
val outputId: Int,
internalAccumulators: Seq[Accumulator[Long]])
extends Task[U](stageId, stageAttemptId, partition.index, internalAccumulators)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
index df7bbd6424..75f22f642b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
@@ -159,7 +159,7 @@ private[spark] class UnsafeShuffleManager(conf: SparkConf) extends ShuffleManage
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
handle match {
- case unsafeShuffleHandle: UnsafeShuffleHandle[K, V] =>
+ case unsafeShuffleHandle: UnsafeShuffleHandle[K @unchecked, V @unchecked] =>
numMapsForShufflesThatUsedNewPath.putIfAbsent(handle.shuffleId, unsafeShuffleHandle.numMaps)
val env = SparkEnv.get
new UnsafeShuffleWriter(
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 150d82b393..1b49dca9dc 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -94,7 +94,7 @@ private[spark] object ClosureCleaner extends Logging {
if (cls.isPrimitive) {
cls match {
case java.lang.Boolean.TYPE => new java.lang.Boolean(false)
- case java.lang.Character.TYPE => new java.lang.Character('\0')
+ case java.lang.Character.TYPE => new java.lang.Character('\u0000')
case java.lang.Void.TYPE =>
// This should not happen because `Foo(void x) {}` does not compile.
throw new IllegalStateException("Unexpected void parameter in constructor")
diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index 61ff9b89ec..db4a8b304e 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -217,7 +217,9 @@ private [util] class SparkShutdownHookManager {
}
Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {
case Success(shmClass) =>
- val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
+ val fsPriority = classOf[FileSystem]
+ .getField("SHUTDOWN_HOOK_PRIORITY")
+ .get(null) // static field, the value is not used
.asInstanceOf[Int]
val shm = shmClass.getMethod("get").invoke(null)
shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
index 7138b4b8e4..1e8476c4a0 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
@@ -79,32 +79,30 @@ private[spark] class RollingFileAppender(
val rolloverSuffix = rollingPolicy.generateRolledOverFileSuffix()
val rolloverFile = new File(
activeFile.getParentFile, activeFile.getName + rolloverSuffix).getAbsoluteFile
- try {
- logDebug(s"Attempting to rollover file $activeFile to file $rolloverFile")
- if (activeFile.exists) {
- if (!rolloverFile.exists) {
- Files.move(activeFile, rolloverFile)
- logInfo(s"Rolled over $activeFile to $rolloverFile")
- } else {
- // In case the rollover file name clashes, make a unique file name.
- // The resultant file names are long and ugly, so this is used only
- // if there is a name collision. This can be avoided by the using
- // the right pattern such that name collisions do not occur.
- var i = 0
- var altRolloverFile: File = null
- do {
- altRolloverFile = new File(activeFile.getParent,
- s"${activeFile.getName}$rolloverSuffix--$i").getAbsoluteFile
- i += 1
- } while (i < 10000 && altRolloverFile.exists)
-
- logWarning(s"Rollover file $rolloverFile already exists, " +
- s"rolled over $activeFile to file $altRolloverFile")
- Files.move(activeFile, altRolloverFile)
- }
+ logDebug(s"Attempting to rollover file $activeFile to file $rolloverFile")
+ if (activeFile.exists) {
+ if (!rolloverFile.exists) {
+ Files.move(activeFile, rolloverFile)
+ logInfo(s"Rolled over $activeFile to $rolloverFile")
} else {
- logWarning(s"File $activeFile does not exist")
+ // In case the rollover file name clashes, make a unique file name.
+ // The resultant file names are long and ugly, so this is used only
+ // if there is a name collision. This can be avoided by the using
+ // the right pattern such that name collisions do not occur.
+ var i = 0
+ var altRolloverFile: File = null
+ do {
+ altRolloverFile = new File(activeFile.getParent,
+ s"${activeFile.getName}$rolloverSuffix--$i").getAbsoluteFile
+ i += 1
+ } while (i < 10000 && altRolloverFile.exists)
+
+ logWarning(s"Rollover file $rolloverFile already exists, " +
+ s"rolled over $activeFile to file $altRolloverFile")
+ Files.move(activeFile, altRolloverFile)
}
+ } else {
+ logWarning(s"File $activeFile does not exist")
}
}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 2bf99cb3cb..c8780aa83b 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -43,7 +43,7 @@ import org.jboss.netty.handler.codec.compression._
private[streaming]
class FlumeInputDStream[T: ClassTag](
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel,
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
index 0bc46209b8..3b936d88ab 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -46,7 +46,7 @@ import org.apache.spark.streaming.flume.sink._
* @tparam T Class type of the object of this stream
*/
private[streaming] class FlumePollingInputDStream[T: ClassTag](
- @transient _ssc: StreamingContext,
+ _ssc: StreamingContext,
val addresses: Seq[InetSocketAddress],
val maxBatchSize: Int,
val parallelism: Int,
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 1000094e93..8a087474d3 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -58,7 +58,7 @@ class DirectKafkaInputDStream[
U <: Decoder[K]: ClassTag,
T <: Decoder[V]: ClassTag,
R: ClassTag](
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
val kafkaParams: Map[String, String],
val fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
@@ -79,7 +79,7 @@ class DirectKafkaInputDStream[
override protected[streaming] val rateController: Option[RateController] = {
if (RateController.isBackPressureEnabled(ssc.conf)) {
Some(new DirectKafkaRateController(id,
- RateEstimator.create(ssc.conf, ssc_.graph.batchDuration)))
+ RateEstimator.create(ssc.conf, context.graph.batchDuration)))
} else {
None
}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 04b2dc10d3..38730fecf3 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -48,7 +48,7 @@ class KafkaInputDStream[
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag](
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
useReliableReceiver: Boolean,
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 7c2f18cb35..116c170489 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -38,7 +38,7 @@ import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class MQTTInputDStream(
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
brokerUrl: String,
topic: String,
storageLevel: StorageLevel
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 7cf02d85d7..d7de74b350 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -39,7 +39,7 @@ import org.apache.spark.streaming.receiver.Receiver
*/
private[streaming]
class TwitterInputDStream(
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
twitterAuth: Option[Authorization],
filters: Seq[String],
storageLevel: StorageLevel
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index 4611a3ace2..ee7302a1ed 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -38,8 +38,8 @@ import org.apache.spark.graphx.impl.EdgeRDDImpl
* `impl.ReplicatedVertexView`.
*/
abstract class EdgeRDD[ED](
- @transient sc: SparkContext,
- @transient deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps) {
+ sc: SparkContext,
+ deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps) {
// scalastyle:off structural.type
private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])] forSome { type VD }
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index db73a8abc5..869caa340f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -46,7 +46,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* @note vertex ids are unique.
* @return an RDD containing the vertices in this graph
*/
- @transient val vertices: VertexRDD[VD]
+ val vertices: VertexRDD[VD]
/**
* An RDD containing the edges and their associated attributes. The entries in the RDD contain
@@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* along with their vertex data.
*
*/
- @transient val edges: EdgeRDD[ED]
+ val edges: EdgeRDD[ED]
/**
* An RDD containing the edge triplets, which are edges along with the vertex data associated with
@@ -77,7 +77,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* val numInvalid = graph.triplets.map(e => if (e.src.data == e.dst.data) 1 else 0).sum
* }}}
*/
- @transient val triplets: RDD[EdgeTriplet[VD, ED]]
+ val triplets: RDD[EdgeTriplet[VD, ED]]
/**
* Caches the vertices and edges associated with this graph at the specified storage level,
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index a9f04b559c..1ef7a78fbc 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -55,8 +55,8 @@ import org.apache.spark.graphx.impl.VertexRDDImpl
* @tparam VD the vertex attribute associated with each vertex in the set.
*/
abstract class VertexRDD[VD](
- @transient sc: SparkContext,
- @transient deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps) {
+ sc: SparkContext,
+ deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps) {
implicit protected def vdTag: ClassTag[VD]
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala
index 910eff9540..f8cea7ecea 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala
@@ -35,11 +35,11 @@ private[mllib] class RandomRDDPartition[T](override val index: Int,
}
// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue
-private[mllib] class RandomRDD[T: ClassTag](@transient sc: SparkContext,
+private[mllib] class RandomRDD[T: ClassTag](sc: SparkContext,
size: Long,
numPartitions: Int,
- @transient rng: RandomDataGenerator[T],
- @transient seed: Long = Utils.random.nextLong) extends RDD[T](sc, Nil) {
+ @transient private val rng: RandomDataGenerator[T],
+ @transient private val seed: Long = Utils.random.nextLong) extends RDD[T](sc, Nil) {
require(size > 0, "Positive RDD size required.")
require(numPartitions > 0, "Positive number of partitions required")
@@ -56,12 +56,12 @@ private[mllib] class RandomRDD[T: ClassTag](@transient sc: SparkContext,
}
}
-private[mllib] class RandomVectorRDD(@transient sc: SparkContext,
+private[mllib] class RandomVectorRDD(sc: SparkContext,
size: Long,
vectorSize: Int,
numPartitions: Int,
- @transient rng: RandomDataGenerator[Double],
- @transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) {
+ @transient private val rng: RandomDataGenerator[Double],
+ @transient private val seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) {
require(size > 0, "Positive RDD size required.")
require(numPartitions > 0, "Positive number of partitions required")
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index bf609ff0f6..33d262558b 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -118,5 +118,5 @@ object SparkILoop {
}
}
}
- def run(lines: List[String]): String = run(lines map (_ + "\n") mkString)
+ def run(lines: List[String]): String = run(lines.map(_ + "\n").mkString)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
index 48d02bb534..a09d5b6e3a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
@@ -255,7 +255,7 @@ object StringTranslate {
val dict = new HashMap[Character, Character]()
var i = 0
while (i < matching.length()) {
- val rep = if (i < replace.length()) replace.charAt(i) else '\0'
+ val rep = if (i < replace.length()) replace.charAt(i) else '\u0000'
if (null == dict.get(matching.charAt(i))) {
dict.put(matching.charAt(i), rep)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala
index e0667c6294..1d2d007c2b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala
@@ -126,7 +126,7 @@ protected[sql] object AnyDataType extends AbstractDataType {
*/
protected[sql] abstract class AtomicType extends DataType {
private[sql] type InternalType
- @transient private[sql] val tag: TypeTag[InternalType]
+ private[sql] val tag: TypeTag[InternalType]
private[sql] val ordering: Ordering[InternalType]
@transient private[sql] val classTag = ScalaReflectionLock.synchronized {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 879fd69863..9a573db0c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.SerializableConfiguration
private[sql] abstract class BaseWriterContainer(
@transient val relation: HadoopFsRelation,
- @transient job: Job,
+ @transient private val job: Job,
isAppend: Boolean)
extends SparkHadoopMapReduceUtil
with Logging
@@ -222,8 +222,8 @@ private[sql] abstract class BaseWriterContainer(
* A writer that writes all of the rows in a partition to a single file.
*/
private[sql] class DefaultWriterContainer(
- @transient relation: HadoopFsRelation,
- @transient job: Job,
+ relation: HadoopFsRelation,
+ job: Job,
isAppend: Boolean)
extends BaseWriterContainer(relation, job, isAppend) {
@@ -286,8 +286,8 @@ private[sql] class DefaultWriterContainer(
* writer externally sorts the remaining rows and then writes out them out one file at a time.
*/
private[sql] class DynamicPartitionWriterContainer(
- @transient relation: HadoopFsRelation,
- @transient job: Job,
+ relation: HadoopFsRelation,
+ job: Job,
partitionColumns: Seq[Attribute],
dataColumns: Seq[Attribute],
inputSchema: Seq[Attribute],
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index b8da0840ae..0a5569b0a4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -767,7 +767,7 @@ private[hive] case class InsertIntoHiveTable(
private[hive] case class MetastoreRelation
(databaseName: String, tableName: String, alias: Option[String])
(val table: HiveTable)
- (@transient sqlContext: SQLContext)
+ (@transient private val sqlContext: SQLContext)
extends LeafNode with MultiInstanceRelation with FileRelation {
override def equals(other: Any): Boolean = other match {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index dc35569085..e35468a624 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -54,10 +54,10 @@ private[hive] sealed trait TableReader {
*/
private[hive]
class HadoopTableReader(
- @transient attributes: Seq[Attribute],
- @transient relation: MetastoreRelation,
- @transient sc: HiveContext,
- @transient hiveExtraConf: HiveConf)
+ @transient private val attributes: Seq[Attribute],
+ @transient private val relation: MetastoreRelation,
+ @transient private val sc: HiveContext,
+ hiveExtraConf: HiveConf)
extends TableReader with Logging {
// Hadoop honors "mapred.map.tasks" as hint, but will ignore when mapred.job.tracker is "local".
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index c7651daffe..32bddbaeae 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -53,7 +53,7 @@ case class ScriptTransformation(
script: String,
output: Seq[Attribute],
child: SparkPlan,
- ioschema: HiveScriptIOSchema)(@transient sc: HiveContext)
+ ioschema: HiveScriptIOSchema)(@transient private val sc: HiveContext)
extends UnaryNode {
override def otherCopyArgs: Seq[HiveContext] = sc :: Nil
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 8dc796b056..29a6f08f40 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -45,7 +45,7 @@ import org.apache.spark.util.SerializableJobConf
* It is based on [[SparkHadoopWriter]].
*/
private[hive] class SparkHiveWriterContainer(
- @transient jobConf: JobConf,
+ jobConf: JobConf,
fileSinkConf: FileSinkDesc)
extends Logging
with SparkHadoopMapRedUtil
@@ -163,7 +163,7 @@ private[spark] object SparkHiveDynamicPartitionWriterContainer {
}
private[spark] class SparkHiveDynamicPartitionWriterContainer(
- @transient jobConf: JobConf,
+ jobConf: JobConf,
fileSinkConf: FileSinkDesc,
dynamicPartColNames: Array[String])
extends SparkHiveWriterContainer(jobConf, fileSinkConf) {
@@ -194,10 +194,10 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
// Better solution is to add a step similar to what Hive FileSinkOperator.jobCloseOp does:
// calling something like Utilities.mvFileToFinalPath to cleanup the output directory and then
// load it with loadDynamicPartitions/loadPartition/loadTable.
- val oldMarker = jobConf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)
- jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false)
+ val oldMarker = conf.value.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)
+ conf.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false)
super.commitJob()
- jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
+ conf.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
}
override def getLocalFileWriter(row: InternalRow, schema: StructType)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 27024ecfd9..8a6050f522 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -32,7 +32,7 @@ import org.apache.spark.streaming.scheduler.JobGenerator
private[streaming]
-class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
+class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
extends Logging with Serializable {
val master = ssc.sc.master
val framework = ssc.sc.appName
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
index 2c373640d2..dfc569451d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
@@ -170,7 +170,7 @@ private[python] object PythonDStream {
*/
private[python] abstract class PythonDStream(
parent: DStream[_],
- @transient pfunc: PythonTransformFunction)
+ pfunc: PythonTransformFunction)
extends DStream[Array[Byte]] (parent.ssc) {
val func = new TransformFunction(pfunc)
@@ -187,7 +187,7 @@ private[python] abstract class PythonDStream(
*/
private[python] class PythonTransformedDStream (
parent: DStream[_],
- @transient pfunc: PythonTransformFunction)
+ pfunc: PythonTransformFunction)
extends PythonDStream(parent, pfunc) {
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
@@ -206,7 +206,7 @@ private[python] class PythonTransformedDStream (
private[python] class PythonTransformed2DStream(
parent: DStream[_],
parent2: DStream[_],
- @transient pfunc: PythonTransformFunction)
+ pfunc: PythonTransformFunction)
extends DStream[Array[Byte]] (parent.ssc) {
val func = new TransformFunction(pfunc)
@@ -230,7 +230,7 @@ private[python] class PythonTransformed2DStream(
*/
private[python] class PythonStateDStream(
parent: DStream[Array[Byte]],
- @transient reduceFunc: PythonTransformFunction)
+ reduceFunc: PythonTransformFunction)
extends PythonDStream(parent, reduceFunc) {
super.persist(StorageLevel.MEMORY_ONLY)
@@ -252,8 +252,8 @@ private[python] class PythonStateDStream(
*/
private[python] class PythonReducedWindowedDStream(
parent: DStream[Array[Byte]],
- @transient preduceFunc: PythonTransformFunction,
- @transient pinvReduceFunc: PythonTransformFunction,
+ preduceFunc: PythonTransformFunction,
+ @transient private val pinvReduceFunc: PythonTransformFunction,
_windowDuration: Duration,
_slideDuration: Duration)
extends PythonDStream(parent, preduceFunc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index c358f5b5bd..40208a6486 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -70,7 +70,7 @@ import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Uti
*/
private[streaming]
class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
directory: String,
filter: Path => Boolean = FileInputDStream.defaultFilter,
newFilesOnly: Boolean = true,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index a6c4cd220e..95994c983c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.Utils
*
* @param ssc_ Streaming context that will execute this input stream
*/
-abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
+abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext)
extends DStream[T](ssc_) {
private[streaming] var lastValidTime: Time = null
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
index 186e1bf03a..002aac9f43 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PluggableInputDStream.scala
@@ -23,7 +23,7 @@ import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class PluggableInputDStream[T: ClassTag](
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
receiver: Receiver[T]) extends ReceiverInputDStream[T](ssc_) {
def getReceiver(): Receiver[T] = {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index bab78a3536..a2685046e0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -27,7 +27,7 @@ import org.apache.spark.streaming.{Time, StreamingContext}
private[streaming]
class QueueInputDStream[T: ClassTag](
- @transient ssc: StreamingContext,
+ ssc: StreamingContext,
val queue: Queue[RDD[T]],
oneAtATime: Boolean,
defaultRDD: RDD[T]
@@ -57,7 +57,7 @@ class QueueInputDStream[T: ClassTag](
if (oneAtATime) {
Some(buffer.head)
} else {
- Some(new UnionRDD(ssc.sc, buffer.toSeq))
+ Some(new UnionRDD(context.sc, buffer.toSeq))
}
} else if (defaultRDD != null) {
Some(defaultRDD)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
index e2925b9e03..5a9eda7c12 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala
@@ -39,7 +39,7 @@ import org.apache.spark.streaming.receiver.Receiver
*/
private[streaming]
class RawInputDStream[T: ClassTag](
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
host: String,
port: Int,
storageLevel: StorageLevel
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 6c139f32da..87c20afd5c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -38,7 +38,7 @@ import org.apache.spark.streaming.{StreamingContext, Time}
* @param ssc_ Streaming context that will execute this input stream
* @tparam T Class type of the object of this stream
*/
-abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
+abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index 5ce5b7aae6..de84e0c9a4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -32,7 +32,7 @@ import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class SocketInputDStream[T: ClassTag](
- @transient ssc_ : StreamingContext,
+ ssc_ : StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index e081ffe46f..f811784b25 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -61,7 +61,7 @@ class WriteAheadLogBackedBlockRDDPartition(
*
*
* @param sc SparkContext
- * @param blockIds Ids of the blocks that contains this RDD's data
+ * @param _blockIds Ids of the blocks that contains this RDD's data
* @param walRecordHandles Record handles in write ahead logs that contain this RDD's data
* @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
* executors). If not, then block lookups by the block ids will be skipped.
@@ -73,23 +73,23 @@ class WriteAheadLogBackedBlockRDDPartition(
*/
private[streaming]
class WriteAheadLogBackedBlockRDD[T: ClassTag](
- @transient sc: SparkContext,
- @transient blockIds: Array[BlockId],
+ sc: SparkContext,
+ @transient private val _blockIds: Array[BlockId],
@transient val walRecordHandles: Array[WriteAheadLogRecordHandle],
- @transient isBlockIdValid: Array[Boolean] = Array.empty,
+ @transient private val isBlockIdValid: Array[Boolean] = Array.empty,
storeInBlockManager: Boolean = false,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER)
- extends BlockRDD[T](sc, blockIds) {
+ extends BlockRDD[T](sc, _blockIds) {
require(
- blockIds.length == walRecordHandles.length,
- s"Number of block Ids (${blockIds.length}) must be " +
+ _blockIds.length == walRecordHandles.length,
+ s"Number of block Ids (${_blockIds.length}) must be " +
s" same as number of WAL record handles (${walRecordHandles.length})")
require(
- isBlockIdValid.isEmpty || isBlockIdValid.length == blockIds.length,
+ isBlockIdValid.isEmpty || isBlockIdValid.length == _blockIds.length,
s"Number of elements in isBlockIdValid (${isBlockIdValid.length}) must be " +
- s" same as number of block Ids (${blockIds.length})")
+ s" same as number of block Ids (${_blockIds.length})")
// Hadoop configuration is not serializable, so broadcast it as a serializable.
@transient private val hadoopConfig = sc.hadoopConfiguration
@@ -99,9 +99,9 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
override def getPartitions: Array[Partition] = {
assertValid()
- Array.tabulate(blockIds.length) { i =>
+ Array.tabulate(_blockIds.length) { i =>
val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i)
- new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), isValid, walRecordHandles(i))
+ new WriteAheadLogBackedBlockRDDPartition(i, _blockIds(i), isValid, walRecordHandles(i))
}
}