From e720dda42e806229ccfd970055c7b8a93eb447bf Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 7 Mar 2016 15:15:10 -0800 Subject: [SPARK-13665][SQL] Separate the concerns of HadoopFsRelation `HadoopFsRelation` is used for reading most files into Spark SQL. However today this class mixes the concerns of file management, schema reconciliation, scan building, bucketing, partitioning, and writing data. As a result, many data sources are forced to reimplement the same functionality and the various layers have accumulated a fair bit of inefficiency. This PR is a first cut at separating this into several components / interfaces that are each described below. Additionally, all implementations inside of Spark (parquet, csv, json, text, orc, svmlib) have been ported to the new API `FileFormat`. External libraries, such as spark-avro will also need to be ported to work with Spark 2.0. ### HadoopFsRelation A simple `case class` that acts as a container for all of the metadata required to read from a datasource. All discovery, resolution and merging logic for schemas and partitions has been removed. This an internal representation that no longer needs to be exposed to developers. ```scala case class HadoopFsRelation( sqlContext: SQLContext, location: FileCatalog, partitionSchema: StructType, dataSchema: StructType, bucketSpec: Option[BucketSpec], fileFormat: FileFormat, options: Map[String, String]) extends BaseRelation ``` ### FileFormat The primary interface that will be implemented by each different format including external libraries. Implementors are responsible for reading a given format and converting it into `InternalRow` as well as writing out an `InternalRow`. A format can optionally return a schema that is inferred from a set of files. ```scala trait FileFormat { def inferSchema( sqlContext: SQLContext, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] def prepareWrite( sqlContext: SQLContext, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory def buildInternalScan( sqlContext: SQLContext, dataSchema: StructType, requiredColumns: Array[String], filters: Array[Filter], bucketSet: Option[BitSet], inputFiles: Array[FileStatus], broadcastedConf: Broadcast[SerializableConfiguration], options: Map[String, String]): RDD[InternalRow] } ``` The current interface is based on what was required to get all the tests passing again, but still mixes a couple of concerns (i.e. `bucketSet` is passed down to the scan instead of being resolved by the planner). Additionally, scans are still returning `RDD`s instead of iterators for single files. In a future PR, bucketing should be removed from this interface and the scan should be isolated to a single file. ### FileCatalog This interface is used to list the files that make up a given relation, as well as handle directory based partitioning. ```scala trait FileCatalog { def paths: Seq[Path] def partitionSpec(schema: Option[StructType]): PartitionSpec def allFiles(): Seq[FileStatus] def getStatus(path: Path): Array[FileStatus] def refresh(): Unit } ``` Currently there are two implementations: - `HDFSFileCatalog` - based on code from the old `HadoopFsRelation`. Infers partitioning by recursive listing and caches this data for performance - `HiveFileCatalog` - based on the above, but it uses the partition spec from the Hive Metastore. ### ResolvedDataSource Produces a logical plan given the following description of a Data Source (which can come from DataFrameReader or a metastore): - `paths: Seq[String] = Nil` - `userSpecifiedSchema: Option[StructType] = None` - `partitionColumns: Array[String] = Array.empty` - `bucketSpec: Option[BucketSpec] = None` - `provider: String` - `options: Map[String, String]` This class is responsible for deciding which of the Data Source APIs a given provider is using (including the non-file based ones). All reconciliation of partitions, buckets, schema from metastores or inference is done here. ### DataSourceAnalysis / DataSourceStrategy Responsible for analyzing and planning reading/writing of data using any of the Data Source APIs, including: - pruning the files from partitions that will be read based on filters. - appending partition columns* - applying additional filters when a data source can not evaluate them internally. - constructing an RDD that is bucketed correctly when required* - sanity checking schema match-up and other analysis when writing. *In the future we should do that following: - Break out file handling into its own Strategy as its sufficiently complex / isolated. - Push the appending of partition columns down in to `FileFormat` to avoid an extra copy / unvectorization. - Use a custom RDD for scans instead of `SQLNewNewHadoopRDD2` Author: Michael Armbrust Author: Wenchen Fan Closes #11509 from marmbrus/fileDataSource. --- .../org/apache/spark/sql/DataFrameReader.scala | 59 +- .../org/apache/spark/sql/DataFrameWriter.scala | 7 - .../apache/spark/sql/execution/ExistingRDD.scala | 17 +- .../execution/datasources/DataSourceStrategy.scala | 235 +++++-- .../datasources/InsertIntoHadoopFsRelation.scala | 77 +-- .../execution/datasources/PartitioningUtils.scala | 16 +- .../execution/datasources/ResolvedDataSource.scala | 261 +++++--- .../execution/datasources/WriterContainer.scala | 24 +- .../spark/sql/execution/datasources/bucket.scala | 24 - .../execution/datasources/csv/CSVRelation.scala | 136 +--- .../execution/datasources/csv/DefaultSource.scala | 157 ++++- .../spark/sql/execution/datasources/ddl.scala | 5 +- .../execution/datasources/json/InferSchema.scala | 2 +- .../execution/datasources/json/JSONRelation.scala | 176 +++--- .../datasources/parquet/ParquetRelation.scala | 503 ++++++--------- .../spark/sql/execution/datasources/rules.scala | 3 +- .../execution/datasources/text/DefaultSource.scala | 122 ++-- .../apache/spark/sql/internal/SessionState.scala | 7 +- .../org/apache/spark/sql/sources/interfaces.scala | 701 +++++---------------- .../org/apache/spark/sql/DataFrameSuite.scala | 2 - .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 2 +- .../sql/execution/datasources/json/JsonSuite.scala | 71 --- .../datasources/parquet/ParquetFilterSuite.scala | 5 +- .../datasources/parquet/ParquetIOSuite.scala | 4 +- .../parquet/ParquetPartitionDiscoverySuite.scala | 3 +- .../org/apache/spark/sql/sources/InsertSuite.scala | 2 +- .../sql/streaming/FileStreamSourceSuite.scala | 16 +- .../org/apache/spark/sql/test/SQLTestUtils.scala | 9 +- 28 files changed, 1102 insertions(+), 1544 deletions(-) (limited to 'sql/core/src') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 20c861de23..fd92e526e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -21,18 +21,14 @@ import java.util.Properties import scala.collection.JavaConverters._ -import org.apache.hadoop.fs.Path -import org.apache.hadoop.util.StringUtils - import org.apache.spark.{Logging, Partition} import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaRDD -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD +import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource} import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} -import org.apache.spark.sql.execution.datasources.json.JSONRelation -import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation +import org.apache.spark.sql.execution.datasources.json.{InferSchema, JacksonParser, JSONOptions} import org.apache.spark.sql.execution.streaming.StreamingRelation import org.apache.spark.sql.types.StructType @@ -129,8 +125,6 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { val resolved = ResolvedDataSource( sqlContext, userSpecifiedSchema = userSpecifiedSchema, - partitionColumns = Array.empty[String], - bucketSpec = None, provider = source, options = extraOptions.toMap) DataFrame(sqlContext, LogicalRelation(resolved.relation)) @@ -154,7 +148,17 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { */ @scala.annotation.varargs def load(paths: String*): DataFrame = { - option("paths", paths.map(StringUtils.escapeString(_, '\\', ',')).mkString(",")).load() + if (paths.isEmpty) { + sqlContext.emptyDataFrame + } else { + sqlContext.baseRelationToDataFrame( + ResolvedDataSource.apply( + sqlContext, + paths = paths, + userSpecifiedSchema = userSpecifiedSchema, + provider = source, + options = extraOptions.toMap).relation) + } } /** @@ -334,14 +338,20 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * @since 1.4.0 */ def json(jsonRDD: RDD[String]): DataFrame = { - sqlContext.baseRelationToDataFrame( - new JSONRelation( - Some(jsonRDD), - maybeDataSchema = userSpecifiedSchema, - maybePartitionSpec = None, - userDefinedPartitionColumns = None, - parameters = extraOptions.toMap)(sqlContext) - ) + val parsedOptions: JSONOptions = new JSONOptions(extraOptions.toMap) + val schema = userSpecifiedSchema.getOrElse { + InferSchema.infer(jsonRDD, sqlContext.conf.columnNameOfCorruptRecord, parsedOptions) + } + + new DataFrame( + sqlContext, + LogicalRDD( + schema.toAttributes, + JacksonParser.parse( + jsonRDD, + schema, + sqlContext.conf.columnNameOfCorruptRecord, + parsedOptions))(sqlContext)) } /** @@ -363,20 +373,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { */ @scala.annotation.varargs def parquet(paths: String*): DataFrame = { - if (paths.isEmpty) { - sqlContext.emptyDataFrame - } else { - val globbedPaths = paths.flatMap { path => - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualified) - }.toArray - - sqlContext.baseRelationToDataFrame( - new ParquetRelation( - globbedPaths.map(_.toString), userSpecifiedSchema, None, extraOptions.toMap)(sqlContext)) - } + format("parquet").load(paths: _*) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index c373606a2e..6d8c8f6b4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -366,13 +366,6 @@ final class DataFrameWriter private[sql](df: DataFrame) { case (true, SaveMode.ErrorIfExists) => throw new AnalysisException(s"Table $tableIdent already exists.") - case (true, SaveMode.Append) => - // If it is Append, we just ask insertInto to handle it. We will not use insertInto - // to handle saveAsTable with Overwrite because saveAsTable can change the schema of - // the table. But, insertInto with Overwrite requires the schema of data be the same - // the schema of the table. - insertInto(tableIdent) - case _ => val cmd = CreateTableUsingAsSelect( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 36e656b8b6..4ad07508ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} -import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation +import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetSource} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation} @@ -226,16 +226,17 @@ private[sql] object PhysicalRDD { rdd: RDD[InternalRow], relation: BaseRelation, metadata: Map[String, String] = Map.empty): PhysicalRDD = { - val outputUnsafeRows = if (relation.isInstanceOf[ParquetRelation]) { - // The vectorized parquet reader does not produce unsafe rows. - !SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) - } else { - // All HadoopFsRelations output UnsafeRows - relation.isInstanceOf[HadoopFsRelation] + + val outputUnsafeRows = relation match { + case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] => + !SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) + case _: HadoopFsRelation => true + case _ => false } val bucketSpec = relation match { - case r: HadoopFsRelation => r.getBucketSpec + // TODO: this should be closer to bucket planning. + case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled() => r.bucketSpec case _ => None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 69a6d23203..2944a8f86f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -25,12 +25,14 @@ import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.PhysicalRDD.{INPUT_PATHS, PUSHED_FILTERS} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.ExecutedCommand @@ -41,6 +43,45 @@ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{SerializableConfiguration, Utils} import org.apache.spark.util.collection.BitSet +/** + * Replaces generic operations with specific variants that are designed to work with Spark + * SQL Data Sources. + */ +private[sql] object DataSourceAnalysis extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case i @ logical.InsertIntoTable( + l @ LogicalRelation(t: HadoopFsRelation, _, _), part, query, overwrite, false) + if query.resolved && t.schema.asNullable == query.schema.asNullable => + + // Sanity checks + if (t.location.paths.size != 1) { + throw new AnalysisException( + "Can only write data to relations with a single path.") + } + + val outputPath = t.location.paths.head + val inputPaths = query.collect { + case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.paths + }.flatten + + val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append + if (overwrite && inputPaths.contains(outputPath)) { + throw new AnalysisException( + "Cannot overwrite a path that is also being read from.") + } + + InsertIntoHadoopFsRelation( + outputPath, + t.partitionSchema.fields.map(_.name).map(UnresolvedAttribute(_)), + t.bucketSpec, + t.fileFormat, + () => t.refresh(), + t.options, + query, + mode) + } +} + /** * A Strategy for planning scans over data sources defined using the sources API. */ @@ -70,10 +111,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Scanning partitioned HadoopFsRelation case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _, _)) - if t.partitionSpec.partitionColumns.nonEmpty => + if t.partitionSchema.nonEmpty => // We divide the filter expressions into 3 parts val partitionColumns = AttributeSet( - t.partitionColumns.map(c => l.output.find(_.name == c.name).get)) + t.partitionSchema.map(c => l.output.find(_.name == c.name).get)) // Only pruning the partition keys val partitionFilters = filters.filter(_.references.subsetOf(partitionColumns)) @@ -104,15 +145,15 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Prune the buckets based on the pushed filters that do not contain partitioning key // since the bucketing key is not allowed to use the columns in partitioning key - val bucketSet = getBuckets(pushedFilters, t.getBucketSpec) - + val bucketSet = getBuckets(pushedFilters, t.bucketSpec) val scan = buildPartitionedTableScan( l, partitionAndNormalColumnProjs, pushedFilters, bucketSet, t.partitionSpec.partitionColumns, - selectedPartitions) + selectedPartitions, + t.options) // Add a Projection to guarantee the original projection: // this is because "partitionAndNormalColumnAttrs" may be different @@ -127,6 +168,9 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { } ).getOrElse(scan) :: Nil + // TODO: The code for planning bucketed/unbucketed/partitioned/unpartitioned tables contains + // a lot of duplication and produces overly complicated RDDs. + // Scanning non-partitioned HadoopFsRelation case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _, _)) => // See buildPartitionedTableScan for the reason that we need to create a shard @@ -134,14 +178,65 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val sharedHadoopConf = SparkHadoopUtil.get.conf val confBroadcast = t.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf)) - // Prune the buckets based on the filters - val bucketSet = getBuckets(filters, t.getBucketSpec) - pruneFilterProject( - l, - projects, - filters, - (a, f) => - t.buildInternalScan(a.map(_.name).toArray, f, bucketSet, t.paths, confBroadcast)) :: Nil + + t.bucketSpec match { + case Some(spec) if t.sqlContext.conf.bucketingEnabled() => + val scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow] = { + (requiredColumns: Seq[Attribute], filters: Array[Filter]) => { + val bucketed = + t.location + .allFiles() + .filterNot(_.getPath.getName startsWith "_") + .groupBy { f => + BucketingUtils + .getBucketId(f.getPath.getName) + .getOrElse(sys.error(s"Invalid bucket file ${f.getPath}")) + } + + val bucketedDataMap = bucketed.mapValues { bucketFiles => + t.fileFormat.buildInternalScan( + t.sqlContext, + t.dataSchema, + requiredColumns.map(_.name).toArray, + filters, + None, + bucketFiles.toArray, + confBroadcast, + t.options).coalesce(1) + } + + val bucketedRDD = new UnionRDD(t.sqlContext.sparkContext, + (0 until spec.numBuckets).map { bucketId => + bucketedDataMap.get(bucketId).getOrElse { + t.sqlContext.emptyResult: RDD[InternalRow] + } + }) + bucketedRDD + } + } + + pruneFilterProject( + l, + projects, + filters, + scanBuilder) :: Nil + + case _ => + pruneFilterProject( + l, + projects, + filters, + (a, f) => + t.fileFormat.buildInternalScan( + t.sqlContext, + t.dataSchema, + a.map(_.name).toArray, + f, + None, + t.location.allFiles().toArray, + confBroadcast, + t.options)) :: Nil + } case l @ LogicalRelation(baseRelation: TableScan, _, _) => execution.PhysicalRDD.createFromDataSource( @@ -151,11 +246,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { part, query, overwrite, false) if part.isEmpty => ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil - case i @ logical.InsertIntoTable( - l @ LogicalRelation(t: HadoopFsRelation, _, _), part, query, overwrite, false) => - val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append - ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil - case _ => Nil } @@ -165,7 +255,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { filters: Seq[Expression], buckets: Option[BitSet], partitionColumns: StructType, - partitions: Array[Partition]): SparkPlan = { + partitions: Array[Partition], + options: Map[String, String]): SparkPlan = { val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation] // Because we are creating one RDD per partition, we need to have a shared HadoopConf. @@ -177,36 +268,86 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Now, we create a scan builder, which will be used by pruneFilterProject. This scan builder // will union all partitions and attach partition values if needed. - val scanBuilder = { + val scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow] = { (requiredColumns: Seq[Attribute], filters: Array[Filter]) => { - val requiredDataColumns = - requiredColumns.filterNot(c => partitionColumnNames.contains(c.name)) - - // Builds RDD[Row]s for each selected partition. - val perPartitionRows = partitions.map { case Partition(partitionValues, dir) => - // Don't scan any partition columns to save I/O. Here we are being optimistic and - // assuming partition columns data stored in data files are always consistent with those - // partition values encoded in partition directory paths. - val dataRows = relation.buildInternalScan( - requiredDataColumns.map(_.name).toArray, filters, buckets, Array(dir), confBroadcast) - - // Merges data values with partition values. - mergeWithPartitionValues( - requiredColumns, - requiredDataColumns, - partitionColumns, - partitionValues, - dataRows) - } - val unionedRows = - if (perPartitionRows.length == 0) { - relation.sqlContext.emptyResult - } else { - new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows) - } + relation.bucketSpec match { + case Some(spec) if relation.sqlContext.conf.bucketingEnabled() => + val requiredDataColumns = + requiredColumns.filterNot(c => partitionColumnNames.contains(c.name)) + + // Builds RDD[Row]s for each selected partition. + val perPartitionRows: Seq[(Int, RDD[InternalRow])] = partitions.flatMap { + case Partition(partitionValues, dir) => + val files = relation.location.getStatus(dir) + val bucketed = files.groupBy { f => + BucketingUtils + .getBucketId(f.getPath.getName) + .getOrElse(sys.error(s"Invalid bucket file ${f.getPath}")) + } + + bucketed.map { bucketFiles => + // Don't scan any partition columns to save I/O. Here we are being optimistic and + // assuming partition columns data stored in data files are always consistent with + // those partition values encoded in partition directory paths. + val dataRows = relation.fileFormat.buildInternalScan( + relation.sqlContext, + relation.dataSchema, + requiredDataColumns.map(_.name).toArray, + filters, + buckets, + bucketFiles._2, + confBroadcast, + options) + + // Merges data values with partition values. + bucketFiles._1 -> mergeWithPartitionValues( + requiredColumns, + requiredDataColumns, + partitionColumns, + partitionValues, + dataRows) + } + } - unionedRows + val bucketedDataMap: Map[Int, Seq[RDD[InternalRow]]] = + perPartitionRows.groupBy(_._1).mapValues(_.map(_._2)) + + val bucketed = new UnionRDD(relation.sqlContext.sparkContext, + (0 until spec.numBuckets).map { bucketId => + bucketedDataMap.get(bucketId).map(i => i.reduce(_ ++ _).coalesce(1)).getOrElse { + relation.sqlContext.emptyResult: RDD[InternalRow] + } + }) + bucketed + + case _ => + val requiredDataColumns = + requiredColumns.filterNot(c => partitionColumnNames.contains(c.name)) + + // Builds RDD[Row]s for each selected partition. + val perPartitionRows = partitions.map { + case Partition(partitionValues, dir) => + val dataRows = relation.fileFormat.buildInternalScan( + relation.sqlContext, + relation.dataSchema, + requiredDataColumns.map(_.name).toArray, + filters, + buckets, + relation.location.getStatus(dir), + confBroadcast, + options) + + // Merges data values with partition values. + mergeWithPartitionValues( + requiredColumns, + requiredDataColumns, + partitionColumns, + partitionValues, + dataRows) + } + new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows) + } } } @@ -477,7 +618,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { } relation.relation match { - case r: HadoopFsRelation => pairs += INPUT_PATHS -> r.paths.mkString(", ") + case r: HadoopFsRelation => pairs += INPUT_PATHS -> r.location.paths.mkString(", ") case _ => } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala index d4cc20b06f..fb52730104 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala @@ -25,8 +25,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.spark._ import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.RunnableCommand @@ -34,7 +34,6 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.util.Utils - /** * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending. * Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a @@ -58,18 +57,29 @@ import org.apache.spark.util.Utils * thrown during job commitment, also aborts the job. */ private[sql] case class InsertIntoHadoopFsRelation( - @transient relation: HadoopFsRelation, + outputPath: Path, + partitionColumns: Seq[Attribute], + bucketSpec: Option[BucketSpec], + fileFormat: FileFormat, + refreshFunction: () => Unit, + options: Map[String, String], @transient query: LogicalPlan, mode: SaveMode) extends RunnableCommand { + override def children: Seq[LogicalPlan] = query :: Nil + override def run(sqlContext: SQLContext): Seq[Row] = { - require( - relation.paths.length == 1, - s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}") + // Most formats don't do well with duplicate columns, so lets not allow that + if (query.schema.fieldNames.length != query.schema.fieldNames.distinct.length) { + val duplicateColumns = query.schema.fieldNames.groupBy(identity).collect { + case (x, ys) if ys.length > 1 => "\"" + x + "\"" + }.mkString(", ") + throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " + + s"cannot save to file.") + } val hadoopConf = sqlContext.sparkContext.hadoopConfiguration - val outputPath = new Path(relation.paths.head) val fs = outputPath.getFileSystem(hadoopConf) val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) @@ -101,45 +111,28 @@ private[sql] case class InsertIntoHadoopFsRelation( job.setOutputValueClass(classOf[InternalRow]) FileOutputFormat.setOutputPath(job, qualifiedOutputPath) - // A partitioned relation schema's can be different from the input logicalPlan, since - // partition columns are all moved after data column. We Project to adjust the ordering. - // TODO: this belongs in the analyzer. - val project = Project( - relation.schema.map(field => UnresolvedAttribute.quoted(field.name)), query) - val queryExecution = DataFrame(sqlContext, project).queryExecution + val partitionSet = AttributeSet(partitionColumns) + val dataColumns = query.output.filterNot(partitionSet.contains) + val queryExecution = DataFrame(sqlContext, query).queryExecution SQLExecution.withNewExecutionId(sqlContext, queryExecution) { - val df = sqlContext.internalCreateDataFrame(queryExecution.toRdd, relation.schema) - val partitionColumns = relation.partitionColumns.fieldNames - - // Some pre-flight checks. - require( - df.schema == relation.schema, - s"""DataFrame must have the same schema as the relation to which is inserted. - |DataFrame schema: ${df.schema} - |Relation schema: ${relation.schema} - """.stripMargin) - val partitionColumnsInSpec = relation.partitionColumns.fieldNames - require( - partitionColumnsInSpec.sameElements(partitionColumns), - s"""Partition columns mismatch. - |Expected: ${partitionColumnsInSpec.mkString(", ")} - |Actual: ${partitionColumns.mkString(", ")} - """.stripMargin) - - val writerContainer = if (partitionColumns.isEmpty && relation.maybeBucketSpec.isEmpty) { + val relation = + WriteRelation( + sqlContext, + dataColumns.toStructType, + qualifiedOutputPath.toString, + fileFormat.prepareWrite(sqlContext, _, options, dataColumns.toStructType), + bucketSpec) + + val writerContainer = if (partitionColumns.isEmpty && bucketSpec.isEmpty) { new DefaultWriterContainer(relation, job, isAppend) } else { - val output = df.queryExecution.executedPlan.output - val (partitionOutput, dataOutput) = - output.partition(a => partitionColumns.contains(a.name)) - new DynamicPartitionWriterContainer( relation, job, - partitionOutput, - dataOutput, - output, + partitionColumns = partitionColumns, + dataColumns = dataColumns, + inputSchema = query.output, PartitioningUtils.DEFAULT_PARTITION_NAME, sqlContext.conf.getConf(SQLConf.PARTITION_MAX_FILES), isAppend) @@ -150,9 +143,9 @@ private[sql] case class InsertIntoHadoopFsRelation( writerContainer.driverSideSetup() try { - sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writerContainer.writeRows _) + sqlContext.sparkContext.runJob(queryExecution.toRdd, writerContainer.writeRows _) writerContainer.commitJob() - relation.refresh() + refreshFunction() } catch { case cause: Throwable => logError("Aborting job.", cause) writerContainer.abortJob() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 65a715caf1..eda3c36674 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -32,7 +32,12 @@ import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.types._ -private[sql] case class Partition(values: InternalRow, path: String) +object Partition { + def apply(values: InternalRow, path: String): Partition = + apply(values, new Path(path)) +} + +private[sql] case class Partition(values: InternalRow, path: Path) private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition]) @@ -102,7 +107,8 @@ private[sql] object PartitioningUtils { // It will be recognised as conflicting directory structure: // "hdfs://host:9000/invalidPath" // "hdfs://host:9000/path" - val discoveredBasePaths = optDiscoveredBasePaths.flatMap(x => x) + // TODO: Selective case sensitivity. + val discoveredBasePaths = optDiscoveredBasePaths.flatMap(x => x).map(_.toString.toLowerCase()) assert( discoveredBasePaths.distinct.size == 1, "Conflicting directory structures detected. Suspicious paths:\b" + @@ -127,7 +133,7 @@ private[sql] object PartitioningUtils { // Finally, we create `Partition`s based on paths and resolved partition values. val partitions = resolvedPartitionValues.zip(pathsWithPartitionValues).map { case (PartitionValues(_, literals), (path, _)) => - Partition(InternalRow.fromSeq(literals.map(_.value)), path.toString) + Partition(InternalRow.fromSeq(literals.map(_.value)), path) } PartitionSpec(StructType(fields), partitions) @@ -242,7 +248,9 @@ private[sql] object PartitioningUtils { if (pathsWithPartitionValues.isEmpty) { Seq.empty } else { - val distinctPartColNames = pathsWithPartitionValues.map(_._2.columnNames).distinct + // TODO: Selective case sensitivity. + val distinctPartColNames = + pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase())).distinct assert( distinctPartColNames.size == 1, listConflictingPartitionColumns(pathsWithPartitionValues)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index eec9070bee..8dd975ed41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -24,19 +24,23 @@ import scala.language.{existentials, implicitConversions} import scala.util.{Failure, Success, Try} import org.apache.hadoop.fs.Path -import org.apache.hadoop.util.StringUtils import org.apache.spark.Logging import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} -import org.apache.spark.sql.execution.streaming.{Sink, Source} +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, Source} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{CalendarIntervalType, StructType} import org.apache.spark.util.Utils case class ResolvedDataSource(provider: Class[_], relation: BaseRelation) - +/** + * Responsible for taking a description of a datasource (either from + * [[org.apache.spark.sql.DataFrameReader]], or a metastore) and converting it into a logical + * relation that can be used in a query plan. + */ object ResolvedDataSource extends Logging { /** A map to maintain backward compatibility in case we move data sources around. */ @@ -92,19 +96,61 @@ object ResolvedDataSource extends Logging { } } + // TODO: Combine with apply? def createSource( sqlContext: SQLContext, userSpecifiedSchema: Option[StructType], providerName: String, options: Map[String, String]): Source = { val provider = lookupDataSource(providerName).newInstance() match { - case s: StreamSourceProvider => s + case s: StreamSourceProvider => + s.createSource(sqlContext, userSpecifiedSchema, providerName, options) + + case format: FileFormat => + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + val path = caseInsensitiveOptions.getOrElse("path", { + throw new IllegalArgumentException("'path' is not specified") + }) + val metadataPath = caseInsensitiveOptions.getOrElse("metadataPath", s"$path/_metadata") + + val allPaths = caseInsensitiveOptions.get("path") + val globbedPaths = allPaths.toSeq.flatMap { path => + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + SparkHadoopUtil.get.globPathIfNecessary(qualified) + }.toArray + + val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths) + val dataSchema = userSpecifiedSchema.orElse { + format.inferSchema( + sqlContext, + caseInsensitiveOptions, + fileCatalog.allFiles()) + }.getOrElse { + throw new AnalysisException("Unable to infer schema. It must be specified manually.") + } + + def dataFrameBuilder(files: Array[String]): DataFrame = { + new DataFrame( + sqlContext, + LogicalRelation( + apply( + sqlContext, + paths = files, + userSpecifiedSchema = Some(dataSchema), + provider = providerName, + options = options.filterKeys(_ != "path")).relation)) + } + + new FileStreamSource( + sqlContext, metadataPath, path, Some(dataSchema), providerName, dataFrameBuilder) case _ => throw new UnsupportedOperationException( s"Data source $providerName does not support streamed reading") } - provider.createSource(sqlContext, userSpecifiedSchema, providerName, options) + provider } def createSink( @@ -125,98 +171,72 @@ object ResolvedDataSource extends Logging { /** Create a [[ResolvedDataSource]] for reading data in. */ def apply( sqlContext: SQLContext, - userSpecifiedSchema: Option[StructType], - partitionColumns: Array[String], - bucketSpec: Option[BucketSpec], + paths: Seq[String] = Nil, + userSpecifiedSchema: Option[StructType] = None, + partitionColumns: Array[String] = Array.empty, + bucketSpec: Option[BucketSpec] = None, provider: String, options: Map[String, String]): ResolvedDataSource = { val clazz: Class[_] = lookupDataSource(provider) def className: String = clazz.getCanonicalName - val relation = userSpecifiedSchema match { - case Some(schema: StructType) => clazz.newInstance() match { - case dataSource: SchemaRelationProvider => - val caseInsensitiveOptions = new CaseInsensitiveMap(options) - if (caseInsensitiveOptions.contains("paths")) { - throw new AnalysisException(s"$className does not support paths option.") - } - dataSource.createRelation(sqlContext, caseInsensitiveOptions, schema) - case dataSource: HadoopFsRelationProvider => - val maybePartitionsSchema = if (partitionColumns.isEmpty) { - None - } else { - Some(partitionColumnsSchema( - schema, partitionColumns, sqlContext.conf.caseSensitiveAnalysis)) - } - val caseInsensitiveOptions = new CaseInsensitiveMap(options) - val paths = { - if (caseInsensitiveOptions.contains("paths") && - caseInsensitiveOptions.contains("path")) { - throw new AnalysisException(s"Both path and paths options are present.") - } - caseInsensitiveOptions.get("paths") - .map(_.split("(? - val hdfsPath = new Path(pathString) - val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualified).map(_.toString) - } - } + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + val relation = (clazz.newInstance(), userSpecifiedSchema) match { + // TODO: Throw when too much is given. + case (dataSource: SchemaRelationProvider, Some(schema)) => + dataSource.createRelation(sqlContext, caseInsensitiveOptions, schema) + case (dataSource: RelationProvider, None) => + dataSource.createRelation(sqlContext, caseInsensitiveOptions) + case (_: SchemaRelationProvider, None) => + throw new AnalysisException(s"A schema needs to be specified when using $className.") + case (_: RelationProvider, Some(_)) => + throw new AnalysisException(s"$className does not allow user-specified schemas.") - val dataSchema = - StructType(schema.filterNot(f => partitionColumns.contains(f.name))).asNullable + case (format: FileFormat, _) => + val allPaths = caseInsensitiveOptions.get("path") ++ paths + val globbedPaths = allPaths.flatMap { path => + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + SparkHadoopUtil.get.globPathIfNecessary(qualified) + }.toArray - dataSource.createRelation( + val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths) + val dataSchema = userSpecifiedSchema.orElse { + format.inferSchema( sqlContext, - paths, - Some(dataSchema), - maybePartitionsSchema, - bucketSpec, - caseInsensitiveOptions) - case dataSource: org.apache.spark.sql.sources.RelationProvider => - throw new AnalysisException(s"$className does not allow user-specified schemas.") - case _ => - throw new AnalysisException(s"$className is not a RelationProvider.") - } - - case None => clazz.newInstance() match { - case dataSource: RelationProvider => - val caseInsensitiveOptions = new CaseInsensitiveMap(options) - if (caseInsensitiveOptions.contains("paths")) { - throw new AnalysisException(s"$className does not support paths option.") - } - dataSource.createRelation(sqlContext, caseInsensitiveOptions) - case dataSource: HadoopFsRelationProvider => - val caseInsensitiveOptions = new CaseInsensitiveMap(options) - val paths = { - if (caseInsensitiveOptions.contains("paths") && - caseInsensitiveOptions.contains("path")) { - throw new AnalysisException(s"Both path and paths options are present.") - } - caseInsensitiveOptions.get("paths") - .map(_.split("(? - val hdfsPath = new Path(pathString) - val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualified).map(_.toString) - } - } - dataSource.createRelation(sqlContext, paths, None, None, None, caseInsensitiveOptions) - case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider => + caseInsensitiveOptions, + fileCatalog.allFiles()) + }.getOrElse { throw new AnalysisException( - s"A schema needs to be specified when using $className.") - case _ => - throw new AnalysisException( - s"$className is neither a RelationProvider nor a FSBasedRelationProvider.") - } + s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " + + "It must be specified manually") + } + + // If they gave a schema, then we try and figure out the types of the partition columns + // from that schema. + val partitionSchema = userSpecifiedSchema.map { schema => + StructType( + partitionColumns.map { c => + // TODO: Case sensitivity. + schema + .find(_.name.toLowerCase() == c.toLowerCase()) + .getOrElse(throw new AnalysisException(s"Invalid partition column '$c'")) + }) + }.getOrElse(fileCatalog.partitionSpec(None).partitionColumns) + + HadoopFsRelation( + sqlContext, + fileCatalog, + partitionSchema = partitionSchema, + dataSchema = dataSchema.asNullable, + bucketSpec = bucketSpec, + format, + options) + + case _ => + throw new AnalysisException( + s"$className is not a valid Spark SQL Data Source.") } new ResolvedDataSource(clazz, relation) } @@ -254,10 +274,10 @@ object ResolvedDataSource extends Logging { throw new AnalysisException("Cannot save interval data type into external storage.") } val clazz: Class[_] = lookupDataSource(provider) - val relation = clazz.newInstance() match { + clazz.newInstance() match { case dataSource: CreatableRelationProvider => dataSource.createRelation(sqlContext, mode, options, data) - case dataSource: HadoopFsRelationProvider => + case format: FileFormat => // Don't glob path for the write path. The contracts here are: // 1. Only one output path can be specified on the write path; // 2. Output path must be a legal HDFS style file system path; @@ -278,26 +298,63 @@ object ResolvedDataSource extends Logging { val equality = columnNameEquality(caseSensitive) val dataSchema = StructType( data.schema.filterNot(f => partitionColumns.exists(equality(_, f.name)))) - val r = dataSource.createRelation( - sqlContext, - Array(outputPath.toString), - Some(dataSchema.asNullable), - Some(partitionColumnsSchema(data.schema, partitionColumns, caseSensitive)), - bucketSpec, - caseInsensitiveOptions) + + // If we are appending to a table that already exists, make sure the partitioning matches + // up. If we fail to load the table for whatever reason, ignore the check. + if (mode == SaveMode.Append) { + val existingPartitionColumnSet = try { + val resolved = apply( + sqlContext, + userSpecifiedSchema = Some(data.schema.asNullable), + provider = provider, + options = options) + + Some(resolved.relation + .asInstanceOf[HadoopFsRelation] + .location + .partitionSpec(None) + .partitionColumns + .fieldNames + .toSet) + } catch { + case e: Exception => + None + } + + existingPartitionColumnSet.foreach { ex => + if (ex.map(_.toLowerCase) != partitionColumns.map(_.toLowerCase()).toSet) { + throw new AnalysisException( + s"Requested partitioning does not equal existing partitioning: " + + s"$ex != ${partitionColumns.toSet}.") + } + } + } // For partitioned relation r, r.schema's column ordering can be different from the column // ordering of data.logicalPlan (partition columns are all moved after data column). This // will be adjusted within InsertIntoHadoopFsRelation. - sqlContext.executePlan( + val plan = InsertIntoHadoopFsRelation( - r, + outputPath, + partitionColumns.map(UnresolvedAttribute.quoted), + bucketSpec, + format, + () => Unit, // No existing table needs to be refreshed. + options, data.logicalPlan, - mode)).toRdd - r + mode) + sqlContext.executePlan(plan).toRdd + case _ => sys.error(s"${clazz.getCanonicalName} does not allow create table as select.") } - ResolvedDataSource(clazz, relation) + + apply( + sqlContext, + userSpecifiedSchema = Some(data.schema.asNullable), + partitionColumns = partitionColumns, + bucketSpec = bucketSpec, + provider = provider, + options = options) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 3653aca994..d8aad5efe3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark._ import org.apache.spark.mapred.SparkHadoopMapRedUtil +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.InternalRow @@ -35,9 +36,16 @@ import org.apache.spark.sql.sources.{HadoopFsRelation, OutputWriter, OutputWrite import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.util.SerializableConfiguration +/** A container for all the details required when writing to a table. */ +case class WriteRelation( + sqlContext: SQLContext, + dataSchema: StructType, + path: String, + prepareJobForWrite: Job => OutputWriterFactory, + bucketSpec: Option[BucketSpec]) private[sql] abstract class BaseWriterContainer( - @transient val relation: HadoopFsRelation, + @transient val relation: WriteRelation, @transient private val job: Job, isAppend: Boolean) extends Logging with Serializable { @@ -67,12 +75,7 @@ private[sql] abstract class BaseWriterContainer( @transient private var taskAttemptId: TaskAttemptID = _ @transient protected var taskAttemptContext: TaskAttemptContext = _ - protected val outputPath: String = { - assert( - relation.paths.length == 1, - s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}") - relation.paths.head - } + protected val outputPath: String = relation.path protected var outputWriterFactory: OutputWriterFactory = _ @@ -237,7 +240,7 @@ private[sql] abstract class BaseWriterContainer( * A writer that writes all of the rows in a partition to a single file. */ private[sql] class DefaultWriterContainer( - relation: HadoopFsRelation, + relation: WriteRelation, job: Job, isAppend: Boolean) extends BaseWriterContainer(relation, job, isAppend) { @@ -299,7 +302,7 @@ private[sql] class DefaultWriterContainer( * writer externally sorts the remaining rows and then writes out them out one file at a time. */ private[sql] class DynamicPartitionWriterContainer( - relation: HadoopFsRelation, + relation: WriteRelation, job: Job, partitionColumns: Seq[Attribute], dataColumns: Seq[Attribute], @@ -309,7 +312,7 @@ private[sql] class DynamicPartitionWriterContainer( isAppend: Boolean) extends BaseWriterContainer(relation, job, isAppend) { - private val bucketSpec = relation.maybeBucketSpec + private val bucketSpec = relation.bucketSpec private val bucketColumns: Seq[Attribute] = bucketSpec.toSeq.flatMap { spec => spec.bucketColumnNames.map(c => inputSchema.find(_.name == c).get) @@ -374,7 +377,6 @@ private[sql] class DynamicPartitionWriterContainer( // We should first sort by partition columns, then bucket id, and finally sorting columns. val sortingExpressions: Seq[Expression] = partitionColumns ++ bucketIdExpression ++ sortColumns - val getSortingKey = UnsafeProjection.create(sortingExpressions, inputSchema) val sortingKeySchema = StructType(sortingExpressions.map { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala index 3e0d484b74..6008d73717 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala @@ -17,12 +17,6 @@ package org.apache.spark.sql.execution.datasources -import org.apache.hadoop.mapreduce.TaskAttemptContext - -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{HadoopFsRelation, HadoopFsRelationProvider, OutputWriter, OutputWriterFactory} -import org.apache.spark.sql.types.StructType - /** * A container for bucketing information. * Bucketing is a technology for decomposing data sets into more manageable parts, and the number @@ -37,24 +31,6 @@ private[sql] case class BucketSpec( bucketColumnNames: Seq[String], sortColumnNames: Seq[String]) -private[sql] trait BucketedHadoopFsRelationProvider extends HadoopFsRelationProvider { - final override def createRelation( - sqlContext: SQLContext, - paths: Array[String], - dataSchema: Option[StructType], - partitionColumns: Option[StructType], - parameters: Map[String, String]): HadoopFsRelation = - throw new UnsupportedOperationException("use the overload version with bucketSpec parameter") -} - -private[sql] abstract class BucketedOutputWriterFactory extends OutputWriterFactory { - final override def newInstance( - path: String, - dataSchema: StructType, - context: TaskAttemptContext): OutputWriter = - throw new UnsupportedOperationException("use the overload version with bucketSpec parameter") -} - private[sql] object BucketingUtils { // The file name of bucketed data should have 3 parts: // 1. some other information in the head of file name diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index d2d7996f56..d7ce9a0ce8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -17,151 +17,21 @@ package org.apache.spark.sql.execution.datasources.csv -import java.nio.charset.Charset - import scala.util.control.NonFatal -import com.google.common.base.Objects import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.io.{LongWritable, NullWritable, Text} -import org.apache.hadoop.mapred.TextInputFormat -import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} +import org.apache.hadoop.io.{NullWritable, Text} import org.apache.hadoop.mapreduce.RecordWriter +import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat import org.apache.spark.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.CompressionCodecs import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ -private[sql] class CSVRelation( - private val inputRDD: Option[RDD[String]], - override val paths: Array[String] = Array.empty[String], - private val maybeDataSchema: Option[StructType], - override val userDefinedPartitionColumns: Option[StructType], - private val parameters: Map[String, String]) - (@transient val sqlContext: SQLContext) extends HadoopFsRelation { - - override lazy val dataSchema: StructType = maybeDataSchema match { - case Some(structType) => structType - case None => inferSchema(paths) - } - - private val options = new CSVOptions(parameters) - - @transient - private var cachedRDD: Option[RDD[String]] = None - - private def readText(location: String): RDD[String] = { - if (Charset.forName(options.charset) == Charset.forName("UTF-8")) { - sqlContext.sparkContext.textFile(location) - } else { - val charset = options.charset - sqlContext.sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](location) - .mapPartitions { _.map { pair => - new String(pair._2.getBytes, 0, pair._2.getLength, charset) - } - } - } - } - - private def baseRdd(inputPaths: Array[String]): RDD[String] = { - inputRDD.getOrElse { - cachedRDD.getOrElse { - val rdd = readText(inputPaths.mkString(",")) - cachedRDD = Some(rdd) - rdd - } - } - } - - private def tokenRdd(header: Array[String], inputPaths: Array[String]): RDD[Array[String]] = { - val rdd = baseRdd(inputPaths) - // Make sure firstLine is materialized before sending to executors - val firstLine = if (options.headerFlag) findFirstLine(rdd) else null - CSVRelation.univocityTokenizer(rdd, header, firstLine, options) - } - - /** - * This supports to eliminate unneeded columns before producing an RDD - * containing all of its tuples as Row objects. This reads all the tokens of each line - * and then drop unneeded tokens without casting and type-checking by mapping - * both the indices produced by `requiredColumns` and the ones of tokens. - * TODO: Switch to using buildInternalScan - */ - override def buildScan(requiredColumns: Array[String], inputs: Array[FileStatus]): RDD[Row] = { - val pathsString = inputs.map(_.getPath.toUri.toString) - val header = schema.fields.map(_.name) - val tokenizedRdd = tokenRdd(header, pathsString) - CSVRelation.parseCsv(tokenizedRdd, schema, requiredColumns, inputs, sqlContext, options) - } - - override def prepareJobForWrite(job: Job): OutputWriterFactory = { - val conf = job.getConfiguration - options.compressionCodec.foreach { codec => - CompressionCodecs.setCodecConfiguration(conf, codec) - } - - new CSVOutputWriterFactory(options) - } - - override def hashCode(): Int = Objects.hashCode(paths.toSet, dataSchema, schema, partitionColumns) - - override def equals(other: Any): Boolean = other match { - case that: CSVRelation => { - val equalPath = paths.toSet == that.paths.toSet - val equalDataSchema = dataSchema == that.dataSchema - val equalSchema = schema == that.schema - val equalPartitionColums = partitionColumns == that.partitionColumns - - equalPath && equalDataSchema && equalSchema && equalPartitionColums - } - case _ => false - } - - private def inferSchema(paths: Array[String]): StructType = { - val rdd = baseRdd(paths) - val firstLine = findFirstLine(rdd) - val firstRow = new LineCsvReader(options).parseLine(firstLine) - - val header = if (options.headerFlag) { - firstRow - } else { - firstRow.zipWithIndex.map { case (value, index) => s"C$index" } - } - - val parsedRdd = tokenRdd(header, paths) - if (options.inferSchemaFlag) { - CSVInferSchema.infer(parsedRdd, header, options.nullValue) - } else { - // By default fields are assumed to be StringType - val schemaFields = header.map { fieldName => - StructField(fieldName.toString, StringType, nullable = true) - } - StructType(schemaFields) - } - } - - /** - * Returns the first line of the first non-empty file in path - */ - private def findFirstLine(rdd: RDD[String]): String = { - if (options.isCommentSet) { - val comment = options.comment.toString - rdd.filter { line => - line.trim.nonEmpty && !line.startsWith(comment) - }.first() - } else { - rdd.filter { line => - line.trim.nonEmpty - }.first() - } - } -} - object CSVRelation extends Logging { def univocityTokenizer( @@ -246,8 +116,10 @@ object CSVRelation extends Logging { private[sql] class CSVOutputWriterFactory(params: CSVOptions) extends OutputWriterFactory { override def newInstance( path: String, + bucketId: Option[Int], dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { + if (bucketId.isDefined) sys.error("csv doesn't support bucketing") new CsvOutputWriter(path, dataSchema, context, params) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala index 2fffae452c..aff672281d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala @@ -17,32 +17,157 @@ package org.apache.spark.sql.execution.datasources.csv +import java.nio.charset.Charset + +import org.apache.hadoop.fs.FileStatus +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.mapreduce.Job + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.execution.datasources.CompressionCodecs import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.collection.BitSet /** * Provides access to CSV data from pure SQL statements. */ -class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { +class DefaultSource extends FileFormat with DataSourceRegister { override def shortName(): String = "csv" + override def toString: String = "CSV" + + override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource] + + override def inferSchema( + sqlContext: SQLContext, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + val csvOptions = new CSVOptions(options) + + // TODO: Move filtering. + val paths = files.filterNot(_.getPath.getName startsWith "_").map(_.getPath.toString) + val rdd = baseRdd(sqlContext, csvOptions, paths) + val firstLine = findFirstLine(csvOptions, rdd) + val firstRow = new LineCsvReader(csvOptions).parseLine(firstLine) + + val header = if (csvOptions.headerFlag) { + firstRow + } else { + firstRow.zipWithIndex.map { case (value, index) => s"C$index" } + } + + val parsedRdd = tokenRdd(sqlContext, csvOptions, header, paths) + val schema = if (csvOptions.inferSchemaFlag) { + CSVInferSchema.infer(parsedRdd, header, csvOptions.nullValue) + } else { + // By default fields are assumed to be StringType + val schemaFields = header.map { fieldName => + StructField(fieldName.toString, StringType, nullable = true) + } + StructType(schemaFields) + } + Some(schema) + } + + override def prepareWrite( + sqlContext: SQLContext, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + val conf = job.getConfiguration + val csvOptions = new CSVOptions(options) + csvOptions.compressionCodec.foreach { codec => + CompressionCodecs.setCodecConfiguration(conf, codec) + } + + new CSVOutputWriterFactory(csvOptions) + } + /** - * Creates a new relation for data store in CSV given parameters and user supported schema. - */ - override def createRelation( + * This supports to eliminate unneeded columns before producing an RDD + * containing all of its tuples as Row objects. This reads all the tokens of each line + * and then drop unneeded tokens without casting and type-checking by mapping + * both the indices produced by `requiredColumns` and the ones of tokens. + */ + override def buildInternalScan( + sqlContext: SQLContext, + dataSchema: StructType, + requiredColumns: Array[String], + filters: Array[Filter], + bucketSet: Option[BitSet], + inputFiles: Array[FileStatus], + broadcastedConf: Broadcast[SerializableConfiguration], + options: Map[String, String]): RDD[InternalRow] = { + // TODO: Filter before calling buildInternalScan. + val csvFiles = inputFiles.filterNot(_.getPath.getName startsWith "_") + + val csvOptions = new CSVOptions(options) + val pathsString = csvFiles.map(_.getPath.toUri.toString) + val header = dataSchema.fields.map(_.name) + val tokenizedRdd = tokenRdd(sqlContext, csvOptions, header, pathsString) + val external = CSVRelation.parseCsv( + tokenizedRdd, dataSchema, requiredColumns, csvFiles, sqlContext, csvOptions) + + // TODO: Generate InternalRow in parseCsv + val outputSchema = StructType(requiredColumns.map(c => dataSchema.find(_.name == c).get)) + val encoder = RowEncoder(outputSchema) + external.map(encoder.toRow) + } + + + private def baseRdd( + sqlContext: SQLContext, + options: CSVOptions, + inputPaths: Seq[String]): RDD[String] = { + readText(sqlContext, options, inputPaths.mkString(",")) + } + + private def tokenRdd( + sqlContext: SQLContext, + options: CSVOptions, + header: Array[String], + inputPaths: Seq[String]): RDD[Array[String]] = { + val rdd = baseRdd(sqlContext, options, inputPaths) + // Make sure firstLine is materialized before sending to executors + val firstLine = if (options.headerFlag) findFirstLine(options, rdd) else null + CSVRelation.univocityTokenizer(rdd, header, firstLine, options) + } + + /** + * Returns the first line of the first non-empty file in path + */ + private def findFirstLine(options: CSVOptions, rdd: RDD[String]): String = { + if (options.isCommentSet) { + val comment = options.comment.toString + rdd.filter { line => + line.trim.nonEmpty && !line.startsWith(comment) + }.first() + } else { + rdd.filter { line => + line.trim.nonEmpty + }.first() + } + } + + private def readText( sqlContext: SQLContext, - paths: Array[String], - dataSchema: Option[StructType], - partitionColumns: Option[StructType], - parameters: Map[String, String]): HadoopFsRelation = { - - new CSVRelation( - None, - paths, - dataSchema, - partitionColumns, - parameters)(sqlContext) + options: CSVOptions, + location: String): RDD[String] = { + if (Charset.forName(options.charset) == Charset.forName("UTF-8")) { + sqlContext.sparkContext.textFile(location) + } else { + val charset = options.charset + sqlContext.sparkContext + .hadoopFile[LongWritable, Text, TextInputFormat](location) + .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset))) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index fb9618804d..3d7c6a6a5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -92,7 +92,10 @@ case class CreateTempTableUsing( def run(sqlContext: SQLContext): Seq[Row] = { val resolved = ResolvedDataSource( - sqlContext, userSpecifiedSchema, Array.empty[String], bucketSpec = None, provider, options) + sqlContext, + userSpecifiedSchema = userSpecifiedSchema, + provider = provider, + options = options) sqlContext.catalog.registerTable( tableIdent, DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala index 8b773ddfcb..0937a213c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils -private[json] object InferSchema { +private[sql] object InferSchema { /** * Infer the type of a collection of json records in three stages: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index 2eba52f326..497e3c59e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -38,101 +38,76 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.collection.BitSet - -class DefaultSource extends BucketedHadoopFsRelationProvider with DataSourceRegister { +class DefaultSource extends FileFormat with DataSourceRegister { override def shortName(): String = "json" - override def createRelation( + override def inferSchema( sqlContext: SQLContext, - paths: Array[String], - dataSchema: Option[StructType], - partitionColumns: Option[StructType], - bucketSpec: Option[BucketSpec], - parameters: Map[String, String]): HadoopFsRelation = { - - new JSONRelation( - inputRDD = None, - maybeDataSchema = dataSchema, - maybePartitionSpec = None, - userDefinedPartitionColumns = partitionColumns, - maybeBucketSpec = bucketSpec, - paths = paths, - parameters = parameters)(sqlContext) - } -} - -private[sql] class JSONRelation( - val inputRDD: Option[RDD[String]], - val maybeDataSchema: Option[StructType], - val maybePartitionSpec: Option[PartitionSpec], - override val userDefinedPartitionColumns: Option[StructType], - override val maybeBucketSpec: Option[BucketSpec] = None, - override val paths: Array[String] = Array.empty[String], - parameters: Map[String, String] = Map.empty[String, String]) - (@transient val sqlContext: SQLContext) - extends HadoopFsRelation(maybePartitionSpec, parameters) { + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + if (files.isEmpty) { + None + } else { + val parsedOptions: JSONOptions = new JSONOptions(options) + val jsonFiles = files.filterNot { status => + val name = status.getPath.getName + name.startsWith("_") || name.startsWith(".") + }.toArray - val options: JSONOptions = new JSONOptions(parameters) + val jsonSchema = InferSchema.infer( + createBaseRdd(sqlContext, jsonFiles), + sqlContext.conf.columnNameOfCorruptRecord, + parsedOptions) + checkConstraints(jsonSchema) - /** Constraints to be imposed on schema to be stored. */ - private def checkConstraints(schema: StructType): Unit = { - if (schema.fieldNames.length != schema.fieldNames.distinct.length) { - val duplicateColumns = schema.fieldNames.groupBy(identity).collect { - case (x, ys) if ys.length > 1 => "\"" + x + "\"" - }.mkString(", ") - throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " + - s"cannot save to JSON format") + Some(jsonSchema) } } - override val needConversion: Boolean = false - - private def createBaseRdd(inputPaths: Array[FileStatus]): RDD[String] = { - val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration) + override def prepareWrite( + sqlContext: SQLContext, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { val conf = job.getConfiguration - - val paths = inputPaths.map(_.getPath) - - if (paths.nonEmpty) { - FileInputFormat.setInputPaths(job, paths: _*) + val parsedOptions: JSONOptions = new JSONOptions(options) + parsedOptions.compressionCodec.foreach { codec => + CompressionCodecs.setCodecConfiguration(conf, codec) } - sqlContext.sparkContext.hadoopRDD( - conf.asInstanceOf[JobConf], - classOf[TextInputFormat], - classOf[LongWritable], - classOf[Text]).map(_._2.toString) // get the text line - } - - override lazy val dataSchema: StructType = { - val jsonSchema = maybeDataSchema.getOrElse { - val files = cachedLeafStatuses().filterNot { status => - val name = status.getPath.getName - name.startsWith("_") || name.startsWith(".") - }.toArray - InferSchema.infer( - inputRDD.getOrElse(createBaseRdd(files)), - sqlContext.conf.columnNameOfCorruptRecord, - options) + new OutputWriterFactory { + override def newInstance( + path: String, + bucketId: Option[Int], + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new JsonOutputWriter(path, bucketId, dataSchema, context) + } } - checkConstraints(jsonSchema) - - jsonSchema } - override private[sql] def buildInternalScan( + override def buildInternalScan( + sqlContext: SQLContext, + dataSchema: StructType, requiredColumns: Array[String], filters: Array[Filter], - inputPaths: Array[FileStatus], - broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = { + bucketSet: Option[BitSet], + inputFiles: Array[FileStatus], + broadcastedConf: Broadcast[SerializableConfiguration], + options: Map[String, String]): RDD[InternalRow] = { + // TODO: Filter files for all formats before calling buildInternalScan. + val jsonFiles = inputFiles.filterNot(_.getPath.getName startsWith "_") + + val parsedOptions: JSONOptions = new JSONOptions(options) val requiredDataSchema = StructType(requiredColumns.map(dataSchema(_))) val rows = JacksonParser.parse( - inputRDD.getOrElse(createBaseRdd(inputPaths)), + createBaseRdd(sqlContext, jsonFiles), requiredDataSchema, sqlContext.conf.columnNameOfCorruptRecord, - options) + parsedOptions) rows.mapPartitions { iterator => val unsafeProjection = UnsafeProjection.create(requiredDataSchema) @@ -140,43 +115,36 @@ private[sql] class JSONRelation( } } - override def equals(other: Any): Boolean = other match { - case that: JSONRelation => - ((inputRDD, that.inputRDD) match { - case (Some(thizRdd), Some(thatRdd)) => thizRdd eq thatRdd - case (None, None) => true - case _ => false - }) && paths.toSet == that.paths.toSet && - dataSchema == that.dataSchema && - schema == that.schema - case _ => false - } + private def createBaseRdd(sqlContext: SQLContext, inputPaths: Array[FileStatus]): RDD[String] = { + val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration) + val conf = job.getConfiguration - override def hashCode(): Int = { - Objects.hashCode( - inputRDD, - paths.toSet, - dataSchema, - schema, - partitionColumns) - } + val paths = inputPaths.map(_.getPath) - override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = { - val conf = job.getConfiguration - options.compressionCodec.foreach { codec => - CompressionCodecs.setCodecConfiguration(conf, codec) + if (paths.nonEmpty) { + FileInputFormat.setInputPaths(job, paths: _*) } - new BucketedOutputWriterFactory { - override def newInstance( - path: String, - bucketId: Option[Int], - dataSchema: StructType, - context: TaskAttemptContext): OutputWriter = { - new JsonOutputWriter(path, bucketId, dataSchema, context) - } + sqlContext.sparkContext.hadoopRDD( + conf.asInstanceOf[JobConf], + classOf[TextInputFormat], + classOf[LongWritable], + classOf[Text]).map(_._2.toString) // get the text line + } + + /** Constraints to be imposed on schema to be stored. */ + private def checkConstraints(schema: StructType): Unit = { + if (schema.fieldNames.length != schema.fieldNames.distinct.length) { + val duplicateColumns = schema.fieldNames.groupBy(identity).collect { + case (x, ys) if ys.length > 1 => "\"" + x + "\"" + }.mkString(", ") + throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " + + s"cannot save to JSON format") } } + + override def toString: String = "JSON" + override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource] } private[json] class JsonOutputWriter( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index b8af832861..82404b8499 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -25,7 +25,6 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.{Failure, Try} -import com.google.common.base.Objects import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.Writable @@ -51,193 +50,23 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.util.collection.BitSet -private[sql] class DefaultSource extends BucketedHadoopFsRelationProvider with DataSourceRegister { - override def shortName(): String = "parquet" - - override def createRelation( - sqlContext: SQLContext, - paths: Array[String], - schema: Option[StructType], - partitionColumns: Option[StructType], - bucketSpec: Option[BucketSpec], - parameters: Map[String, String]): HadoopFsRelation = { - new ParquetRelation(paths, schema, None, partitionColumns, bucketSpec, parameters)(sqlContext) - } -} - -// NOTE: This class is instantiated and used on executor side only, no need to be serializable. -private[sql] class ParquetOutputWriter( - path: String, - bucketId: Option[Int], - context: TaskAttemptContext) - extends OutputWriter { - - private val recordWriter: RecordWriter[Void, InternalRow] = { - val outputFormat = { - new ParquetOutputFormat[InternalRow]() { - // Here we override `getDefaultWorkFile` for two reasons: - // - // 1. To allow appending. We need to generate unique output file names to avoid - // overwriting existing files (either exist before the write job, or are just written - // by other tasks within the same write job). - // - // 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses - // `FileOutputCommitter.getWorkPath()`, which points to the base directory of all - // partitions in the case of dynamic partitioning. - override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { - val configuration = context.getConfiguration - val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID") - val taskAttemptId = context.getTaskAttemptID - val split = taskAttemptId.getTaskID.getId - val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") - new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension") - } - } - } - - outputFormat.getRecordWriter(context) - } +private[sql] class DefaultSource extends FileFormat with DataSourceRegister with Logging { - override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") - - override protected[sql] def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row) - - override def close(): Unit = recordWriter.close(context) -} - -private[sql] class ParquetRelation( - override val paths: Array[String], - private val maybeDataSchema: Option[StructType], - // This is for metastore conversion. - private val maybePartitionSpec: Option[PartitionSpec], - override val userDefinedPartitionColumns: Option[StructType], - override val maybeBucketSpec: Option[BucketSpec], - parameters: Map[String, String])( - val sqlContext: SQLContext) - extends HadoopFsRelation(maybePartitionSpec, parameters) - with Logging { - - private[sql] def this( - paths: Array[String], - maybeDataSchema: Option[StructType], - maybePartitionSpec: Option[PartitionSpec], - parameters: Map[String, String])( - sqlContext: SQLContext) = { - this( - paths, - maybeDataSchema, - maybePartitionSpec, - maybePartitionSpec.map(_.partitionColumns), - None, - parameters)(sqlContext) - } - - // Should we merge schemas from all Parquet part-files? - private val shouldMergeSchemas = - parameters - .get(ParquetRelation.MERGE_SCHEMA) - .map(_.toBoolean) - .getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED)) - - private val mergeRespectSummaries = - sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES) - - private val maybeMetastoreSchema = parameters - .get(ParquetRelation.METASTORE_SCHEMA) - .map(DataType.fromJson(_).asInstanceOf[StructType]) - - private val compressionCodec: Option[String] = parameters - .get("compression") - .map { codecName => - // Validate if given compression codec is supported or not. - val shortParquetCompressionCodecNames = ParquetRelation.shortParquetCompressionCodecNames - if (!shortParquetCompressionCodecNames.contains(codecName.toLowerCase)) { - val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase) - throw new IllegalArgumentException(s"Codec [$codecName] " + - s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.") - } - codecName.toLowerCase - } - - private lazy val metadataCache: MetadataCache = { - val meta = new MetadataCache - meta.refresh() - meta - } - - override def toString: String = { - parameters.get(ParquetRelation.METASTORE_TABLE_NAME).map { tableName => - s"${getClass.getSimpleName}: $tableName" - }.getOrElse(super.toString) - } - - override def equals(other: Any): Boolean = other match { - case that: ParquetRelation => - val schemaEquality = if (shouldMergeSchemas) { - this.shouldMergeSchemas == that.shouldMergeSchemas - } else { - this.dataSchema == that.dataSchema && - this.schema == that.schema - } - - this.paths.toSet == that.paths.toSet && - schemaEquality && - this.maybeDataSchema == that.maybeDataSchema && - this.partitionColumns == that.partitionColumns - - case _ => false - } - - override def hashCode(): Int = { - if (shouldMergeSchemas) { - Objects.hashCode( - Boolean.box(shouldMergeSchemas), - paths.toSet, - maybeDataSchema, - partitionColumns) - } else { - Objects.hashCode( - Boolean.box(shouldMergeSchemas), - paths.toSet, - dataSchema, - schema, - maybeDataSchema, - partitionColumns) - } - } - - /** Constraints on schema of dataframe to be stored. */ - private def checkConstraints(schema: StructType): Unit = { - if (schema.fieldNames.length != schema.fieldNames.distinct.length) { - val duplicateColumns = schema.fieldNames.groupBy(identity).collect { - case (x, ys) if ys.length > 1 => "\"" + x + "\"" - }.mkString(", ") - throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " + - s"cannot save to parquet format") - } - } + override def shortName(): String = "parquet" - override def dataSchema: StructType = { - val schema = maybeDataSchema.getOrElse(metadataCache.dataSchema) - // check if schema satisfies the constraints - // before moving forward - checkConstraints(schema) - schema - } + override def toString: String = "ParquetFormat" - override private[sql] def refresh(): Unit = { - super.refresh() - metadataCache.refresh() - } + override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource] - // Parquet data source always uses Catalyst internal representations. - override val needConversion: Boolean = false - - override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum + override def prepareWrite( + sqlContext: SQLContext, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { - override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = { val conf = ContextUtil.getConfiguration(job) // SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible @@ -255,11 +84,24 @@ private[sql] class ParquetRelation( if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) { logInfo("Using default output committer for Parquet: " + - classOf[ParquetOutputCommitter].getCanonicalName) + classOf[ParquetOutputCommitter].getCanonicalName) } else { logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName) } + val compressionCodec: Option[String] = options + .get("compression") + .map { codecName => + // Validate if given compression codec is supported or not. + val shortParquetCompressionCodecNames = ParquetRelation.shortParquetCompressionCodecNames + if (!shortParquetCompressionCodecNames.contains(codecName.toLowerCase)) { + val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase) + throw new IllegalArgumentException(s"Codec [$codecName] " + + s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.") + } + codecName.toLowerCase + } + conf.setClass( SQLConf.OUTPUT_COMMITTER_CLASS.key, committerClass, @@ -303,7 +145,7 @@ private[sql] class ParquetRelation( .getOrElse(sqlContext.conf.parquetCompressionCodec.toLowerCase), CompressionCodecName.UNCOMPRESSED).name()) - new BucketedOutputWriterFactory { + new OutputWriterFactory { override def newInstance( path: String, bucketId: Option[Int], @@ -314,11 +156,127 @@ private[sql] class ParquetRelation( } } + def inferSchema( + sqlContext: SQLContext, + parameters: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + // Should we merge schemas from all Parquet part-files? + val shouldMergeSchemas = + parameters + .get(ParquetRelation.MERGE_SCHEMA) + .map(_.toBoolean) + .getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED)) + + val mergeRespectSummaries = + sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES) + + val filesByType = splitFiles(files) + + // Sees which file(s) we need to touch in order to figure out the schema. + // + // Always tries the summary files first if users don't require a merged schema. In this case, + // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row + // groups information, and could be much smaller for large Parquet files with lots of row + // groups. If no summary file is available, falls back to some random part-file. + // + // NOTE: Metadata stored in the summary files are merged from all part-files. However, for + // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know + // how to merge them correctly if some key is associated with different values in different + // part-files. When this happens, Parquet simply gives up generating the summary file. This + // implies that if a summary file presents, then: + // + // 1. Either all part-files have exactly the same Spark SQL schema, or + // 2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus + // their schemas may differ from each other). + // + // Here we tend to be pessimistic and take the second case into account. Basically this means + // we can't trust the summary files if users require a merged schema, and must touch all part- + // files to do the merge. + val filesToTouch = + if (shouldMergeSchemas) { + // Also includes summary files, 'cause there might be empty partition directories. + + // If mergeRespectSummaries config is true, we assume that all part-files are the same for + // their schema with summary files, so we ignore them when merging schema. + // If the config is disabled, which is the default setting, we merge all part-files. + // In this mode, we only need to merge schemas contained in all those summary files. + // You should enable this configuration only if you are very sure that for the parquet + // part-files to read there are corresponding summary files containing correct schema. + + // As filed in SPARK-11500, the order of files to touch is a matter, which might affect + // the ordering of the output columns. There are several things to mention here. + // + // 1. If mergeRespectSummaries config is false, then it merges schemas by reducing from + // the first part-file so that the columns of the lexicographically first file show + // first. + // + // 2. If mergeRespectSummaries config is true, then there should be, at least, + // "_metadata"s for all given files, so that we can ensure the columns of + // the lexicographically first file show first. + // + // 3. If shouldMergeSchemas is false, but when multiple files are given, there is + // no guarantee of the output order, since there might not be a summary file for the + // lexicographically first file, which ends up putting ahead the columns of + // the other files. However, this should be okay since not enabling + // shouldMergeSchemas means (assumes) all the files have the same schemas. + + val needMerged: Seq[FileStatus] = + if (mergeRespectSummaries) { + Seq() + } else { + filesByType.data + } + needMerged ++ filesByType.metadata ++ filesByType.commonMetadata + } else { + // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet + // don't have this. + filesByType.commonMetadata.headOption + // Falls back to "_metadata" + .orElse(filesByType.metadata.headOption) + // Summary file(s) not found, the Parquet file is either corrupted, or different part- + // files contain conflicting user defined metadata (two or more values are associated + // with a same key in different files). In either case, we fall back to any of the + // first part-file, and just assume all schemas are consistent. + .orElse(filesByType.data.headOption) + .toSeq + } + ParquetRelation.mergeSchemasInParallel(filesToTouch, sqlContext) + } + + case class FileTypes( + data: Seq[FileStatus], + metadata: Seq[FileStatus], + commonMetadata: Seq[FileStatus]) + + private def splitFiles(allFiles: Seq[FileStatus]): FileTypes = { + // Lists `FileStatus`es of all leaf nodes (files) under all base directories. + val leaves = allFiles.filter { f => + isSummaryFile(f.getPath) || + !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith(".")) + }.toArray.sortBy(_.getPath.toString) + + FileTypes( + data = leaves.filterNot(f => isSummaryFile(f.getPath)), + metadata = + leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE), + commonMetadata = + leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)) + } + + private def isSummaryFile(file: Path): Boolean = { + file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE || + file.getName == ParquetFileWriter.PARQUET_METADATA_FILE + } + override def buildInternalScan( + sqlContext: SQLContext, + dataSchema: StructType, requiredColumns: Array[String], filters: Array[Filter], - inputFiles: Array[FileStatus], - broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = { + bucketSet: Option[BitSet], + allFiles: Array[FileStatus], + broadcastedConf: Broadcast[SerializableConfiguration], + options: Map[String, String]): RDD[InternalRow] = { val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA) val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString @@ -341,6 +299,8 @@ private[sql] class ParquetRelation( assumeBinaryIsString, assumeInt96IsTimestamp) _ + val inputFiles = splitFiles(allFiles).data.toArray + // Create the function to set input paths at the driver side. val setInputPaths = ParquetRelation.initializeDriverSideJobFunc(inputFiles, parquetBlockSize) _ @@ -392,153 +352,46 @@ private[sql] class ParquetRelation( } } } +} - private class MetadataCache { - // `FileStatus` objects of all "_metadata" files. - private var metadataStatuses: Array[FileStatus] = _ - - // `FileStatus` objects of all "_common_metadata" files. - private var commonMetadataStatuses: Array[FileStatus] = _ - - // `FileStatus` objects of all data files (Parquet part-files). - var dataStatuses: Array[FileStatus] = _ - - // Schema of the actual Parquet files, without partition columns discovered from partition - // directory paths. - var dataSchema: StructType = null - - // Schema of the whole table, including partition columns. - var schema: StructType = _ - - // Cached leaves - var cachedLeaves: mutable.LinkedHashSet[FileStatus] = null - - /** - * Refreshes `FileStatus`es, footers, partition spec, and table schema. - */ - def refresh(): Unit = { - val currentLeafStatuses = cachedLeafStatuses() - - // Check if cachedLeafStatuses is changed or not - val leafStatusesChanged = (cachedLeaves == null) || - !cachedLeaves.equals(currentLeafStatuses) - - if (leafStatusesChanged) { - cachedLeaves = currentLeafStatuses - - // Lists `FileStatus`es of all leaf nodes (files) under all base directories. - val leaves = currentLeafStatuses.filter { f => - isSummaryFile(f.getPath) || - !(f.getPath.getName.startsWith("_") || f.getPath.getName.startsWith(".")) - }.toArray.sortBy(_.getPath.toString) - - dataStatuses = leaves.filterNot(f => isSummaryFile(f.getPath)) - metadataStatuses = - leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE) - commonMetadataStatuses = - leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE) - - dataSchema = { - val dataSchema0 = maybeDataSchema - .orElse(readSchema()) - .orElse(maybeMetastoreSchema) - .getOrElse(throw new AnalysisException( - s"Failed to discover schema of Parquet file(s) in the following location(s):\n" + - paths.mkString("\n\t"))) - - // If this Parquet relation is converted from a Hive Metastore table, must reconcile case - // case insensitivity issue and possible schema mismatch (probably caused by schema - // evolution). - maybeMetastoreSchema - .map(ParquetRelation.mergeMetastoreParquetSchema(_, dataSchema0)) - .getOrElse(dataSchema0) +// NOTE: This class is instantiated and used on executor side only, no need to be serializable. +private[sql] class ParquetOutputWriter( + path: String, + bucketId: Option[Int], + context: TaskAttemptContext) + extends OutputWriter { + + private val recordWriter: RecordWriter[Void, InternalRow] = { + val outputFormat = { + new ParquetOutputFormat[InternalRow]() { + // Here we override `getDefaultWorkFile` for two reasons: + // + // 1. To allow appending. We need to generate unique output file names to avoid + // overwriting existing files (either exist before the write job, or are just written + // by other tasks within the same write job). + // + // 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses + // `FileOutputCommitter.getWorkPath()`, which points to the base directory of all + // partitions in the case of dynamic partitioning. + override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { + val configuration = context.getConfiguration + val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID") + val taskAttemptId = context.getTaskAttemptID + val split = taskAttemptId.getTaskID.getId + val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") + new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension") } } } - private def isSummaryFile(file: Path): Boolean = { - file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE || - file.getName == ParquetFileWriter.PARQUET_METADATA_FILE - } + outputFormat.getRecordWriter(context) + } - private def readSchema(): Option[StructType] = { - // Sees which file(s) we need to touch in order to figure out the schema. - // - // Always tries the summary files first if users don't require a merged schema. In this case, - // "_common_metadata" is more preferable than "_metadata" because it doesn't contain row - // groups information, and could be much smaller for large Parquet files with lots of row - // groups. If no summary file is available, falls back to some random part-file. - // - // NOTE: Metadata stored in the summary files are merged from all part-files. However, for - // user defined key-value metadata (in which we store Spark SQL schema), Parquet doesn't know - // how to merge them correctly if some key is associated with different values in different - // part-files. When this happens, Parquet simply gives up generating the summary file. This - // implies that if a summary file presents, then: - // - // 1. Either all part-files have exactly the same Spark SQL schema, or - // 2. Some part-files don't contain Spark SQL schema in the key-value metadata at all (thus - // their schemas may differ from each other). - // - // Here we tend to be pessimistic and take the second case into account. Basically this means - // we can't trust the summary files if users require a merged schema, and must touch all part- - // files to do the merge. - val filesToTouch = - if (shouldMergeSchemas) { - // Also includes summary files, 'cause there might be empty partition directories. - - // If mergeRespectSummaries config is true, we assume that all part-files are the same for - // their schema with summary files, so we ignore them when merging schema. - // If the config is disabled, which is the default setting, we merge all part-files. - // In this mode, we only need to merge schemas contained in all those summary files. - // You should enable this configuration only if you are very sure that for the parquet - // part-files to read there are corresponding summary files containing correct schema. - - // As filed in SPARK-11500, the order of files to touch is a matter, which might affect - // the ordering of the output columns. There are several things to mention here. - // - // 1. If mergeRespectSummaries config is false, then it merges schemas by reducing from - // the first part-file so that the columns of the lexicographically first file show - // first. - // - // 2. If mergeRespectSummaries config is true, then there should be, at least, - // "_metadata"s for all given files, so that we can ensure the columns of - // the lexicographically first file show first. - // - // 3. If shouldMergeSchemas is false, but when multiple files are given, there is - // no guarantee of the output order, since there might not be a summary file for the - // lexicographically first file, which ends up putting ahead the columns of - // the other files. However, this should be okay since not enabling - // shouldMergeSchemas means (assumes) all the files have the same schemas. - - val needMerged: Seq[FileStatus] = - if (mergeRespectSummaries) { - Seq() - } else { - dataStatuses - } - needMerged ++ metadataStatuses ++ commonMetadataStatuses - } else { - // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet - // don't have this. - commonMetadataStatuses.headOption - // Falls back to "_metadata" - .orElse(metadataStatuses.headOption) - // Summary file(s) not found, the Parquet file is either corrupted, or different part- - // files contain conflicting user defined metadata (two or more values are associated - // with a same key in different files). In either case, we fall back to any of the - // first part-file, and just assume all schemas are consistent. - .orElse(dataStatuses.headOption) - .toSeq - } + override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") - assert( - filesToTouch.nonEmpty || maybeDataSchema.isDefined || maybeMetastoreSchema.isDefined, - "No predefined schema found, " + - s"and no Parquet data files or summary files found under ${paths.mkString(", ")}.") + override protected[sql] def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row) - ParquetRelation.mergeSchemasInParallel(filesToTouch, sqlContext) - } - } + override def close(): Unit = recordWriter.close(context) } private[sql] object ParquetRelation extends Logging { @@ -699,7 +552,7 @@ private[sql] object ParquetRelation extends Logging { * distinguish binary and string). This method generates a correct schema by merging Metastore * schema data types and Parquet schema field names. */ - private[parquet] def mergeMetastoreParquetSchema( + private[sql] def mergeMetastoreParquetSchema( metastoreSchema: StructType, parquetSchema: StructType): StructType = { def schemaConflictMessage: String = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 2e41e88392..0eae34614c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -34,6 +34,7 @@ private[sql] class ResolveDataSource(sqlContext: SQLContext) extends Rule[Logica try { val resolved = ResolvedDataSource( sqlContext, + paths = Seq.empty, userSpecifiedSchema = None, partitionColumns = Array(), bucketSpec = None, @@ -130,7 +131,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => LogicalRelation(r: HadoopFsRelation, _, _), part, query, overwrite, _) => // We need to make sure the partition columns specified by users do match partition // columns of the relation. - val existingPartitionColumns = r.partitionColumns.fieldNames.toSet + val existingPartitionColumns = r.partitionSchema.fieldNames.toSet val specifiedPartitionColumns = part.keySet if (existingPartitionColumns != specifiedPartitionColumns) { failAnalysis(s"Specified partition columns " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index 8f3f6335e4..b3297254cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -31,25 +31,16 @@ import org.apache.spark.sql.{AnalysisException, Row, SQLContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} -import org.apache.spark.sql.execution.datasources.{CompressionCodecs, PartitionSpec} +import org.apache.spark.sql.execution.datasources.CompressionCodecs import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.collection.BitSet /** * A data source for reading text files. */ -class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { - - override def createRelation( - sqlContext: SQLContext, - paths: Array[String], - dataSchema: Option[StructType], - partitionColumns: Option[StructType], - parameters: Map[String, String]): HadoopFsRelation = { - dataSchema.foreach(verifySchema) - new TextRelation(None, dataSchema, partitionColumns, paths, parameters)(sqlContext) - } +class DefaultSource extends FileFormat with DataSourceRegister { override def shortName(): String = "text" @@ -64,58 +55,21 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { s"Text data source supports only a string column, but you have ${tpe.simpleString}.") } } -} - -private[sql] class TextRelation( - val maybePartitionSpec: Option[PartitionSpec], - val textSchema: Option[StructType], - override val userDefinedPartitionColumns: Option[StructType], - override val paths: Array[String] = Array.empty[String], - parameters: Map[String, String] = Map.empty[String, String]) - (@transient val sqlContext: SQLContext) - extends HadoopFsRelation(maybePartitionSpec, parameters) { - /** Data schema is always a single column, named "value" if original Data source has no schema. */ - override def dataSchema: StructType = - textSchema.getOrElse(new StructType().add("value", StringType)) - /** This is an internal data source that outputs internal row format. */ - override val needConversion: Boolean = false - - - override private[sql] def buildInternalScan( - requiredColumns: Array[String], - filters: Array[Filter], - inputPaths: Array[FileStatus], - broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = { - val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration) - val conf = job.getConfiguration - val paths = inputPaths.map(_.getPath).sortBy(_.toUri) - - if (paths.nonEmpty) { - FileInputFormat.setInputPaths(job, paths: _*) - } + override def inferSchema( + sqlContext: SQLContext, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = Some(new StructType().add("value", StringType)) - sqlContext.sparkContext.hadoopRDD( - conf.asInstanceOf[JobConf], classOf[TextInputFormat], classOf[LongWritable], classOf[Text]) - .mapPartitions { iter => - val unsafeRow = new UnsafeRow(1) - val bufferHolder = new BufferHolder(unsafeRow) - val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1) - - iter.map { case (_, line) => - // Writes to an UnsafeRow directly - bufferHolder.reset() - unsafeRowWriter.write(0, line.getBytes, 0, line.getLength) - unsafeRow.setTotalSize(bufferHolder.totalSize()) - unsafeRow - } - } - } + override def prepareWrite( + sqlContext: SQLContext, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + verifySchema(dataSchema) - /** Write path. */ - override def prepareJobForWrite(job: Job): OutputWriterFactory = { val conf = job.getConfiguration - val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName) + val compressionCodec = options.get("compression").map(CompressionCodecs.getCodecClassName) compressionCodec.foreach { codec => CompressionCodecs.setCodecConfiguration(conf, codec) } @@ -123,21 +77,54 @@ private[sql] class TextRelation( new OutputWriterFactory { override def newInstance( path: String, + bucketId: Option[Int], dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { + if (bucketId.isDefined) { + throw new AnalysisException("Text doesn't support bucketing") + } new TextOutputWriter(path, dataSchema, context) } } } - override def equals(other: Any): Boolean = other match { - case that: TextRelation => - paths.toSet == that.paths.toSet && partitionColumns == that.partitionColumns - case _ => false - } + override def buildInternalScan( + sqlContext: SQLContext, + dataSchema: StructType, + requiredColumns: Array[String], + filters: Array[Filter], + bucketSet: Option[BitSet], + inputFiles: Array[FileStatus], + broadcastedConf: Broadcast[SerializableConfiguration], + options: Map[String, String]): RDD[InternalRow] = { + verifySchema(dataSchema) - override def hashCode(): Int = { - Objects.hashCode(paths.toSet, partitionColumns) + val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration) + val conf = job.getConfiguration + val paths = inputFiles + .filterNot(_.getPath.getName startsWith "_") + .map(_.getPath) + .sortBy(_.toUri) + + if (paths.nonEmpty) { + FileInputFormat.setInputPaths(job, paths: _*) + } + + sqlContext.sparkContext.hadoopRDD( + conf.asInstanceOf[JobConf], classOf[TextInputFormat], classOf[LongWritable], classOf[Text]) + .mapPartitions { iter => + val unsafeRow = new UnsafeRow(1) + val bufferHolder = new BufferHolder(unsafeRow) + val unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 1) + + iter.map { case (_, line) => + // Writes to an UnsafeRow directly + bufferHolder.reset() + unsafeRowWriter.write(0, line.getBytes, 0, line.getLength) + unsafeRow.setTotalSize(bufferHolder.totalSize()) + unsafeRow + } + } } } @@ -170,3 +157,4 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp recordWriter.close(context) } } + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index f5f36544a7..6f81794b29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.datasources.{PreInsertCastAndRename, ResolveDataSource} +import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename, ResolveDataSource} import org.apache.spark.sql.execution.exchange.EnsureRequirements import org.apache.spark.sql.util.ExecutionListenerManager @@ -63,8 +63,9 @@ private[sql] class SessionState(ctx: SQLContext) { new Analyzer(catalog, functionRegistry, conf) { override val extendedResolutionRules = python.ExtractPythonUDFs :: - PreInsertCastAndRename :: - (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil) + PreInsertCastAndRename :: + DataSourceAnalysis :: + (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil) override val extendedCheckRules = Seq(datasources.PreWriteCheck(catalog)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 87ea7f510e..12512a8312 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -28,12 +28,11 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.{Logging, SparkContext} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast -import org.apache.spark.rdd.{RDD, UnionRDD} +import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection -import org.apache.spark.sql.execution.{FileRelation, RDDConversions} +import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, Source} import org.apache.spark.sql.types.{StringType, StructType} @@ -146,84 +145,6 @@ trait StreamSinkProvider { partitionColumns: Seq[String]): Sink } -/** - * ::Experimental:: - * Implemented by objects that produce relations for a specific kind of data source - * with a given schema and partitioned columns. When Spark SQL is given a DDL operation with a - * USING clause specified (to specify the implemented [[HadoopFsRelationProvider]]), a user defined - * schema, and an optional list of partition columns, this interface is used to pass in the - * parameters specified by a user. - * - * Users may specify the fully qualified class name of a given data source. When that class is - * not found Spark SQL will append the class name `DefaultSource` to the path, allowing for - * less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the - * data source 'org.apache.spark.sql.json.DefaultSource' - * - * A new instance of this class will be instantiated each time a DDL call is made. - * - * The difference between a [[RelationProvider]] and a [[HadoopFsRelationProvider]] is - * that users need to provide a schema and a (possibly empty) list of partition columns when - * using a [[HadoopFsRelationProvider]]. A relation provider can inherits both [[RelationProvider]], - * and [[HadoopFsRelationProvider]] if it can support schema inference, user-specified - * schemas, and accessing partitioned relations. - * - * @since 1.4.0 - */ -@Experimental -trait HadoopFsRelationProvider extends StreamSourceProvider { - /** - * Returns a new base relation with the given parameters, a user defined schema, and a list of - * partition columns. Note: the parameters' keywords are case insensitive and this insensitivity - * is enforced by the Map that is passed to the function. - * - * @param dataSchema Schema of data columns (i.e., columns that are not partition columns). - */ - def createRelation( - sqlContext: SQLContext, - paths: Array[String], - dataSchema: Option[StructType], - partitionColumns: Option[StructType], - parameters: Map[String, String]): HadoopFsRelation - - private[sql] def createRelation( - sqlContext: SQLContext, - paths: Array[String], - dataSchema: Option[StructType], - partitionColumns: Option[StructType], - bucketSpec: Option[BucketSpec], - parameters: Map[String, String]): HadoopFsRelation = { - if (bucketSpec.isDefined) { - throw new AnalysisException("Currently we don't support bucketing for this data source.") - } - createRelation(sqlContext, paths, dataSchema, partitionColumns, parameters) - } - - override def createSource( - sqlContext: SQLContext, - schema: Option[StructType], - providerName: String, - parameters: Map[String, String]): Source = { - val caseInsensitiveOptions = new CaseInsensitiveMap(parameters) - val path = caseInsensitiveOptions.getOrElse("path", { - throw new IllegalArgumentException("'path' is not specified") - }) - val metadataPath = caseInsensitiveOptions.getOrElse("metadataPath", s"$path/_metadata") - - def dataFrameBuilder(files: Array[String]): DataFrame = { - val relation = createRelation( - sqlContext, - files, - schema, - partitionColumns = None, - bucketSpec = None, - parameters) - DataFrame(sqlContext, LogicalRelation(relation)) - } - - new FileStreamSource(sqlContext, metadataPath, path, schema, providerName, dataFrameBuilder) - } -} - /** * @since 1.3.0 */ @@ -409,20 +330,13 @@ abstract class OutputWriterFactory extends Serializable { * @param dataSchema Schema of the rows to be written. Partition columns are not included in the * schema if the relation being written is partitioned. * @param context The Hadoop MapReduce task context. - * * @since 1.4.0 */ - def newInstance( - path: String, - dataSchema: StructType, - context: TaskAttemptContext): OutputWriter - private[sql] def newInstance( path: String, - bucketId: Option[Int], + bucketId: Option[Int], // TODO: This doesn't belong here... dataSchema: StructType, - context: TaskAttemptContext): OutputWriter = - newInstance(path, dataSchema, context) + context: TaskAttemptContext): OutputWriter } /** @@ -465,214 +379,165 @@ abstract class OutputWriter { } /** - * ::Experimental:: - * A [[BaseRelation]] that provides much of the common code required for relations that store their - * data to an HDFS compatible filesystem. - * - * For the read path, similar to [[PrunedFilteredScan]], it can eliminate unneeded columns and - * filter using selected predicates before producing an RDD containing all matching tuples as - * [[Row]] objects. In addition, when reading from Hive style partitioned tables stored in file - * systems, it's able to discover partitioning information from the paths of input directories, and - * perform partition pruning before start reading the data. Subclasses of [[HadoopFsRelation()]] - * must override one of the four `buildScan` methods to implement the read path. - * - * For the write path, it provides the ability to write to both non-partitioned and partitioned - * tables. Directory layout of the partitioned tables is compatible with Hive. - * - * @constructor This constructor is for internal uses only. The [[PartitionSpec]] argument is for - * implementing metastore table conversion. - * - * @param maybePartitionSpec An [[HadoopFsRelation]] can be created with an optional - * [[PartitionSpec]], so that partition discovery can be skipped. - * - * @since 1.4.0 + * Acts as a container for all of the metadata required to read from a datasource. All discovery, + * resolution and merging logic for schemas and partitions has been removed. + * + * @param location A [[FileCatalog]] that can enumerate the locations of all the files that comprise + * this relation. + * @param partitionSchema The schmea of the columns (if any) that are used to partition the relation + * @param dataSchema The schema of any remaining columns. Note that if any partition columns are + * present in the actual data files as well, they are removed. + * @param bucketSpec Describes the bucketing (hash-partitioning of the files by some column values). + * @param fileFormat A file format that can be used to read and write the data in files. + * @param options Configuration used when reading / writing data. */ -@Experimental -abstract class HadoopFsRelation private[sql]( - maybePartitionSpec: Option[PartitionSpec], - parameters: Map[String, String]) - extends BaseRelation with FileRelation with Logging { - - override def toString: String = getClass.getSimpleName +case class HadoopFsRelation( + sqlContext: SQLContext, + location: FileCatalog, + partitionSchema: StructType, + dataSchema: StructType, + bucketSpec: Option[BucketSpec], + fileFormat: FileFormat, + options: Map[String, String]) extends BaseRelation with FileRelation { + + val schema: StructType = { + val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet + StructType(dataSchema ++ partitionSchema.filterNot { column => + dataSchemaColumnNames.contains(column.name.toLowerCase) + }) + } - def this() = this(None, Map.empty[String, String]) + def partitionSchemaOption: Option[StructType] = + if (partitionSchema.isEmpty) None else Some(partitionSchema) + def partitionSpec: PartitionSpec = location.partitionSpec(partitionSchemaOption) - def this(parameters: Map[String, String]) = this(None, parameters) + def refresh(): Unit = location.refresh() - private[sql] def this(maybePartitionSpec: Option[PartitionSpec]) = - this(maybePartitionSpec, Map.empty[String, String]) + override def toString: String = + s"$fileFormat part: ${partitionSchema.simpleString}, data: ${dataSchema.simpleString}" - private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + /** Returns the list of files that will be read when scanning this relation. */ + override def inputFiles: Array[String] = + location.allFiles().map(_.getPath.toUri.toString).toArray +} - private var _partitionSpec: PartitionSpec = _ +/** + * Used to read a write data in files to [[InternalRow]] format. + */ +trait FileFormat { + /** + * When possible, this method should return the schema of the given `files`. When the format + * does not support inference, or no valid files are given should return None. In these cases + * Spark will require that user specify the schema manually. + */ + def inferSchema( + sqlContext: SQLContext, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] - private[this] var malformedBucketFile = false + /** + * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can + * be put here. For example, user defined output committer can be configured here + * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass. + */ + def prepareWrite( + sqlContext: SQLContext, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory - private[sql] def maybeBucketSpec: Option[BucketSpec] = None + def buildInternalScan( + sqlContext: SQLContext, + dataSchema: StructType, + requiredColumns: Array[String], + filters: Array[Filter], + bucketSet: Option[BitSet], + inputFiles: Array[FileStatus], + broadcastedConf: Broadcast[SerializableConfiguration], + options: Map[String, String]): RDD[InternalRow] +} - final private[sql] def getBucketSpec: Option[BucketSpec] = - maybeBucketSpec.filter(_ => sqlContext.conf.bucketingEnabled() && !malformedBucketFile) +/** + * An interface for objects capable of enumerating the files that comprise a relation as well + * as the partitioning characteristics of those files. + */ +trait FileCatalog { + def paths: Seq[Path] - private class FileStatusCache { - var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus] + def partitionSpec(schema: Option[StructType]): PartitionSpec - var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]] + def allFiles(): Seq[FileStatus] - private def listLeafFiles(paths: Array[String]): mutable.LinkedHashSet[FileStatus] = { - if (paths.length >= sqlContext.conf.parallelPartitionDiscoveryThreshold) { - HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sqlContext.sparkContext) - } else { - val statuses = paths.flatMap { path => - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(hadoopConf) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - logInfo(s"Listing $qualified on driver") - // Dummy jobconf to get to the pathFilter defined in configuration - val jobConf = new JobConf(hadoopConf, this.getClass()) - val pathFilter = FileInputFormat.getInputPathFilter(jobConf) - if (pathFilter != null) { - Try(fs.listStatus(qualified, pathFilter)).getOrElse(Array.empty) - } else { - Try(fs.listStatus(qualified)).getOrElse(Array.empty) - } - }.filterNot { status => - val name = status.getPath.getName - name.toLowerCase == "_temporary" || name.startsWith(".") - } + def getStatus(path: Path): Array[FileStatus] - val (dirs, files) = statuses.partition(_.isDirectory) + def refresh(): Unit +} - // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) - if (dirs.isEmpty) { - mutable.LinkedHashSet(files: _*) - } else { - mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath.toString)) - } - } - } +/** + * A file catalog that caches metadata gathered by scanning all the files present in `paths` + * recursively. + */ +class HDFSFileCatalog( + val sqlContext: SQLContext, + val parameters: Map[String, String], + val paths: Seq[Path]) + extends FileCatalog with Logging { - def refresh(): Unit = { - val files = listLeafFiles(paths) + private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) - leafFiles.clear() - leafDirToChildrenFiles.clear() + var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus] + var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]] + var cachedPartitionSpec: PartitionSpec = _ - leafFiles ++= files.map(f => f.getPath -> f) - leafDirToChildrenFiles ++= files.toArray.groupBy(_.getPath.getParent) + def partitionSpec(schema: Option[StructType]): PartitionSpec = { + if (cachedPartitionSpec == null) { + cachedPartitionSpec = inferPartitioning(schema) } - } - private lazy val fileStatusCache = { - val cache = new FileStatusCache - cache.refresh() - cache + cachedPartitionSpec } - protected def cachedLeafStatuses(): mutable.LinkedHashSet[FileStatus] = { - mutable.LinkedHashSet(fileStatusCache.leafFiles.values.toArray: _*) - } + refresh() - final private[sql] def partitionSpec: PartitionSpec = { - if (_partitionSpec == null) { - _partitionSpec = maybePartitionSpec - .flatMap { - case spec if spec.partitions.nonEmpty => - Some(spec.copy(partitionColumns = spec.partitionColumns.asNullable)) - case _ => - None - } - .orElse { - // We only know the partition columns and their data types. We need to discover - // partition values. - userDefinedPartitionColumns.map { partitionSchema => - val spec = discoverPartitions() - val partitionColumnTypes = spec.partitionColumns.map(_.dataType) - val castedPartitions = spec.partitions.map { case p @ Partition(values, path) => - val literals = partitionColumnTypes.zipWithIndex.map { case (dt, i) => - Literal.create(values.get(i, dt), dt) - } - val castedValues = partitionSchema.zip(literals).map { case (field, literal) => - Cast(literal, field.dataType).eval() - } - p.copy(values = InternalRow.fromSeq(castedValues)) - } - PartitionSpec(partitionSchema, castedPartitions) - } - } - .getOrElse { - if (sqlContext.conf.partitionDiscoveryEnabled()) { - discoverPartitions() - } else { - PartitionSpec(StructType(Nil), Array.empty[Partition]) - } - } - } - _partitionSpec - } + def allFiles(): Seq[FileStatus] = leafFiles.values.toSeq - /** - * Paths of this relation. For partitioned relations, it should be root directories - * of all partition directories. - * - * @since 1.4.0 - */ - def paths: Array[String] - - /** - * Contains a set of paths that are considered as the base dirs of the input datasets. - * The partitioning discovery logic will make sure it will stop when it reaches any - * base path. By default, the paths of the dataset provided by users will be base paths. - * For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path - * will be `/path/something=true/`, and the returned DataFrame will not contain a column of - * `something`. If users want to override the basePath. They can set `basePath` in the options - * to pass the new base path to the data source. - * For the above example, if the user-provided base path is `/path/`, the returned - * DataFrame will have the column of `something`. - */ - private def basePaths: Set[Path] = { - val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath))) - userDefinedBasePath.getOrElse { - // If the user does not provide basePath, we will just use paths. - val pathSet = paths.toSet - pathSet.map(p => new Path(p)) - }.map { hdfsPath => - // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). - val fs = hdfsPath.getFileSystem(hadoopConf) - hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - } - } - - override def inputFiles: Array[String] = cachedLeafStatuses().map(_.getPath.toString).toArray + def getStatus(path: Path): Array[FileStatus] = leafDirToChildrenFiles(path) - override def sizeInBytes: Long = cachedLeafStatuses().map(_.getLen).sum - - /** - * Partition columns. Can be either defined by [[userDefinedPartitionColumns]] or automatically - * discovered. Note that they should always be nullable. - * - * @since 1.4.0 - */ - final def partitionColumns: StructType = - userDefinedPartitionColumns.getOrElse(partitionSpec.partitionColumns) + private def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { + if (paths.length >= sqlContext.conf.parallelPartitionDiscoveryThreshold) { + HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sqlContext.sparkContext) + } else { + val statuses = paths.flatMap { path => + val fs = path.getFileSystem(hadoopConf) + logInfo(s"Listing $path on driver") + // Dummy jobconf to get to the pathFilter defined in configuration + val jobConf = new JobConf(hadoopConf, this.getClass()) + val pathFilter = FileInputFormat.getInputPathFilter(jobConf) + if (pathFilter != null) { + Try(fs.listStatus(path, pathFilter)).getOrElse(Array.empty) + } else { + Try(fs.listStatus(path)).getOrElse(Array.empty) + } + }.filterNot { status => + val name = status.getPath.getName + name.toLowerCase == "_temporary" || name.startsWith(".") + } - /** - * Optional user defined partition columns. - * - * @since 1.4.0 - */ - def userDefinedPartitionColumns: Option[StructType] = None + val (dirs, files) = statuses.partition(_.isDirectory) - private[sql] def refresh(): Unit = { - fileStatusCache.refresh() - if (sqlContext.conf.partitionDiscoveryEnabled()) { - _partitionSpec = discoverPartitions() + // It uses [[LinkedHashSet]] since the order of files can affect the results. (SPARK-11500) + if (dirs.isEmpty) { + mutable.LinkedHashSet(files: _*) + } else { + mutable.LinkedHashSet(files: _*) ++ listLeafFiles(dirs.map(_.getPath)) + } } } - private def discoverPartitions(): PartitionSpec = { + def inferPartitioning(schema: Option[StructType]): PartitionSpec = { // We use leaf dirs containing data files to discover the schema. - val leafDirs = fileStatusCache.leafDirToChildrenFiles.keys.toSeq - userDefinedPartitionColumns match { + val leafDirs = leafDirToChildrenFiles.keys.toSeq + schema match { case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => val spec = PartitioningUtils.parsePartitions( leafDirs, @@ -693,9 +558,7 @@ abstract class HadoopFsRelation private[sql]( PartitionSpec(userProvidedSchema, spec.partitions.map { part => part.copy(values = castPartitionValuesToUserSchema(part.values)) }) - - case _ => - // user did not provide a partitioning schema + case None => PartitioningUtils.parsePartitions( leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME, @@ -705,271 +568,51 @@ abstract class HadoopFsRelation private[sql]( } /** - * Schema of this relation. It consists of columns appearing in [[dataSchema]] and all partition - * columns not appearing in [[dataSchema]]. - * - * @since 1.4.0 - */ - override lazy val schema: StructType = { - val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet - StructType(dataSchema ++ partitionColumns.filterNot { column => - dataSchemaColumnNames.contains(column.name.toLowerCase) - }) - } - - /** - * Groups the input files by bucket id, if bucketing is enabled and this data source is bucketed. - * Returns None if there exists any malformed bucket files. + * Contains a set of paths that are considered as the base dirs of the input datasets. + * The partitioning discovery logic will make sure it will stop when it reaches any + * base path. By default, the paths of the dataset provided by users will be base paths. + * For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path + * will be `/path/something=true/`, and the returned DataFrame will not contain a column of + * `something`. If users want to override the basePath. They can set `basePath` in the options + * to pass the new base path to the data source. + * For the above example, if the user-provided base path is `/path/`, the returned + * DataFrame will have the column of `something`. */ - private def groupBucketFiles( - files: Array[FileStatus]): Option[scala.collection.Map[Int, Array[FileStatus]]] = { - malformedBucketFile = false - if (getBucketSpec.isDefined) { - val groupedBucketFiles = mutable.HashMap.empty[Int, mutable.ArrayBuffer[FileStatus]] - var i = 0 - while (!malformedBucketFile && i < files.length) { - val bucketId = BucketingUtils.getBucketId(files(i).getPath.getName) - if (bucketId.isEmpty) { - logError(s"File ${files(i).getPath} is expected to be a bucket file, but there is no " + - "bucket id information in file name. Fall back to non-bucketing mode.") - malformedBucketFile = true - } else { - val bucketFiles = - groupedBucketFiles.getOrElseUpdate(bucketId.get, mutable.ArrayBuffer.empty) - bucketFiles += files(i) - } - i += 1 - } - if (malformedBucketFile) None else Some(groupedBucketFiles.mapValues(_.toArray)) - } else { - None - } - } - - final private[sql] def buildInternalScan( - requiredColumns: Array[String], - filters: Array[Filter], - bucketSet: Option[BitSet], - inputPaths: Array[String], - broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = { - val inputStatuses = inputPaths.flatMap { input => - val path = new Path(input) - - // First assumes `input` is a directory path, and tries to get all files contained in it. - fileStatusCache.leafDirToChildrenFiles.getOrElse( - path, - // Otherwise, `input` might be a file path - fileStatusCache.leafFiles.get(path).toArray - ).filter { status => - val name = status.getPath.getName - !name.startsWith("_") && !name.startsWith(".") - } - } - - groupBucketFiles(inputStatuses).map { groupedBucketFiles => - // For each bucket id, firstly we get all files belong to this bucket, by detecting bucket - // id from file name. Then read these files into a RDD(use one-partition empty RDD for empty - // bucket), and coalesce it to one partition. Finally union all bucket RDDs to one result. - val perBucketRows = (0 until maybeBucketSpec.get.numBuckets).map { bucketId => - // If the current bucketId is not set in the bucket bitSet, skip scanning it. - if (bucketSet.nonEmpty && !bucketSet.get.get(bucketId)){ - sqlContext.emptyResult - } else { - // When all the buckets need a scan (i.e., bucketSet is equal to None) - // or when the current bucket need a scan (i.e., the bit of bucketId is set to true) - groupedBucketFiles.get(bucketId).map { inputStatuses => - buildInternalScan(requiredColumns, filters, inputStatuses, broadcastedConf).coalesce(1) - }.getOrElse(sqlContext.emptyResult) - } - } - - new UnionRDD(sqlContext.sparkContext, perBucketRows) - }.getOrElse { - buildInternalScan(requiredColumns, filters, inputStatuses, broadcastedConf) + private def basePaths: Set[Path] = { + val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath))) + userDefinedBasePath.getOrElse { + // If the user does not provide basePath, we will just use paths. + paths.toSet + }.map { hdfsPath => + // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). + val fs = hdfsPath.getFileSystem(hadoopConf) + hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) } } - /** - * Specifies schema of actual data files. For partitioned relations, if one or more partitioned - * columns are contained in the data files, they should also appear in `dataSchema`. - * - * @since 1.4.0 - */ - def dataSchema: StructType - - /** - * For a non-partitioned relation, this method builds an `RDD[Row]` containing all rows within - * this relation. For partitioned relations, this method is called for each selected partition, - * and builds an `RDD[Row]` containing all rows within that single partition. - * - * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the - * relation. For a partitioned relation, it contains paths of all data files in a single - * selected partition. - * - * @since 1.4.0 - */ - def buildScan(inputFiles: Array[FileStatus]): RDD[Row] = { - throw new UnsupportedOperationException( - "At least one buildScan() method should be overridden to read the relation.") - } - - /** - * For a non-partitioned relation, this method builds an `RDD[Row]` containing all rows within - * this relation. For partitioned relations, this method is called for each selected partition, - * and builds an `RDD[Row]` containing all rows within that single partition. - * - * @param requiredColumns Required columns. - * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the - * relation. For a partitioned relation, it contains paths of all data files in a single - * selected partition. - * - * @since 1.4.0 - */ - // TODO Tries to eliminate the extra Catalyst-to-Scala conversion when `needConversion` is true - // - // PR #7626 separated `Row` and `InternalRow` completely. One of the consequences is that we can - // no longer treat an `InternalRow` containing Catalyst values as a `Row`. Thus we have to - // introduce another row value conversion for data sources whose `needConversion` is true. - def buildScan(requiredColumns: Array[String], inputFiles: Array[FileStatus]): RDD[Row] = { - // Yeah, to workaround serialization... - val dataSchema = this.dataSchema - val needConversion = this.needConversion - - val requiredOutput = requiredColumns.map { col => - val field = dataSchema(col) - BoundReference(dataSchema.fieldIndex(col), field.dataType, field.nullable) - }.toSeq - - val rdd: RDD[Row] = buildScan(inputFiles) - val converted: RDD[InternalRow] = - if (needConversion) { - RDDConversions.rowToRowRdd(rdd, dataSchema.fields.map(_.dataType)) - } else { - rdd.asInstanceOf[RDD[InternalRow]] - } + def refresh(): Unit = { + val files = listLeafFiles(paths) - converted.mapPartitions { rows => - val buildProjection = - GenerateMutableProjection.generate(requiredOutput, dataSchema.toAttributes) + leafFiles.clear() + leafDirToChildrenFiles.clear() - val projectedRows = { - val mutableProjection = buildProjection() - rows.map(r => mutableProjection(r)) - } + leafFiles ++= files.map(f => f.getPath -> f) + leafDirToChildrenFiles ++= files.toArray.groupBy(_.getPath.getParent) - if (needConversion) { - val requiredSchema = StructType(requiredColumns.map(dataSchema(_))) - val toScala = CatalystTypeConverters.createToScalaConverter(requiredSchema) - projectedRows.map(toScala(_).asInstanceOf[Row]) - } else { - projectedRows - } - }.asInstanceOf[RDD[Row]] + cachedPartitionSpec = null } - /** - * For a non-partitioned relation, this method builds an `RDD[Row]` containing all rows within - * this relation. For partitioned relations, this method is called for each selected partition, - * and builds an `RDD[Row]` containing all rows within that single partition. - * - * @param requiredColumns Required columns. - * @param filters Candidate filters to be pushed down. The actual filter should be the conjunction - * of all `filters`. The pushed down filters are currently purely an optimization as they - * will all be evaluated again. This means it is safe to use them with methods that produce - * false positives such as filtering partitions based on a bloom filter. - * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the - * relation. For a partitioned relation, it contains paths of all data files in a single - * selected partition. - * - * @since 1.4.0 - */ - def buildScan( - requiredColumns: Array[String], - filters: Array[Filter], - inputFiles: Array[FileStatus]): RDD[Row] = { - buildScan(requiredColumns, inputFiles) + override def equals(other: Any): Boolean = other match { + case hdfs: HDFSFileCatalog => paths.toSet == hdfs.paths.toSet + case _ => false } - /** - * For a non-partitioned relation, this method builds an `RDD[Row]` containing all rows within - * this relation. For partitioned relations, this method is called for each selected partition, - * and builds an `RDD[Row]` containing all rows within that single partition. - * - * Note: This interface is subject to change in future. - * - * @param requiredColumns Required columns. - * @param filters Candidate filters to be pushed down. The actual filter should be the conjunction - * of all `filters`. The pushed down filters are currently purely an optimization as they - * will all be evaluated again. This means it is safe to use them with methods that produce - * false positives such as filtering partitions based on a bloom filter. - * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the - * relation. For a partitioned relation, it contains paths of all data files in a single - * selected partition. - * @param broadcastedConf A shared broadcast Hadoop Configuration, which can be used to reduce the - * overhead of broadcasting the Configuration for every Hadoop RDD. - * - * @since 1.4.0 - */ - private[sql] def buildScan( - requiredColumns: Array[String], - filters: Array[Filter], - inputFiles: Array[FileStatus], - broadcastedConf: Broadcast[SerializableConfiguration]): RDD[Row] = { - buildScan(requiredColumns, filters, inputFiles) - } - - /** - * For a non-partitioned relation, this method builds an `RDD[InternalRow]` containing all rows - * within this relation. For partitioned relations, this method is called for each selected - * partition, and builds an `RDD[InternalRow]` containing all rows within that single partition. - * - * Note: - * - * 1. Rows contained in the returned `RDD[InternalRow]` are assumed to be `UnsafeRow`s. - * 2. This interface is subject to change in future. - * - * @param requiredColumns Required columns. - * @param filters Candidate filters to be pushed down. The actual filter should be the conjunction - * of all `filters`. The pushed down filters are currently purely an optimization as they - * will all be evaluated again. This means it is safe to use them with methods that produce - * false positives such as filtering partitions based on a bloom filter. - * @param inputFiles For a non-partitioned relation, it contains paths of all data files in the - * relation. For a partitioned relation, it contains paths of all data files in a single - * selected partition. - * @param broadcastedConf A shared broadcast Hadoop Configuration, which can be used to reduce the - * overhead of broadcasting the Configuration for every Hadoop RDD. - */ - private[sql] def buildInternalScan( - requiredColumns: Array[String], - filters: Array[Filter], - inputFiles: Array[FileStatus], - broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = { - val requiredSchema = StructType(requiredColumns.map(dataSchema.apply)) - val internalRows = { - val externalRows = buildScan(requiredColumns, filters, inputFiles, broadcastedConf) - execution.RDDConversions.rowToRowRdd(externalRows, requiredSchema.map(_.dataType)) - } - - internalRows.mapPartitions { iterator => - val unsafeProjection = UnsafeProjection.create(requiredSchema) - iterator.map(unsafeProjection) - } - } - - /** - * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can - * be put here. For example, user defined output committer can be configured here - * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass. - * - * Note that the only side effect expected here is mutating `job` via its setters. Especially, - * Spark SQL caches [[BaseRelation]] instances for performance, mutating relation internal states - * may cause unexpected behaviors. - * - * @since 1.4.0 - */ - def prepareJobForWrite(job: Job): OutputWriterFactory + override def hashCode(): Int = paths.toSet.hashCode() } +/** + * Helper methods for gathering metadata from HDFS. + */ private[sql] object HadoopFsRelation extends Logging { // We don't filter files/directories whose name start with "_" except "_temporary" here, as // specific data sources may take advantages over them (e.g. Parquet _metadata and @@ -1009,17 +652,17 @@ private[sql] object HadoopFsRelation extends Logging { accessTime: Long) def listLeafFilesInParallel( - paths: Array[String], + paths: Seq[Path], hadoopConf: Configuration, sparkContext: SparkContext): mutable.LinkedHashSet[FileStatus] = { logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") val serializableConfiguration = new SerializableConfiguration(hadoopConf) - val fakeStatuses = sparkContext.parallelize(paths).flatMap { path => - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(serializableConfiguration.value) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - Try(listLeafFiles(fs, fs.getFileStatus(qualified))).getOrElse(Array.empty) + val serializedPaths = paths.map(_.toString) + + val fakeStatuses = sparkContext.parallelize(serializedPaths).map(new Path(_)).flatMap { path => + val fs = path.getFileSystem(serializableConfiguration.value) + Try(listLeafFiles(fs, fs.getFileStatus(path))).getOrElse(Array.empty) }.map { status => FakeFileStatus( status.getPath.toString, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index a824759cb8..55153cda31 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -889,7 +889,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { .write.format("parquet").save("temp") } assert(e.getMessage.contains("Duplicate column(s)")) - assert(e.getMessage.contains("parquet")) assert(e.getMessage.contains("column1")) assert(!e.getMessage.contains("column2")) @@ -900,7 +899,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { .write.format("json").save("temp") } assert(f.getMessage.contains("Duplicate column(s)")) - assert(f.getMessage.contains("JSON")) assert(f.getMessage.contains("column1")) assert(f.getMessage.contains("column3")) assert(!f.getMessage.contains("column2")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index f59faa0dc2..182f287dd0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1741,7 +1741,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { val e3 = intercept[AnalysisException] { sql("select * from json.invalid_file") } - assert(e3.message.contains("No input paths specified")) + assert(e3.message.contains("Unable to infer schema")) } test("SortMergeJoin returns wrong results when using UnsafeRows") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 3a33554143..2f17037a58 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -582,35 +582,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { jsonDF.registerTempTable("jsonTable") } - test("jsonFile should be based on JSONRelation") { - val dir = Utils.createTempDir() - dir.delete() - val path = dir.getCanonicalFile.toURI.toString - sparkContext.parallelize(1 to 100) - .map(i => s"""{"a": 1, "b": "str$i"}""").saveAsTextFile(path) - val jsonDF = sqlContext.read.option("samplingRatio", "0.49").json(path) - - val analyzed = jsonDF.queryExecution.analyzed - assert( - analyzed.isInstanceOf[LogicalRelation], - "The DataFrame returned by jsonFile should be based on LogicalRelation.") - val relation = analyzed.asInstanceOf[LogicalRelation].relation - assert( - relation.isInstanceOf[JSONRelation], - "The DataFrame returned by jsonFile should be based on JSONRelation.") - assert(relation.asInstanceOf[JSONRelation].paths === Array(path)) - assert(relation.asInstanceOf[JSONRelation].options.samplingRatio === (0.49 +- 0.001)) - - val schema = StructType(StructField("a", LongType, true) :: Nil) - val logicalRelation = - sqlContext.read.schema(schema).json(path) - .queryExecution.analyzed.asInstanceOf[LogicalRelation] - val relationWithSchema = logicalRelation.relation.asInstanceOf[JSONRelation] - assert(relationWithSchema.paths === Array(path)) - assert(relationWithSchema.schema === schema) - assert(relationWithSchema.options.samplingRatio > 0.99) - } - test("Loading a JSON dataset from a text file") { val dir = Utils.createTempDir() dir.delete() @@ -1202,48 +1173,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("JSONRelation equality test") { - val relation0 = new JSONRelation( - Some(empty), - Some(StructType(StructField("a", IntegerType, true) :: Nil)), - None, - None)(sqlContext) - val logicalRelation0 = LogicalRelation(relation0) - val relation1 = new JSONRelation( - Some(singleRow), - Some(StructType(StructField("a", IntegerType, true) :: Nil)), - None, - None)(sqlContext) - val logicalRelation1 = LogicalRelation(relation1) - val relation2 = new JSONRelation( - Some(singleRow), - Some(StructType(StructField("a", IntegerType, true) :: Nil)), - None, - None, - parameters = Map("samplingRatio" -> "0.5"))(sqlContext) - val logicalRelation2 = LogicalRelation(relation2) - val relation3 = new JSONRelation( - Some(singleRow), - Some(StructType(StructField("b", IntegerType, true) :: Nil)), - None, - None)(sqlContext) - val logicalRelation3 = LogicalRelation(relation3) - - assert(relation0 !== relation1) - assert(!logicalRelation0.sameResult(logicalRelation1), - s"$logicalRelation0 and $logicalRelation1 should be considered not having the same result.") - - assert(relation1 === relation2) - assert(logicalRelation1.sameResult(logicalRelation2), - s"$logicalRelation1 and $logicalRelation2 should be considered having the same result.") - - assert(relation1 !== relation3) - assert(!logicalRelation1.sameResult(logicalRelation3), - s"$logicalRelation1 and $logicalRelation3 should be considered not having the same result.") - - assert(relation2 !== relation3) - assert(!logicalRelation2.sameResult(logicalRelation3), - s"$logicalRelation2 and $logicalRelation3 should be considered not having the same result.") - withTempPath(dir => { val path = dir.getCanonicalFile.toURI.toString sparkContext.parallelize(1 to 100) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index d2947676a0..e32616fb5c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.HadoopFsRelation import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -59,9 +60,9 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex .select(output.map(e => Column(e)): _*) .where(Column(predicate)) - var maybeRelation: Option[ParquetRelation] = None + var maybeRelation: Option[HadoopFsRelation] = None val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { - case PhysicalOperation(_, filters, LogicalRelation(relation: ParquetRelation, _, _)) => + case PhysicalOperation(_, filters, LogicalRelation(relation: HadoopFsRelation, _, _)) => maybeRelation = Some(relation) filters }.flatten.reduceLeftOption(_ && _) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index cf8a9fdd46..34e914cb1e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -437,8 +437,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { readParquetFile(path.toString) { df => assertResult(df.schema) { StructType( - StructField("a", BooleanType, nullable = false) :: - StructField("b", IntegerType, nullable = false) :: + StructField("a", BooleanType, nullable = true) :: + StructField("b", IntegerType, nullable = true) :: Nil) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 8bc5c89959..b74b9d3f3b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.execution.datasources.{LogicalRelation, Partition, PartitioningUtils, PartitionSpec} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.HadoopFsRelation import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -564,7 +565,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha (1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath) val queryExecution = sqlContext.read.parquet(dir.getCanonicalPath).queryExecution queryExecution.analyzed.collectFirst { - case LogicalRelation(relation: ParquetRelation, _, _) => + case LogicalRelation(relation: HadoopFsRelation, _, _) => assert(relation.partitionSpec === PartitionSpec.emptySpec) }.getOrElse { fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 5b70d258d6..5ac39f54b9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -174,7 +174,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { """.stripMargin) }.getMessage assert( - message.contains("Cannot insert overwrite into table that is also being read from."), + message.contains("Cannot overwrite a path that is also being read from."), "INSERT OVERWRITE to a table while querying it should not be allowed.") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 7a4ee0ef26..e9d77abb8c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, File, FileNotFoundException, InputStream} import com.google.common.base.Charsets.UTF_8 -import org.apache.spark.sql.StreamTest +import org.apache.spark.sql.{AnalysisException, StreamTest} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.FileStreamSource._ @@ -112,7 +112,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("FileStreamSource schema: path doesn't exist") { - intercept[FileNotFoundException] { + intercept[AnalysisException] { createFileStreamSourceAndGetSchema(format = None, path = Some("/a/b/c"), schema = None) } } @@ -146,11 +146,11 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { test("FileStreamSource schema: parquet, no existing files, no schema") { withTempDir { src => - val e = intercept[IllegalArgumentException] { + val e = intercept[AnalysisException] { createFileStreamSourceAndGetSchema( format = Some("parquet"), path = Some(new File(src, "1").getCanonicalPath), schema = None) } - assert("No schema specified" === e.getMessage) + assert("Unable to infer schema. It must be specified manually.;" === e.getMessage) } } @@ -177,11 +177,11 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { test("FileStreamSource schema: json, no existing files, no schema") { withTempDir { src => - val e = intercept[IllegalArgumentException] { + val e = intercept[AnalysisException] { createFileStreamSourceAndGetSchema( format = Some("json"), path = Some(src.getCanonicalPath), schema = None) } - assert("No schema specified" === e.getMessage) + assert("Unable to infer schema. It must be specified manually.;" === e.getMessage) } } @@ -310,10 +310,10 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { createFileStreamSource("text", src.getCanonicalPath) // Both "json" and "parquet" require a schema if no existing file to infer - intercept[IllegalArgumentException] { + intercept[AnalysisException] { createFileStreamSource("json", src.getCanonicalPath) } - intercept[IllegalArgumentException] { + intercept[AnalysisException] { createFileStreamSource("parquet", src.getCanonicalPath) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 83ea311eb2..a7592e5d8d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -28,6 +28,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkFunSuite import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.util.Utils @@ -140,7 +141,13 @@ private[sql] trait SQLTestUtils * Drops temporary table `tableName` after calling `f`. */ protected def withTempTable(tableNames: String*)(f: => Unit): Unit = { - try f finally tableNames.foreach(sqlContext.dropTempTable) + try f finally { + // If the test failed part way, we don't want to mask the failure by failing to remove + // temp tables that never got created. + try tableNames.foreach(sqlContext.dropTempTable) catch { + case _: NoSuchTableException => + } + } } /** -- cgit v1.2.3