aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala109
1 files changed, 43 insertions, 66 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 8ffcef8566..d7504936d9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -100,80 +100,57 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
}
}
- ignore("test all data types") {
- withTempPath { file =>
- // Create the schema.
- val struct =
- StructType(
- StructField("f1", FloatType, true) ::
- StructField("f2", ArrayType(BooleanType), true) :: Nil)
- // TODO: add CalendarIntervalType to here once we can save it out.
- val dataTypes =
- Seq(
- StringType, BinaryType, NullType, BooleanType,
- ByteType, ShortType, IntegerType, LongType,
- FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
- DateType, TimestampType,
- ArrayType(IntegerType), MapType(StringType, LongType), struct,
- new MyDenseVectorUDT())
- val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
- StructField(s"col$index", dataType, nullable = true)
- }
- val schema = StructType(fields)
-
- // Generate data at the driver side. We need to materialize the data first and then
- // create RDD.
- val maybeDataGenerator =
- RandomDataGenerator.forType(
- dataType = schema,
+ private val supportedDataTypes = Seq(
+ StringType, BinaryType,
+ NullType, BooleanType,
+ ByteType, ShortType, IntegerType, LongType,
+ FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
+ DateType, TimestampType,
+ ArrayType(IntegerType),
+ MapType(StringType, LongType),
+ new StructType()
+ .add("f1", FloatType, nullable = true)
+ .add("f2", ArrayType(BooleanType, containsNull = true), nullable = true),
+ new MyDenseVectorUDT()
+ ).filter(supportsDataType)
+
+ for (dataType <- supportedDataTypes) {
+ test(s"test all data types - $dataType") {
+ withTempPath { file =>
+ val path = file.getCanonicalPath
+
+ val dataGenerator = RandomDataGenerator.forType(
+ dataType = dataType,
nullable = true,
- seed = Some(System.nanoTime()))
- val dataGenerator =
- maybeDataGenerator
- .getOrElse(fail(s"Failed to create data generator for schema $schema"))
- val data = (1 to 10).map { i =>
- dataGenerator.apply() match {
- case row: Row => row
- case null => Row.fromSeq(Seq.fill(schema.length)(null))
- case other =>
- fail(s"Row or null is expected to be generated, " +
- s"but a ${other.getClass.getCanonicalName} is generated.")
+ seed = Some(System.nanoTime())
+ ).getOrElse {
+ fail(s"Failed to create data generator for schema $dataType")
}
- }
- // Create a DF for the schema with random data.
- val rdd = sqlContext.sparkContext.parallelize(data, 10)
- val df = sqlContext.createDataFrame(rdd, schema)
+ // Create a DF for the schema with random data. The index field is used to sort the
+ // DataFrame. This is a workaround for SPARK-10591.
+ val schema = new StructType()
+ .add("index", IntegerType, nullable = false)
+ .add("col", dataType, nullable = true)
+ val rdd = sqlContext.sparkContext.parallelize((1 to 10).map(i => Row(i, dataGenerator())))
+ val df = sqlContext.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
- // All columns that have supported data types of this source.
- val supportedColumns = schema.fields.collect {
- case StructField(name, dataType, _, _) if supportsDataType(dataType) => name
- }
- val selectedColumns = util.Random.shuffle(supportedColumns.toSeq)
-
- val dfToBeSaved = df.selectExpr(selectedColumns: _*)
-
- // Save the data out.
- dfToBeSaved
- .write
- .format(dataSourceName)
- .option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
- .save(file.getCanonicalPath)
+ df.write
+ .mode("overwrite")
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .save(path)
- val loadedDF =
- sqlContext
+ val loadedDF = sqlContext
.read
.format(dataSourceName)
- .schema(dfToBeSaved.schema)
- .option("dataSchema", dfToBeSaved.schema.json) // This option is just used by tests.
- .load(file.getCanonicalPath)
- .selectExpr(selectedColumns: _*)
+ .option("dataSchema", df.schema.json)
+ .schema(df.schema)
+ .load(path)
+ .orderBy("index")
- // Read the data back.
- checkAnswer(
- loadedDF,
- dfToBeSaved
- )
+ checkAnswer(loadedDF, df)
+ }
}
}