aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-09-18 12:19:08 -0700
committerYin Huai <yhuai@databricks.com>2015-09-18 12:19:08 -0700
commit00a2911c5bea67a1a4796fb1d6fd5d0a8ee79001 (patch)
tree915ddb6a613f07bcb16c10df5c7a028d157df6c2 /sql
parent35e8ab939000d4a1a01c1af4015c25ff6f4013a3 (diff)
downloadspark-00a2911c5bea67a1a4796fb1d6fd5d0a8ee79001.tar.gz
spark-00a2911c5bea67a1a4796fb1d6fd5d0a8ee79001.tar.bz2
spark-00a2911c5bea67a1a4796fb1d6fd5d0a8ee79001.zip
[SPARK-10540] Fixes flaky all-data-type test
This PR breaks the original test case into multiple ones (one test case for each data type). In this way, test failure output can be much more readable. Within each test case, we build a table with two columns, one of them is for the data type to test, the other is an "index" column, which is used to sort the DataFrame and workaround [SPARK-10591] [1] [1]: https://issues.apache.org/jira/browse/SPARK-10591 Author: Cheng Lian <lian@databricks.com> Closes #8768 from liancheng/spark-10540/test-all-data-types.
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala109
1 files changed, 43 insertions, 66 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 8ffcef8566..d7504936d9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -100,80 +100,57 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
}
- ignore("test all data types") {
- withTempPath { file =>
- // Create the schema.
- val struct =
- StructType(
- StructField("f1", FloatType, true) ::
- StructField("f2", ArrayType(BooleanType), true) :: Nil)
- // TODO: add CalendarIntervalType to here once we can save it out.
- val dataTypes =
- Seq(
- StringType, BinaryType, NullType, BooleanType,
- ByteType, ShortType, IntegerType, LongType,
- FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
- DateType, TimestampType,
- ArrayType(IntegerType), MapType(StringType, LongType), struct,
- new MyDenseVectorUDT())
- val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
- StructField(s"col$index", dataType, nullable = true)
- }
- val schema = StructType(fields)
-
- // Generate data at the driver side. We need to materialize the data first and then
- // create RDD.
- val maybeDataGenerator =
- RandomDataGenerator.forType(
- dataType = schema,
+ private val supportedDataTypes = Seq(
+ StringType, BinaryType,
+ NullType, BooleanType,
+ ByteType, ShortType, IntegerType, LongType,
+ FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
+ DateType, TimestampType,
+ ArrayType(IntegerType),
+ MapType(StringType, LongType),
+ new StructType()
+ .add("f1", FloatType, nullable = true)
+ .add("f2", ArrayType(BooleanType, containsNull = true), nullable = true),
+ new MyDenseVectorUDT()
+ ).filter(supportsDataType)
+
+ for (dataType <- supportedDataTypes) {
+ test(s"test all data types - $dataType") {
+ withTempPath { file =>
+ val path = file.getCanonicalPath
+
+ val dataGenerator = RandomDataGenerator.forType(
+ dataType = dataType,
nullable = true,
- seed = Some(System.nanoTime()))
- val dataGenerator =
- maybeDataGenerator
- .getOrElse(fail(s"Failed to create data generator for schema $schema"))
- val data = (1 to 10).map { i =>
- dataGenerator.apply() match {
- case row: Row => row
- case null => Row.fromSeq(Seq.fill(schema.length)(null))
- case other =>
- fail(s"Row or null is expected to be generated, " +
- s"but a ${other.getClass.getCanonicalName} is generated.")
+ seed = Some(System.nanoTime())
+ ).getOrElse {
+ fail(s"Failed to create data generator for schema $dataType")
}
- }
- // Create a DF for the schema with random data.
- val rdd = sqlContext.sparkContext.parallelize(data, 10)
- val df = sqlContext.createDataFrame(rdd, schema)
+ // Create a DF for the schema with random data. The index field is used to sort the
+ // DataFrame. This is a workaround for SPARK-10591.
+ val schema = new StructType()
+ .add("index", IntegerType, nullable = false)
+ .add("col", dataType, nullable = true)
+ val rdd = sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
+ val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
- // All columns that have supported data types of this source.
- val supportedColumns = schema.fields.collect {
- case StructField(name, dataType, _, _) if supportsDataType(dataType) => name
- }
- val selectedColumns = util.Random.shuffle(supportedColumns.toSeq)
-
- val dfToBeSaved = df.selectExpr(selectedColumns: _*)
-
- // Save the data out.
- dfToBeSaved
- .write
- .format(dataSourceName)
- .option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
- .save(file.getCanonicalPath)
+ df.write
+ .mode("overwrite")
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .save(path)
- val loadedDF =
- sqlContext
+ val loadedDF = sqlContext
.read
.format(dataSourceName)
- .schema(dfToBeSaved.schema)
- .option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
- .load(file.getCanonicalPath)
- .selectExpr(selectedColumns: _*)
+ .option("dataSchema", df.schema.json)
+ .schema(df.schema)
+ .load(path)
+ .orderBy("index")
- // Read the data back.
- checkAnswer(
- loadedDF,
- dfToBeSaved
- )
+ checkAnswer(loadedDF, df)
+ }
}
}