aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulators.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/Dependency.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/Partitioner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/input/PortableDataStream.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala46
32 files changed, 93 insertions, 86 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index c39c8667d0..5592b75afb 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -47,7 +47,7 @@ import org.apache.spark.util.Utils
* @tparam T partial data that can be added in
*/
class Accumulable[R, T] private[spark] (
- @transient initialValue: R,
+ initialValue: R,
param: AccumulableParam[R, T],
val name: Option[String],
internal: Boolean)
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala
index fc8cdde934..cfeeb3902c 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -66,7 +66,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
*/
@DeveloperApi
class ShuffleDependency[K, V, C](
- @transient _rdd: RDD[_ <: Product2[K, V]],
+ @transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 29e581bb57..e4df7af81a 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -104,8 +104,8 @@ class HashPartitioner(partitions: Int) extends Partitioner {
* the value of `partitions`.
*/
class RangePartitioner[K : Ordering : ClassTag, V](
- @transient partitions: Int,
- @transient rdd: RDD[_ <: Product2[K, V]],
+ partitions: Int,
+ rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index f5dd36cbcf..ae5926dd53 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -37,7 +37,7 @@ import org.apache.spark.util.SerializableJobConf
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
private[spark]
-class SparkHadoopWriter(@transient jobConf: JobConf)
+class SparkHadoopWriter(jobConf: JobConf)
extends Logging
with SparkHadoopMapRedUtil
with Serializable {
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index b4d152b336..69da180593 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -41,7 +41,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
import scala.util.control.NonFatal
private[spark] class PythonRDD(
- @transient parent: RDD[_],
+ parent: RDD[_],
command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
@@ -785,7 +785,7 @@ class BytesToString extends org.apache.spark.api.java.function.Function[Array[By
* Internal class that acts as an `AccumulatorParam` for Python accumulators. Inside, it
* collects a list of pickled strings that we pass to Python through a socket.
*/
-private class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
+private class PythonAccumulatorParam(@transient private val serverHost: String, serverPort: Int)
extends AccumulatorParam[JList[Array[Byte]]] {
Utils.checkHost(serverHost, "Expected hostname")
diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index a5ad47293f..e2ffc3b64e 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -131,8 +131,8 @@ private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDat
*/
@Experimental
class PortableDataStream(
- @transient isplit: CombineFileSplit,
- @transient context: TaskAttemptContext,
+ isplit: CombineFileSplit,
+ context: TaskAttemptContext,
index: Integer)
extends Serializable {
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 4b851bcb36..70a42f9045 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -137,7 +137,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
new RpcResponseCallback {
override def onSuccess(response: Array[Byte]): Unit = {
logTrace(s"Successfully uploaded block $blockId")
- result.success()
+ result.success((): Unit)
}
override def onFailure(e: Throwable): Unit = {
logError(s"Error while uploading block $blockId", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
index 1f755db485..6fec00dcd0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
@@ -28,7 +28,7 @@ private[spark] class BinaryFileRDD[T](
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
keyClass: Class[String],
valueClass: Class[T],
- @transient conf: Configuration,
+ conf: Configuration,
minPartitions: Int)
extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) {
@@ -36,10 +36,10 @@ private[spark] class BinaryFileRDD[T](
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
- configurable.setConf(conf)
+ configurable.setConf(getConf)
case _ =>
}
- val jobContext = newJobContext(conf, jobId)
+ val jobContext = newJobContext(getConf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index 9220302637..fc1710fbad 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -28,7 +28,7 @@ private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends P
}
private[spark]
-class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds: Array[BlockId])
+class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId])
extends RDD[T](sc, Nil) {
@transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
@@ -64,7 +64,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
*/
private[spark] def removeBlocks() {
blockIds.foreach { blockId =>
- sc.env.blockManager.master.removeBlock(blockId)
+ sparkContext.env.blockManager.master.removeBlock(blockId)
}
_isValid = false
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
index c1d6971787..18e8cddbc4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
@@ -27,8 +27,8 @@ import org.apache.spark.util.Utils
private[spark]
class CartesianPartition(
idx: Int,
- @transient rdd1: RDD[_],
- @transient rdd2: RDD[_],
+ @transient private val rdd1: RDD[_],
+ @transient private val rdd2: RDD[_],
s1Index: Int,
s2Index: Int
) extends Partition {
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 72fe215dae..b0364623af 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -29,7 +29,7 @@ private[spark] class CheckpointRDDPartition(val index: Int) extends Partition
/**
* An RDD that recovers checkpointed data from storage.
*/
-private[spark] abstract class CheckpointRDD[T: ClassTag](@transient sc: SparkContext)
+private[spark] abstract class CheckpointRDD[T: ClassTag](sc: SparkContext)
extends RDD[T](sc, Nil) {
// CheckpointRDD should not be checkpointed again
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index e1f8719eea..8f2655d63b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -51,7 +51,7 @@ import org.apache.spark.storage.StorageLevel
/**
* A Spark split class that wraps around a Hadoop InputSplit.
*/
-private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSplit)
+private[spark] class HadoopPartition(rddId: Int, idx: Int, s: InputSplit)
extends Partition {
val inputSplit = new SerializableWritable[InputSplit](s)
@@ -99,7 +99,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
*/
@DeveloperApi
class HadoopRDD[K, V](
- @transient sc: SparkContext,
+ sc: SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
@@ -109,7 +109,7 @@ class HadoopRDD[K, V](
extends RDD[(K, V)](sc, Nil) with Logging {
if (initLocalJobConfFuncOpt.isDefined) {
- sc.clean(initLocalJobConfFuncOpt.get)
+ sparkContext.clean(initLocalJobConfFuncOpt.get)
}
def this(
@@ -137,7 +137,7 @@ class HadoopRDD[K, V](
// used to build JobTracker ID
private val createTime = new Date()
- private val shouldCloneJobConf = sc.conf.getBoolean("spark.hadoop.cloneConf", false)
+ private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
index daa5779d68..bfe19195fc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
@@ -35,7 +35,7 @@ import org.apache.spark.storage.RDDBlockId
* @param numPartitions the number of partitions in the checkpointed RDD
*/
private[spark] class LocalCheckpointRDD[T: ClassTag](
- @transient sc: SparkContext,
+ sc: SparkContext,
rddId: Int,
numPartitions: Int)
extends CheckpointRDD[T](sc) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
index d6fad89684..c115e0ff74 100644
--- a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.Utils
* is written to the local, ephemeral block storage that lives in each executor. This is useful
* for use cases where RDDs build up long lineages that need to be truncated often (e.g. GraphX).
*/
-private[spark] class LocalRDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
+private[spark] class LocalRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends RDDCheckpointData[T](rdd) with Logging {
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 6a9c004d65..174979aaeb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -40,7 +40,7 @@ import org.apache.spark.storage.StorageLevel
private[spark] class NewHadoopPartition(
rddId: Int,
val index: Int,
- @transient rawSplit: InputSplit with Writable)
+ rawSplit: InputSplit with Writable)
extends Partition {
val serializableHadoopSplit = new SerializableWritable(rawSplit)
@@ -68,14 +68,14 @@ class NewHadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- @transient conf: Configuration)
+ @transient private val _conf: Configuration)
extends RDD[(K, V)](sc, Nil)
with SparkHadoopMapReduceUtil
with Logging {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
- private val confBroadcast = sc.broadcast(new SerializableConfiguration(conf))
- // private val serializableConf = new SerializableWritable(conf)
+ private val confBroadcast = sc.broadcast(new SerializableConfiguration(_conf))
+ // private val serializableConf = new SerializableWritable(_conf)
private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
@@ -88,10 +88,10 @@ class NewHadoopRDD[K, V](
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
- configurable.setConf(conf)
+ configurable.setConf(_conf)
case _ =>
}
- val jobContext = newJobContext(conf, jobId)
+ val jobContext = newJobContext(_conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
@@ -262,7 +262,7 @@ private[spark] class WholeTextFileRDD(
inputFormatClass: Class[_ <: WholeTextFileInputFormat],
keyClass: Class[String],
valueClass: Class[String],
- @transient conf: Configuration,
+ conf: Configuration,
minPartitions: Int)
extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {
@@ -270,10 +270,10 @@ private[spark] class WholeTextFileRDD(
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
- configurable.setConf(conf)
+ configurable.setConf(getConf)
case _ =>
}
- val jobContext = newJobContext(conf, jobId)
+ val jobContext = newJobContext(getConf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index e2394e28f8..582fa93afe 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -83,8 +83,8 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
}
private[spark] class ParallelCollectionRDD[T: ClassTag](
- @transient sc: SparkContext,
- @transient data: Seq[T],
+ sc: SparkContext,
+ @transient private val data: Seq[T],
numSlices: Int,
locationPrefs: Map[Int, Seq[String]])
extends RDD[T](sc, Nil) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
index a00f4c1cdf..d6a37e8cc5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
@@ -32,7 +32,7 @@ private[spark] class PartitionPruningRDDPartition(idx: Int, val parentSplit: Par
* Represents a dependency between the PartitionPruningRDD and its parent. In this
* case, the child RDD contains a subset of partitions of the parents'.
*/
-private[spark] class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
+private[spark] class PruneDependency[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean)
extends NarrowDependency[T](rdd) {
@transient
@@ -55,8 +55,8 @@ private[spark] class PruneDependency[T](rdd: RDD[T], @transient partitionFilterF
*/
@DeveloperApi
class PartitionPruningRDD[T: ClassTag](
- @transient prev: RDD[T],
- @transient partitionFilterFunc: Int => Boolean)
+ prev: RDD[T],
+ partitionFilterFunc: Int => Boolean)
extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) {
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala
index a637d6f15b..3b1acacf40 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala
@@ -47,8 +47,8 @@ class PartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long)
private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
prev: RDD[T],
sampler: RandomSampler[T, U],
- @transient preservesPartitioning: Boolean,
- @transient seed: Long = Utils.random.nextLong)
+ preservesPartitioning: Boolean,
+ @transient private val seed: Long = Utils.random.nextLong)
extends RDD[U](prev) {
@transient override val partitioner = if (preservesPartitioning) prev.partitioner else None
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index 0e43520870..429514b4f6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -36,7 +36,7 @@ private[spark] object CheckpointState extends Enumeration {
* as well as, manages the post-checkpoint state by providing the updated partitions,
* iterator and preferred locations of the checkpointed RDD.
*/
-private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
+private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends Serializable {
import CheckpointState._
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index 35d8b0bfd1..1c3b5da19c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -32,7 +32,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
* An RDD that reads from checkpoint files previously written to reliable storage.
*/
private[spark] class ReliableCheckpointRDD[T: ClassTag](
- @transient sc: SparkContext,
+ sc: SparkContext,
val checkpointPath: String)
extends CheckpointRDD[T](sc) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
index 1df8eef5ff..e9f6060301 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.SerializableConfiguration
* An implementation of checkpointing that writes the RDD data to reliable storage.
* This allows drivers to be restarted on failure with previously computed state.
*/
-private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
+private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends RDDCheckpointData[T](rdd) with Logging {
// The directory to which the associated RDD has been checkpointed to
diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
index fa3fecc80c..9babe56267 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, Ut
private[spark] class SqlNewHadoopPartition(
rddId: Int,
val index: Int,
- @transient rawSplit: InputSplit with Writable)
+ rawSplit: InputSplit with Writable)
extends SparkPartition {
val serializableHadoopSplit = new SerializableWritable(rawSplit)
@@ -61,9 +61,9 @@ private[spark] class SqlNewHadoopPartition(
* changes based on [[org.apache.spark.rdd.HadoopRDD]].
*/
private[spark] class SqlNewHadoopRDD[V: ClassTag](
- @transient sc : SparkContext,
+ sc : SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
- @transient initDriverSideJobFuncOpt: Option[Job => Unit],
+ @transient private val initDriverSideJobFuncOpt: Option[Job => Unit],
initLocalJobFuncOpt: Option[Job => Unit],
inputFormatClass: Class[_ <: InputFormat[Void, V]],
valueClass: Class[V])
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index 3986645350..66cf4369da 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -37,9 +37,9 @@ import org.apache.spark.util.Utils
*/
private[spark] class UnionPartition[T: ClassTag](
idx: Int,
- @transient rdd: RDD[T],
+ @transient private val rdd: RDD[T],
val parentRddIndex: Int,
- @transient parentRddPartitionIndex: Int)
+ @transient private val parentRddPartitionIndex: Int)
extends Partition {
var parentPartition: Partition = rdd.partitions(parentRddPartitionIndex)
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index b3c64394ab..70bf04de64 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -26,7 +26,7 @@ import org.apache.spark.util.Utils
private[spark] class ZippedPartitionsPartition(
idx: Int,
- @transient rdds: Seq[RDD[_]],
+ @transient private val rdds: Seq[RDD[_]],
@transient val preferredLocations: Seq[String])
extends Partition {
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
index e277ae28d5..32931d59ac 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -37,7 +37,7 @@ class ZippedWithIndexRDDPartition(val prev: Partition, val startIndex: Long)
* @tparam T parent RDD item type
*/
private[spark]
-class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, Long)](prev) {
+class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) {
/** The start index of each partition. */
@transient private val startIndices: Array[Long] = {
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
index 7409ac8859..f25710bb5b 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
@@ -26,7 +26,7 @@ import org.apache.spark.{SparkException, Logging, SparkConf}
/**
* A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.
*/
-private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
+private[spark] abstract class RpcEndpointRef(conf: SparkConf)
extends Serializable with Logging {
private[this] val maxRetries = RpcUtils.numRetries(conf)
diff --git a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
index fc17542abf..ad67e1c5ad 100644
--- a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
@@ -87,9 +87,9 @@ private[spark] class AkkaRpcEnv private[akka] (
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
@volatile var endpointRef: AkkaRpcEndpointRef = null
- // Use lazy because the Actor needs to use `endpointRef`.
+ // Use defered function because the Actor needs to use `endpointRef`.
// So `actorRef` should be created after assigning `endpointRef`.
- lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging {
+ val actorRef = () => actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging {
assert(endpointRef != null)
@@ -272,13 +272,20 @@ private[akka] class ErrorMonitor extends Actor with ActorLogReceive with Logging
}
private[akka] class AkkaRpcEndpointRef(
- @transient defaultAddress: RpcAddress,
- @transient _actorRef: => ActorRef,
- @transient conf: SparkConf,
- @transient initInConstructor: Boolean = true)
+ @transient private val defaultAddress: RpcAddress,
+ @transient private val _actorRef: () => ActorRef,
+ conf: SparkConf,
+ initInConstructor: Boolean)
extends RpcEndpointRef(conf) with Logging {
- lazy val actorRef = _actorRef
+ def this(
+ defaultAddress: RpcAddress,
+ _actorRef: ActorRef,
+ conf: SparkConf) = {
+ this(defaultAddress, () => _actorRef, conf, true)
+ }
+
+ lazy val actorRef = _actorRef()
override lazy val address: RpcAddress = {
val akkaAddress = actorRef.path.address
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index c4dc080e2b..fb693721a9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -44,7 +44,7 @@ private[spark] class ResultTask[T, U](
stageAttemptId: Int,
taskBinary: Broadcast[Array[Byte]],
partition: Partition,
- @transient locs: Seq[TaskLocation],
+ locs: Seq[TaskLocation],
val outputId: Int,
internalAccumulators: Seq[Accumulator[Long]])
extends Task[U](stageId, stageAttemptId, partition.index, internalAccumulators)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
index df7bbd6424..75f22f642b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
@@ -159,7 +159,7 @@ private[spark] class UnsafeShuffleManager(conf: SparkConf) extends ShuffleManage
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
handle match {
- case unsafeShuffleHandle: UnsafeShuffleHandle[K, V] =>
+ case unsafeShuffleHandle: UnsafeShuffleHandle[K @unchecked, V @unchecked] =>
numMapsForShufflesThatUsedNewPath.putIfAbsent(handle.shuffleId, unsafeShuffleHandle.numMaps)
val env = SparkEnv.get
new UnsafeShuffleWriter(
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 150d82b393..1b49dca9dc 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -94,7 +94,7 @@ private[spark] object ClosureCleaner extends Logging {
if (cls.isPrimitive) {
cls match {
case java.lang.Boolean.TYPE => new java.lang.Boolean(false)
- case java.lang.Character.TYPE => new java.lang.Character('\0')
+ case java.lang.Character.TYPE => new java.lang.Character('\u0000')
case java.lang.Void.TYPE =>
// This should not happen because `Foo(void x) {}` does not compile.
throw new IllegalStateException("Unexpected void parameter in constructor")
diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index 61ff9b89ec..db4a8b304e 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -217,7 +217,9 @@ private [util] class SparkShutdownHookManager {
}
Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {
case Success(shmClass) =>
- val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
+ val fsPriority = classOf[FileSystem]
+ .getField("SHUTDOWN_HOOK_PRIORITY")
+ .get(null) // static field, the value is not used
.asInstanceOf[Int]
val shm = shmClass.getMethod("get").invoke(null)
shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
index 7138b4b8e4..1e8476c4a0 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
@@ -79,32 +79,30 @@ private[spark] class RollingFileAppender(
val rolloverSuffix = rollingPolicy.generateRolledOverFileSuffix()
val rolloverFile = new File(
activeFile.getParentFile, activeFile.getName + rolloverSuffix).getAbsoluteFile
- try {
- logDebug(s"Attempting to rollover file $activeFile to file $rolloverFile")
- if (activeFile.exists) {
- if (!rolloverFile.exists) {
- Files.move(activeFile, rolloverFile)
- logInfo(s"Rolled over $activeFile to $rolloverFile")
- } else {
- // In case the rollover file name clashes, make a unique file name.
- // The resultant file names are long and ugly, so this is used only
- // if there is a name collision. This can be avoided by the using
- // the right pattern such that name collisions do not occur.
- var i = 0
- var altRolloverFile: File = null
- do {
- altRolloverFile = new File(activeFile.getParent,
- s"${activeFile.getName}$rolloverSuffix--$i").getAbsoluteFile
- i += 1
- } while (i < 10000 && altRolloverFile.exists)
-
- logWarning(s"Rollover file $rolloverFile already exists, " +
- s"rolled over $activeFile to file $altRolloverFile")
- Files.move(activeFile, altRolloverFile)
- }
+ logDebug(s"Attempting to rollover file $activeFile to file $rolloverFile")
+ if (activeFile.exists) {
+ if (!rolloverFile.exists) {
+ Files.move(activeFile, rolloverFile)
+ logInfo(s"Rolled over $activeFile to $rolloverFile")
} else {
- logWarning(s"File $activeFile does not exist")
+ // In case the rollover file name clashes, make a unique file name.
+ // The resultant file names are long and ugly, so this is used only
+ // if there is a name collision. This can be avoided by the using
+ // the right pattern such that name collisions do not occur.
+ var i = 0
+ var altRolloverFile: File = null
+ do {
+ altRolloverFile = new File(activeFile.getParent,
+ s"${activeFile.getName}$rolloverSuffix--$i").getAbsoluteFile
+ i += 1
+ } while (i < 10000 && altRolloverFile.exists)
+
+ logWarning(s"Rollover file $rolloverFile already exists, " +
+ s"rolled over $activeFile to file $altRolloverFile")
+ Files.move(activeFile, altRolloverFile)
}
+ } else {
+ logWarning(s"File $activeFile does not exist")
}
}