aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2015-06-18 19:36:05 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-06-18 19:36:05 -0700
commit43f50decdd20fafc55913c56ffa30f56040090e4 (patch)
treeda3cf219841826b9d1bc6c5870994f6d1bfe7d32 /sql
parentdc413138995b45a7a957acae007dc11622110310 (diff)
downloadspark-43f50decdd20fafc55913c56ffa30f56040090e4.tar.gz
spark-43f50decdd20fafc55913c56ffa30f56040090e4.tar.bz2
spark-43f50decdd20fafc55913c56ffa30f56040090e4.zip
[SPARK-8135] Don't load defaults when reconstituting Hadoop Configurations
Author: Sandy Ryza <sandy@cloudera.com> Closes #6679 from sryza/sandy-spark-8135 and squashes the following commits: c5554ff [Sandy Ryza] SPARK-8135. In SerializableWritable, don't load defaults when instantiating Configuration
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala5
10 files changed, 30 insertions, 27 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 65ecad9878..b30fc171c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -49,7 +49,8 @@ import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, InternalRow, _}
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
import org.apache.spark.sql.types.StructType
-import org.apache.spark.{Logging, SerializableWritable, TaskContext}
+import org.apache.spark.{Logging, TaskContext}
+import org.apache.spark.util.SerializableConfiguration
/**
* :: DeveloperApi ::
@@ -329,7 +330,7 @@ private[sql] case class InsertIntoParquetTable(
job.setOutputKeyClass(keyType)
job.setOutputValueClass(classOf[InternalRow])
NewFileOutputFormat.setOutputPath(job, new Path(path))
- val wrappedConf = new SerializableWritable(job.getConfiguration)
+ val wrappedConf = new SerializableConfiguration(job.getConfiguration)
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = sqlContext.sparkContext.newRddId()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 4c702c3b0d..c9de45e0dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -24,7 +24,6 @@ import scala.collection.JavaConversions._
import scala.util.Try
import com.google.common.base.Objects
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
@@ -42,8 +41,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
-import org.apache.spark.util.Utils
-import org.apache.spark.{Logging, SerializableWritable, SparkException, Partition => SparkPartition}
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.{Logging, SparkException, Partition => SparkPartition}
private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation(
@@ -258,7 +257,7 @@ private[sql] class ParquetRelation2(
requiredColumns: Array[String],
filters: Array[Filter],
inputFiles: Array[FileStatus],
- broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
+ broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
// Create the function to set variable Parquet confs at both driver and executor side.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index 4cf67439b9..a8f56f4767 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.sources
+import org.apache.spark.{Logging, TaskContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
import org.apache.spark.sql._
@@ -27,9 +28,8 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{SaveMode, Strategy, execution, sources}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.{Logging, SerializableWritable, TaskContext}
/**
* A Strategy for planning scans over data sources defined using the sources API.
@@ -91,7 +91,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// broadcast HadoopConf.
val sharedHadoopConf = SparkHadoopUtil.get.conf
val confBroadcast =
- t.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf))
+ t.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
pruneFilterProject(
l,
projects,
@@ -126,7 +126,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Otherwise, the cost of broadcasting HadoopConf in every RDD will be high.
val sharedHadoopConf = SparkHadoopUtil.get.conf
val confBroadcast =
- relation.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf))
+ relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
// Builds RDD[Row]s for each selected partition.
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
index ebad0c1564..2bdc341021 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
@@ -34,7 +34,7 @@ import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.{RDD, HadoopRDD}
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
import scala.reflect.ClassTag
@@ -65,7 +65,7 @@ private[spark] class SqlNewHadoopPartition(
*/
private[sql] class SqlNewHadoopRDD[K, V](
@transient sc : SparkContext,
- broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+ broadcastedConf: Broadcast[SerializableConfiguration],
@transient initDriverSideJobFuncOpt: Option[Job => Unit],
initLocalJobFuncOpt: Option[Job => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index d39a20b388..c16bd9ae52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext, SaveMode}
+import org.apache.spark.util.SerializableConfiguration
private[sql] case class InsertIntoDataSource(
logicalRelation: LogicalRelation,
@@ -260,7 +261,7 @@ private[sql] abstract class BaseWriterContainer(
with Logging
with Serializable {
- protected val serializableConf = new SerializableWritable(ContextUtil.getConfiguration(job))
+ protected val serializableConf = new SerializableConfiguration(ContextUtil.getConfiguration(job))
// This is only used on driver side.
@transient private val jobContext: JobContext = job
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 43d3507d7d..7005c7079a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -27,12 +27,12 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
-import org.apache.spark.SerializableWritable
import org.apache.spark.sql.execution.RDDConversions
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SQLContext}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
/**
* ::DeveloperApi::
@@ -518,7 +518,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
requiredColumns: Array[String],
filters: Array[Filter],
inputPaths: Array[String],
- broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
+ broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
val inputStatuses = inputPaths.flatMap { input =>
val path = new Path(input)
@@ -648,7 +648,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
requiredColumns: Array[String],
filters: Array[Filter],
inputFiles: Array[FileStatus],
- broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
+ broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
buildScan(requiredColumns, filters, inputFiles)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 485810320f..439f39bafc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.hive
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
@@ -30,12 +29,12 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters,
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
-import org.apache.spark.{Logging, SerializableWritable}
+import org.apache.spark.{Logging}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateUtils
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
/**
* A trait for subclasses that handle table scans.
@@ -72,7 +71,7 @@ class HadoopTableReader(
// TODO: set aws s3 credentials.
private val _broadcastedHiveConf =
- sc.sparkContext.broadcast(new SerializableWritable(hiveExtraConf))
+ sc.sparkContext.broadcast(new SerializableConfiguration(hiveExtraConf))
override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] =
makeRDDForTable(
@@ -276,7 +275,7 @@ class HadoopTableReader(
val rdd = new HadoopRDD(
sc.sparkContext,
- _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
+ _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableConfiguration]],
Some(initializeJobConfFunc),
inputFormatClass,
classOf[Writable],
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 1d306c5d10..404bb937aa 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -35,9 +35,10 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, InternalRow}
import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive._
-import org.apache.spark.{SerializableWritable, SparkException, TaskContext}
+import org.apache.spark.{SparkException, TaskContext}
import scala.collection.JavaConversions._
+import org.apache.spark.util.SerializableJobConf
private[hive]
case class InsertIntoHiveTable(
@@ -64,7 +65,7 @@ case class InsertIntoHiveTable(
rdd: RDD[InternalRow],
valueClass: Class[_],
fileSinkConf: FileSinkDesc,
- conf: SerializableWritable[JobConf],
+ conf: SerializableJobConf,
writerContainer: SparkHiveWriterContainer): Unit = {
assert(valueClass != null, "Output value class not set")
conf.value.setOutputValueClass(valueClass)
@@ -172,7 +173,7 @@ case class InsertIntoHiveTable(
}
val jobConf = new JobConf(sc.hiveconf)
- val jobConfSer = new SerializableWritable(jobConf)
+ val jobConfSer = new SerializableJobConf(jobConf)
val writerContainer = if (numDynamicPartitions > 0) {
val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index ee440e304e..0bc69c00c2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -37,6 +37,7 @@ import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.types._
+import org.apache.spark.util.SerializableJobConf
/**
* Internal helper class that saves an RDD using a Hive OutputFormat.
@@ -57,7 +58,7 @@ private[hive] class SparkHiveWriterContainer(
PlanUtils.configureOutputJobPropertiesForStorageHandler(tableDesc)
Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf)
}
- protected val conf = new SerializableWritable(jobConf)
+ protected val conf = new SerializableJobConf(jobConf)
private var jobID = 0
private var splitID = 0
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index f03c4cd54e..77f1ca9ae0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -39,7 +39,8 @@ import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreType
import org.apache.spark.sql.sources.{Filter, _}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SQLContext}
-import org.apache.spark.{Logging, SerializableWritable}
+import org.apache.spark.{Logging}
+import org.apache.spark.util.SerializableConfiguration
/* Implicit conversions */
import scala.collection.JavaConversions._
@@ -283,7 +284,7 @@ private[orc] case class OrcTableScan(
classOf[Writable]
).asInstanceOf[HadoopRDD[NullWritable, Writable]]
- val wrappedConf = new SerializableWritable(conf)
+ val wrappedConf = new SerializableConfiguration(conf)
rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))