aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala92
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala33
2 files changed, 123 insertions, 2 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
new file mode 100644
index 0000000000..bb5f1febe9
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.parquet.ParquetCompatibilityTest
+import org.apache.spark.sql.{Row, SQLConf, SQLContext}
+
+class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest {
+ import ParquetCompatibilityTest.makeNullable
+
+ override val sqlContext: SQLContext = TestHive
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+
+ withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") {
+ withTempTable("data") {
+ sqlContext.sql(
+ s"""CREATE TABLE parquet_compat(
+ | bool_column BOOLEAN,
+ | byte_column TINYINT,
+ | short_column SMALLINT,
+ | int_column INT,
+ | long_column BIGINT,
+ | float_column FLOAT,
+ | double_column DOUBLE,
+ |
+ | strings_column ARRAY<STRING>,
+ | int_to_string_column MAP<INT, STRING>
+ |)
+ |STORED AS PARQUET
+ |LOCATION '${parquetStore.getCanonicalPath}'
+ """.stripMargin)
+
+ val schema = sqlContext.table("parquet_compat").schema
+ val rowRDD = sqlContext.sparkContext.parallelize(makeRows).coalesce(1)
+ sqlContext.createDataFrame(rowRDD, schema).registerTempTable("data")
+ sqlContext.sql("INSERT INTO TABLE parquet_compat SELECT * FROM data")
+ }
+ }
+ }
+
+ override protected def afterAll(): Unit = {
+ sqlContext.sql("DROP TABLE parquet_compat")
+ }
+
+ test("Read Parquet file generated by parquet-hive") {
+ logInfo(
+ s"""Schema of the Parquet file written by parquet-hive:
+ |${readParquetSchema(parquetStore.getCanonicalPath)}
+ """.stripMargin)
+
+ // Unfortunately parquet-hive doesn't add `UTF8` annotation to BINARY when writing strings.
+ // Have to assume all BINARY values are strings here.
+ withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") {
+ checkAnswer(sqlContext.read.parquet(parquetStore.getCanonicalPath), makeRows)
+ }
+ }
+
+ def makeRows: Seq[Row] = {
+ (0 until 10).map { i =>
+ def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i)
+
+ Row(
+ nullable(i % 2 == 0: java.lang.Boolean),
+ nullable(i.toByte: java.lang.Byte),
+ nullable((i + 1).toShort: java.lang.Short),
+ nullable(i + 2: Integer),
+ nullable(i.toLong * 10: java.lang.Long),
+ nullable(i.toFloat + 0.1f: java.lang.Float),
+ nullable(i.toDouble + 0.2d: java.lang.Double),
+ nullable(Seq.tabulate(3)(n => s"arr_${i + n}")),
+ nullable(Seq.tabulate(3)(n => (i + n: Integer) -> s"val_${i + n}").toMap))
+ }
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index c2e0980093..9d79a4b007 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -21,14 +21,16 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll
+import org.apache.spark.sql._
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.hive.execution.HiveTableScan
+import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, QueryTest, Row, SQLConf, SaveMode}
import org.apache.spark.util.Utils
// The data where the partitioning key exists only in the directory structure.
@@ -685,6 +687,31 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
sql("drop table spark_6016_fix")
}
+
+ test("SPARK-8811: compatibility with array of struct in Hive") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ withTable("array_of_struct") {
+ val conf = Seq(
+ HiveContext.CONVERT_METASTORE_PARQUET.key -> "false",
+ SQLConf.PARQUET_BINARY_AS_STRING.key -> "true",
+ SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key -> "true")
+
+ withSQLConf(conf: _*) {
+ sql(
+ s"""CREATE TABLE array_of_struct
+ |STORED AS PARQUET LOCATION '$path'
+ |AS SELECT '1st', '2nd', ARRAY(NAMED_STRUCT('a', 'val_a', 'b', 'val_b'))
+ """.stripMargin)
+
+ checkAnswer(
+ sqlContext.read.parquet(path),
+ Row("1st", "2nd", Seq(Row("val_a", "val_b"))))
+ }
+ }
+ }
+ }
}
class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
@@ -762,7 +789,9 @@ class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase {
/**
* A collection of tests for parquet data with various forms of partitioning.
*/
-abstract class ParquetPartitioningTest extends QueryTest with BeforeAndAfterAll {
+abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils with BeforeAndAfterAll {
+ override def sqlContext: SQLContext = TestHive
+
var partitionedTableDir: File = null
var normalTableDir: File = null
var partitionedTableDirWithKey: File = null