diff options
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala | 92 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala | 33 |
2 files changed, 123 insertions, 2 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala new file mode 100644 index 0000000000..bb5f1febe9 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.parquet.ParquetCompatibilityTest +import org.apache.spark.sql.{Row, SQLConf, SQLContext} + +class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest { + import ParquetCompatibilityTest.makeNullable + + override val sqlContext: SQLContext = TestHive + + override protected def beforeAll(): Unit = { + super.beforeAll() + + withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") { + withTempTable("data") { + sqlContext.sql( + s"""CREATE TABLE parquet_compat( + | bool_column BOOLEAN, + | byte_column TINYINT, + | short_column SMALLINT, + | int_column INT, + | long_column BIGINT, + | float_column FLOAT, + | double_column DOUBLE, + | + | strings_column ARRAY<STRING>, + | int_to_string_column MAP<INT, STRING> + |) + |STORED AS PARQUET + |LOCATION '${parquetStore.getCanonicalPath}' + """.stripMargin) + + val schema = sqlContext.table("parquet_compat").schema + val rowRDD = sqlContext.sparkContext.parallelize(makeRows).coalesce(1) + sqlContext.createDataFrame(rowRDD, schema).registerTempTable("data") + sqlContext.sql("INSERT INTO TABLE parquet_compat SELECT * FROM data") + } + } + } + + override protected def afterAll(): Unit = { + sqlContext.sql("DROP TABLE parquet_compat") + } + + test("Read Parquet file generated by parquet-hive") { + logInfo( + s"""Schema of the Parquet file written by parquet-hive: + |${readParquetSchema(parquetStore.getCanonicalPath)} + """.stripMargin) + + // Unfortunately parquet-hive doesn't add `UTF8` annotation to BINARY when writing strings. + // Have to assume all BINARY values are strings here. + withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") { + checkAnswer(sqlContext.read.parquet(parquetStore.getCanonicalPath), makeRows) + } + } + + def makeRows: Seq[Row] = { + (0 until 10).map { i => + def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i) + + Row( + nullable(i % 2 == 0: java.lang.Boolean), + nullable(i.toByte: java.lang.Byte), + nullable((i + 1).toShort: java.lang.Short), + nullable(i + 2: Integer), + nullable(i.toLong * 10: java.lang.Long), + nullable(i.toFloat + 0.1f: java.lang.Float), + nullable(i.toDouble + 0.2d: java.lang.Double), + nullable(Seq.tabulate(3)(n => s"arr_${i + n}")), + nullable(Seq.tabulate(3)(n => (i + n: Integer) -> s"val_${i + n}").toMap)) + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index c2e0980093..9d79a4b007 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -21,14 +21,16 @@ import java.io.File import org.scalatest.BeforeAndAfterAll +import org.apache.spark.sql._ import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD} import org.apache.spark.sql.hive.execution.HiveTableScan +import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan} import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, QueryTest, Row, SQLConf, SaveMode} import org.apache.spark.util.Utils // The data where the partitioning key exists only in the directory structure. @@ -685,6 +687,31 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest { sql("drop table spark_6016_fix") } + + test("SPARK-8811: compatibility with array of struct in Hive") { + withTempPath { dir => + val path = dir.getCanonicalPath + + withTable("array_of_struct") { + val conf = Seq( + HiveContext.CONVERT_METASTORE_PARQUET.key -> "false", + SQLConf.PARQUET_BINARY_AS_STRING.key -> "true", + SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key -> "true") + + withSQLConf(conf: _*) { + sql( + s"""CREATE TABLE array_of_struct + |STORED AS PARQUET LOCATION '$path' + |AS SELECT '1st', '2nd', ARRAY(NAMED_STRUCT('a', 'val_a', 'b', 'val_b')) + """.stripMargin) + + checkAnswer( + sqlContext.read.parquet(path), + Row("1st", "2nd", Seq(Row("val_a", "val_b")))) + } + } + } + } } class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase { @@ -762,7 +789,9 @@ class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase { /** * A collection of tests for parquet data with various forms of partitioning. */ -abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll { +abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils with BeforeAndAfterAll { + override def sqlContext: SQLContext = TestHive + var partitionedTableDir: File = null var normalTableDir: File = null var partitionedTableDirWithKey: File = null |