diff options
author | Cheng Lian <lian@databricks.com> | 2016-04-19 09:37:00 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-04-19 09:37:00 -0700 |
commit | 5e360c93bed9d4f9250cf79bbcebd8552557f548 (patch) | |
tree | ce3a791360d08ebedd126e91764ca008a304058b /sql | |
parent | 3d46d796a3a2b60b37dc318652eded5e992be1e5 (diff) | |
download | spark-5e360c93bed9d4f9250cf79bbcebd8552557f548.tar.gz spark-5e360c93bed9d4f9250cf79bbcebd8552557f548.tar.bz2 spark-5e360c93bed9d4f9250cf79bbcebd8552557f548.zip |
[SPARK-13681][SPARK-14458][SPARK-14566][SQL] Add back once removed CommitFailureTestRelationSuite and SimpleTextHadoopFsRelationSuite
## What changes were proposed in this pull request?
These test suites were removed while refactoring `HadoopFsRelation` related API. This PR brings them back.
This PR also fixes two regressions:
- SPARK-14458, which causes runtime error when saving partitioned tables using `FileFormat` data sources that are not able to infer their own schemata. This bug wasn't detected by any built-in data sources because all of them happen to have schema inference feature.
- SPARK-14566, which happens to be covered by SPARK-14458 and causes wrong query result or runtime error when
- appending a Dataset `ds` to a persisted partitioned data source relation `t`, and
- partition columns in `ds` don't all appear after data columns
## How was this patch tested?
`CommitFailureTestRelationSuite` uses a testing relation that always fails when committing write tasks to test write job cleanup.
`SimpleTextHadoopFsRelationSuite` uses a testing relation to test general `HadoopFsRelation` and `FileFormat` interfaces.
The two regressions are both covered by existing test cases.
Author: Cheng Lian <lian@databricks.com>
Closes #12179 from liancheng/spark-13681-commit-failure-test.
Diffstat (limited to 'sql')
7 files changed, 403 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 10fde152ab..23a7071086 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -310,7 +310,17 @@ case class DataSource( val fileCatalog: FileCatalog = new HDFSFileCatalog(sqlContext, options, globbedPaths, partitionSchema) - val dataSchema = userSpecifiedSchema.orElse { + + val dataSchema = userSpecifiedSchema.map { schema => + val equality = + if (sqlContext.conf.caseSensitiveAnalysis) { + org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution + } else { + org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution + } + + StructType(schema.filterNot(f => partitionColumns.exists(equality(_, f.name)))) + }.orElse { format.inferSchema( sqlContext, caseInsensitiveOptions, @@ -318,7 +328,7 @@ case class DataSource( }.getOrElse { throw new AnalysisException( s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " + - "It must be specified manually") + "It must be specified manually") } val enrichedOptions = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 697cf719c1..79fe23b258 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -504,11 +504,12 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte } } - private def convertToLogicalRelation(metastoreRelation: MetastoreRelation, - options: Map[String, String], - defaultSource: FileFormat, - fileFormatClass: Class[_ <: FileFormat], - fileType: String): LogicalRelation = { + private def convertToLogicalRelation( + metastoreRelation: MetastoreRelation, + options: Map[String, String], + defaultSource: FileFormat, + fileFormatClass: Class[_ <: FileFormat], + fileType: String): LogicalRelation = { val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) val tableIdentifier = QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 5ef502afa5..8f7c4e8289 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -300,7 +300,7 @@ case class CreateMetastoreDataSourceAsSelect( val data = Dataset.ofRows(hiveContext, query) val df = existingSchema match { // If we are inserting into an existing table, just use the existing schema. - case Some(s) => sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, s) + case Some(s) => data.selectExpr(s.fieldNames: _*) case None => data } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala new file mode 100644 index 0000000000..08e83b7f69 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkException +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class CommitFailureTestRelationSuite extends SQLTestUtils with TestHiveSingleton { + // When committing a task, `CommitFailureTestSource` throws an exception for testing purpose. + val dataSourceName: String = classOf[CommitFailureTestSource].getCanonicalName + + test("SPARK-7684: commitTask() failure should fallback to abortTask()") { + withTempPath { file => + // Here we coalesce partition number to 1 to ensure that only a single task is issued. This + // prevents race condition happened when FileOutputCommitter tries to remove the `_temporary` + // directory while committing/aborting the job. See SPARK-8513 for more details. + val df = sqlContext.range(0, 10).coalesce(1) + intercept[SparkException] { + df.write.format(dataSourceName).save(file.getCanonicalPath) + } + + val fs = new Path(file.getCanonicalPath).getFileSystem(SparkHadoopUtil.get.conf) + assert(!fs.exists(new Path(file.getCanonicalPath, "_temporary"))) + } + } + + test("call failure callbacks before close writer - default") { + SimpleTextRelation.failCommitter = false + withTempPath { file => + // fail the job in the middle of writing + val divideByZero = udf((x: Int) => { x / (x - 1)}) + val df = sqlContext.range(0, 10).coalesce(1).select(divideByZero(col("id"))) + + SimpleTextRelation.callbackCalled = false + intercept[SparkException] { + df.write.format(dataSourceName).save(file.getCanonicalPath) + } + assert(SimpleTextRelation.callbackCalled, "failure callback should be called") + + val fs = new Path(file.getCanonicalPath).getFileSystem(SparkHadoopUtil.get.conf) + assert(!fs.exists(new Path(file.getCanonicalPath, "_temporary"))) + } + } + + test("call failure callbacks before close writer - partitioned") { + SimpleTextRelation.failCommitter = false + withTempPath { file => + // fail the job in the middle of writing + val df = sqlContext.range(0, 10).coalesce(1).select(col("id").mod(2).as("key"), col("id")) + + SimpleTextRelation.callbackCalled = false + SimpleTextRelation.failWriter = true + intercept[SparkException] { + df.write.format(dataSourceName).partitionBy("key").save(file.getCanonicalPath) + } + assert(SimpleTextRelation.callbackCalled, "failure callback should be called") + + val fs = new Path(file.getCanonicalPath).getFileSystem(SparkHadoopUtil.get.conf) + assert(!fs.exists(new Path(file.getCanonicalPath, "_temporary"))) + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala new file mode 100644 index 0000000000..6d7e7b77df --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} + +import org.apache.spark.TaskContext +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.types.StructType + +class CommitFailureTestSource extends SimpleTextSource { + /** + * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can + * be put here. For example, user defined output committer can be configured here + * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass. + */ + override def prepareWrite( + sqlContext: SQLContext, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = + new OutputWriterFactory { + override def newInstance( + path: String, + bucketId: Option[Int], + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new SimpleTextOutputWriter(path, context) { + var failed = false + TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => + failed = true + SimpleTextRelation.callbackCalled = true + } + + override def write(row: Row): Unit = { + if (SimpleTextRelation.failWriter) { + sys.error("Intentional task writer failure for testing purpose.") + + } + super.write(row) + } + + override def close(): Unit = { + super.close() + sys.error("Intentional task commitment failure for testing purpose.") + } + } + } + } + + override def shortName(): String = "commit-failure-test" +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala new file mode 100644 index 0000000000..71e3457d25 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.hadoop.fs.Path + +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql.catalyst.expressions.PredicateHelper +import org.apache.spark.sql.types._ + +class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with PredicateHelper { + override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName + + // We have a very limited number of supported types at here since it is just for a + // test relation and we do very basic testing at here. + override protected def supportsDataType(dataType: DataType): Boolean = dataType match { + case _: BinaryType => false + // We are using random data generator and the generated strings are not really valid string. + case _: StringType => false + case _: BooleanType => false // see https://issues.apache.org/jira/browse/SPARK-10442 + case _: CalendarIntervalType => false + case _: DateType => false + case _: TimestampType => false + case _: ArrayType => false + case _: MapType => false + case _: StructType => false + case _: UserDefinedType[_] => false + case _ => true + } + + test("save()/load() - partitioned table - simple queries - partition columns in data") { + withTempDir { file => + val basePath = new Path(file.getCanonicalPath) + val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) + val qualifiedBasePath = fs.makeQualified(basePath) + + for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) { + val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2") + sparkContext + .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1") + .saveAsTextFile(partitionDir.toString) + } + + val dataSchemaWithPartition = + StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) + + checkQueries( + hiveContext.read.format(dataSourceName) + .option("dataSchema", dataSchemaWithPartition.json) + .load(file.getCanonicalPath)) + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala new file mode 100644 index 0000000000..113b124be3 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import java.text.NumberFormat + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.{NullWritable, Text} +import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat} + +import org.apache.spark.sql.{sources, Row, SQLContext} +import org.apache.spark.sql.catalyst.{expressions, InternalRow} +import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, GenericInternalRow, InterpretedPredicate, InterpretedProjection, JoinedRow, Literal} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, PartitionedFile} +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.util.SerializableConfiguration + +class SimpleTextSource extends FileFormat with DataSourceRegister { + override def shortName(): String = "test" + + override def inferSchema( + sqlContext: SQLContext, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + Some(DataType.fromJson(options("dataSchema")).asInstanceOf[StructType]) + } + + override def prepareWrite( + sqlContext: SQLContext, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = new OutputWriterFactory { + override private[sql] def newInstance( + path: String, + bucketId: Option[Int], + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new SimpleTextOutputWriter(path, context) + } + } + + override def buildReader( + sqlContext: SQLContext, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = { + + SimpleTextRelation.requiredColumns = requiredSchema.fieldNames + SimpleTextRelation.pushedFilters = filters.toSet + + val fieldTypes = dataSchema.map(_.dataType) + val inputAttributes = dataSchema.toAttributes + val outputAttributes = requiredSchema.flatMap { field => + inputAttributes.find(_.name == field.name) + } + + val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + val broadcastedConf = + sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf)) + + (file: PartitionedFile) => { + val predicate = { + val filterCondition: Expression = filters.collect { + // According to `unhandledFilters`, `SimpleTextRelation` only handles `GreaterThan` filter + case sources.GreaterThan(column, value) => + val dataType = dataSchema(column).dataType + val literal = Literal.create(value, dataType) + val attribute = inputAttributes.find(_.name == column).get + expressions.GreaterThan(attribute, literal) + }.reduceOption(expressions.And).getOrElse(Literal(true)) + InterpretedPredicate.create(filterCondition, inputAttributes) + } + + // Uses a simple projection to simulate column pruning + val projection = new InterpretedProjection(outputAttributes, inputAttributes) + + val unsafeRowIterator = + new HadoopFileLinesReader(file, broadcastedConf.value.value).map { line => + val record = line.toString + new GenericInternalRow(record.split(",", -1).zip(fieldTypes).map { + case (v, dataType) => + val value = if (v == "") null else v + // `Cast`ed values are always of internal types (e.g. UTF8String instead of String) + Cast(Literal(value), dataType).eval() + }) + }.filter(predicate).map(projection) + + // Appends partition values + val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes + val joinedRow = new JoinedRow() + val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput) + + unsafeRowIterator.map { dataRow => + appendPartitionColumns(joinedRow(dataRow, file.partitionValues)) + } + } + } +} + +class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter { + private val recordWriter: RecordWriter[NullWritable, Text] = + new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context) + + override def write(row: Row): Unit = { + val serialized = row.toSeq.map { v => + if (v == null) "" else v.toString + }.mkString(",") + recordWriter.write(null, new Text(serialized)) + } + + override def close(): Unit = { + recordWriter.close(context) + } +} + +class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullWritable, Text] { + val numberFormat = NumberFormat.getInstance() + + numberFormat.setMinimumIntegerDigits(5) + numberFormat.setGroupingUsed(false) + + override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { + val configuration = context.getConfiguration + val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID") + val taskAttemptId = context.getTaskAttemptID + val split = taskAttemptId.getTaskID.getId + val name = FileOutputFormat.getOutputName(context) + new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId") + } +} + +object SimpleTextRelation { + // Used to test column pruning + var requiredColumns: Seq[String] = Nil + + // Used to test filter push-down + var pushedFilters: Set[Filter] = Set.empty + + // Used to test failed committer + var failCommitter = false + + // Used to test failed writer + var failWriter = false + + // Used to test failure callback + var callbackCalled = false +} |