aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorLuc Bourlier <luc.bourlier@typesafe.com>2015-09-09 09:57:58 +0100
committerSean Owen <sowen@cloudera.com>2015-09-09 09:57:58 +0100
commitc1bc4f439f54625c01a585691e5293cd9961eb0c (patch)
tree4b3688eae83147aa50d2a55524f8eabfaae242d0 /core
parent91a577d2778ab5946f0c40cb80c89de24e3d10e8 (diff)
downloadspark-c1bc4f439f54625c01a585691e5293cd9961eb0c.tar.gz
spark-c1bc4f439f54625c01a585691e5293cd9961eb0c.tar.bz2
spark-c1bc4f439f54625c01a585691e5293cd9961eb0c.zip
[SPARK-10227] fatal warnings with sbt on Scala 2.11
The bulk of the changes are on `transient` annotation on class parameter. Often the compiler doesn't generate a field for this parameters, so the the transient annotation would be unnecessary. But if the class parameter are used in methods, then fields are created. So it is safer to keep the annotations. The remainder are some potential bugs, and deprecated syntax. Author: Luc Bourlier <luc.bourlier@typesafe.com> Closes #8433 from skyluc/issue/sbt-2.11.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulators.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/Dependency.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/Partitioner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/input/PortableDataStream.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala46
32 files changed, 93 insertions, 86 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index c39c8667d0..5592b75afb 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -47,7 +47,7 @@ import org.apache.spark.util.Utils
* @tparam T partial data that can be added in
*/
class Accumulable[R, T] private[spark] (
- @transient initialValue: R,
+ initialValue: R,
param: AccumulableParam[R, T],
val name: Option[String],
internal: Boolean)
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala
index fc8cdde934..cfeeb3902c 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -66,7 +66,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
*/
@DeveloperApi
class ShuffleDependency[K, V, C](
- @transient _rdd: RDD[_ <: Product2[K, V]],
+ @transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 29e581bb57..e4df7af81a 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -104,8 +104,8 @@ class HashPartitioner(partitions: Int) extends Partitioner {
* the value of `partitions`.
*/
class RangePartitioner[K : Ordering : ClassTag, V](
- @transient partitions: Int,
- @transient rdd: RDD[_ <: Product2[K, V]],
+ partitions: Int,
+ rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
extends Partitioner {
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index f5dd36cbcf..ae5926dd53 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -37,7 +37,7 @@ import org.apache.spark.util.SerializableJobConf
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
private[spark]
-class SparkHadoopWriter(@transient jobConf: JobConf)
+class SparkHadoopWriter(jobConf: JobConf)
extends Logging
with SparkHadoopMapRedUtil
with Serializable {
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index b4d152b336..69da180593 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -41,7 +41,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
import scala.util.control.NonFatal
private[spark] class PythonRDD(
- @transient parent: RDD[_],
+ parent: RDD[_],
command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
@@ -785,7 +785,7 @@ class BytesToString extends org.apache.spark.api.java.function.Function[Array[By
* Internal class that acts as an `AccumulatorParam` for Python accumulators. Inside, it
* collects a list of pickled strings that we pass to Python through a socket.
*/
-private class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
+private class PythonAccumulatorParam(@transient private val serverHost: String, serverPort: Int)
extends AccumulatorParam[JList[Array[Byte]]] {
Utils.checkHost(serverHost, "Expected hostname")
diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index a5ad47293f..e2ffc3b64e 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -131,8 +131,8 @@ private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDat
*/
@Experimental
class PortableDataStream(
- @transient isplit: CombineFileSplit,
- @transient context: TaskAttemptContext,
+ isplit: CombineFileSplit,
+ context: TaskAttemptContext,
index: Integer)
extends Serializable {
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 4b851bcb36..70a42f9045 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -137,7 +137,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
new RpcResponseCallback {
override def onSuccess(response: Array[Byte]): Unit = {
logTrace(s"Successfully uploaded block $blockId")
- result.success()
+ result.success((): Unit)
}
override def onFailure(e: Throwable): Unit = {
logError(s"Error while uploading block $blockId", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
index 1f755db485..6fec00dcd0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
@@ -28,7 +28,7 @@ private[spark] class BinaryFileRDD[T](
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
keyClass: Class[String],
valueClass: Class[T],
- @transient conf: Configuration,
+ conf: Configuration,
minPartitions: Int)
extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) {
@@ -36,10 +36,10 @@ private[spark] class BinaryFileRDD[T](
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
- configurable.setConf(conf)
+ configurable.setConf(getConf)
case _ =>
}
- val jobContext = newJobContext(conf, jobId)
+ val jobContext = newJobContext(getConf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index 9220302637..fc1710fbad 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -28,7 +28,7 @@ private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends P
}
private[spark]
-class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds: Array[BlockId])
+class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId])
extends RDD[T](sc, Nil) {
@transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
@@ -64,7 +64,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
*/
private[spark] def removeBlocks() {
blockIds.foreach { blockId =>
- sc.env.blockManager.master.removeBlock(blockId)
+ sparkContext.env.blockManager.master.removeBlock(blockId)
}
_isValid = false
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
index c1d6971787..18e8cddbc4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
@@ -27,8 +27,8 @@ import org.apache.spark.util.Utils
private[spark]
class CartesianPartition(
idx: Int,
- @transient rdd1: RDD[_],
- @transient rdd2: RDD[_],
+ @transient private val rdd1: RDD[_],
+ @transient private val rdd2: RDD[_],
s1Index: Int,
s2Index: Int
) extends Partition {
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 72fe215dae..b0364623af 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -29,7 +29,7 @@ private[spark] class CheckpointRDDPartition(val index: Int) extends Partition
/**
* An RDD that recovers checkpointed data from storage.
*/
-private[spark] abstract class CheckpointRDD[T: ClassTag](@transient sc: SparkContext)
+private[spark] abstract class CheckpointRDD[T: ClassTag](sc: SparkContext)
extends RDD[T](sc, Nil) {
// CheckpointRDD should not be checkpointed again
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index e1f8719eea..8f2655d63b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -51,7 +51,7 @@ import org.apache.spark.storage.StorageLevel
/**
* A Spark split class that wraps around a Hadoop InputSplit.
*/
-private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSplit)
+private[spark] class HadoopPartition(rddId: Int, idx: Int, s: InputSplit)
extends Partition {
val inputSplit = new SerializableWritable[InputSplit](s)
@@ -99,7 +99,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
*/
@DeveloperApi
class HadoopRDD[K, V](
- @transient sc: SparkContext,
+ sc: SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
@@ -109,7 +109,7 @@ class HadoopRDD[K, V](
extends RDD[(K, V)](sc, Nil) with Logging {
if (initLocalJobConfFuncOpt.isDefined) {
- sc.clean(initLocalJobConfFuncOpt.get)
+ sparkContext.clean(initLocalJobConfFuncOpt.get)
}
def this(
@@ -137,7 +137,7 @@ class HadoopRDD[K, V](
// used to build JobTracker ID
private val createTime = new Date()
- private val shouldCloneJobConf = sc.conf.getBoolean("spark.hadoop.cloneConf", false)
+ private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
index daa5779d68..bfe19195fc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/LocalCheckpointRDD.scala
@@ -35,7 +35,7 @@ import org.apache.spark.storage.RDDBlockId
* @param numPartitions the number of partitions in the checkpointed RDD
*/
private[spark] class LocalCheckpointRDD[T: ClassTag](
- @transient sc: SparkContext,
+ sc: SparkContext,
rddId: Int,
numPartitions: Int)
extends CheckpointRDD[T](sc) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
index d6fad89684..c115e0ff74 100644
--- a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.Utils
* is written to the local, ephemeral block storage that lives in each executor. This is useful
* for use cases where RDDs build up long lineages that need to be truncated often (e.g. GraphX).
*/
-private[spark] class LocalRDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
+private[spark] class LocalRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends RDDCheckpointData[T](rdd) with Logging {
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 6a9c004d65..174979aaeb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -40,7 +40,7 @@ import org.apache.spark.storage.StorageLevel
private[spark] class NewHadoopPartition(
rddId: Int,
val index: Int,
- @transient rawSplit: InputSplit with Writable)
+ rawSplit: InputSplit with Writable)
extends Partition {
val serializableHadoopSplit = new SerializableWritable(rawSplit)
@@ -68,14 +68,14 @@ class NewHadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- @transient conf: Configuration)
+ @transient private val _conf: Configuration)
extends RDD[(K, V)](sc, Nil)
with SparkHadoopMapReduceUtil
with Logging {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
- private val confBroadcast = sc.broadcast(new SerializableConfiguration(conf))
- // private val serializableConf = new SerializableWritable(conf)
+ private val confBroadcast = sc.broadcast(new SerializableConfiguration(_conf))
+ // private val serializableConf = new SerializableWritable(_conf)
private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
@@ -88,10 +88,10 @@ class NewHadoopRDD[K, V](
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
- configurable.setConf(conf)
+ configurable.setConf(_conf)
case _ =>
}
- val jobContext = newJobContext(conf, jobId)
+ val jobContext = newJobContext(_conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
for (i <- 0 until rawSplits.size) {
@@ -262,7 +262,7 @@ private[spark] class WholeTextFileRDD(
inputFormatClass: Class[_ <: WholeTextFileInputFormat],
keyClass: Class[String],
valueClass: Class[String],
- @transient conf: Configuration,
+ conf: Configuration,
minPartitions: Int)
extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {
@@ -270,10 +270,10 @@ private[spark] class WholeTextFileRDD(
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
- configurable.setConf(conf)
+ configurable.setConf(getConf)
case _ =>
}
- val jobContext = newJobContext(conf, jobId)
+ val jobContext = newJobContext(getConf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index e2394e28f8..582fa93afe 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -83,8 +83,8 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
}
private[spark] class ParallelCollectionRDD[T: ClassTag](
- @transient sc: SparkContext,
- @transient data: Seq[T],
+ sc: SparkContext,
+ @transient private val data: Seq[T],
numSlices: Int,
locationPrefs: Map[Int, Seq[String]])
extends RDD[T](sc, Nil) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
index a00f4c1cdf..d6a37e8cc5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
@@ -32,7 +32,7 @@ private[spark] class PartitionPruningRDDPartition(idx: Int, val parentSplit: Par
* Represents a dependency between the PartitionPruningRDD and its parent. In this
* case, the child RDD contains a subset of partitions of the parents'.
*/
-private[spark] class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
+private[spark] class PruneDependency[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean)
extends NarrowDependency[T](rdd) {
@transient
@@ -55,8 +55,8 @@ private[spark] class PruneDependency[T](rdd: RDD[T], @transient partitionFilterF
*/
@DeveloperApi
class PartitionPruningRDD[T: ClassTag](
- @transient prev: RDD[T],
- @transient partitionFilterFunc: Int => Boolean)
+ prev: RDD[T],
+ partitionFilterFunc: Int => Boolean)
extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) {
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala
index a637d6f15b..3b1acacf40 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionwiseSampledRDD.scala
@@ -47,8 +47,8 @@ class PartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long)
private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
prev: RDD[T],
sampler: RandomSampler[T, U],
- @transient preservesPartitioning: Boolean,
- @transient seed: Long = Utils.random.nextLong)
+ preservesPartitioning: Boolean,
+ @transient private val seed: Long = Utils.random.nextLong)
extends RDD[U](prev) {
@transient override val partitioner = if (preservesPartitioning) prev.partitioner else None
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index 0e43520870..429514b4f6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -36,7 +36,7 @@ private[spark] object CheckpointState extends Enumeration {
* as well as, manages the post-checkpoint state by providing the updated partitions,
* iterator and preferred locations of the checkpointed RDD.
*/
-private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
+private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends Serializable {
import CheckpointState._
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index 35d8b0bfd1..1c3b5da19c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -32,7 +32,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
* An RDD that reads from checkpoint files previously written to reliable storage.
*/
private[spark] class ReliableCheckpointRDD[T: ClassTag](
- @transient sc: SparkContext,
+ sc: SparkContext,
val checkpointPath: String)
extends CheckpointRDD[T](sc) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
index 1df8eef5ff..e9f6060301 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
@@ -28,7 +28,7 @@ import org.apache.spark.util.SerializableConfiguration
* An implementation of checkpointing that writes the RDD data to reliable storage.
* This allows drivers to be restarted on failure with previously computed state.
*/
-private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
+private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
extends RDDCheckpointData[T](rdd) with Logging {
// The directory to which the associated RDD has been checkpointed to
diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
index fa3fecc80c..9babe56267 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, Ut
private[spark] class SqlNewHadoopPartition(
rddId: Int,
val index: Int,
- @transient rawSplit: InputSplit with Writable)
+ rawSplit: InputSplit with Writable)
extends SparkPartition {
val serializableHadoopSplit = new SerializableWritable(rawSplit)
@@ -61,9 +61,9 @@ private[spark] class SqlNewHadoopPartition(
* changes based on [[org.apache.spark.rdd.HadoopRDD]].
*/
private[spark] class SqlNewHadoopRDD[V: ClassTag](
- @transient sc : SparkContext,
+ sc : SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
- @transient initDriverSideJobFuncOpt: Option[Job => Unit],
+ @transient private val initDriverSideJobFuncOpt: Option[Job => Unit],
initLocalJobFuncOpt: Option[Job => Unit],
inputFormatClass: Class[_ <: InputFormat[Void, V]],
valueClass: Class[V])
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index 3986645350..66cf4369da 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -37,9 +37,9 @@ import org.apache.spark.util.Utils
*/
private[spark] class UnionPartition[T: ClassTag](
idx: Int,
- @transient rdd: RDD[T],
+ @transient private val rdd: RDD[T],
val parentRddIndex: Int,
- @transient parentRddPartitionIndex: Int)
+ @transient private val parentRddPartitionIndex: Int)
extends Partition {
var parentPartition: Partition = rdd.partitions(parentRddPartitionIndex)
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index b3c64394ab..70bf04de64 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -26,7 +26,7 @@ import org.apache.spark.util.Utils
private[spark] class ZippedPartitionsPartition(
idx: Int,
- @transient rdds: Seq[RDD[_]],
+ @transient private val rdds: Seq[RDD[_]],
@transient val preferredLocations: Seq[String])
extends Partition {
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
index e277ae28d5..32931d59ac 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -37,7 +37,7 @@ class ZippedWithIndexRDDPartition(val prev: Partition, val startIndex: Long)
* @tparam T parent RDD item type
*/
private[spark]
-class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, Long)](prev) {
+class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev) {
/** The start index of each partition. */
@transient private val startIndices: Array[Long] = {
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
index 7409ac8859..f25710bb5b 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
@@ -26,7 +26,7 @@ import org.apache.spark.{SparkException, Logging, SparkConf}
/**
* A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.
*/
-private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
+private[spark] abstract class RpcEndpointRef(conf: SparkConf)
extends Serializable with Logging {
private[this] val maxRetries = RpcUtils.numRetries(conf)
diff --git a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
index fc17542abf..ad67e1c5ad 100644
--- a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
@@ -87,9 +87,9 @@ private[spark] class AkkaRpcEnv private[akka] (
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
@volatile var endpointRef: AkkaRpcEndpointRef = null
- // Use lazy because the Actor needs to use `endpointRef`.
+ // Use defered function because the Actor needs to use `endpointRef`.
// So `actorRef` should be created after assigning `endpointRef`.
- lazy val actorRef = actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging {
+ val actorRef = () => actorSystem.actorOf(Props(new Actor with ActorLogReceive with Logging {
assert(endpointRef != null)
@@ -272,13 +272,20 @@ private[akka] class ErrorMonitor extends Actor with ActorLogReceive with Logging
}
private[akka] class AkkaRpcEndpointRef(
- @transient defaultAddress: RpcAddress,
- @transient _actorRef: => ActorRef,
- @transient conf: SparkConf,
- @transient initInConstructor: Boolean = true)
+ @transient private val defaultAddress: RpcAddress,
+ @transient private val _actorRef: () => ActorRef,
+ conf: SparkConf,
+ initInConstructor: Boolean)
extends RpcEndpointRef(conf) with Logging {
- lazy val actorRef = _actorRef
+ def this(
+ defaultAddress: RpcAddress,
+ _actorRef: ActorRef,
+ conf: SparkConf) = {
+ this(defaultAddress, () => _actorRef, conf, true)
+ }
+
+ lazy val actorRef = _actorRef()
override lazy val address: RpcAddress = {
val akkaAddress = actorRef.path.address
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index c4dc080e2b..fb693721a9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -44,7 +44,7 @@ private[spark] class ResultTask[T, U](
stageAttemptId: Int,
taskBinary: Broadcast[Array[Byte]],
partition: Partition,
- @transient locs: Seq[TaskLocation],
+ locs: Seq[TaskLocation],
val outputId: Int,
internalAccumulators: Seq[Accumulator[Long]])
extends Task[U](stageId, stageAttemptId, partition.index, internalAccumulators)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
index df7bbd6424..75f22f642b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala
@@ -159,7 +159,7 @@ private[spark] class UnsafeShuffleManager(conf: SparkConf) extends ShuffleManage
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
handle match {
- case unsafeShuffleHandle: UnsafeShuffleHandle[K, V] =>
+ case unsafeShuffleHandle: UnsafeShuffleHandle[K @unchecked, V @unchecked] =>
numMapsForShufflesThatUsedNewPath.putIfAbsent(handle.shuffleId, unsafeShuffleHandle.numMaps)
val env = SparkEnv.get
new UnsafeShuffleWriter(
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 150d82b393..1b49dca9dc 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -94,7 +94,7 @@ private[spark] object ClosureCleaner extends Logging {
if (cls.isPrimitive) {
cls match {
case java.lang.Boolean.TYPE => new java.lang.Boolean(false)
- case java.lang.Character.TYPE => new java.lang.Character('\0')
+ case java.lang.Character.TYPE => new java.lang.Character('\u0000')
case java.lang.Void.TYPE =>
// This should not happen because `Foo(void x) {}` does not compile.
throw new IllegalStateException("Unexpected void parameter in constructor")
diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index 61ff9b89ec..db4a8b304e 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -217,7 +217,9 @@ private [util] class SparkShutdownHookManager {
}
Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {
case Success(shmClass) =>
- val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
+ val fsPriority = classOf[FileSystem]
+ .getField("SHUTDOWN_HOOK_PRIORITY")
+ .get(null) // static field, the value is not used
.asInstanceOf[Int]
val shm = shmClass.getMethod("get").invoke(null)
shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
index 7138b4b8e4..1e8476c4a0 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
@@ -79,32 +79,30 @@ private[spark] class RollingFileAppender(
val rolloverSuffix = rollingPolicy.generateRolledOverFileSuffix()
val rolloverFile = new File(
activeFile.getParentFile, activeFile.getName + rolloverSuffix).getAbsoluteFile
- try {
- logDebug(s"Attempting to rollover file $activeFile to file $rolloverFile")
- if (activeFile.exists) {
- if (!rolloverFile.exists) {
- Files.move(activeFile, rolloverFile)
- logInfo(s"Rolled over $activeFile to $rolloverFile")
- } else {
- // In case the rollover file name clashes, make a unique file name.
- // The resultant file names are long and ugly, so this is used only
- // if there is a name collision. This can be avoided by the using
- // the right pattern such that name collisions do not occur.
- var i = 0
- var altRolloverFile: File = null
- do {
- altRolloverFile = new File(activeFile.getParent,
- s"${activeFile.getName}$rolloverSuffix--$i").getAbsoluteFile
- i += 1
- } while (i < 10000 && altRolloverFile.exists)
-
- logWarning(s"Rollover file $rolloverFile already exists, " +
- s"rolled over $activeFile to file $altRolloverFile")
- Files.move(activeFile, altRolloverFile)
- }
+ logDebug(s"Attempting to rollover file $activeFile to file $rolloverFile")
+ if (activeFile.exists) {
+ if (!rolloverFile.exists) {
+ Files.move(activeFile, rolloverFile)
+ logInfo(s"Rolled over $activeFile to $rolloverFile")
} else {
- logWarning(s"File $activeFile does not exist")
+ // In case the rollover file name clashes, make a unique file name.
+ // The resultant file names are long and ugly, so this is used only
+ // if there is a name collision. This can be avoided by the using
+ // the right pattern such that name collisions do not occur.
+ var i = 0
+ var altRolloverFile: File = null
+ do {
+ altRolloverFile = new File(activeFile.getParent,
+ s"${activeFile.getName}$rolloverSuffix--$i").getAbsoluteFile
+ i += 1
+ } while (i < 10000 && altRolloverFile.exists)
+
+ logWarning(s"Rollover file $rolloverFile already exists, " +
+ s"rolled over $activeFile to file $altRolloverFile")
+ Files.move(activeFile, altRolloverFile)
}
+ } else {
+ logWarning(s"File $activeFile does not exist")
}
}