aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SerializableWritable.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala37
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala9
26 files changed, 146 insertions, 67 deletions
diff --git a/core/src/main/scala/org/apache/spark/SerializableWritable.scala b/core/src/main/scala/org/apache/spark/SerializableWritable.scala
index cb2cae1852..beb2e27254 100644
--- a/core/src/main/scala/org/apache/spark/SerializableWritable.scala
+++ b/core/src/main/scala/org/apache/spark/SerializableWritable.scala
@@ -41,7 +41,7 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
val ow = new ObjectWritable()
- ow.setConf(new Configuration())
+ ow.setConf(new Configuration(false))
ow.readFields(in)
t = ow.get().asInstanceOf[T]
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a453c9bf48..141276ac90 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -974,7 +974,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
- val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
+ val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index 59ac82ccec..f5dd36cbcf 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD
+import org.apache.spark.util.SerializableJobConf
/**
* Internal helper class that saves an RDD using a Hadoop OutputFormat.
@@ -42,7 +43,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
with Serializable {
private val now = new Date()
- private val conf = new SerializableWritable(jobConf)
+ private val conf = new SerializableJobConf(jobConf)
private var jobID = 0
private var splitID = 0
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index c9181a29d4..b959b683d1 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -19,8 +19,8 @@ package org.apache.spark.api.python
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
-import org.apache.spark.util.Utils
-import org.apache.spark.{Logging, SerializableWritable, SparkException}
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.{Logging, SparkException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
import scala.util.{Failure, Success, Try}
@@ -61,7 +61,7 @@ private[python] object Converter extends Logging {
* Other objects are passed through without conversion.
*/
private[python] class WritableToJavaConverter(
- conf: Broadcast[SerializableWritable[Configuration]]) extends Converter[Any, Any] {
+ conf: Broadcast[SerializableConfiguration]) extends Converter[Any, Any] {
/**
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 55a37f8c94..dc9f62f39e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -36,7 +36,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
import scala.util.control.NonFatal
@@ -445,7 +445,7 @@ private[spark] object PythonRDD extends Logging {
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
- val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
+ val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration()))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -471,7 +471,7 @@ private[spark] object PythonRDD extends Logging {
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
- val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
+ val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(mergedConf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -497,7 +497,7 @@ private[spark] object PythonRDD extends Logging {
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
- val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
+ val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(conf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -540,7 +540,7 @@ private[spark] object PythonRDD extends Logging {
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
- val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
+ val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(mergedConf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
@@ -566,7 +566,7 @@ private[spark] object PythonRDD extends Logging {
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
- val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
+ val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(conf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index a4715e3437..33e6998b2c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -21,13 +21,12 @@ import java.io.IOException
import scala.reflect.ClassTag
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
@@ -38,7 +37,7 @@ private[spark]
class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
extends RDD[T](sc, Nil) {
- val broadcastedConf = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration))
+ val broadcastedConf = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
@transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
@@ -87,7 +86,7 @@ private[spark] object CheckpointRDD extends Logging {
def writeToFile[T: ClassTag](
path: String,
- broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+ broadcastedConf: Broadcast[SerializableConfiguration],
blockSize: Int = -1
)(ctx: TaskContext, iterator: Iterator[T]) {
val env = SparkEnv.get
@@ -135,7 +134,7 @@ private[spark] object CheckpointRDD extends Logging {
def readFromFile[T](
path: Path,
- broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+ broadcastedConf: Broadcast[SerializableConfiguration],
context: TaskContext
): Iterator[T] = {
val env = SparkEnv.get
@@ -164,7 +163,7 @@ private[spark] object CheckpointRDD extends Logging {
val path = new Path(hdfsPath, "temp")
val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
val fs = path.getFileSystem(conf)
- val broadcastedConf = sc.broadcast(new SerializableWritable(conf))
+ val broadcastedConf = sc.broadcast(new SerializableConfiguration(conf))
sc.runJob(rdd, CheckpointRDD.writeToFile[Int](path.toString, broadcastedConf, 1024) _)
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2cefe63d44..bee59a437f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -44,7 +44,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
-import org.apache.spark.util.{NextIterator, Utils}
+import org.apache.spark.util.{SerializableConfiguration, NextIterator, Utils}
import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation}
import org.apache.spark.storage.StorageLevel
@@ -100,7 +100,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
@DeveloperApi
class HadoopRDD[K, V](
@transient sc: SparkContext,
- broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+ broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
@@ -121,8 +121,8 @@ class HadoopRDD[K, V](
minPartitions: Int) = {
this(
sc,
- sc.broadcast(new SerializableWritable(conf))
- .asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
+ sc.broadcast(new SerializableConfiguration(conf))
+ .asInstanceOf[Broadcast[SerializableConfiguration]],
None /* initLocalJobConfFuncOpt */,
inputFormatClass,
keyClass,
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 84456d6d86..f827270ee6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -33,7 +33,7 @@ import org.apache.spark._
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.storage.StorageLevel
@@ -74,7 +74,7 @@ class NewHadoopRDD[K, V](
with Logging {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
- private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+ private val confBroadcast = sc.broadcast(new SerializableConfiguration(conf))
// private val serializableConf = new SerializableWritable(conf)
private val jobTrackerId: String = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index cfd3e26faf..91a6a2d039 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -44,7 +44,7 @@ import org.apache.spark.executor.{DataWriteMethod, OutputMetrics}
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.util.collection.CompactBuffer
import org.apache.spark.util.random.StratifiedSamplingUtils
@@ -1002,7 +1002,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
- val wrappedConf = new SerializableWritable(job.getConfiguration)
+ val wrappedConf = new SerializableConfiguration(job.getConfiguration)
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance
@@ -1065,7 +1065,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
- val wrappedConf = new SerializableWritable(hadoopConf)
+ val wrappedConf = new SerializableConfiguration(hadoopConf)
val outputFormatInstance = hadoopConf.getOutputFormat
val keyClass = hadoopConf.getOutputKeyClass
val valueClass = hadoopConf.getOutputValueClass
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index 1722c27e55..acbd31aacd 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.scheduler.{ResultTask, ShuffleMapTask}
+import org.apache.spark.util.SerializableConfiguration
/**
* Enumeration to manage state transitions of an RDD through checkpointing
@@ -91,7 +92,7 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
// Save to file, and reload it as an RDD
val broadcastedConf = rdd.context.broadcast(
- new SerializableWritable(rdd.context.hadoopConfiguration))
+ new SerializableConfiguration(rdd.context.hadoopConfiguration))
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
rdd.context.cleaner.foreach { cleaner =>
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala b/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala
new file mode 100644
index 0000000000..30bcf1d2f2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import java.io.{ObjectInputStream, ObjectOutputStream}
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.util.Utils
+
+private[spark]
+class SerializableConfiguration(@transient var value: Configuration) extends Serializable {
+ private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
+ out.defaultWriteObject()
+ value.write(out)
+ }
+
+ private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
+ value = new Configuration(false)
+ value.readFields(in)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala b/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala
new file mode 100644
index 0000000000..afbcc6efc8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.{ObjectInputStream, ObjectOutputStream}
+
+import org.apache.hadoop.mapred.JobConf
+
+import org.apache.spark.util.Utils
+
+private[spark]
+class SerializableJobConf(@transient var value: JobConf) extends Serializable {
+ private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
+ out.defaultWriteObject()
+ value.write(out)
+ }
+
+ private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
+ value = new JobConf(false)
+ value.readFields(in)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 65ecad9878..b30fc171c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -49,7 +49,8 @@ import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, InternalRow, _}
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
import org.apache.spark.sql.types.StructType
-import org.apache.spark.{Logging, SerializableWritable, TaskContext}
+import org.apache.spark.{Logging, TaskContext}
+import org.apache.spark.util.SerializableConfiguration
/**
* :: DeveloperApi ::
@@ -329,7 +330,7 @@ private[sql] case class InsertIntoParquetTable(
job.setOutputKeyClass(keyType)
job.setOutputValueClass(classOf[InternalRow])
NewFileOutputFormat.setOutputPath(job, new Path(path))
- val wrappedConf = new SerializableWritable(job.getConfiguration)
+ val wrappedConf = new SerializableConfiguration(job.getConfiguration)
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = sqlContext.sparkContext.newRddId()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 4c702c3b0d..c9de45e0dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -24,7 +24,6 @@ import scala.collection.JavaConversions._
import scala.util.Try
import com.google.common.base.Objects
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
@@ -42,8 +41,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
-import org.apache.spark.util.Utils
-import org.apache.spark.{Logging, SerializableWritable, SparkException, Partition => SparkPartition}
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.{Logging, SparkException, Partition => SparkPartition}
private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation(
@@ -258,7 +257,7 @@ private[sql] class ParquetRelation2(
requiredColumns: Array[String],
filters: Array[Filter],
inputFiles: Array[FileStatus],
- broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
+ broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
// Create the function to set variable Parquet confs at both driver and executor side.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index 4cf67439b9..a8f56f4767 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.sources
+import org.apache.spark.{Logging, TaskContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
import org.apache.spark.sql._
@@ -27,9 +28,8 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{SaveMode, Strategy, execution, sources}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.{Logging, SerializableWritable, TaskContext}
/**
* A Strategy for planning scans over data sources defined using the sources API.
@@ -91,7 +91,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// broadcast HadoopConf.
val sharedHadoopConf = SparkHadoopUtil.get.conf
val confBroadcast =
- t.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf))
+ t.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
pruneFilterProject(
l,
projects,
@@ -126,7 +126,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Otherwise, the cost of broadcasting HadoopConf in every RDD will be high.
val sharedHadoopConf = SparkHadoopUtil.get.conf
val confBroadcast =
- relation.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf))
+ relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
// Builds RDD[Row]s for each selected partition.
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
index ebad0c1564..2bdc341021 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
@@ -34,7 +34,7 @@ import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.{RDD, HadoopRDD}
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
import scala.reflect.ClassTag
@@ -65,7 +65,7 @@ private[spark] class SqlNewHadoopPartition(
*/
private[sql] class SqlNewHadoopRDD[K, V](
@transient sc : SparkContext,
- broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+ broadcastedConf: Broadcast[SerializableConfiguration],
@transient initDriverSideJobFuncOpt: Option[Job => Unit],
initLocalJobFuncOpt: Option[Job => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index d39a20b388..c16bd9ae52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext, SaveMode}
+import org.apache.spark.util.SerializableConfiguration
private[sql] case class InsertIntoDataSource(
logicalRelation: LogicalRelation,
@@ -260,7 +261,7 @@ private[sql] abstract class BaseWriterContainer(
with Logging
with Serializable {
- protected val serializableConf = new SerializableWritable(ContextUtil.getConfiguration(job))
+ protected val serializableConf = new SerializableConfiguration(ContextUtil.getConfiguration(job))
// This is only used on driver side.
@transient private val jobContext: JobContext = job
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 43d3507d7d..7005c7079a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -27,12 +27,12 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
-import org.apache.spark.SerializableWritable
import org.apache.spark.sql.execution.RDDConversions
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SQLContext}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
/**
* ::DeveloperApi::
@@ -518,7 +518,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
requiredColumns: Array[String],
filters: Array[Filter],
inputPaths: Array[String],
- broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
+ broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
val inputStatuses = inputPaths.flatMap { input =>
val path = new Path(input)
@@ -648,7 +648,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
requiredColumns: Array[String],
filters: Array[Filter],
inputFiles: Array[FileStatus],
- broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
+ broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
buildScan(requiredColumns, filters, inputFiles)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 485810320f..439f39bafc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.hive
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
@@ -30,12 +29,12 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters,
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
-import org.apache.spark.{Logging, SerializableWritable}
+import org.apache.spark.{Logging}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateUtils
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
/**
* A trait for subclasses that handle table scans.
@@ -72,7 +71,7 @@ class HadoopTableReader(
// TODO: set aws s3 credentials.
private val _broadcastedHiveConf =
- sc.sparkContext.broadcast(new SerializableWritable(hiveExtraConf))
+ sc.sparkContext.broadcast(new SerializableConfiguration(hiveExtraConf))
override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] =
makeRDDForTable(
@@ -276,7 +275,7 @@ class HadoopTableReader(
val rdd = new HadoopRDD(
sc.sparkContext,
- _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
+ _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableConfiguration]],
Some(initializeJobConfFunc),
inputFormatClass,
classOf[Writable],
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 1d306c5d10..404bb937aa 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -35,9 +35,10 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, InternalRow}
import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive._
-import org.apache.spark.{SerializableWritable, SparkException, TaskContext}
+import org.apache.spark.{SparkException, TaskContext}
import scala.collection.JavaConversions._
+import org.apache.spark.util.SerializableJobConf
private[hive]
case class InsertIntoHiveTable(
@@ -64,7 +65,7 @@ case class InsertIntoHiveTable(
rdd: RDD[InternalRow],
valueClass: Class[_],
fileSinkConf: FileSinkDesc,
- conf: SerializableWritable[JobConf],
+ conf: SerializableJobConf,
writerContainer: SparkHiveWriterContainer): Unit = {
assert(valueClass != null, "Output value class not set")
conf.value.setOutputValueClass(valueClass)
@@ -172,7 +173,7 @@ case class InsertIntoHiveTable(
}
val jobConf = new JobConf(sc.hiveconf)
- val jobConfSer = new SerializableWritable(jobConf)
+ val jobConfSer = new SerializableJobConf(jobConf)
val writerContainer = if (numDynamicPartitions > 0) {
val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index ee440e304e..0bc69c00c2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -37,6 +37,7 @@ import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.types._
+import org.apache.spark.util.SerializableJobConf
/**
* Internal helper class that saves an RDD using a Hive OutputFormat.
@@ -57,7 +58,7 @@ private[hive] class SparkHiveWriterContainer(
PlanUtils.configureOutputJobPropertiesForStorageHandler(tableDesc)
Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf)
}
- protected val conf = new SerializableWritable(jobConf)
+ protected val conf = new SerializableJobConf(jobConf)
private var jobID = 0
private var splitID = 0
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index f03c4cd54e..77f1ca9ae0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -39,7 +39,8 @@ import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreType
import org.apache.spark.sql.sources.{Filter, _}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}
-import org.apache.spark.{Logging, SerializableWritable}
+import org.apache.spark.{Logging}
+import org.apache.spark.util.SerializableConfiguration
/* Implicit conversions */
import scala.collection.JavaConversions._
@@ -283,7 +284,7 @@ private[orc] case class OrcTableScan(
classOf[Writable]
).asInstanceOf[HadoopRDD[NullWritable, Writable]]
- val wrappedConf = new SerializableWritable(conf)
+ val wrappedConf = new SerializableConfiguration(conf)
rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 6c1fab5674..86a8e2beff 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -26,10 +26,9 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import org.apache.spark.{SparkConf, SerializableWritable}
import org.apache.spark.rdd.{RDD, UnionRDD}
import org.apache.spark.streaming._
-import org.apache.spark.util.{TimeStampedHashMap, Utils}
+import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Utils}
/**
* This class represents an input stream that monitors a Hadoop-compatible filesystem for new
@@ -78,7 +77,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
extends InputDStream[(K, V)](ssc_) {
- private val serializableConfOpt = conf.map(new SerializableWritable(_))
+ private val serializableConfOpt = conf.map(new SerializableConfiguration(_))
/**
* Minimum duration of remembering the information of selected files. Defaults to 60 seconds.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 358e4c66df..71bec96d46 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -24,10 +24,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
-import org.apache.spark.{HashPartitioner, Partitioner, SerializableWritable}
+import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.streaming.StreamingContext.rddToFileName
+import org.apache.spark.util.{SerializableConfiguration, SerializableJobConf}
/**
* Extra functions available on DStream of (key, value) pairs through an implicit conversion.
@@ -688,7 +689,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration)
): Unit = ssc.withScope {
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
- val serializableConf = new SerializableWritable(conf)
+ val serializableConf = new SerializableJobConf(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value)
@@ -721,7 +722,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
conf: Configuration = ssc.sparkContext.hadoopConfiguration
): Unit = ssc.withScope {
// Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints
- val serializableConf = new SerializableWritable(conf)
+ val serializableConf = new SerializableConfiguration(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsNewAPIHadoopFile(
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index ffce6a4c3c..31ce8e1ec1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -23,12 +23,11 @@ import java.util.UUID
import scala.reflect.ClassTag
import scala.util.control.NonFatal
-import org.apache.commons.io.FileUtils
-
import org.apache.spark._
import org.apache.spark.rdd.BlockRDD
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.streaming.util._
+import org.apache.spark.util.SerializableConfiguration
/**
* Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
@@ -94,7 +93,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
// Hadoop configuration is not serializable, so broadcast it as a serializable.
@transient private val hadoopConfig = sc.hadoopConfiguration
- private val broadcastedHadoopConf = new SerializableWritable(hadoopConfig)
+ private val broadcastedHadoopConf = new SerializableConfiguration(hadoopConfig)
override def isValid(): Boolean = true
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index f1504b09c9..e6cdbec11e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -21,10 +21,12 @@ import scala.collection.mutable.{HashMap, SynchronizedMap}
import scala.language.existentials
import org.apache.spark.streaming.util.WriteAheadLogUtils
-import org.apache.spark.{Logging, SerializableWritable, SparkEnv, SparkException}
+import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.rpc._
import org.apache.spark.streaming.{StreamingContext, Time}
-import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl, StopReceiver}
+import org.apache.spark.streaming.receiver.{CleanupOldBlocks, Receiver, ReceiverSupervisorImpl,
+ StopReceiver}
+import org.apache.spark.util.SerializableConfiguration
/**
* Messages used by the NetworkReceiver and the ReceiverTracker to communicate
@@ -294,7 +296,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
val checkpointDirOption = Option(ssc.checkpointDir)
- val serializableHadoopConf = new SerializableWritable(ssc.sparkContext.hadoopConfiguration)
+ val serializableHadoopConf =
+ new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
// Function to start the receiver on the worker node
val startReceiver = (iterator: Iterator[Receiver[_]]) => {