aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/test/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive/src/test/scala/org/apache')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala82
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala67
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala68
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala167
4 files changed, 384 insertions, 0 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala
new file mode 100644
index 0000000000..08e83b7f69
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class CommitFailureTestRelationSuite extends SQLTestUtils with TestHiveSingleton {
+ // When committing a task, `CommitFailureTestSource` throws an exception for testing purpose.
+ val dataSourceName: String = classOf[CommitFailureTestSource].getCanonicalName
+
+ test("SPARK-7684: commitTask() failure should fallback to abortTask()") {
+ withTempPath { file =>
+ // Here we coalesce partition number to 1 to ensure that only a single task is issued. This
+ // prevents race condition happened when FileOutputCommitter tries to remove the `_temporary`
+ // directory while committing/aborting the job. See SPARK-8513 for more details.
+ val df = sqlContext.range(0, 10).coalesce(1)
+ intercept[SparkException] {
+ df.write.format(dataSourceName).save(file.getCanonicalPath)
+ }
+
+ val fs = new Path(file.getCanonicalPath).getFileSystem(SparkHadoopUtil.get.conf)
+ assert(!fs.exists(new Path(file.getCanonicalPath, "_temporary")))
+ }
+ }
+
+ test("call failure callbacks before close writer - default") {
+ SimpleTextRelation.failCommitter = false
+ withTempPath { file =>
+ // fail the job in the middle of writing
+ val divideByZero = udf((x: Int) => { x / (x - 1)})
+ val df = sqlContext.range(0, 10).coalesce(1).select(divideByZero(col("id")))
+
+ SimpleTextRelation.callbackCalled = false
+ intercept[SparkException] {
+ df.write.format(dataSourceName).save(file.getCanonicalPath)
+ }
+ assert(SimpleTextRelation.callbackCalled, "failure callback should be called")
+
+ val fs = new Path(file.getCanonicalPath).getFileSystem(SparkHadoopUtil.get.conf)
+ assert(!fs.exists(new Path(file.getCanonicalPath, "_temporary")))
+ }
+ }
+
+ test("call failure callbacks before close writer - partitioned") {
+ SimpleTextRelation.failCommitter = false
+ withTempPath { file =>
+ // fail the job in the middle of writing
+ val df = sqlContext.range(0, 10).coalesce(1).select(col("id").mod(2).as("key"), col("id"))
+
+ SimpleTextRelation.callbackCalled = false
+ SimpleTextRelation.failWriter = true
+ intercept[SparkException] {
+ df.write.format(dataSourceName).partitionBy("key").save(file.getCanonicalPath)
+ }
+ assert(SimpleTextRelation.callbackCalled, "failure callback should be called")
+
+ val fs = new Path(file.getCanonicalPath).getFileSystem(SparkHadoopUtil.get.conf)
+ assert(!fs.exists(new Path(file.getCanonicalPath, "_temporary")))
+ }
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
new file mode 100644
index 0000000000..6d7e7b77df
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.types.StructType
+
+class CommitFailureTestSource extends SimpleTextSource {
+ /**
+ * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can
+ * be put here. For example, user defined output committer can be configured here
+ * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
+ */
+ override def prepareWrite(
+ sqlContext: SQLContext,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory =
+ new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ bucketId: Option[Int],
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ new SimpleTextOutputWriter(path, context) {
+ var failed = false
+ TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
+ failed = true
+ SimpleTextRelation.callbackCalled = true
+ }
+
+ override def write(row: Row): Unit = {
+ if (SimpleTextRelation.failWriter) {
+ sys.error("Intentional task writer failure for testing purpose.")
+
+ }
+ super.write(row)
+ }
+
+ override def close(): Unit = {
+ super.close()
+ sys.error("Intentional task commitment failure for testing purpose.")
+ }
+ }
+ }
+ }
+
+ override def shortName(): String = "commit-failure-test"
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
new file mode 100644
index 0000000000..71e3457d25
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.types._
+
+class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with PredicateHelper {
+ override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName
+
+ // We have a very limited number of supported types at here since it is just for a
+ // test relation and we do very basic testing at here.
+ override protected def supportsDataType(dataType: DataType): Boolean = dataType match {
+ case _: BinaryType => false
+ // We are using random data generator and the generated strings are not really valid string.
+ case _: StringType => false
+ case _: BooleanType => false // see https://issues.apache.org/jira/browse/SPARK-10442
+ case _: CalendarIntervalType => false
+ case _: DateType => false
+ case _: TimestampType => false
+ case _: ArrayType => false
+ case _: MapType => false
+ case _: StructType => false
+ case _: UserDefinedType[_] => false
+ case _ => true
+ }
+
+ test("save()/load() - partitioned table - simple queries - partition columns in data") {
+ withTempDir { file =>
+ val basePath = new Path(file.getCanonicalPath)
+ val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf)
+ val qualifiedBasePath = fs.makeQualified(basePath)
+
+ for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) {
+ val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2")
+ sparkContext
+ .parallelize(for (i <- 1 to 3) yield s"$i,val_$i,$p1")
+ .saveAsTextFile(partitionDir.toString)
+ }
+
+ val dataSchemaWithPartition =
+ StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
+
+ checkQueries(
+ hiveContext.read.format(dataSourceName)
+ .option("dataSchema", dataSchemaWithPartition.json)
+ .load(file.getCanonicalPath))
+ }
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
new file mode 100644
index 0000000000..113b124be3
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import java.text.NumberFormat
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{NullWritable, Text}
+import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
+
+import org.apache.spark.sql.{sources, Row, SQLContext}
+import org.apache.spark.sql.catalyst.{expressions, InternalRow}
+import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, GenericInternalRow, InterpretedPredicate, InterpretedProjection, JoinedRow, Literal}
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, PartitionedFile}
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+class SimpleTextSource extends FileFormat with DataSourceRegister {
+ override def shortName(): String = "test"
+
+ override def inferSchema(
+ sqlContext: SQLContext,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = {
+ Some(DataType.fromJson(options("dataSchema")).asInstanceOf[StructType])
+ }
+
+ override def prepareWrite(
+ sqlContext: SQLContext,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = new OutputWriterFactory {
+ override private[sql] def newInstance(
+ path: String,
+ bucketId: Option[Int],
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ new SimpleTextOutputWriter(path, context)
+ }
+ }
+
+ override def buildReader(
+ sqlContext: SQLContext,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
+
+ SimpleTextRelation.requiredColumns = requiredSchema.fieldNames
+ SimpleTextRelation.pushedFilters = filters.toSet
+
+ val fieldTypes = dataSchema.map(_.dataType)
+ val inputAttributes = dataSchema.toAttributes
+ val outputAttributes = requiredSchema.flatMap { field =>
+ inputAttributes.find(_.name == field.name)
+ }
+
+ val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+ val broadcastedConf =
+ sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
+
+ (file: PartitionedFile) => {
+ val predicate = {
+ val filterCondition: Expression = filters.collect {
+ // According to `unhandledFilters`, `SimpleTextRelation` only handles `GreaterThan` filter
+ case sources.GreaterThan(column, value) =>
+ val dataType = dataSchema(column).dataType
+ val literal = Literal.create(value, dataType)
+ val attribute = inputAttributes.find(_.name == column).get
+ expressions.GreaterThan(attribute, literal)
+ }.reduceOption(expressions.And).getOrElse(Literal(true))
+ InterpretedPredicate.create(filterCondition, inputAttributes)
+ }
+
+ // Uses a simple projection to simulate column pruning
+ val projection = new InterpretedProjection(outputAttributes, inputAttributes)
+
+ val unsafeRowIterator =
+ new HadoopFileLinesReader(file, broadcastedConf.value.value).map { line =>
+ val record = line.toString
+ new GenericInternalRow(record.split(",", -1).zip(fieldTypes).map {
+ case (v, dataType) =>
+ val value = if (v == "") null else v
+ // `Cast`ed values are always of internal types (e.g. UTF8String instead of String)
+ Cast(Literal(value), dataType).eval()
+ })
+ }.filter(predicate).map(projection)
+
+ // Appends partition values
+ val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes
+ val joinedRow = new JoinedRow()
+ val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput)
+
+ unsafeRowIterator.map { dataRow =>
+ appendPartitionColumns(joinedRow(dataRow, file.partitionValues))
+ }
+ }
+ }
+}
+
+class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter {
+ private val recordWriter: RecordWriter[NullWritable, Text] =
+ new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context)
+
+ override def write(row: Row): Unit = {
+ val serialized = row.toSeq.map { v =>
+ if (v == null) "" else v.toString
+ }.mkString(",")
+ recordWriter.write(null, new Text(serialized))
+ }
+
+ override def close(): Unit = {
+ recordWriter.close(context)
+ }
+}
+
+class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullWritable, Text] {
+ val numberFormat = NumberFormat.getInstance()
+
+ numberFormat.setMinimumIntegerDigits(5)
+ numberFormat.setGroupingUsed(false)
+
+ override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+ val configuration = context.getConfiguration
+ val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
+ val taskAttemptId = context.getTaskAttemptID
+ val split = taskAttemptId.getTaskID.getId
+ val name = FileOutputFormat.getOutputName(context)
+ new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId")
+ }
+}
+
+object SimpleTextRelation {
+ // Used to test column pruning
+ var requiredColumns: Seq[String] = Nil
+
+ // Used to test filter push-down
+ var pushedFilters: Set[Filter] = Set.empty
+
+ // Used to test failed committer
+ var failCommitter = false
+
+ // Used to test failed writer
+ var failWriter = false
+
+ // Used to test failure callback
+ var callbackCalled = false
+}