diff options
Diffstat (limited to 'sql/hive/src/test/scala/org/apache')
4 files changed, 384 insertions, 0 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala new file mode 100644 index 0000000000..08e83b7f69 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkException +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class CommitFailureTestRelationSuite extends SQLTestUtils with TestHiveSingleton { + // When committing a task, `CommitFailureTestSource` throws an exception for testing purpose. + val dataSourceName: String = classOf[CommitFailureTestSource].getCanonicalName + + test("SPARK-7684: commitTask() failure should fallback to abortTask()") { + withTempPath { file => + // Here we coalesce partition number to 1 to ensure that only a single task is issued. This + // prevents race condition happened when FileOutputCommitter tries to remove the `_temporary` + // directory while committing/aborting the job. See SPARK-8513 for more details. + val df = sqlContext.range(0, 10).coalesce(1) + intercept[SparkException] { + df.write.format(dataSourceName).save(file.getCanonicalPath) + } + + val fs = new Path(file.getCanonicalPath).getFileSystem(SparkHadoopUtil.get.conf) + assert(!fs.exists(new Path(file.getCanonicalPath, "_temporary"))) + } + } + + test("call failure callbacks before close writer - default") { + SimpleTextRelation.failCommitter = false + withTempPath { file => + // fail the job in the middle of writing + val divideByZero = udf((x: Int) => { x / (x - 1)}) + val df = sqlContext.range(0, 10).coalesce(1).select(divideByZero(col("id"))) + + SimpleTextRelation.callbackCalled = false + intercept[SparkException] { + df.write.format(dataSourceName).save(file.getCanonicalPath) + } + assert(SimpleTextRelation.callbackCalled, "failure callback should be called") + + val fs = new Path(file.getCanonicalPath).getFileSystem(SparkHadoopUtil.get.conf) + assert(!fs.exists(new Path(file.getCanonicalPath, "_temporary"))) + } + } + + test("call failure callbacks before close writer - partitioned") { + SimpleTextRelation.failCommitter = false + withTempPath { file => + // fail the job in the middle of writing + val df = sqlContext.range(0, 10).coalesce(1).select(col("id").mod(2).as("key"), col("id")) + + SimpleTextRelation.callbackCalled = false + SimpleTextRelation.failWriter = true + intercept[SparkException] { + df.write.format(dataSourceName).partitionBy("key").save(file.getCanonicalPath) + } + assert(SimpleTextRelation.callbackCalled, "failure callback should be called") + + val fs = new Path(file.getCanonicalPath).getFileSystem(SparkHadoopUtil.get.conf) + assert(!fs.exists(new Path(file.getCanonicalPath, "_temporary"))) + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala new file mode 100644 index 0000000000..6d7e7b77df --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} + +import org.apache.spark.TaskContext +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.types.StructType + +class CommitFailureTestSource extends SimpleTextSource { + /** + * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can + * be put here. For example, user defined output committer can be configured here + * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass. + */ + override def prepareWrite( + sqlContext: SQLContext, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = + new OutputWriterFactory { + override def newInstance( + path: String, + bucketId: Option[Int], + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new SimpleTextOutputWriter(path, context) { + var failed = false + TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => + failed = true + SimpleTextRelation.callbackCalled = true + } + + override def write(row: Row): Unit = { + if (SimpleTextRelation.failWriter) { + sys.error("Intentional task writer failure for testing purpose.") + + } + super.write(row) + } + + override def close(): Unit = { + super.close() + sys.error("Intentional task commitment failure for testing purpose.") + } + } + } + } + + override def shortName(): String = "commit-failure-test" +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala new file mode 100644 index 0000000000..71e3457d25 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import org.apache.hadoop.fs.Path + +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql.catalyst.expressions.PredicateHelper +import org.apache.spark.sql.types._ + +class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with PredicateHelper { + override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName + + // We have a very limited number of supported types at here since it is just for a + // test relation and we do very basic testing at here. + override protected def supportsDataType(dataType: DataType): Boolean = dataType match { + case _: BinaryType => false + // We are using random data generator and the generated strings are not really valid string. + case _: StringType => false + case _: BooleanType => false // see https://issues.apache.org/jira/browse/SPARK-10442 + case _: CalendarIntervalType => false + case _: DateType => false + case _: TimestampType => false + case _: ArrayType => false + case _: MapType => false + case _: StructType => false + case _: UserDefinedType[_] => false + case _ => true + } + + test("save()/load() - partitioned table - simple queries - partition columns in data") { + withTempDir { file => + val basePath = new Path(file.getCanonicalPath) + val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) + val qualifiedBasePath = fs.makeQualified(basePath) + + for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) { + val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2") + sparkContext + .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1") + .saveAsTextFile(partitionDir.toString) + } + + val dataSchemaWithPartition = + StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) + + checkQueries( + hiveContext.read.format(dataSourceName) + .option("dataSchema", dataSchemaWithPartition.json) + .load(file.getCanonicalPath)) + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala new file mode 100644 index 0000000000..113b124be3 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources + +import java.text.NumberFormat + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.{NullWritable, Text} +import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat} + +import org.apache.spark.sql.{sources, Row, SQLContext} +import org.apache.spark.sql.catalyst.{expressions, InternalRow} +import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, GenericInternalRow, InterpretedPredicate, InterpretedProjection, JoinedRow, Literal} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, PartitionedFile} +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.util.SerializableConfiguration + +class SimpleTextSource extends FileFormat with DataSourceRegister { + override def shortName(): String = "test" + + override def inferSchema( + sqlContext: SQLContext, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + Some(DataType.fromJson(options("dataSchema")).asInstanceOf[StructType]) + } + + override def prepareWrite( + sqlContext: SQLContext, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = new OutputWriterFactory { + override private[sql] def newInstance( + path: String, + bucketId: Option[Int], + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new SimpleTextOutputWriter(path, context) + } + } + + override def buildReader( + sqlContext: SQLContext, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = { + + SimpleTextRelation.requiredColumns = requiredSchema.fieldNames + SimpleTextRelation.pushedFilters = filters.toSet + + val fieldTypes = dataSchema.map(_.dataType) + val inputAttributes = dataSchema.toAttributes + val outputAttributes = requiredSchema.flatMap { field => + inputAttributes.find(_.name == field.name) + } + + val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + val broadcastedConf = + sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf)) + + (file: PartitionedFile) => { + val predicate = { + val filterCondition: Expression = filters.collect { + // According to `unhandledFilters`, `SimpleTextRelation` only handles `GreaterThan` filter + case sources.GreaterThan(column, value) => + val dataType = dataSchema(column).dataType + val literal = Literal.create(value, dataType) + val attribute = inputAttributes.find(_.name == column).get + expressions.GreaterThan(attribute, literal) + }.reduceOption(expressions.And).getOrElse(Literal(true)) + InterpretedPredicate.create(filterCondition, inputAttributes) + } + + // Uses a simple projection to simulate column pruning + val projection = new InterpretedProjection(outputAttributes, inputAttributes) + + val unsafeRowIterator = + new HadoopFileLinesReader(file, broadcastedConf.value.value).map { line => + val record = line.toString + new GenericInternalRow(record.split(",", -1).zip(fieldTypes).map { + case (v, dataType) => + val value = if (v == "") null else v + // `Cast`ed values are always of internal types (e.g. UTF8String instead of String) + Cast(Literal(value), dataType).eval() + }) + }.filter(predicate).map(projection) + + // Appends partition values + val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes + val joinedRow = new JoinedRow() + val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput) + + unsafeRowIterator.map { dataRow => + appendPartitionColumns(joinedRow(dataRow, file.partitionValues)) + } + } + } +} + +class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter { + private val recordWriter: RecordWriter[NullWritable, Text] = + new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context) + + override def write(row: Row): Unit = { + val serialized = row.toSeq.map { v => + if (v == null) "" else v.toString + }.mkString(",") + recordWriter.write(null, new Text(serialized)) + } + + override def close(): Unit = { + recordWriter.close(context) + } +} + +class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullWritable, Text] { + val numberFormat = NumberFormat.getInstance() + + numberFormat.setMinimumIntegerDigits(5) + numberFormat.setGroupingUsed(false) + + override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { + val configuration = context.getConfiguration + val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID") + val taskAttemptId = context.getTaskAttemptID + val split = taskAttemptId.getTaskID.getId + val name = FileOutputFormat.getOutputName(context) + new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId") + } +} + +object SimpleTextRelation { + // Used to test column pruning + var requiredColumns: Seq[String] = Nil + + // Used to test filter push-down + var pushedFilters: Set[Filter] = Set.empty + + // Used to test failed committer + var failCommitter = false + + // Used to test failed writer + var failWriter = false + + // Used to test failure callback + var callbackCalled = false +} |