aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2015-06-18 19:36:05 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-06-18 19:36:05 -0700
commit43f50decdd20fafc55913c56ffa30f56040090e4 (patch)
treeda3cf219841826b9d1bc6c5870994f6d1bfe7d32 /sql/core
parentdc413138995b45a7a957acae007dc11622110310 (diff)
downloadspark-43f50decdd20fafc55913c56ffa30f56040090e4.tar.gz
spark-43f50decdd20fafc55913c56ffa30f56040090e4.tar.bz2
spark-43f50decdd20fafc55913c56ffa30f56040090e4.zip
[SPARK-8135] Don't load defaults when reconstituting Hadoop Configurations
Author: Sandy Ryza <sandy@cloudera.com> Closes #6679 from sryza/sandy-spark-8135 and squashes the following commits: c5554ff [Sandy Ryza] SPARK-8135. In SerializableWritable, don't load defaults when instantiating Configuration
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala6
6 files changed, 17 insertions, 16 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 65ecad9878..b30fc171c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -49,7 +49,8 @@ import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, InternalRow, _}
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
import org.apache.spark.sql.types.StructType
-import org.apache.spark.{Logging, SerializableWritable, TaskContext}
+import org.apache.spark.{Logging, TaskContext}
+import org.apache.spark.util.SerializableConfiguration
/**
* :: DeveloperApi ::
@@ -329,7 +330,7 @@ private[sql] case class InsertIntoParquetTable(
job.setOutputKeyClass(keyType)
job.setOutputValueClass(classOf[InternalRow])
NewFileOutputFormat.setOutputPath(job, new Path(path))
- val wrappedConf = new SerializableWritable(job.getConfiguration)
+ val wrappedConf = new SerializableConfiguration(job.getConfiguration)
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = sqlContext.sparkContext.newRddId()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 4c702c3b0d..c9de45e0dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -24,7 +24,6 @@ import scala.collection.JavaConversions._
import scala.util.Try
import com.google.common.base.Objects
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
@@ -42,8 +41,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
-import org.apache.spark.util.Utils
-import org.apache.spark.{Logging, SerializableWritable, SparkException, Partition => SparkPartition}
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.{Logging, SparkException, Partition => SparkPartition}
private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation(
@@ -258,7 +257,7 @@ private[sql] class ParquetRelation2(
requiredColumns: Array[String],
filters: Array[Filter],
inputFiles: Array[FileStatus],
- broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
+ broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
// Create the function to set variable Parquet confs at both driver and executor side.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index 4cf67439b9..a8f56f4767 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.sources
+import org.apache.spark.{Logging, TaskContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
import org.apache.spark.sql._
@@ -27,9 +28,8 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{SaveMode, Strategy, execution, sources}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.{Logging, SerializableWritable, TaskContext}
/**
* A Strategy for planning scans over data sources defined using the sources API.
@@ -91,7 +91,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// broadcast HadoopConf.
val sharedHadoopConf = SparkHadoopUtil.get.conf
val confBroadcast =
- t.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf))
+ t.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
pruneFilterProject(
l,
projects,
@@ -126,7 +126,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Otherwise, the cost of broadcasting HadoopConf in every RDD will be high.
val sharedHadoopConf = SparkHadoopUtil.get.conf
val confBroadcast =
- relation.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf))
+ relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
// Builds RDD[Row]s for each selected partition.
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
index ebad0c1564..2bdc341021 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
@@ -34,7 +34,7 @@ import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.{RDD, HadoopRDD}
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SerializableConfiguration, Utils}
import scala.reflect.ClassTag
@@ -65,7 +65,7 @@ private[spark] class SqlNewHadoopPartition(
*/
private[sql] class SqlNewHadoopRDD[K, V](
@transient sc : SparkContext,
- broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+ broadcastedConf: Broadcast[SerializableConfiguration],
@transient initDriverSideJobFuncOpt: Option[Job => Unit],
initLocalJobFuncOpt: Option[Job => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index d39a20b388..c16bd9ae52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext, SaveMode}
+import org.apache.spark.util.SerializableConfiguration
private[sql] case class InsertIntoDataSource(
logicalRelation: LogicalRelation,
@@ -260,7 +261,7 @@ private[sql] abstract class BaseWriterContainer(
with Logging
with Serializable {
- protected val serializableConf = new SerializableWritable(ContextUtil.getConfiguration(job))
+ protected val serializableConf = new SerializableConfiguration(ContextUtil.getConfiguration(job))
// This is only used on driver side.
@transient private val jobContext: JobContext = job
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 43d3507d7d..7005c7079a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -27,12 +27,12 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
-import org.apache.spark.SerializableWritable
import org.apache.spark.sql.execution.RDDConversions
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SQLContext}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
/**
* ::DeveloperApi::
@@ -518,7 +518,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
requiredColumns: Array[String],
filters: Array[Filter],
inputPaths: Array[String],
- broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
+ broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
val inputStatuses = inputPaths.flatMap { input =>
val path = new Path(input)
@@ -648,7 +648,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
requiredColumns: Array[String],
filters: Array[Filter],
inputFiles: Array[FileStatus],
- broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
+ broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = {
buildScan(requiredColumns, filters, inputFiles)
}