aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'sql/hive/src/test')
-rw-r--r--sql/hive/src/test/resources/log4j.properties47
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala126
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala38
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala379
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala708
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala70
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala144
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala65
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala33
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala32
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala164
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala161
12 files changed, 1967 insertions, 0 deletions
diff --git a/sql/hive/src/test/resources/log4j.properties b/sql/hive/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..5e17e3b596
--- /dev/null
+++ b/sql/hive/src/test/resources/log4j.properties
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file core/target/unit-tests.log
+log4j.rootLogger=DEBUG, CA, FA
+
+#Console Appender
+log4j.appender.CA=org.apache.log4j.ConsoleAppender
+log4j.appender.CA.layout=org.apache.log4j.PatternLayout
+log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
+log4j.appender.CA.Threshold = WARN
+
+
+#File Appender
+log4j.appender.FA=org.apache.log4j.FileAppender
+log4j.appender.FA.append=false
+log4j.appender.FA.file=target/unit-tests.log
+log4j.appender.FA.layout=org.apache.log4j.PatternLayout
+log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Set the logger level of File Appender to WARN
+log4j.appender.FA.Threshold = INFO
+
+# Some packages are noisy for no good reason.
+log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false
+log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF
+
+log4j.additivity.org.apache.hadoop.hive.metastore.RetryingHMSHandler=false
+log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF
+
+log4j.additivity.hive.ql.metadata.Hive=false
+log4j.logger.hive.ql.metadata.Hive=OFF
+
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala
new file mode 100644
index 0000000000..4b45e69860
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/BigDataBenchmarkSuite.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package hive
+package execution
+
+import java.io.File
+
+/**
+ * A set of test cases based on the big-data-benchmark.
+ * https://amplab.cs.berkeley.edu/benchmark/
+ */
+class BigDataBenchmarkSuite extends HiveComparisonTest {
+ import TestHive._
+
+ val testDataDirectory = new File("target/big-data-benchmark-testdata")
+
+ val testTables = Seq(
+ TestTable(
+ "rankings",
+ s"""
+ |CREATE EXTERNAL TABLE rankings (
+ | pageURL STRING,
+ | pageRank INT,
+ | avgDuration INT)
+ | ROW FORMAT DELIMITED FIELDS TERMINATED BY ","
+ | STORED AS TEXTFILE LOCATION "${new File(testDataDirectory, "rankings").getCanonicalPath}"
+ """.stripMargin.cmd),
+ TestTable(
+ "scratch",
+ s"""
+ |CREATE EXTERNAL TABLE scratch (
+ | pageURL STRING,
+ | pageRank INT,
+ | avgDuration INT)
+ | ROW FORMAT DELIMITED FIELDS TERMINATED BY ","
+ | STORED AS TEXTFILE LOCATION "${new File(testDataDirectory, "scratch").getCanonicalPath}"
+ """.stripMargin.cmd),
+ TestTable(
+ "uservisits",
+ s"""
+ |CREATE EXTERNAL TABLE uservisits (
+ | sourceIP STRING,
+ | destURL STRING,
+ | visitDate STRING,
+ | adRevenue DOUBLE,
+ | userAgent STRING,
+ | countryCode STRING,
+ | languageCode STRING,
+ | searchWord STRING,
+ | duration INT)
+ | ROW FORMAT DELIMITED FIELDS TERMINATED BY ","
+ | STORED AS TEXTFILE LOCATION "${new File(testDataDirectory, "uservisits").getCanonicalPath}"
+ """.stripMargin.cmd),
+ TestTable(
+ "documents",
+ s"""
+ |CREATE EXTERNAL TABLE documents (line STRING)
+ |STORED AS TEXTFILE
+ |LOCATION "${new File(testDataDirectory, "crawl").getCanonicalPath}"
+ """.stripMargin.cmd))
+
+ testTables.foreach(registerTestTable)
+
+ if (!testDataDirectory.exists()) {
+ // TODO: Auto download the files on demand.
+ ignore("No data files found for BigDataBenchmark tests.") {}
+ } else {
+ createQueryTest("query1",
+ "SELECT pageURL, pageRank FROM rankings WHERE pageRank > 1")
+
+ createQueryTest("query2",
+ "SELECT SUBSTR(sourceIP, 1, 10), SUM(adRevenue) FROM uservisits GROUP BY SUBSTR(sourceIP, 1, 10)")
+
+ createQueryTest("query3",
+ """
+ |SELECT sourceIP,
+ | sum(adRevenue) as totalRevenue,
+ | avg(pageRank) as pageRank
+ |FROM
+ | rankings R JOIN
+ | (SELECT sourceIP, destURL, adRevenue
+ | FROM uservisits UV
+ | WHERE UV.visitDate > "1980-01-01"
+ | AND UV.visitDate < "1980-04-01")
+ | NUV ON (R.pageURL = NUV.destURL)
+ |GROUP BY sourceIP
+ |ORDER BY totalRevenue DESC
+ |LIMIT 1
+ """.stripMargin)
+
+ createQueryTest("query4",
+ """
+ |DROP TABLE IF EXISTS url_counts_partial;
+ |CREATE TABLE url_counts_partial AS
+ | SELECT TRANSFORM (line)
+ | USING 'python target/url_count.py' as (sourcePage,
+ | destPage, count) from documents;
+ |DROP TABLE IF EXISTS url_counts_total;
+ |CREATE TABLE url_counts_total AS
+ | SELECT SUM(count) AS totalCount, destpage
+ | FROM url_counts_partial GROUP BY destpage
+ |-- The following queries run, but generate different results in HIVE likely because the UDF is not deterministic
+ |-- given different input splits.
+ |-- SELECT CAST(SUM(count) AS INT) FROM url_counts_partial
+ |-- SELECT COUNT(*) FROM url_counts_partial
+ |-- SELECT * FROM url_counts_partial
+ |-- SELECT * FROM url_counts_total
+ """.stripMargin)
+ }
+} \ No newline at end of file
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala
new file mode 100644
index 0000000000..a12ab23946
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+package sql
+package hive
+package execution
+
+
+import org.scalatest.{FunSuite, BeforeAndAfterAll}
+
+class ConcurrentHiveSuite extends FunSuite with BeforeAndAfterAll {
+ ignore("multiple instances not supported") {
+ test("Multiple Hive Instances") {
+ (1 to 10).map { i =>
+ val ts =
+ new TestHiveContext(new SparkContext("local", s"TestSQLContext$i", new SparkConf()))
+ ts.executeSql("SHOW TABLES").toRdd.collect()
+ ts.executeSql("SELECT * FROM src").toRdd.collect()
+ ts.executeSql("SHOW TABLES").toRdd.collect()
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
new file mode 100644
index 0000000000..8a5b97b7a0
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package hive
+package execution
+
+import java.io._
+import org.scalatest.{BeforeAndAfterAll, FunSuite, GivenWhenThen}
+
+import catalyst.plans.logical.{ExplainCommand, NativeCommand}
+import catalyst.plans._
+import catalyst.util._
+
+import org.apache.spark.sql.execution.Sort
+
+/**
+ * Allows the creations of tests that execute the same query against both hive
+ * and catalyst, comparing the results.
+ *
+ * The "golden" results from Hive are cached in an retrieved both from the classpath and
+ * [[answerCache]] to speed up testing.
+ *
+ * See the documentation of public vals in this class for information on how test execution can be
+ * configured using system properties.
+ */
+abstract class HiveComparisonTest extends FunSuite with BeforeAndAfterAll with GivenWhenThen with Logging {
+
+ /**
+ * When set, any cache files that result in test failures will be deleted. Used when the test
+ * harness or hive have been updated thus requiring new golden answers to be computed for some
+ * tests. Also prevents the classpath being used when looking for golden answers as these are
+ * usually stale.
+ */
+ val recomputeCache = System.getProperty("spark.hive.recomputeCache") != null
+
+ protected val shardRegEx = "(\\d+):(\\d+)".r
+ /**
+ * Allows multiple JVMs to be run in parallel, each responsible for portion of all test cases.
+ * Format `shardId:numShards`. Shard ids should be zero indexed. E.g. -Dspark.hive.testshard=0:4.
+ */
+ val shardInfo = Option(System.getProperty("spark.hive.shard")).map {
+ case shardRegEx(id, total) => (id.toInt, total.toInt)
+ }
+
+ protected val targetDir = new File("target")
+
+ /**
+ * When set, this comma separated list is defines directories that contain the names of test cases
+ * that should be skipped.
+ *
+ * For example when `-Dspark.hive.skiptests=passed,hiveFailed` is specified and test cases listed
+ * in [[passedDirectory]] or [[hiveFailedDirectory]] will be skipped.
+ */
+ val skipDirectories =
+ Option(System.getProperty("spark.hive.skiptests"))
+ .toSeq
+ .flatMap(_.split(","))
+ .map(name => new File(targetDir, s"$suiteName.$name"))
+
+ val runOnlyDirectories =
+ Option(System.getProperty("spark.hive.runonlytests"))
+ .toSeq
+ .flatMap(_.split(","))
+ .map(name => new File(targetDir, s"$suiteName.$name"))
+
+ /** The local directory with cached golden answer will be stored. */
+ protected val answerCache = new File("src/test/resources/golden")
+ if (!answerCache.exists) {
+ answerCache.mkdir()
+ }
+
+ /** The [[ClassLoader]] that contains test dependencies. Used to look for golden answers. */
+ protected val testClassLoader = this.getClass.getClassLoader
+
+ /** Directory containing a file for each test case that passes. */
+ val passedDirectory = new File(targetDir, s"$suiteName.passed")
+ if (!passedDirectory.exists()) {
+ passedDirectory.mkdir() // Not atomic!
+ }
+
+ /** Directory containing output of tests that fail to execute with Catalyst. */
+ val failedDirectory = new File(targetDir, s"$suiteName.failed")
+ if (!failedDirectory.exists()) {
+ failedDirectory.mkdir() // Not atomic!
+ }
+
+ /** Directory containing output of tests where catalyst produces the wrong answer. */
+ val wrongDirectory = new File(targetDir, s"$suiteName.wrong")
+ if (!wrongDirectory.exists()) {
+ wrongDirectory.mkdir() // Not atomic!
+ }
+
+ /** Directory containing output of tests where we fail to generate golden output with Hive. */
+ val hiveFailedDirectory = new File(targetDir, s"$suiteName.hiveFailed")
+ if (!hiveFailedDirectory.exists()) {
+ hiveFailedDirectory.mkdir() // Not atomic!
+ }
+
+ /** All directories that contain per-query output files */
+ val outputDirectories = Seq(
+ passedDirectory,
+ failedDirectory,
+ wrongDirectory,
+ hiveFailedDirectory)
+
+ protected val cacheDigest = java.security.MessageDigest.getInstance("MD5")
+ protected def getMd5(str: String): String = {
+ val digest = java.security.MessageDigest.getInstance("MD5")
+ digest.update(str.getBytes)
+ new java.math.BigInteger(1, digest.digest).toString(16)
+ }
+
+ protected def prepareAnswer(
+ hiveQuery: TestHive.type#SqlQueryExecution,
+ answer: Seq[String]): Seq[String] = {
+ val orderedAnswer = hiveQuery.logical match {
+ // Clean out non-deterministic time schema info.
+ case _: NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "")
+ case _: ExplainCommand => answer
+ case _ =>
+ // TODO: Really we only care about the final total ordering here...
+ val isOrdered = hiveQuery.executedPlan.collect {
+ case s @ Sort(_, global, _) if global => s
+ }.nonEmpty
+ // If the query results aren't sorted, then sort them to ensure deterministic answers.
+ if (!isOrdered) answer.sorted else answer
+ }
+ orderedAnswer.map(cleanPaths)
+ }
+
+ // TODO: Instead of filtering we should clean to avoid accidentally ignoring actual results.
+ lazy val nonDeterministicLineIndicators = Seq(
+ "CreateTime",
+ "transient_lastDdlTime",
+ "grantTime",
+ "lastUpdateTime",
+ "last_modified_time",
+ "Owner:",
+ // The following are hive specific schema parameters which we do not need to match exactly.
+ "numFiles",
+ "numRows",
+ "rawDataSize",
+ "totalSize",
+ "totalNumberFiles",
+ "maxFileSize",
+ "minFileSize"
+ )
+ protected def nonDeterministicLine(line: String) =
+ nonDeterministicLineIndicators.map(line contains _).reduceLeft(_||_)
+
+ /**
+ * Removes non-deterministic paths from `str` so cached answers will compare correctly.
+ */
+ protected def cleanPaths(str: String): String = {
+ str.replaceAll("file:\\/.*\\/", "<PATH>")
+ }
+
+ val installHooksCommand = "(?i)SET.*hooks".r
+ def createQueryTest(testCaseName: String, sql: String) {
+ // If test sharding is enable, skip tests that are not in the correct shard.
+ shardInfo.foreach {
+ case (shardId, numShards) if testCaseName.hashCode % numShards != shardId => return
+ case (shardId, _) => logger.debug(s"Shard $shardId includes test '$testCaseName'")
+ }
+
+ // Skip tests found in directories specified by user.
+ skipDirectories
+ .map(new File(_, testCaseName))
+ .filter(_.exists)
+ .foreach(_ => return)
+
+ // If runonlytests is set, skip this test unless we find a file in one of the specified
+ // directories.
+ val runIndicators =
+ runOnlyDirectories
+ .map(new File(_, testCaseName))
+ .filter(_.exists)
+ if (runOnlyDirectories.nonEmpty && runIndicators.isEmpty) {
+ logger.debug(
+ s"Skipping test '$testCaseName' not found in ${runOnlyDirectories.map(_.getCanonicalPath)}")
+ return
+ }
+
+ test(testCaseName) {
+ logger.debug(s"=== HIVE TEST: $testCaseName ===")
+
+ // Clear old output for this testcase.
+ outputDirectories.map(new File(_, testCaseName)).filter(_.exists()).foreach(_.delete())
+
+ val allQueries = sql.split("(?<=[^\\\\]);").map(_.trim).filterNot(q => q == "").toSeq
+
+ // TODO: DOCUMENT UNSUPPORTED
+ val queryList =
+ allQueries
+ // In hive, setting the hive.outerjoin.supports.filters flag to "false" essentially tells
+ // the system to return the wrong answer. Since we have no intention of mirroring their
+ // previously broken behavior we simply filter out changes to this setting.
+ .filterNot(_ contains "hive.outerjoin.supports.filters")
+
+ if (allQueries != queryList)
+ logger.warn(s"Simplifications made on unsupported operations for test $testCaseName")
+
+ lazy val consoleTestCase = {
+ val quotes = "\"\"\""
+ queryList.zipWithIndex.map {
+ case (query, i) =>
+ s"""
+ |val q$i = $quotes$query$quotes.q
+ |q$i.stringResult()
+ """.stripMargin
+ }.mkString("\n== Console version of this test ==\n", "\n", "\n")
+ }
+
+ try {
+ // MINOR HACK: You must run a query before calling reset the first time.
+ TestHive.sql("SHOW TABLES")
+ TestHive.reset()
+
+ val hiveCacheFiles = queryList.zipWithIndex.map {
+ case (queryString, i) =>
+ val cachedAnswerName = s"$testCaseName-$i-${getMd5(queryString)}"
+ new File(answerCache, cachedAnswerName)
+ }
+
+ val hiveCachedResults = hiveCacheFiles.flatMap { cachedAnswerFile =>
+ logger.debug(s"Looking for cached answer file $cachedAnswerFile.")
+ if (cachedAnswerFile.exists) {
+ Some(fileToString(cachedAnswerFile))
+ } else {
+ logger.debug(s"File $cachedAnswerFile not found")
+ None
+ }
+ }.map {
+ case "" => Nil
+ case "\n" => Seq("")
+ case other => other.split("\n").toSeq
+ }
+
+ val hiveResults: Seq[Seq[String]] =
+ if (hiveCachedResults.size == queryList.size) {
+ logger.info(s"Using answer cache for test: $testCaseName")
+ hiveCachedResults
+ } else {
+
+ val hiveQueries = queryList.map(new TestHive.SqlQueryExecution(_))
+ // Make sure we can at least parse everything before attempting hive execution.
+ hiveQueries.foreach(_.logical)
+ val computedResults = (queryList.zipWithIndex, hiveQueries, hiveCacheFiles).zipped.map {
+ case ((queryString, i), hiveQuery, cachedAnswerFile)=>
+ try {
+ // Hooks often break the harness and don't really affect our test anyway, don't
+ // even try running them.
+ if (installHooksCommand.findAllMatchIn(queryString).nonEmpty)
+ sys.error("hive exec hooks not supported for tests.")
+
+ logger.warn(s"Running query ${i+1}/${queryList.size} with hive.")
+ // Analyze the query with catalyst to ensure test tables are loaded.
+ val answer = hiveQuery.analyzed match {
+ case _: ExplainCommand => Nil // No need to execute EXPLAIN queries as we don't check the output.
+ case _ => TestHive.runSqlHive(queryString)
+ }
+
+ // We need to add a new line to non-empty answers so we can differentiate Seq()
+ // from Seq("").
+ stringToFile(
+ cachedAnswerFile, answer.mkString("\n") + (if (answer.nonEmpty) "\n" else ""))
+ answer
+ } catch {
+ case e: Exception =>
+ val errorMessage =
+ s"""
+ |Failed to generate golden answer for query:
+ |Error: ${e.getMessage}
+ |${stackTraceToString(e)}
+ |$queryString
+ |$consoleTestCase
+ """.stripMargin
+ stringToFile(
+ new File(hiveFailedDirectory, testCaseName),
+ errorMessage + consoleTestCase)
+ fail(errorMessage)
+ }
+ }.toSeq
+ TestHive.reset()
+
+ computedResults
+ }
+
+ // Run w/ catalyst
+ val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) =>
+ val query = new TestHive.SqlQueryExecution(queryString)
+ try { (query, prepareAnswer(query, query.stringResult())) } catch {
+ case e: Exception =>
+ val errorMessage =
+ s"""
+ |Failed to execute query using catalyst:
+ |Error: ${e.getMessage}
+ |${stackTraceToString(e)}
+ |$query
+ |== HIVE - ${hive.size} row(s) ==
+ |${hive.mkString("\n")}
+ |
+ |$consoleTestCase
+ """.stripMargin
+ stringToFile(new File(failedDirectory, testCaseName), errorMessage + consoleTestCase)
+ fail(errorMessage)
+ }
+ }.toSeq
+
+ (queryList, hiveResults, catalystResults).zipped.foreach {
+ case (query, hive, (hiveQuery, catalyst)) =>
+ // Check that the results match unless its an EXPLAIN query.
+ val preparedHive = prepareAnswer(hiveQuery,hive)
+
+ if ((!hiveQuery.logical.isInstanceOf[ExplainCommand]) && preparedHive != catalyst) {
+
+ val hivePrintOut = s"== HIVE - ${hive.size} row(s) ==" +: preparedHive
+ val catalystPrintOut = s"== CATALYST - ${catalyst.size} row(s) ==" +: catalyst
+
+ val resultComparison = sideBySide(hivePrintOut, catalystPrintOut).mkString("\n")
+
+ if (recomputeCache) {
+ logger.warn(s"Clearing cache files for failed test $testCaseName")
+ hiveCacheFiles.foreach(_.delete())
+ }
+
+ val errorMessage =
+ s"""
+ |Results do not match for $testCaseName:
+ |$hiveQuery\n${hiveQuery.analyzed.output.map(_.name).mkString("\t")}
+ |$resultComparison
+ """.stripMargin
+
+ stringToFile(new File(wrongDirectory, testCaseName), errorMessage + consoleTestCase)
+ fail(errorMessage)
+ }
+ }
+
+ // Touch passed file.
+ new FileOutputStream(new File(passedDirectory, testCaseName)).close()
+ } catch {
+ case tf: org.scalatest.exceptions.TestFailedException => throw tf
+ case originalException: Exception =>
+ if (System.getProperty("spark.hive.canarytest") != null) {
+ // When we encounter an error we check to see if the environment is still okay by running a simple query.
+ // If this fails then we halt testing since something must have gone seriously wrong.
+ try {
+ new TestHive.SqlQueryExecution("SELECT key FROM src").stringResult()
+ TestHive.runSqlHive("SELECT key FROM src")
+ } catch {
+ case e: Exception =>
+ logger.error(s"FATAL ERROR: Canary query threw $e This implies that the testing environment has likely been corrupted.")
+ // The testing setup traps exits so wait here for a long time so the developer can see when things started
+ // to go wrong.
+ Thread.sleep(1000000)
+ }
+ }
+
+ // If the canary query didn't fail then the environment is still okay, so just throw the original exception.
+ throw originalException
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
new file mode 100644
index 0000000000..d010023f78
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -0,0 +1,708 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package hive
+package execution
+
+
+import java.io._
+
+import util._
+
+/**
+ * Runs the test cases that are included in the hive distribution.
+ */
+class HiveCompatibilitySuite extends HiveQueryFileTest {
+ // TODO: bundle in jar files... get from classpath
+ lazy val hiveQueryDir = TestHive.getHiveFile("ql/src/test/queries/clientpositive")
+ def testCases = hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f)
+
+ /** A list of tests deemed out of scope currently and thus completely disregarded. */
+ override def blackList = Seq(
+ // These tests use hooks that are not on the classpath and thus break all subsequent execution.
+ "hook_order",
+ "hook_context",
+ "mapjoin_hook",
+ "multi_sahooks",
+ "overridden_confs",
+ "query_properties",
+ "sample10",
+ "updateAccessTime",
+ "index_compact_binary_search",
+ "bucket_num_reducers",
+ "column_access_stats",
+ "concatenate_inherit_table_location",
+
+ // Setting a default property does not seem to get reset and thus changes the answer for many
+ // subsequent tests.
+ "create_default_prop",
+
+ // User/machine specific test answers, breaks the caching mechanism.
+ "authorization_3",
+ "authorization_5",
+ "keyword_1",
+ "misc_json",
+ "create_like_tbl_props",
+ "load_overwrite",
+ "alter_table_serde2",
+ "alter_table_not_sorted",
+ "alter_skewed_table",
+ "alter_partition_clusterby_sortby",
+ "alter_merge",
+ "alter_concatenate_indexed_table",
+ "protectmode2",
+ "describe_table",
+ "describe_comment_nonascii",
+ "udf5",
+ "udf_java_method",
+
+ // Weird DDL differences result in failures on jenkins.
+ "create_like2",
+ "create_view_translate",
+ "partitions_json",
+
+ // Timezone specific test answers.
+ "udf_unix_timestamp",
+ "udf_to_unix_timestamp",
+
+ // Cant run without local map/reduce.
+ "index_auto_update",
+ "index_auto_self_join",
+ "index_stale.*",
+ "type_cast_1",
+ "index_compression",
+ "index_bitmap_compression",
+ "index_auto_multiple",
+ "index_auto_mult_tables_compact",
+ "index_auto_mult_tables",
+ "index_auto_file_format",
+ "index_auth",
+ "index_auto_empty",
+ "index_auto_partitioned",
+ "index_auto_unused",
+ "index_bitmap_auto_partitioned",
+ "ql_rewrite_gbtoidx",
+ "stats1.*",
+ "stats20",
+ "alter_merge_stats",
+
+ // Hive seems to think 1.0 > NaN = true && 1.0 < NaN = false... which is wrong.
+ // http://stackoverflow.com/a/1573715
+ "ops_comparison",
+
+ // Tests that seems to never complete on hive...
+ "skewjoin",
+ "database",
+
+ // These tests fail and and exit the JVM.
+ "auto_join18_multi_distinct",
+ "join18_multi_distinct",
+ "input44",
+ "input42",
+ "input_dfs",
+ "metadata_export_drop",
+ "repair",
+
+ // Uses a serde that isn't on the classpath... breaks other tests.
+ "bucketizedhiveinputformat",
+
+ // Avro tests seem to change the output format permanently thus breaking the answer cache, until
+ // we figure out why this is the case let just ignore all of avro related tests.
+ ".*avro.*",
+
+ // Unique joins are weird and will require a lot of hacks (see comments in hive parser).
+ "uniquejoin",
+
+ // Hive seems to get the wrong answer on some outer joins. MySQL agrees with catalyst.
+ "auto_join29",
+
+ // No support for multi-alias i.e. udf as (e1, e2, e3).
+ "allcolref_in_udf",
+
+ // No support for TestSerDe (not published afaik)
+ "alter1",
+ "input16",
+
+ // No support for unpublished test udfs.
+ "autogen_colalias",
+
+ // Hive does not support buckets.
+ ".*bucket.*",
+
+ // No window support yet
+ ".*window.*",
+
+ // Fails in hive with authorization errors.
+ "alter_rename_partition_authorization",
+ "authorization.*",
+
+ // Hadoop version specific tests
+ "archive_corrupt",
+
+ // No support for case sensitivity is resolution using hive properties atm.
+ "case_sensitivity"
+ )
+
+ /**
+ * The set of tests that are believed to be working in catalyst. Tests not on whiteList or
+ * blacklist are implicitly marked as ignored.
+ */
+ override def whiteList = Seq(
+ "add_part_exist",
+ "add_partition_no_whitelist",
+ "add_partition_with_whitelist",
+ "alias_casted_column",
+ "alter2",
+ "alter4",
+ "alter5",
+ "alter_index",
+ "alter_merge_2",
+ "alter_partition_format_loc",
+ "alter_partition_protect_mode",
+ "alter_partition_with_whitelist",
+ "alter_table_serde",
+ "alter_varchar2",
+ "alter_view_as_select",
+ "ambiguous_col",
+ "auto_join0",
+ "auto_join1",
+ "auto_join10",
+ "auto_join11",
+ "auto_join12",
+ "auto_join13",
+ "auto_join14",
+ "auto_join14_hadoop20",
+ "auto_join15",
+ "auto_join17",
+ "auto_join18",
+ "auto_join19",
+ "auto_join2",
+ "auto_join20",
+ "auto_join21",
+ "auto_join22",
+ "auto_join23",
+ "auto_join24",
+ "auto_join25",
+ "auto_join26",
+ "auto_join27",
+ "auto_join28",
+ "auto_join3",
+ "auto_join30",
+ "auto_join31",
+ "auto_join32",
+ "auto_join4",
+ "auto_join5",
+ "auto_join6",
+ "auto_join7",
+ "auto_join8",
+ "auto_join9",
+ "auto_join_filters",
+ "auto_join_nulls",
+ "auto_join_reordering_values",
+ "auto_sortmerge_join_1",
+ "auto_sortmerge_join_10",
+ "auto_sortmerge_join_11",
+ "auto_sortmerge_join_12",
+ "auto_sortmerge_join_15",
+ "auto_sortmerge_join_2",
+ "auto_sortmerge_join_3",
+ "auto_sortmerge_join_4",
+ "auto_sortmerge_join_5",
+ "auto_sortmerge_join_6",
+ "auto_sortmerge_join_7",
+ "auto_sortmerge_join_8",
+ "auto_sortmerge_join_9",
+ "binary_constant",
+ "binarysortable_1",
+ "combine1",
+ "compute_stats_binary",
+ "compute_stats_boolean",
+ "compute_stats_double",
+ "compute_stats_table",
+ "compute_stats_long",
+ "compute_stats_string",
+ "convert_enum_to_string",
+ "correlationoptimizer11",
+ "correlationoptimizer15",
+ "correlationoptimizer2",
+ "correlationoptimizer3",
+ "correlationoptimizer4",
+ "correlationoptimizer6",
+ "correlationoptimizer7",
+ "correlationoptimizer8",
+ "count",
+ "create_like_view",
+ "create_nested_type",
+ "create_skewed_table1",
+ "create_struct_table",
+ "ct_case_insensitive",
+ "database_location",
+ "database_properties",
+ "decimal_join",
+ "default_partition_name",
+ "delimiter",
+ "desc_non_existent_tbl",
+ "describe_comment_indent",
+ "describe_database_json",
+ "describe_pretty",
+ "describe_syntax",
+ "describe_table_json",
+ "diff_part_input_formats",
+ "disable_file_format_check",
+ "drop_function",
+ "drop_index",
+ "drop_partitions_filter",
+ "drop_partitions_filter2",
+ "drop_partitions_filter3",
+ "drop_partitions_ignore_protection",
+ "drop_table",
+ "drop_table2",
+ "drop_view",
+ "escape_clusterby1",
+ "escape_distributeby1",
+ "escape_orderby1",
+ "escape_sortby1",
+ "fetch_aggregation",
+ "filter_join_breaktask",
+ "filter_join_breaktask2",
+ "groupby1",
+ "groupby11",
+ "groupby1_map",
+ "groupby1_map_nomap",
+ "groupby1_map_skew",
+ "groupby1_noskew",
+ "groupby4",
+ "groupby4_map",
+ "groupby4_map_skew",
+ "groupby4_noskew",
+ "groupby5",
+ "groupby5_map",
+ "groupby5_map_skew",
+ "groupby5_noskew",
+ "groupby6",
+ "groupby6_map",
+ "groupby6_map_skew",
+ "groupby6_noskew",
+ "groupby7",
+ "groupby7_map",
+ "groupby7_map_multi_single_reducer",
+ "groupby7_map_skew",
+ "groupby7_noskew",
+ "groupby8_map",
+ "groupby8_map_skew",
+ "groupby8_noskew",
+ "groupby_distinct_samekey",
+ "groupby_multi_single_reducer2",
+ "groupby_mutli_insert_common_distinct",
+ "groupby_neg_float",
+ "groupby_sort_10",
+ "groupby_sort_6",
+ "groupby_sort_8",
+ "groupby_sort_test_1",
+ "implicit_cast1",
+ "innerjoin",
+ "inoutdriver",
+ "input",
+ "input0",
+ "input11",
+ "input11_limit",
+ "input12",
+ "input12_hadoop20",
+ "input19",
+ "input1_limit",
+ "input22",
+ "input23",
+ "input24",
+ "input25",
+ "input26",
+ "input28",
+ "input2_limit",
+ "input40",
+ "input41",
+ "input4_cb_delim",
+ "input6",
+ "input7",
+ "input8",
+ "input9",
+ "input_limit",
+ "input_part0",
+ "input_part1",
+ "input_part10",
+ "input_part10_win",
+ "input_part2",
+ "input_part3",
+ "input_part4",
+ "input_part5",
+ "input_part6",
+ "input_part7",
+ "input_part8",
+ "input_part9",
+ "inputddl4",
+ "inputddl7",
+ "inputddl8",
+ "insert_compressed",
+ "join0",
+ "join1",
+ "join10",
+ "join11",
+ "join12",
+ "join13",
+ "join14",
+ "join14_hadoop20",
+ "join15",
+ "join16",
+ "join17",
+ "join18",
+ "join19",
+ "join2",
+ "join20",
+ "join21",
+ "join22",
+ "join23",
+ "join24",
+ "join25",
+ "join26",
+ "join27",
+ "join28",
+ "join29",
+ "join3",
+ "join30",
+ "join31",
+ "join32",
+ "join33",
+ "join34",
+ "join35",
+ "join36",
+ "join37",
+ "join38",
+ "join39",
+ "join4",
+ "join40",
+ "join41",
+ "join5",
+ "join6",
+ "join7",
+ "join8",
+ "join9",
+ "join_1to1",
+ "join_array",
+ "join_casesensitive",
+ "join_empty",
+ "join_filters",
+ "join_hive_626",
+ "join_nulls",
+ "join_reorder2",
+ "join_reorder3",
+ "join_reorder4",
+ "join_star",
+ "join_view",
+ "lateral_view_cp",
+ "lateral_view_ppd",
+ "lineage1",
+ "literal_double",
+ "literal_ints",
+ "literal_string",
+ "load_dyn_part7",
+ "load_file_with_space_in_the_name",
+ "louter_join_ppr",
+ "mapjoin_distinct",
+ "mapjoin_mapjoin",
+ "mapjoin_subquery",
+ "mapjoin_subquery2",
+ "mapjoin_test_outer",
+ "mapreduce3",
+ "mapreduce7",
+ "merge1",
+ "merge2",
+ "mergejoins",
+ "mergejoins_mixed",
+ "multiMapJoin1",
+ "multiMapJoin2",
+ "multi_join_union",
+ "multigroupby_singlemr",
+ "noalias_subq1",
+ "nomore_ambiguous_table_col",
+ "nonblock_op_deduplicate",
+ "notable_alias1",
+ "notable_alias2",
+ "nullgroup",
+ "nullgroup2",
+ "nullgroup3",
+ "nullgroup4",
+ "nullgroup4_multi_distinct",
+ "nullgroup5",
+ "nullinput",
+ "nullinput2",
+ "nullscript",
+ "optional_outer",
+ "order",
+ "order2",
+ "outer_join_ppr",
+ "part_inherit_tbl_props",
+ "part_inherit_tbl_props_empty",
+ "part_inherit_tbl_props_with_star",
+ "partition_schema1",
+ "partition_varchar1",
+ "plan_json",
+ "ppd1",
+ "ppd_constant_where",
+ "ppd_gby",
+ "ppd_gby2",
+ "ppd_gby_join",
+ "ppd_join",
+ "ppd_join2",
+ "ppd_join3",
+ "ppd_join_filter",
+ "ppd_outer_join1",
+ "ppd_outer_join2",
+ "ppd_outer_join3",
+ "ppd_outer_join4",
+ "ppd_outer_join5",
+ "ppd_random",
+ "ppd_repeated_alias",
+ "ppd_udf_col",
+ "ppd_union",
+ "ppr_allchildsarenull",
+ "ppr_pushdown",
+ "ppr_pushdown2",
+ "ppr_pushdown3",
+ "progress_1",
+ "protectmode",
+ "push_or",
+ "query_with_semi",
+ "quote1",
+ "quote2",
+ "reduce_deduplicate_exclude_join",
+ "rename_column",
+ "router_join_ppr",
+ "select_as_omitted",
+ "select_unquote_and",
+ "select_unquote_not",
+ "select_unquote_or",
+ "serde_reported_schema",
+ "set_variable_sub",
+ "show_describe_func_quotes",
+ "show_functions",
+ "show_partitions",
+ "skewjoinopt13",
+ "skewjoinopt18",
+ "skewjoinopt9",
+ "smb_mapjoin_1",
+ "smb_mapjoin_10",
+ "smb_mapjoin_13",
+ "smb_mapjoin_14",
+ "smb_mapjoin_15",
+ "smb_mapjoin_16",
+ "smb_mapjoin_17",
+ "smb_mapjoin_2",
+ "smb_mapjoin_21",
+ "smb_mapjoin_25",
+ "smb_mapjoin_3",
+ "smb_mapjoin_4",
+ "smb_mapjoin_5",
+ "smb_mapjoin_8",
+ "sort",
+ "sort_merge_join_desc_1",
+ "sort_merge_join_desc_2",
+ "sort_merge_join_desc_3",
+ "sort_merge_join_desc_4",
+ "sort_merge_join_desc_5",
+ "sort_merge_join_desc_6",
+ "sort_merge_join_desc_7",
+ "stats0",
+ "stats_empty_partition",
+ "subq2",
+ "tablename_with_select",
+ "touch",
+ "type_widening",
+ "udaf_collect_set",
+ "udaf_corr",
+ "udaf_covar_pop",
+ "udaf_covar_samp",
+ "udf2",
+ "udf6",
+ "udf9",
+ "udf_10_trims",
+ "udf_E",
+ "udf_PI",
+ "udf_abs",
+ "udf_acos",
+ "udf_add",
+ "udf_array",
+ "udf_array_contains",
+ "udf_ascii",
+ "udf_asin",
+ "udf_atan",
+ "udf_avg",
+ "udf_bigint",
+ "udf_bin",
+ "udf_bitmap_and",
+ "udf_bitmap_empty",
+ "udf_bitmap_or",
+ "udf_bitwise_and",
+ "udf_bitwise_not",
+ "udf_bitwise_or",
+ "udf_bitwise_xor",
+ "udf_boolean",
+ "udf_case",
+ "udf_ceil",
+ "udf_ceiling",
+ "udf_concat",
+ "udf_concat_insert2",
+ "udf_concat_ws",
+ "udf_conv",
+ "udf_cos",
+ "udf_count",
+ "udf_date_add",
+ "udf_date_sub",
+ "udf_datediff",
+ "udf_day",
+ "udf_dayofmonth",
+ "udf_degrees",
+ "udf_div",
+ "udf_double",
+ "udf_exp",
+ "udf_field",
+ "udf_find_in_set",
+ "udf_float",
+ "udf_floor",
+ "udf_format_number",
+ "udf_from_unixtime",
+ "udf_greaterthan",
+ "udf_greaterthanorequal",
+ "udf_hex",
+ "udf_if",
+ "udf_index",
+ "udf_int",
+ "udf_isnotnull",
+ "udf_isnull",
+ "udf_java_method",
+ "udf_lcase",
+ "udf_length",
+ "udf_lessthan",
+ "udf_lessthanorequal",
+ "udf_like",
+ "udf_ln",
+ "udf_log",
+ "udf_log10",
+ "udf_log2",
+ "udf_lower",
+ "udf_lpad",
+ "udf_ltrim",
+ "udf_map",
+ "udf_minute",
+ "udf_modulo",
+ "udf_month",
+ "udf_negative",
+ "udf_not",
+ "udf_notequal",
+ "udf_notop",
+ "udf_nvl",
+ "udf_or",
+ "udf_parse_url",
+ "udf_positive",
+ "udf_pow",
+ "udf_power",
+ "udf_radians",
+ "udf_rand",
+ "udf_regexp",
+ "udf_regexp_extract",
+ "udf_regexp_replace",
+ "udf_repeat",
+ "udf_rlike",
+ "udf_round",
+ "udf_round_3",
+ "udf_rpad",
+ "udf_rtrim",
+ "udf_second",
+ "udf_sign",
+ "udf_sin",
+ "udf_smallint",
+ "udf_space",
+ "udf_sqrt",
+ "udf_std",
+ "udf_stddev",
+ "udf_stddev_pop",
+ "udf_stddev_samp",
+ "udf_string",
+ "udf_substring",
+ "udf_subtract",
+ "udf_sum",
+ "udf_tan",
+ "udf_tinyint",
+ "udf_to_byte",
+ "udf_to_date",
+ "udf_to_double",
+ "udf_to_float",
+ "udf_to_long",
+ "udf_to_short",
+ "udf_translate",
+ "udf_trim",
+ "udf_ucase",
+ "udf_upper",
+ "udf_var_pop",
+ "udf_var_samp",
+ "udf_variance",
+ "udf_weekofyear",
+ "udf_when",
+ "udf_xpath",
+ "udf_xpath_boolean",
+ "udf_xpath_double",
+ "udf_xpath_float",
+ "udf_xpath_int",
+ "udf_xpath_long",
+ "udf_xpath_short",
+ "udf_xpath_string",
+ "unicode_notation",
+ "union10",
+ "union11",
+ "union13",
+ "union14",
+ "union15",
+ "union16",
+ "union17",
+ "union18",
+ "union19",
+ "union2",
+ "union20",
+ "union22",
+ "union23",
+ "union24",
+ "union26",
+ "union27",
+ "union28",
+ "union29",
+ "union30",
+ "union31",
+ "union34",
+ "union4",
+ "union5",
+ "union6",
+ "union7",
+ "union8",
+ "union9",
+ "union_lateralview",
+ "union_ppr",
+ "union_remove_3",
+ "union_remove_6",
+ "union_script",
+ "varchar_2",
+ "varchar_join1",
+ "varchar_union1"
+ )
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala
new file mode 100644
index 0000000000..f0a4ec3c02
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQueryFileTest.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package hive
+package execution
+
+import java.io._
+
+import catalyst.util._
+
+/**
+ * A framework for running the query tests that are listed as a set of text files.
+ *
+ * TestSuites that derive from this class must provide a map of testCaseName -> testCaseFiles that should be included.
+ * Additionally, there is support for whitelisting and blacklisting tests as development progresses.
+ */
+abstract class HiveQueryFileTest extends HiveComparisonTest {
+ /** A list of tests deemed out of scope and thus completely disregarded */
+ def blackList: Seq[String] = Nil
+
+ /**
+ * The set of tests that are believed to be working in catalyst. Tests not in whiteList
+ * blacklist are implicitly marked as ignored.
+ */
+ def whiteList: Seq[String] = ".*" :: Nil
+
+ def testCases: Seq[(String, File)]
+
+ val runAll =
+ !(System.getProperty("spark.hive.alltests") == null) ||
+ runOnlyDirectories.nonEmpty ||
+ skipDirectories.nonEmpty
+
+ val whiteListProperty = "spark.hive.whitelist"
+ // Allow the whiteList to be overridden by a system property
+ val realWhiteList =
+ Option(System.getProperty(whiteListProperty)).map(_.split(",").toSeq).getOrElse(whiteList)
+
+ // Go through all the test cases and add them to scala test.
+ testCases.sorted.foreach {
+ case (testCaseName, testCaseFile) =>
+ if (blackList.map(_.r.pattern.matcher(testCaseName).matches()).reduceLeft(_||_)) {
+ logger.debug(s"Blacklisted test skipped $testCaseName")
+ } else if (realWhiteList.map(_.r.pattern.matcher(testCaseName).matches()).reduceLeft(_||_) || runAll) {
+ // Build a test case and submit it to scala test framework...
+ val queriesString = fileToString(testCaseFile)
+ createQueryTest(testCaseName, queriesString)
+ } else {
+ // Only output warnings for the built in whitelist as this clutters the output when the user
+ // trying to execute a single test from the commandline.
+ if(System.getProperty(whiteListProperty) == null && !runAll)
+ ignore(testCaseName) {}
+ }
+ }
+} \ No newline at end of file
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
new file mode 100644
index 0000000000..28a5d260b3
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package hive
+package execution
+
+
+/**
+ * A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution.
+ */
+class HiveQuerySuite extends HiveComparisonTest {
+ import TestHive._
+
+ createQueryTest("Simple Average",
+ "SELECT AVG(key) FROM src")
+
+ createQueryTest("Simple Average + 1",
+ "SELECT AVG(key) + 1.0 FROM src")
+
+ createQueryTest("Simple Average + 1 with group",
+ "SELECT AVG(key) + 1.0, value FROM src group by value")
+
+ createQueryTest("string literal",
+ "SELECT 'test' FROM src")
+
+ createQueryTest("Escape sequences",
+ """SELECT key, '\\\t\\' FROM src WHERE key = 86""")
+
+ createQueryTest("IgnoreExplain",
+ """EXPLAIN SELECT key FROM src""")
+
+ createQueryTest("trivial join where clause",
+ "SELECT * FROM src a JOIN src b WHERE a.key = b.key")
+
+ createQueryTest("trivial join ON clause",
+ "SELECT * FROM src a JOIN src b ON a.key = b.key")
+
+ createQueryTest("small.cartesian",
+ "SELECT a.key, b.key FROM (SELECT key FROM src WHERE key < 1) a JOIN (SELECT key FROM src WHERE key = 2) b")
+
+ createQueryTest("length.udf",
+ "SELECT length(\"test\") FROM src LIMIT 1")
+
+ ignore("partitioned table scan") {
+ createQueryTest("partitioned table scan",
+ "SELECT ds, hr, key, value FROM srcpart")
+ }
+
+ createQueryTest("hash",
+ "SELECT hash('test') FROM src LIMIT 1")
+
+ createQueryTest("create table as",
+ """
+ |CREATE TABLE createdtable AS SELECT * FROM src;
+ |SELECT * FROM createdtable
+ """.stripMargin)
+
+ createQueryTest("create table as with db name",
+ """
+ |CREATE DATABASE IF NOT EXISTS testdb;
+ |CREATE TABLE testdb.createdtable AS SELECT * FROM default.src;
+ |SELECT * FROM testdb.createdtable;
+ |DROP DATABASE IF EXISTS testdb CASCADE
+ """.stripMargin)
+
+ createQueryTest("insert table with db name",
+ """
+ |CREATE DATABASE IF NOT EXISTS testdb;
+ |CREATE TABLE testdb.createdtable like default.src;
+ |INSERT INTO TABLE testdb.createdtable SELECT * FROM default.src;
+ |SELECT * FROM testdb.createdtable;
+ |DROP DATABASE IF EXISTS testdb CASCADE
+ """.stripMargin)
+
+ createQueryTest("insert into and insert overwrite",
+ """
+ |CREATE TABLE createdtable like src;
+ |INSERT INTO TABLE createdtable SELECT * FROM src;
+ |INSERT INTO TABLE createdtable SELECT * FROM src1;
+ |SELECT * FROM createdtable;
+ |INSERT OVERWRITE TABLE createdtable SELECT * FROM src WHERE key = 86;
+ |SELECT * FROM createdtable;
+ """.stripMargin)
+
+ createQueryTest("transform",
+ "SELECT TRANSFORM (key) USING 'cat' AS (tKey) FROM src")
+
+ createQueryTest("LIKE",
+ "SELECT * FROM src WHERE value LIKE '%1%'")
+
+ createQueryTest("DISTINCT",
+ "SELECT DISTINCT key, value FROM src")
+
+ ignore("empty aggregate input") {
+ createQueryTest("empty aggregate input",
+ "SELECT SUM(key) FROM (SELECT * FROM src LIMIT 0) a")
+ }
+
+ createQueryTest("lateral view1",
+ "SELECT tbl.* FROM src LATERAL VIEW explode(array(1,2)) tbl as a")
+
+ createQueryTest("lateral view2",
+ "SELECT * FROM src LATERAL VIEW explode(array(1,2)) tbl")
+
+
+ createQueryTest("lateral view3",
+ "FROM src SELECT key, D.* lateral view explode(array(key+3, key+4)) D as CX")
+
+ createQueryTest("lateral view4",
+ """
+ |create table src_lv1 (key string, value string);
+ |create table src_lv2 (key string, value string);
+ |
+ |FROM src
+ |insert overwrite table src_lv1 SELECT key, D.* lateral view explode(array(key+3, key+4)) D as CX
+ |insert overwrite table src_lv2 SELECT key, D.* lateral view explode(array(key+3, key+4)) D as CX
+ """.stripMargin)
+
+ createQueryTest("lateral view5",
+ "FROM src SELECT explode(array(key+3, key+4))")
+
+ createQueryTest("lateral view6",
+ "SELECT * FROM src LATERAL VIEW explode(map(key+3,key+4)) D as k, v")
+
+ test("sampling") {
+ sql("SELECT * FROM src TABLESAMPLE(0.1 PERCENT) s")
+ }
+
+} \ No newline at end of file
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
new file mode 100644
index 0000000000..0dd79faa15
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package hive
+package execution
+
+/**
+ * A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution.
+ */
+class HiveResolutionSuite extends HiveComparisonTest {
+ import TestHive._
+
+ createQueryTest("table.attr",
+ "SELECT src.key FROM src ORDER BY key LIMIT 1")
+
+ createQueryTest("database.table",
+ "SELECT key FROM default.src ORDER BY key LIMIT 1")
+
+ createQueryTest("database.table table.attr",
+ "SELECT src.key FROM default.src ORDER BY key LIMIT 1")
+
+ createQueryTest("alias.attr",
+ "SELECT a.key FROM src a ORDER BY key LIMIT 1")
+
+ createQueryTest("subquery-alias.attr",
+ "SELECT a.key FROM (SELECT * FROM src ORDER BY key LIMIT 1) a")
+
+ createQueryTest("quoted alias.attr",
+ "SELECT `a`.`key` FROM src a ORDER BY key LIMIT 1")
+
+ createQueryTest("attr",
+ "SELECT key FROM src a ORDER BY key LIMIT 1")
+
+ createQueryTest("alias.*",
+ "SELECT a.* FROM src a ORDER BY key LIMIT 1")
+
+ /**
+ * Negative examples. Currently only left here for documentation purposes.
+ * TODO(marmbrus): Test that catalyst fails on these queries.
+ */
+
+ /* SemanticException [Error 10009]: Line 1:7 Invalid table alias 'src'
+ createQueryTest("table.*",
+ "SELECT src.* FROM src a ORDER BY key LIMIT 1") */
+
+ /* Invalid table alias or column reference 'src': (possible column names are: key, value)
+ createQueryTest("tableName.attr from aliased subquery",
+ "SELECT src.key FROM (SELECT * FROM src ORDER BY key LIMIT 1) a") */
+
+} \ No newline at end of file
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala
new file mode 100644
index 0000000000..c2264926f4
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package hive
+package execution
+
+/**
+ * A set of tests that validates support for Hive SerDe.
+ */
+class HiveSerDeSuite extends HiveComparisonTest {
+ createQueryTest(
+ "Read and write with LazySimpleSerDe (tab separated)",
+ "SELECT * from serdeins")
+
+ createQueryTest("Read with RegexSerDe", "SELECT * FROM sales")
+
+ createQueryTest("Read with AvroSerDe", "SELECT * FROM episodes")
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
new file mode 100644
index 0000000000..bb33583e5f
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTypeCoercionSuite.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+/**
+ * A set of tests that validate type promotion rules.
+ */
+class HiveTypeCoercionSuite extends HiveComparisonTest {
+
+ val baseTypes = Seq("1", "1.0", "1L", "1S", "1Y", "'1'")
+
+ baseTypes.foreach { i =>
+ baseTypes.foreach { j =>
+ createQueryTest(s"$i + $j", s"SELECT $i + $j FROM src LIMIT 1")
+ }
+ }
+} \ No newline at end of file
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
new file mode 100644
index 0000000000..8542f42aa9
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package hive
+package execution
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.sql.hive.TestHive
+
+/**
+ * A set of test cases that validate partition and column pruning.
+ */
+class PruningSuite extends HiveComparisonTest {
+ // Column pruning tests
+
+ createPruningTest("Column pruning: with partitioned table",
+ "SELECT key FROM srcpart WHERE ds = '2008-04-08' LIMIT 3",
+ Seq("key"),
+ Seq("key", "ds"),
+ Seq(
+ Seq("2008-04-08", "11"),
+ Seq("2008-04-08", "12")))
+
+ createPruningTest("Column pruning: with non-partitioned table",
+ "SELECT key FROM src WHERE key > 10 LIMIT 3",
+ Seq("key"),
+ Seq("key"),
+ Seq.empty)
+
+ createPruningTest("Column pruning: with multiple projects",
+ "SELECT c1 FROM (SELECT key AS c1 FROM src WHERE key > 10) t1 LIMIT 3",
+ Seq("c1"),
+ Seq("key"),
+ Seq.empty)
+
+ createPruningTest("Column pruning: projects alias substituting",
+ "SELECT c1 AS c2 FROM (SELECT key AS c1 FROM src WHERE key > 10) t1 LIMIT 3",
+ Seq("c2"),
+ Seq("key"),
+ Seq.empty)
+
+ createPruningTest("Column pruning: filter alias in-lining",
+ "SELECT c1 FROM (SELECT key AS c1 FROM src WHERE key > 10) t1 WHERE c1 < 100 LIMIT 3",
+ Seq("c1"),
+ Seq("key"),
+ Seq.empty)
+
+ createPruningTest("Column pruning: without filters",
+ "SELECT c1 FROM (SELECT key AS c1 FROM src) t1 LIMIT 3",
+ Seq("c1"),
+ Seq("key"),
+ Seq.empty)
+
+ createPruningTest("Column pruning: simple top project without aliases",
+ "SELECT key FROM (SELECT key FROM src WHERE key > 10) t1 WHERE key < 100 LIMIT 3",
+ Seq("key"),
+ Seq("key"),
+ Seq.empty)
+
+ createPruningTest("Column pruning: non-trivial top project with aliases",
+ "SELECT c1 * 2 AS double FROM (SELECT key AS c1 FROM src WHERE key > 10) t1 LIMIT 3",
+ Seq("double"),
+ Seq("key"),
+ Seq.empty)
+
+ // Partition pruning tests
+
+ createPruningTest("Partition pruning: non-partitioned, non-trivial project",
+ "SELECT key * 2 AS double FROM src WHERE value IS NOT NULL",
+ Seq("double"),
+ Seq("key", "value"),
+ Seq.empty)
+
+ createPruningTest("Partiton pruning: non-partitioned table",
+ "SELECT value FROM src WHERE key IS NOT NULL",
+ Seq("value"),
+ Seq("value", "key"),
+ Seq.empty)
+
+ createPruningTest("Partition pruning: with filter on string partition key",
+ "SELECT value, hr FROM srcpart1 WHERE ds = '2008-04-08'",
+ Seq("value", "hr"),
+ Seq("value", "hr", "ds"),
+ Seq(
+ Seq("2008-04-08", "11"),
+ Seq("2008-04-08", "12")))
+
+ createPruningTest("Partition pruning: with filter on int partition key",
+ "SELECT value, hr FROM srcpart1 WHERE hr < 12",
+ Seq("value", "hr"),
+ Seq("value", "hr"),
+ Seq(
+ Seq("2008-04-08", "11"),
+ Seq("2008-04-09", "11")))
+
+ createPruningTest("Partition pruning: left only 1 partition",
+ "SELECT value, hr FROM srcpart1 WHERE ds = '2008-04-08' AND hr < 12",
+ Seq("value", "hr"),
+ Seq("value", "hr", "ds"),
+ Seq(
+ Seq("2008-04-08", "11")))
+
+ createPruningTest("Partition pruning: all partitions pruned",
+ "SELECT value, hr FROM srcpart1 WHERE ds = '2014-01-27' AND hr = 11",
+ Seq("value", "hr"),
+ Seq("value", "hr", "ds"),
+ Seq.empty)
+
+ createPruningTest("Partition pruning: pruning with both column key and partition key",
+ "SELECT value, hr FROM srcpart1 WHERE value IS NOT NULL AND hr < 12",
+ Seq("value", "hr"),
+ Seq("value", "hr"),
+ Seq(
+ Seq("2008-04-08", "11"),
+ Seq("2008-04-09", "11")))
+
+ def createPruningTest(
+ testCaseName: String,
+ sql: String,
+ expectedOutputColumns: Seq[String],
+ expectedScannedColumns: Seq[String],
+ expectedPartValues: Seq[Seq[String]]) = {
+ test(s"$testCaseName - pruning test") {
+ val plan = new TestHive.SqlQueryExecution(sql).executedPlan
+ val actualOutputColumns = plan.output.map(_.name)
+ val (actualScannedColumns, actualPartValues) = plan.collect {
+ case p @ HiveTableScan(columns, relation, _) =>
+ val columnNames = columns.map(_.name)
+ val partValues = p.prunePartitions(relation.hiveQlPartitions).map(_.getValues)
+ (columnNames, partValues)
+ }.head
+
+ assert(actualOutputColumns sameElements expectedOutputColumns, "Output columns mismatch")
+ assert(actualScannedColumns sameElements expectedScannedColumns, "Scanned columns mismatch")
+
+ assert(
+ actualPartValues.length === expectedPartValues.length,
+ "Partition value count mismatches")
+
+ for ((actual, expected) <- actualPartValues.zip(expectedPartValues)) {
+ assert(actual sameElements expected, "Partition values mismatch")
+ }
+ }
+
+ // Creates a query test to compare query results generated by Hive and Catalyst.
+ createQueryTest(s"$testCaseName - query test", sql)
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
new file mode 100644
index 0000000000..ee90061c7c
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import java.io.File
+
+import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll, FunSuite}
+
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.util.getTempFilePath
+import org.apache.spark.sql.hive.TestHive
+
+
+class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
+
+ val filename = getTempFilePath("parquettest").getCanonicalFile.toURI.toString
+
+ // runs a SQL and optionally resolves one Parquet table
+ def runQuery(querystr: String, tableName: Option[String] = None, filename: Option[String] = None): Array[Row] = {
+ // call to resolve references in order to get CREATE TABLE AS to work
+ val query = TestHive
+ .parseSql(querystr)
+ val finalQuery =
+ if (tableName.nonEmpty && filename.nonEmpty)
+ resolveParquetTable(tableName.get, filename.get, query)
+ else
+ query
+ TestHive.executePlan(finalQuery)
+ .toRdd
+ .collect()
+ }
+
+ // stores a query output to a Parquet file
+ def storeQuery(querystr: String, filename: String): Unit = {
+ val query = WriteToFile(
+ filename,
+ TestHive.parseSql(querystr))
+ TestHive
+ .executePlan(query)
+ .stringResult()
+ }
+
+ /**
+ * TODO: This function is necessary as long as there is no notion of a Catalog for
+ * Parquet tables. Once such a thing exists this functionality should be moved there.
+ */
+ def resolveParquetTable(tableName: String, filename: String, plan: LogicalPlan): LogicalPlan = {
+ TestHive.loadTestTable("src") // may not be loaded now
+ plan.transform {
+ case relation @ UnresolvedRelation(databaseName, name, alias) =>
+ if (name == tableName)
+ ParquetRelation(tableName, filename)
+ else
+ relation
+ case op @ InsertIntoCreatedTable(databaseName, name, child) =>
+ if (name == tableName) {
+ // note: at this stage the plan is not yet analyzed but Parquet needs to know the schema
+ // and for that we need the child to be resolved
+ val relation = ParquetRelation.create(
+ filename,
+ TestHive.analyzer(child),
+ TestHive.sparkContext.hadoopConfiguration,
+ Some(tableName))
+ InsertIntoTable(
+ relation.asInstanceOf[BaseRelation],
+ Map.empty,
+ child,
+ overwrite = false)
+ } else
+ op
+ }
+ }
+
+ override def beforeAll() {
+ // write test data
+ ParquetTestData.writeFile
+ // Override initial Parquet test table
+ TestHive.catalog.registerTable(Some[String]("parquet"), "testsource", ParquetTestData.testData)
+ }
+
+ override def afterAll() {
+ ParquetTestData.testFile.delete()
+ }
+
+ override def beforeEach() {
+ new File(filename).getAbsoluteFile.delete()
+ }
+
+ override def afterEach() {
+ new File(filename).getAbsoluteFile.delete()
+ }
+
+ test("SELECT on Parquet table") {
+ val rdd = runQuery("SELECT * FROM parquet.testsource")
+ assert(rdd != null)
+ assert(rdd.forall(_.size == 6))
+ }
+
+ test("Simple column projection + filter on Parquet table") {
+ val rdd = runQuery("SELECT myboolean, mylong FROM parquet.testsource WHERE myboolean=true")
+ assert(rdd.size === 5, "Filter returned incorrect number of rows")
+ assert(rdd.forall(_.getBoolean(0)), "Filter returned incorrect Boolean field value")
+ }
+
+ test("Converting Hive to Parquet Table via WriteToFile") {
+ storeQuery("SELECT * FROM src", filename)
+ val rddOne = runQuery("SELECT * FROM src").sortBy(_.getInt(0))
+ val rddTwo = runQuery("SELECT * from ptable", Some("ptable"), Some(filename)).sortBy(_.getInt(0))
+ compareRDDs(rddOne, rddTwo, "src (Hive)", Seq("key:Int", "value:String"))
+ }
+
+ test("INSERT OVERWRITE TABLE Parquet table") {
+ storeQuery("SELECT * FROM parquet.testsource", filename)
+ runQuery("INSERT OVERWRITE TABLE ptable SELECT * FROM parquet.testsource", Some("ptable"), Some(filename))
+ runQuery("INSERT OVERWRITE TABLE ptable SELECT * FROM parquet.testsource", Some("ptable"), Some(filename))
+ val rddCopy = runQuery("SELECT * FROM ptable", Some("ptable"), Some(filename))
+ val rddOrig = runQuery("SELECT * FROM parquet.testsource")
+ compareRDDs(rddOrig, rddCopy, "parquet.testsource", ParquetTestData.testSchemaFieldNames)
+ }
+
+ test("CREATE TABLE AS Parquet table") {
+ runQuery("CREATE TABLE ptable AS SELECT * FROM src", Some("ptable"), Some(filename))
+ val rddCopy = runQuery("SELECT * FROM ptable", Some("ptable"), Some(filename))
+ .sortBy[Int](_.apply(0) match {
+ case x: Int => x
+ case _ => 0
+ })
+ val rddOrig = runQuery("SELECT * FROM src").sortBy(_.getInt(0))
+ compareRDDs(rddOrig, rddCopy, "src (Hive)", Seq("key:Int", "value:String"))
+ }
+
+ private def compareRDDs(rddOne: Array[Row], rddTwo: Array[Row], tableName: String, fieldNames: Seq[String]) {
+ var counter = 0
+ (rddOne, rddTwo).zipped.foreach {
+ (a,b) => (a,b).zipped.toArray.zipWithIndex.foreach {
+ case ((value_1:Array[Byte], value_2:Array[Byte]), index) =>
+ assert(new String(value_1) === new String(value_2), s"table $tableName row ${counter} field ${fieldNames(index)} don't match")
+ case ((value_1, value_2), index) =>
+ assert(value_1 === value_2, s"table $tableName row $counter field ${fieldNames(index)} don't match")
+ }
+ counter = counter + 1
+ }
+ }
+}