aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2015-06-18 19:36:05 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-06-18 19:36:05 -0700
commit43f50decdd20fafc55913c56ffa30f56040090e4 (patch)
treeda3cf219841826b9d1bc6c5870994f6d1bfe7d32 /core
parentdc413138995b45a7a957acae007dc11622110310 (diff)
downloadspark-43f50decdd20fafc55913c56ffa30f56040090e4.tar.gz
spark-43f50decdd20fafc55913c56ffa30f56040090e4.tar.bz2
spark-43f50decdd20fafc55913c56ffa30f56040090e4.zip
[SPARK-8135] Don't load defaults when reconstituting Hadoop Configurations
Author: Sandy Ryza <sandy@cloudera.com> Closes #6679 from sryza/sandy-spark-8135 and squashes the following commits: c5554ff [Sandy Ryza] SPARK-8135. In SerializableWritable, don't load defaults when instantiating Configuration
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SerializableWritable.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala37
12 files changed, 102 insertions, 28 deletions
diff --git a/core/src/main/scala/org/apache/spark/SerializableWritable.scala b/core/src/main/scala/org/apache/spark/SerializableWritable.scala
index cb2cae1852..beb2e27254 100644
--- a/core/src/main/scala/org/apache/spark/SerializableWritable.scala
+++ b/core/src/main/scala/org/apache/spark/SerializableWritable.scala
@@ -41,7 +41,7 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
val ow = new ObjectWritable()
- ow.setConf(new Configuration())
+ ow.setConf(new Configuration(false))
ow.readFields(in)
t = ow.get().asInstanceOf[T]
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a453c9bf48..141276ac90 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -974,7 +974,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
- val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
+ val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index 59ac82ccec..f5dd36cbcf 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD
+import org.apache.spark.util.SerializableJobConf
/**
* Internal helper class that saves an RDD using a Hadoop OutputFormat.
@@ -42,7 +43,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
with Serializable {
private val now = new Date()
- private val conf = new SerializableWritable(jobConf)
+ private val conf = new SerializableJobConf(jobConf)
private var jobID = 0
private var splitID = 0
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index c9181a29d4..b959b683d1 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -19,8 +19,8 @@ package org.apache.spark.api.python
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
-import org.apache.spark.util.Utils
-import org.apache.spark.{Logging, SerializableWritable, SparkException}
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.{Logging, SparkException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
import scala.util.{Failure, Success, Try}
@@ -61,7 +61,7 @@ private[python] object Converter extends Logging {
* Other objects are passed through without conversion.
*/
private[python] class WritableToJavaConverter(
- conf: Broadcast[SerializableWritable[Configuration]]) extends Converter[Any, Any] {
+ conf: Broadcast[SerializableConfiguration]) extends Converter[Any, Any] {
/**
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 55a37f8c94..dc9f62f39e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -36,7 +36,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
import scala.util.control.NonFatal
@@ -445,7 +445,7 @@ private[spark] object PythonRDD extends Logging {
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
- val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
+ val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration()))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -471,7 +471,7 @@ private[spark] object PythonRDD extends Logging {
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
- val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
+ val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(mergedConf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -497,7 +497,7 @@ private[spark] object PythonRDD extends Logging {
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
- val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
+ val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(conf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -540,7 +540,7 @@ private[spark] object PythonRDD extends Logging {
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
- val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
+ val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(mergedConf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -566,7 +566,7 @@ private[spark] object PythonRDD extends Logging {
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
- val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
+ val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(conf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index a4715e3437..33e6998b2c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -21,13 +21,12 @@ import java.io.IOException
import scala.reflect.ClassTag
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
@@ -38,7 +37,7 @@ private[spark]
class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
extends RDD[T](sc, Nil) {
- val broadcastedConf = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration))
+ val broadcastedConf = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
@transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
@@ -87,7 +86,7 @@ private[spark] object CheckpointRDD extends Logging {
def writeToFile[T: ClassTag](
path: String,
- broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+ broadcastedConf: Broadcast[SerializableConfiguration],
blockSize: Int = -1
)(ctx: TaskContext, iterator: Iterator[T]) {
val env = SparkEnv.get
@@ -135,7 +134,7 @@ private[spark] object CheckpointRDD extends Logging {
def readFromFile[T](
path: Path,
- broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+ broadcastedConf: Broadcast[SerializableConfiguration],
context: TaskContext
): Iterator[T] = {
val env = SparkEnv.get
@@ -164,7 +163,7 @@ private[spark] object CheckpointRDD extends Logging {
val path = new Path(hdfsPath, "temp")
val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
val fs = path.getFileSystem(conf)
- val broadcastedConf = sc.broadcast(new SerializableWritable(conf))
+ val broadcastedConf = sc.broadcast(new SerializableConfiguration(conf))
sc.runJob(rdd, CheckpointRDD.writeToFile[Int](path.toString, broadcastedConf, 1024) _)
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2cefe63d44..bee59a437f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -44,7 +44,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
-import org.apache.spark.util.{NextIterator, Utils}
+import org.apache.spark.util.{SerializableConfiguration, NextIterator, Utils}
import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation}
import org.apache.spark.storage.StorageLevel
@@ -100,7 +100,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
@DeveloperApi
class HadoopRDD[K, V](
@transient sc: SparkContext,
- broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+ broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
@@ -121,8 +121,8 @@ class HadoopRDD[K, V](
minPartitions: Int) = {
this(
sc,
- sc.broadcast(new SerializableWritable(conf))
- .asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
+ sc.broadcast(new SerializableConfiguration(conf))
+ .asInstanceOf[Broadcast[SerializableConfiguration]],
None /* initLocalJobConfFuncOpt */,
inputFormatClass,
keyClass,
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 84456d6d86..f827270ee6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -33,7 +33,7 @@ import org.apache.spark._
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.storage.StorageLevel
@@ -74,7 +74,7 @@ class NewHadoopRDD[K, V](
with Logging {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
- private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+ private val confBroadcast = sc.broadcast(new SerializableConfiguration(conf))
// private val serializableConf = new SerializableWritable(conf)
private val jobTrackerId: String = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index cfd3e26faf..91a6a2d039 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -44,7 +44,7 @@ import org.apache.spark.executor.{DataWriteMethod, OutputMetrics}
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.util.collection.CompactBuffer
import org.apache.spark.util.random.StratifiedSamplingUtils
@@ -1002,7 +1002,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
- val wrappedConf = new SerializableWritable(job.getConfiguration)
+ val wrappedConf = new SerializableConfiguration(job.getConfiguration)
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance
@@ -1065,7 +1065,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
- val wrappedConf = new SerializableWritable(hadoopConf)
+ val wrappedConf = new SerializableConfiguration(hadoopConf)
val outputFormatInstance = hadoopConf.getOutputFormat
val keyClass = hadoopConf.getOutputKeyClass
val valueClass = hadoopConf.getOutputValueClass
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index 1722c27e55..acbd31aacd 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.scheduler.{ResultTask, ShuffleMapTask}
+import org.apache.spark.util.SerializableConfiguration
/**
* Enumeration to manage state transitions of an RDD through checkpointing
@@ -91,7 +92,7 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
// Save to file, and reload it as an RDD
val broadcastedConf = rdd.context.broadcast(
- new SerializableWritable(rdd.context.hadoopConfiguration))
+ new SerializableConfiguration(rdd.context.hadoopConfiguration))
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
rdd.context.cleaner.foreach { cleaner =>
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala b/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala
new file mode 100644
index 0000000000..30bcf1d2f2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import java.io.{ObjectInputStream, ObjectOutputStream}
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.util.Utils
+
+private[spark]
+class SerializableConfiguration(@transient var value: Configuration) extends Serializable {
+ private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
+ out.defaultWriteObject()
+ value.write(out)
+ }
+
+ private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
+ value = new Configuration(false)
+ value.readFields(in)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala b/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala
new file mode 100644
index 0000000000..afbcc6efc8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.{ObjectInputStream, ObjectOutputStream}
+
+import org.apache.hadoop.mapred.JobConf
+
+import org.apache.spark.util.Utils
+
+private[spark]
+class SerializableJobConf(@transient var value: JobConf) extends Serializable {
+ private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
+ out.defaultWriteObject()
+ value.write(out)
+ }
+
+ private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
+ value = new JobConf(false)
+ value.readFields(in)
+ }
+}