aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-05-15 16:20:49 +0800
committerCheng Lian <lian@databricks.com>2015-05-15 16:20:49 +0800
commitfdf5bba35d201fe0de3901b4d47262c485c76569 (patch)
tree71bbf6e41c872d3ac16def2faf88383b57c6a7c1 /sql
parent94761485b207fa1f12a8410a68920300d851bf61 (diff)
downloadspark-fdf5bba35d201fe0de3901b4d47262c485c76569.tar.gz
spark-fdf5bba35d201fe0de3901b4d47262c485c76569.tar.bz2
spark-fdf5bba35d201fe0de3901b4d47262c485c76569.zip
[SPARK-7591] [SQL] Partitioning support API tweaks
Please see [SPARK-7591] [1] for the details. /cc rxin marmbrus yhuai [1]: https://issues.apache.org/jira/browse/SPARK-7591 Author: Cheng Lian <lian@databricks.com> Closes #6150 from liancheng/spark-7591 and squashes the following commits: af422e7 [Cheng Lian] Addresses @rxin's comments 37d1738 [Cheng Lian] Fixes HadoopFsRelation partition columns initialization 2fc680a [Cheng Lian] Fixes Scala style issue 189ad23 [Cheng Lian] Removes HadoopFsRelation constructor arguments 522c24e [Cheng Lian] Adds OutputWriterFactory 047d40d [Cheng Lian] Renames FSBased* to HadoopFs*, also renamed FSBasedParquetRelation back to ParquetRelation2
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala)71
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala140
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala20
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala47
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala (renamed from sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala)8
17 files changed, 195 insertions, 194 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index b33a700208..9fb355eb81 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.ParserDialect
import org.apache.spark.sql.execution.{Filter, _}
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.json._
-import org.apache.spark.sql.parquet.FSBasedParquetRelation
+import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -610,7 +610,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
} else if (conf.parquetUseDataSourceApi) {
val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray
baseRelationToDataFrame(
- new FSBasedParquetRelation(
+ new ParquetRelation2(
globbedPaths.map(_.toString), None, None, Map.empty[String, String])(this))
} else {
DataFrame(this, parquet.ParquetRelation(
@@ -989,7 +989,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def jdbc(url: String, table: String): DataFrame = {
jdbc(url, table, JDBCRelation.columnPartition(null), new Properties())
}
-
+
/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
@@ -1002,7 +1002,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
jdbc(url, table, JDBCRelation.columnPartition(null), properties)
}
-
+
/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
@@ -1020,7 +1020,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
@Experimental
def jdbc(
url: String,
- table: String,
+ table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
@@ -1056,7 +1056,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
val parts = JDBCRelation.columnPartition(partitioning)
jdbc(url, table, parts, properties)
}
-
+
/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
@@ -1093,7 +1093,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
jdbc(url, table, parts, properties)
}
-
+
private def jdbc(
url: String,
table: String,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index c83a9c35db..946062f6ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/fsBasedParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -41,27 +41,23 @@ import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
-private[sql] class DefaultSource extends FSBasedRelationProvider {
+private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation(
sqlContext: SQLContext,
paths: Array[String],
schema: Option[StructType],
partitionColumns: Option[StructType],
- parameters: Map[String, String]): FSBasedRelation = {
+ parameters: Map[String, String]): HadoopFsRelation = {
val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty))
- new FSBasedParquetRelation(paths, schema, partitionSpec, parameters)(sqlContext)
+ new ParquetRelation2(paths, schema, partitionSpec, parameters)(sqlContext)
}
}
// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
-private[sql] class ParquetOutputWriter extends OutputWriter {
- private var recordWriter: RecordWriter[Void, Row] = _
- private var taskAttemptContext: TaskAttemptContext = _
-
- override def init(
- path: String,
- dataSchema: StructType,
- context: TaskAttemptContext): Unit = {
+private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext)
+ extends OutputWriter {
+
+ private val recordWriter: RecordWriter[Void, Row] = {
val conf = context.getConfiguration
val outputFormat = {
// When appending new Parquet files to an existing Parquet file directory, to avoid
@@ -77,7 +73,7 @@ private[sql] class ParquetOutputWriter extends OutputWriter {
if (fs.exists(outputPath)) {
// Pattern used to match task ID in part file names, e.g.:
//
- // part-r-00001.gz.part
+ // part-r-00001.gz.parquet
// ^~~~~
val partFilePattern = """part-.-(\d{1,}).*""".r
@@ -86,9 +82,8 @@ private[sql] class ParquetOutputWriter extends OutputWriter {
case name if name.startsWith("_") => 0
case name if name.startsWith(".") => 0
case name => sys.error(
- s"""Trying to write Parquet files to directory $outputPath,
- |but found items with illegal name "$name"
- """.stripMargin.replace('\n', ' ').trim)
+ s"Trying to write Parquet files to directory $outputPath, " +
+ s"but found items with illegal name '$name'.")
}.reduceOption(_ max _).getOrElse(0)
} else {
0
@@ -111,37 +106,39 @@ private[sql] class ParquetOutputWriter extends OutputWriter {
}
}
- recordWriter = outputFormat.getRecordWriter(context)
- taskAttemptContext = context
+ outputFormat.getRecordWriter(context)
}
override def write(row: Row): Unit = recordWriter.write(null, row)
- override def close(): Unit = recordWriter.close(taskAttemptContext)
+ override def close(): Unit = recordWriter.close(context)
}
-private[sql] class FSBasedParquetRelation(
- paths: Array[String],
+private[sql] class ParquetRelation2(
+ override val paths: Array[String],
private val maybeDataSchema: Option[StructType],
private val maybePartitionSpec: Option[PartitionSpec],
parameters: Map[String, String])(
val sqlContext: SQLContext)
- extends FSBasedRelation(paths, maybePartitionSpec)
+ extends HadoopFsRelation(maybePartitionSpec)
with Logging {
// Should we merge schemas from all Parquet part-files?
private val shouldMergeSchemas =
- parameters.getOrElse(FSBasedParquetRelation.MERGE_SCHEMA, "true").toBoolean
+ parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean
private val maybeMetastoreSchema = parameters
- .get(FSBasedParquetRelation.METASTORE_SCHEMA)
+ .get(ParquetRelation2.METASTORE_SCHEMA)
.map(DataType.fromJson(_).asInstanceOf[StructType])
- private val metadataCache = new MetadataCache
- metadataCache.refresh()
+ private lazy val metadataCache: MetadataCache = {
+ val meta = new MetadataCache
+ meta.refresh()
+ meta
+ }
override def equals(other: scala.Any): Boolean = other match {
- case that: FSBasedParquetRelation =>
+ case that: ParquetRelation2 =>
val schemaEquality = if (shouldMergeSchemas) {
this.shouldMergeSchemas == that.shouldMergeSchemas
} else {
@@ -175,8 +172,6 @@ private[sql] class FSBasedParquetRelation(
}
}
- override def outputWriterClass: Class[_ <: OutputWriter] = classOf[ParquetOutputWriter]
-
override def dataSchema: StructType = metadataCache.dataSchema
override private[sql] def refresh(): Unit = {
@@ -187,9 +182,12 @@ private[sql] class FSBasedParquetRelation(
// Parquet data source always uses Catalyst internal representations.
override val needConversion: Boolean = false
- override val sizeInBytes = metadataCache.dataStatuses.map(_.getLen).sum
+ override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum
+
+ override def userDefinedPartitionColumns: Option[StructType] =
+ maybePartitionSpec.map(_.partitionColumns)
- override def prepareForWrite(job: Job): Unit = {
+ override def prepareJobForWrite(job: Job): OutputWriterFactory = {
val conf = ContextUtil.getConfiguration(job)
val committerClass =
@@ -224,6 +222,13 @@ private[sql] class FSBasedParquetRelation(
.getOrElse(
sqlContext.conf.parquetCompressionCodec.toUpperCase,
CompressionCodecName.UNCOMPRESSED).name())
+
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = {
+ new ParquetOutputWriter(path, context)
+ }
+ }
}
override def buildScan(
@@ -385,7 +390,7 @@ private[sql] class FSBasedParquetRelation(
// case insensitivity issue and possible schema mismatch (probably caused by schema
// evolution).
maybeMetastoreSchema
- .map(FSBasedParquetRelation.mergeMetastoreParquetSchema(_, dataSchema0))
+ .map(ParquetRelation2.mergeMetastoreParquetSchema(_, dataSchema0))
.getOrElse(dataSchema0)
}
}
@@ -439,12 +444,12 @@ private[sql] class FSBasedParquetRelation(
"No schema defined, " +
s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")
- FSBasedParquetRelation.readSchema(filesToTouch.map(footers.apply), sqlContext)
+ ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext)
}
}
}
-private[sql] object FSBasedParquetRelation extends Logging {
+private[sql] object ParquetRelation2 extends Logging {
// Whether we should merge schemas collected from all Parquet part-files.
private[sql] val MERGE_SCHEMA = "mergeSchema"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index ee099ab959..e6324b20b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -59,7 +59,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
(a, _) => t.buildScan(a)) :: Nil
// Scanning partitioned FSBasedRelation
- case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: FSBasedRelation))
+ case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation))
if t.partitionSpec.partitionColumns.nonEmpty =>
val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
@@ -87,7 +87,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
selectedPartitions) :: Nil
// Scanning non-partitioned FSBasedRelation
- case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: FSBasedRelation)) =>
+ case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
val inputPaths = t.paths.map(new Path(_)).flatMap { path =>
val fs = path.getFileSystem(t.sqlContext.sparkContext.hadoopConfiguration)
val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
@@ -111,10 +111,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
case i @ logical.InsertIntoTable(
- l @ LogicalRelation(t: FSBasedRelation), part, query, overwrite, false) if part.isEmpty =>
+ l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) if part.isEmpty =>
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
execution.ExecutedCommand(
- InsertIntoFSBasedRelation(t, query, Array.empty[String], mode)) :: Nil
+ InsertIntoHadoopFsRelation(t, query, Array.empty[String], mode)) :: Nil
case _ => Nil
}
@@ -126,7 +126,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
partitionColumns: StructType,
partitions: Array[Partition]) = {
val output = projections.map(_.toAttribute)
- val relation = logicalRelation.relation.asInstanceOf[FSBasedRelation]
+ val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]
// Builds RDD[Row]s for each selected partition.
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index d30f7f65e2..d1f0cdab55 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -35,6 +35,10 @@ private[sql] case class Partition(values: Row, path: String)
private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition])
private[sql] object PartitioningUtils {
+ // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't
+ // depend on Hive.
+ private[sql] val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__"
+
private[sql] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) {
require(columnNames.size == literals.size)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 7879328bba..a09bb08de7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -58,8 +58,8 @@ private[sql] case class InsertIntoDataSource(
}
}
-private[sql] case class InsertIntoFSBasedRelation(
- @transient relation: FSBasedRelation,
+private[sql] case class InsertIntoHadoopFsRelation(
+ @transient relation: HadoopFsRelation,
@transient query: LogicalPlan,
partitionColumns: Array[String],
mode: SaveMode)
@@ -102,7 +102,7 @@ private[sql] case class InsertIntoFSBasedRelation(
insert(new DefaultWriterContainer(relation, job), df)
} else {
val writerContainer = new DynamicPartitionWriterContainer(
- relation, job, partitionColumns, "__HIVE_DEFAULT_PARTITION__")
+ relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME)
insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
}
}
@@ -234,7 +234,7 @@ private[sql] case class InsertIntoFSBasedRelation(
}
private[sql] abstract class BaseWriterContainer(
- @transient val relation: FSBasedRelation,
+ @transient val relation: HadoopFsRelation,
@transient job: Job)
extends SparkHadoopMapReduceUtil
with Logging
@@ -261,7 +261,7 @@ private[sql] abstract class BaseWriterContainer(
protected val dataSchema = relation.dataSchema
- protected val outputWriterClass: Class[_ <: OutputWriter] = relation.outputWriterClass
+ protected var outputWriterFactory: OutputWriterFactory = _
private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _
@@ -269,7 +269,7 @@ private[sql] abstract class BaseWriterContainer(
setupIDs(0, 0, 0)
setupConf()
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
- relation.prepareForWrite(job)
+ outputWriterFactory = relation.prepareJobForWrite(job)
outputFormatClass = job.getOutputFormatClass
outputCommitter = newOutputCommitter(taskAttemptContext)
outputCommitter.setupJob(jobContext)
@@ -346,16 +346,15 @@ private[sql] abstract class BaseWriterContainer(
}
private[sql] class DefaultWriterContainer(
- @transient relation: FSBasedRelation,
+ @transient relation: HadoopFsRelation,
@transient job: Job)
extends BaseWriterContainer(relation, job) {
@transient private var writer: OutputWriter = _
override protected def initWriters(): Unit = {
- writer = outputWriterClass.newInstance()
taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath)
- writer.init(getWorkPath, dataSchema, taskAttemptContext)
+ writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
}
override def outputWriterForRow(row: Row): OutputWriter = writer
@@ -372,7 +371,7 @@ private[sql] class DefaultWriterContainer(
}
private[sql] class DynamicPartitionWriterContainer(
- @transient relation: FSBasedRelation,
+ @transient relation: HadoopFsRelation,
@transient job: Job,
partitionColumns: Array[String],
defaultPartitionName: String)
@@ -398,12 +397,10 @@ private[sql] class DynamicPartitionWriterContainer(
outputWriters.getOrElseUpdate(partitionPath, {
val path = new Path(getWorkPath, partitionPath)
- val writer = outputWriterClass.newInstance()
taskAttemptContext.getConfiguration.set(
"spark.sql.sources.output.path",
new Path(outputPath, partitionPath).toString)
- writer.init(path.toString, dataSchema, taskAttemptContext)
- writer
+ outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
})
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 595c5eb40e..37a569db31 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -226,7 +226,7 @@ private[sql] object ResolvedDataSource {
case Some(schema: StructType) => clazz.newInstance() match {
case dataSource: SchemaRelationProvider =>
dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
- case dataSource: FSBasedRelationProvider =>
+ case dataSource: HadoopFsRelationProvider =>
val maybePartitionsSchema = if (partitionColumns.isEmpty) {
None
} else {
@@ -256,7 +256,7 @@ private[sql] object ResolvedDataSource {
case None => clazz.newInstance() match {
case dataSource: RelationProvider =>
dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))
- case dataSource: FSBasedRelationProvider =>
+ case dataSource: HadoopFsRelationProvider =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val paths = {
val patternPath = new Path(caseInsensitiveOptions("path"))
@@ -296,7 +296,7 @@ private[sql] object ResolvedDataSource {
val relation = clazz.newInstance() match {
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(sqlContext, mode, options, data)
- case dataSource: FSBasedRelationProvider =>
+ case dataSource: HadoopFsRelationProvider =>
// Don't glob path for the write path. The contracts here are:
// 1. Only one output path can be specified on the write path;
// 2. Output path must be a legal HDFS style file system path;
@@ -315,7 +315,7 @@ private[sql] object ResolvedDataSource {
Some(partitionColumnsSchema(data.schema, partitionColumns)),
caseInsensitiveOptions)
sqlContext.executePlan(
- InsertIntoFSBasedRelation(
+ InsertIntoHadoopFsRelation(
r,
data.logicalPlan,
partitionColumns.toArray,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 6f315305c1..274ab44852 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, _}
+import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.types.{StructField, StructType}
@@ -94,7 +94,7 @@ trait SchemaRelationProvider {
* ::DeveloperApi::
* Implemented by objects that produce relations for a specific kind of data source
* with a given schema and partitioned columns. When Spark SQL is given a DDL operation with a
- * USING clause specified (to specify the implemented [[FSBasedRelationProvider]]), a user defined
+ * USING clause specified (to specify the implemented [[HadoopFsRelationProvider]]), a user defined
* schema, and an optional list of partition columns, this interface is used to pass in the
* parameters specified by a user.
*
@@ -105,15 +105,15 @@ trait SchemaRelationProvider {
*
* A new instance of this class with be instantiated each time a DDL call is made.
*
- * The difference between a [[RelationProvider]] and a [[FSBasedRelationProvider]] is
+ * The difference between a [[RelationProvider]] and a [[HadoopFsRelationProvider]] is
* that users need to provide a schema and a (possibly empty) list of partition columns when
* using a SchemaRelationProvider. A relation provider can inherits both [[RelationProvider]],
- * and [[FSBasedRelationProvider]] if it can support schema inference, user-specified
+ * and [[HadoopFsRelationProvider]] if it can support schema inference, user-specified
* schemas, and accessing partitioned relations.
*
* @since 1.4.0
*/
-trait FSBasedRelationProvider {
+trait HadoopFsRelationProvider {
/**
* Returns a new base relation with the given parameters, a user defined schema, and a list of
* partition columns. Note: the parameters' keywords are case insensitive and this insensitivity
@@ -124,7 +124,7 @@ trait FSBasedRelationProvider {
paths: Array[String],
schema: Option[StructType],
partitionColumns: Option[StructType],
- parameters: Map[String, String]): FSBasedRelation
+ parameters: Map[String, String]): HadoopFsRelation
}
/**
@@ -280,33 +280,42 @@ trait CatalystScan {
/**
* ::Experimental::
- * [[OutputWriter]] is used together with [[FSBasedRelation]] for persisting rows to the
- * underlying file system. Subclasses of [[OutputWriter]] must provide a zero-argument constructor.
- * An [[OutputWriter]] instance is created and initialized when a new output file is opened on
- * executor side. This instance is used to persist rows to this single output file.
+ * A factory that produces [[OutputWriter]]s. A new [[OutputWriterFactory]] is created on driver
+ * side for each write job issued when writing to a [[HadoopFsRelation]], and then gets serialized
+ * to executor side to create actual [[OutputWriter]]s on the fly.
*
* @since 1.4.0
*/
@Experimental
-abstract class OutputWriter {
+abstract class OutputWriterFactory extends Serializable {
/**
- * Initializes this [[OutputWriter]] before any rows are persisted.
+ * When writing to a [[HadoopFsRelation]], this method gets called by each task on executor side
+ * to instantiate new [[OutputWriter]]s.
*
* @param path Path of the file to which this [[OutputWriter]] is supposed to write. Note that
* this may not point to the final output file. For example, `FileOutputFormat` writes to
* temporary directories and then merge written files back to the final destination. In
* this case, `path` points to a temporary output file under the temporary directory.
* @param dataSchema Schema of the rows to be written. Partition columns are not included in the
- * schema if the corresponding relation is partitioned.
+ * schema if the relation being written is partitioned.
* @param context The Hadoop MapReduce task context.
*
* @since 1.4.0
*/
- def init(
- path: String,
- dataSchema: StructType,
- context: TaskAttemptContext): Unit = ()
+ def newInstance(path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter
+}
+/**
+ * ::Experimental::
+ * [[OutputWriter]] is used together with [[HadoopFsRelation]] for persisting rows to the
+ * underlying file system. Subclasses of [[OutputWriter]] must provide a zero-argument constructor.
+ * An [[OutputWriter]] instance is created and initialized when a new output file is opened on
+ * executor side. This instance is used to persist rows to this single output file.
+ *
+ * @since 1.4.0
+ */
+@Experimental
+abstract class OutputWriter {
/**
* Persists a single row. Invoked on the executor side. When writing to dynamically partitioned
* tables, dynamic partition columns are not included in rows to be written.
@@ -333,74 +342,71 @@ abstract class OutputWriter {
* filter using selected predicates before producing an RDD containing all matching tuples as
* [[Row]] objects. In addition, when reading from Hive style partitioned tables stored in file
* systems, it's able to discover partitioning information from the paths of input directories, and
- * perform partition pruning before start reading the data. Subclasses of [[FSBasedRelation()]] must
- * override one of the three `buildScan` methods to implement the read path.
+ * perform partition pruning before start reading the data. Subclasses of [[HadoopFsRelation()]]
+ * must override one of the three `buildScan` methods to implement the read path.
*
* For the write path, it provides the ability to write to both non-partitioned and partitioned
* tables. Directory layout of the partitioned tables is compatible with Hive.
*
* @constructor This constructor is for internal uses only. The [[PartitionSpec]] argument is for
* implementing metastore table conversion.
- * @param paths Base paths of this relation. For partitioned relations, it should be the root
- * directories of all partition directories.
- * @param maybePartitionSpec An [[FSBasedRelation]] can be created with an optional
+ *
+ * @param maybePartitionSpec An [[HadoopFsRelation]] can be created with an optional
* [[PartitionSpec]], so that partition discovery can be skipped.
*
* @since 1.4.0
*/
@Experimental
-abstract class FSBasedRelation private[sql](
- val paths: Array[String],
- maybePartitionSpec: Option[PartitionSpec])
+abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[PartitionSpec])
extends BaseRelation {
+ def this() = this(None)
+
+ private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+
+ private val codegenEnabled = sqlContext.conf.codegenEnabled
+
+ private var _partitionSpec: PartitionSpec = _
+
+ final private[sql] def partitionSpec: PartitionSpec = {
+ if (_partitionSpec == null) {
+ _partitionSpec = maybePartitionSpec
+ .map(spec => spec.copy(partitionColumns = spec.partitionColumns.asNullable))
+ .orElse(userDefinedPartitionColumns.map(PartitionSpec(_, Array.empty[Partition])))
+ .getOrElse {
+ if (sqlContext.conf.partitionDiscoveryEnabled()) {
+ discoverPartitions()
+ } else {
+ PartitionSpec(StructType(Nil), Array.empty[Partition])
+ }
+ }
+ }
+ _partitionSpec
+ }
+
/**
- * Constructs an [[FSBasedRelation]].
- *
- * @param paths Base paths of this relation. For partitioned relations, it should be either root
- * directories of all partition directories.
- * @param partitionColumns Partition columns of this relation.
+ * Base paths of this relation. For partitioned relations, it should be either root directories
+ * of all partition directories.
*
* @since 1.4.0
*/
- def this(paths: Array[String], partitionColumns: StructType) =
- this(paths, {
- if (partitionColumns.isEmpty) None
- else Some(PartitionSpec(partitionColumns, Array.empty[Partition]))
- })
+ def paths: Array[String]
/**
- * Constructs an [[FSBasedRelation]].
- *
- * @param paths Base paths of this relation. For partitioned relations, it should be root
- * directories of all partition directories.
+ * Partition columns. Can be either defined by [[userDefinedPartitionColumns]] or automatically
+ * discovered. Note that they should always be nullable.
*
* @since 1.4.0
*/
- def this(paths: Array[String]) = this(paths, None)
-
- private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
-
- private val codegenEnabled = sqlContext.conf.codegenEnabled
-
- private var _partitionSpec: PartitionSpec = maybePartitionSpec.map { spec =>
- spec.copy(partitionColumns = spec.partitionColumns.asNullable)
- }.getOrElse {
- if (sqlContext.conf.partitionDiscoveryEnabled()) {
- discoverPartitions()
- } else {
- PartitionSpec(StructType(Nil), Array.empty[Partition])
- }
- }
-
- private[sql] def partitionSpec: PartitionSpec = _partitionSpec
+ final def partitionColumns: StructType =
+ userDefinedPartitionColumns.getOrElse(partitionSpec.partitionColumns)
/**
- * Partition columns. Note that they are always nullable.
+ * Optional user defined partition columns.
*
* @since 1.4.0
*/
- def partitionColumns: StructType = partitionSpec.partitionColumns
+ def userDefinedPartitionColumns: Option[StructType] = None
private[sql] def refresh(): Unit = {
if (sqlContext.conf.partitionDiscoveryEnabled()) {
@@ -419,7 +425,7 @@ abstract class FSBasedRelation private[sql](
}.map(_.getPath)
if (leafDirs.nonEmpty) {
- PartitioningUtils.parsePartitions(leafDirs, "__HIVE_DEFAULT_PARTITION__")
+ PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME)
} else {
PartitionSpec(StructType(Array.empty[StructField]), Array.empty[Partition])
}
@@ -458,7 +464,7 @@ abstract class FSBasedRelation private[sql](
* @since 1.4.0
*/
def buildScan(inputPaths: Array[String]): RDD[Row] = {
- throw new RuntimeException(
+ throw new UnsupportedOperationException(
"At least one buildScan() method should be overridden to read the relation.")
}
@@ -520,8 +526,8 @@ abstract class FSBasedRelation private[sql](
}
/**
- * Client side preparation for data writing can be put here. For example, user defined output
- * committer can be configured here.
+ * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can
+ * be put here. For example, user defined output committer can be configured here.
*
* Note that the only side effect expected here is mutating `job` via its setters. Especially,
* Spark SQL caches [[BaseRelation]] instances for performance, mutating relation internal states
@@ -529,13 +535,5 @@ abstract class FSBasedRelation private[sql](
*
* @since 1.4.0
*/
- def prepareForWrite(job: Job): Unit = ()
-
- /**
- * This method is responsible for producing a new [[OutputWriter]] for each newly opened output
- * file on the executor side.
- *
- * @since 1.4.0
- */
- def outputWriterClass: Class[_ <: OutputWriter]
+ def prepareJobForWrite(job: Job): OutputWriterFactory
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
index aad1d248d0..1eacdde741 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
@@ -102,7 +102,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
}
case logical.InsertIntoTable(LogicalRelation(_: InsertableRelation), _, _, _, _) => // OK
- case logical.InsertIntoTable(LogicalRelation(_: FSBasedRelation), _, _, _, _) => // OK
+ case logical.InsertIntoTable(LogicalRelation(_: HadoopFsRelation), _, _, _, _) => // OK
case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) =>
// The relation in l is not an InsertableRelation.
failAnalysis(s"$l does not allow insertion.")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index 3bbc5b0586..5ad4395847 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -63,7 +63,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
}.flatten.reduceOption(_ && _)
val forParquetDataSource = query.queryExecution.optimizedPlan.collect {
- case PhysicalOperation(_, filters, LogicalRelation(_: FSBasedParquetRelation)) => filters
+ case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation2)) => filters
}.flatten.reduceOption(_ && _)
forParquetTableScan.orElse(forParquetDataSource)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
index fc90e3edce..c964b6d984 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -204,7 +204,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
StructField("lowerCase", StringType),
StructField("UPPERCase", DoubleType, nullable = false)))) {
- FSBasedParquetRelation.mergeMetastoreParquetSchema(
+ ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("lowercase", StringType),
StructField("uppercase", DoubleType, nullable = false))),
@@ -219,7 +219,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
StructType(Seq(
StructField("UPPERCase", DoubleType, nullable = false)))) {
- FSBasedParquetRelation.mergeMetastoreParquetSchema(
+ ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false))),
@@ -230,7 +230,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
// Metastore schema contains additional non-nullable fields.
assert(intercept[Throwable] {
- FSBasedParquetRelation.mergeMetastoreParquetSchema(
+ ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("uppercase", DoubleType, nullable = false),
StructField("lowerCase", BinaryType, nullable = false))),
@@ -241,7 +241,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
// Conflicting non-nullable field names
intercept[Throwable] {
- FSBasedParquetRelation.mergeMetastoreParquetSchema(
+ ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(StructField("lower", StringType, nullable = false))),
StructType(Seq(StructField("lowerCase", BinaryType))))
}
@@ -255,7 +255,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
StructField("firstField", StringType, nullable = true),
StructField("secondField", StringType, nullable = true),
StructField("thirdfield", StringType, nullable = true)))) {
- FSBasedParquetRelation.mergeMetastoreParquetSchema(
+ ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
@@ -268,7 +268,7 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
// Merge should fail if the Metastore contains any additional fields that are not
// nullable.
assert(intercept[Throwable] {
- FSBasedParquetRelation.mergeMetastoreParquetSchema(
+ ParquetRelation2.mergeMetastoreParquetSchema(
StructType(Seq(
StructField("firstfield", StringType, nullable = true),
StructField("secondfield", StringType, nullable = true),
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index b0e82c8d03..2aa80b47a9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.hive.client._
-import org.apache.spark.sql.parquet.FSBasedParquetRelation
+import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode, sources}
@@ -226,8 +226,8 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// serialize the Metastore schema to JSON and pass it as a data source option because of the
// evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
val parquetOptions = Map(
- FSBasedParquetRelation.METASTORE_SCHEMA -> metastoreSchema.json,
- FSBasedParquetRelation.MERGE_SCHEMA -> mergeSchema.toString)
+ ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
+ ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
val tableIdentifier =
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
@@ -238,7 +238,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
- case logical@LogicalRelation(parquetRelation: FSBasedParquetRelation) =>
+ case logical@LogicalRelation(parquetRelation: ParquetRelation2) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
@@ -281,7 +281,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec))
val parquetRelation = cached.getOrElse {
val created = LogicalRelation(
- new FSBasedParquetRelation(
+ new ParquetRelation2(
paths.toArray, None, Some(partitionSpec), parquetOptions)(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
@@ -294,7 +294,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
val parquetRelation = cached.getOrElse {
val created = LogicalRelation(
- new FSBasedParquetRelation(paths.toArray, None, None, parquetOptions)(hive))
+ new ParquetRelation2(paths.toArray, None, None, parquetOptions)(hive))
cachedDataSourceTables.put(tableIdentifier, created)
created
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 8e405e0804..6609763343 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -194,7 +194,7 @@ case class CreateMetastoreDataSourceAsSelect(
sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath)
val createdRelation = LogicalRelation(resolved.relation)
EliminateSubQueries(sqlContext.table(tableName).logicalPlan) match {
- case l @ LogicalRelation(_: InsertableRelation | _: FSBasedRelation) =>
+ case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation) =>
if (l.relation != createdRelation.relation) {
val errorDescription =
s"Cannot append to table $tableName because the resolved relation does not " +
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index da5d203d9d..1bf1c1be3e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.parquet.FSBasedParquetRelation
+import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -579,11 +579,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
)
table("test_parquet_ctas").queryExecution.optimizedPlan match {
- case LogicalRelation(p: FSBasedParquetRelation) => // OK
+ case LogicalRelation(p: ParquetRelation2) => // OK
case _ =>
fail(
"test_parquet_ctas should be converted to " +
- s"${classOf[FSBasedParquetRelation].getCanonicalName}")
+ s"${classOf[ParquetRelation2].getCanonicalName}")
}
// Clenup and reset confs.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 5c7152e214..dfe73c62c4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim, MetastoreRelation}
-import org.apache.spark.sql.parquet.FSBasedParquetRelation
+import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.types._
@@ -175,17 +175,17 @@ class SQLQuerySuite extends QueryTest {
def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName)))
relation match {
- case LogicalRelation(r: FSBasedParquetRelation) =>
+ case LogicalRelation(r: ParquetRelation2) =>
if (!isDataSourceParquet) {
fail(
s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
- s"${FSBasedParquetRelation.getClass.getCanonicalName}.")
+ s"${ParquetRelation2.getClass.getCanonicalName}.")
}
case r: MetastoreRelation =>
if (isDataSourceParquet) {
fail(
- s"${FSBasedParquetRelation.getClass.getCanonicalName} is expected, but found " +
+ s"${ParquetRelation2.getClass.getCanonicalName} is expected, but found " +
s"${classOf[MetastoreRelation].getCanonicalName}.")
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 41bcbe84b0..b6be09e2f8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -26,8 +26,8 @@ import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.parquet.{FSBasedParquetRelation, ParquetTableScan}
-import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoFSBasedRelation, LogicalRelation}
+import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
+import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
import org.apache.spark.util.Utils
@@ -291,10 +291,10 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
)
table("test_parquet_ctas").queryExecution.optimizedPlan match {
- case LogicalRelation(_: FSBasedParquetRelation) => // OK
+ case LogicalRelation(_: ParquetRelation2) => // OK
case _ => fail(
"test_parquet_ctas should be converted to " +
- s"${classOf[FSBasedParquetRelation].getCanonicalName}")
+ s"${classOf[ParquetRelation2].getCanonicalName}")
}
sql("DROP TABLE IF EXISTS test_parquet_ctas")
@@ -315,9 +315,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
df.queryExecution.executedPlan match {
- case ExecutedCommand(InsertIntoFSBasedRelation(_: FSBasedParquetRelation, _, _, _)) => // OK
+ case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation2, _, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
- s"${classOf[FSBasedParquetRelation].getCanonicalName} and " +
+ s"${classOf[ParquetRelation2].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " +
s"However, found a ${o.toString} ")
}
@@ -345,9 +345,9 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
df.queryExecution.executedPlan match {
- case ExecutedCommand(InsertIntoFSBasedRelation(r: FSBasedParquetRelation, _, _, _)) => // OK
+ case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation2, _, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
- s"${classOf[FSBasedParquetRelation].getCanonicalName} and " +
+ s"${classOf[ParquetRelation2].getCanonicalName} and " +
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
s"However, found a ${o.toString} ")
}
@@ -378,7 +378,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
assertResult(2) {
analyzed.collect {
- case r @ LogicalRelation(_: FSBasedParquetRelation) => r
+ case r @ LogicalRelation(_: ParquetRelation2) => r
}.size
}
@@ -390,7 +390,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
// Converted test_parquet should be cached.
catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match {
case null => fail("Converted test_parquet should be cached in the cache.")
- case logical @ LogicalRelation(parquetRelation: FSBasedParquetRelation) => // OK
+ case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK
case other =>
fail(
"The cached test_parquet should be a Parquet Relation. " +
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 8801aba2f6..29b21586f9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -24,7 +24,7 @@ import com.google.common.base.Objects
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
-import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
@@ -32,17 +32,16 @@ import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
/**
- * A simple example [[FSBasedRelationProvider]].
+ * A simple example [[HadoopFsRelationProvider]].
*/
-class SimpleTextSource extends FSBasedRelationProvider {
+class SimpleTextSource extends HadoopFsRelationProvider {
override def createRelation(
sqlContext: SQLContext,
paths: Array[String],
schema: Option[StructType],
partitionColumns: Option[StructType],
- parameters: Map[String, String]): FSBasedRelation = {
- val partitionsSchema = partitionColumns.getOrElse(StructType(Array.empty[StructField]))
- new SimpleTextRelation(paths, schema, partitionsSchema, parameters)(sqlContext)
+ parameters: Map[String, String]): HadoopFsRelation = {
+ new SimpleTextRelation(paths, schema, partitionColumns, parameters)(sqlContext)
}
}
@@ -59,38 +58,30 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW
}
}
-class SimpleTextOutputWriter extends OutputWriter {
- private var recordWriter: RecordWriter[NullWritable, Text] = _
- private var taskAttemptContext: TaskAttemptContext = _
-
- override def init(
- path: String,
- dataSchema: StructType,
- context: TaskAttemptContext): Unit = {
- recordWriter = new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context)
- taskAttemptContext = context
- }
+class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter {
+ private val recordWriter: RecordWriter[NullWritable, Text] =
+ new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context)
override def write(row: Row): Unit = {
val serialized = row.toSeq.map(_.toString).mkString(",")
recordWriter.write(null, new Text(serialized))
}
- override def close(): Unit = recordWriter.close(taskAttemptContext)
+ override def close(): Unit = recordWriter.close(context)
}
/**
- * A simple example [[FSBasedRelation]], used for testing purposes. Data are stored as comma
+ * A simple example [[HadoopFsRelation]], used for testing purposes. Data are stored as comma
* separated string lines. When scanning data, schema must be explicitly provided via data source
* option `"dataSchema"`.
*/
class SimpleTextRelation(
- paths: Array[String],
+ override val paths: Array[String],
val maybeDataSchema: Option[StructType],
- partitionsSchema: StructType,
+ override val userDefinedPartitionColumns: Option[StructType],
parameters: Map[String, String])(
@transient val sqlContext: SQLContext)
- extends FSBasedRelation(paths, partitionsSchema) {
+ extends HadoopFsRelation {
import sqlContext.sparkContext
@@ -110,9 +101,6 @@ class SimpleTextRelation(
override def hashCode(): Int =
Objects.hashCode(paths, maybeDataSchema, dataSchema)
- override def outputWriterClass: Class[_ <: OutputWriter] =
- classOf[SimpleTextOutputWriter]
-
override def buildScan(inputPaths: Array[String]): RDD[Row] = {
val fields = dataSchema.map(_.dataType)
@@ -122,4 +110,13 @@ class SimpleTextRelation(
}: _*)
}
}
+
+ override def prepareJobForWrite(job: Job): OutputWriterFactory = new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ new SimpleTextOutputWriter(path, context)
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 394833f229..cf6afd25ae 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/fsBasedRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types._
// TODO Don't extend ParquetTest
// This test suite extends ParquetTest for some convenient utility methods. These methods should be
// moved to some more general places, maybe QueryTest.
-class FSBasedRelationTest extends QueryTest with ParquetTest {
+class HadoopFsRelationTest extends QueryTest with ParquetTest {
override val sqlContext: SQLContext = TestHive
import sqlContext._
@@ -487,7 +487,7 @@ class FSBasedRelationTest extends QueryTest with ParquetTest {
}
val actualPaths = df.queryExecution.analyzed.collectFirst {
- case LogicalRelation(relation: FSBasedRelation) =>
+ case LogicalRelation(relation: HadoopFsRelation) =>
relation.paths.toSet
}.getOrElse {
fail("Expect an FSBasedRelation, but none could be found")
@@ -499,7 +499,7 @@ class FSBasedRelationTest extends QueryTest with ParquetTest {
}
}
-class SimpleTextRelationSuite extends FSBasedRelationTest {
+class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName
import sqlContext._
@@ -530,7 +530,7 @@ class SimpleTextRelationSuite extends FSBasedRelationTest {
}
}
-class FSBasedParquetRelationSuite extends FSBasedRelationTest {
+class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
override val dataSourceName: String = classOf[parquet.DefaultSource].getCanonicalName
import sqlContext._