From 9c675de992d29327436edf7143dba3e4ef5601a8 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 16 Jun 2014 21:30:29 +0200 Subject: [SQL][SPARK-2094] Follow up of PR #1071 for Java API Updated `JavaSQLContext` and `JavaHiveContext` similar to what we've done to `SQLContext` and `HiveContext` in PR #1071. Added corresponding test case for Spark SQL Java API. Author: Cheng Lian Closes #1085 from liancheng/spark-2094-java and squashes the following commits: 29b8a51 [Cheng Lian] Avoided instantiating JavaSparkContext & JavaHiveContext to workaround test failure 92bb4fb [Cheng Lian] Marked test cases in JavaHiveQLSuite with "ignore" 22aec97 [Cheng Lian] Follow up of PR #1071 for Java API (cherry picked from commit 273afcb254fb5384204c56bdcb3b9b760bcfab3f) Signed-off-by: Reynold Xin --- .../spark/sql/hive/api/java/JavaHiveContext.scala | 10 +- .../spark/sql/hive/api/java/JavaHiveQLSuite.scala | 101 +++++++++++++++++++++ .../spark/sql/hive/api/java/JavaHiveSuite.scala | 41 --------- .../spark/sql/hive/execution/HiveQuerySuite.scala | 30 +++--- 4 files changed, 120 insertions(+), 62 deletions(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala delete mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveSuite.scala (limited to 'sql/hive') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala index 6df76fa825..c9ee162191 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala @@ -31,12 +31,6 @@ class JavaHiveContext(sparkContext: JavaSparkContext) extends JavaSQLContext(spa /** * Executes a query expressed in HiveQL, returning the result as a JavaSchemaRDD. */ - def hql(hqlQuery: String): JavaSchemaRDD = { - val result = new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery)) - // We force query optimization to happen right away instead of letting it happen lazily like - // when using the query DSL. This is so DDL commands behave as expected. This is only - // generates the RDD lineage for DML queries, but do not perform any execution. - result.queryExecution.toRdd - result - } + def hql(hqlQuery: String): JavaSchemaRDD = + new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery)) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala new file mode 100644 index 0000000000..3b9cd8f52d --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveQLSuite.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.api.java + +import scala.util.Try + +import org.scalatest.FunSuite + +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.api.java.JavaSchemaRDD +import org.apache.spark.sql.execution.ExplainCommand +import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.test.TestSQLContext + +// Implicits +import scala.collection.JavaConversions._ + +class JavaHiveQLSuite extends FunSuite { + lazy val javaCtx = new JavaSparkContext(TestSQLContext.sparkContext) + + // There is a little trickery here to avoid instantiating two HiveContexts in the same JVM + lazy val javaHiveCtx = new JavaHiveContext(javaCtx) { + override val sqlContext = TestHive + } + + ignore("SELECT * FROM src") { + assert( + javaHiveCtx.hql("SELECT * FROM src").collect().map(_.getInt(0)) === + TestHive.sql("SELECT * FROM src").collect().map(_.getInt(0)).toSeq) + } + + private val explainCommandClassName = + classOf[ExplainCommand].getSimpleName.stripSuffix("$") + + def isExplanation(result: JavaSchemaRDD) = { + val explanation = result.collect().map(_.getString(0)) + explanation.size == 1 && explanation.head.startsWith(explainCommandClassName) + } + + ignore("Query Hive native command execution result") { + val tableName = "test_native_commands" + + assertResult(0) { + javaHiveCtx.hql(s"DROP TABLE IF EXISTS $tableName").count() + } + + assertResult(0) { + javaHiveCtx.hql(s"CREATE TABLE $tableName(key INT, value STRING)").count() + } + + javaHiveCtx.hql("SHOW TABLES").registerAsTable("show_tables") + + assert( + javaHiveCtx + .hql("SELECT result FROM show_tables") + .collect() + .map(_.getString(0)) + .contains(tableName)) + + assertResult(Array(Array("key", "int", "None"), Array("value", "string", "None"))) { + javaHiveCtx.hql(s"DESCRIBE $tableName").registerAsTable("describe_table") + + javaHiveCtx + .hql("SELECT result FROM describe_table") + .collect() + .map(_.getString(0).split("\t").map(_.trim)) + .toArray + } + + assert(isExplanation(javaHiveCtx.hql( + s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key"))) + + TestHive.reset() + } + + ignore("Exactly once semantics for DDL and command statements") { + val tableName = "test_exactly_once" + val q0 = javaHiveCtx.hql(s"CREATE TABLE $tableName(key INT, value STRING)") + + // If the table was not created, the following assertion would fail + assert(Try(TestHive.table(tableName)).isSuccess) + + // If the CREATE TABLE command got executed again, the following assertion would fail + assert(Try(q0.count()).isSuccess) + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveSuite.scala deleted file mode 100644 index 9c5d7c81f7..0000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveSuite.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.api.java - -import org.scalatest.FunSuite - -import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.sql.test.TestSQLContext -import org.apache.spark.sql.hive.test.TestHive - -// Implicits -import scala.collection.JavaConversions._ - -class JavaHiveSQLSuite extends FunSuite { - ignore("SELECT * FROM src") { - val javaCtx = new JavaSparkContext(TestSQLContext.sparkContext) - // There is a little trickery here to avoid instantiating two HiveContexts in the same JVM - val javaSqlCtx = new JavaHiveContext(javaCtx) { - override val sqlContext = TestHive - } - - assert( - javaSqlCtx.hql("SELECT * FROM src").collect().map(_.getInt(0)) === - TestHive.sql("SELECT * FROM src").collect().map(_.getInt(0)).toSeq) - } -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 0d656c5569..6e8d11b8a1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -184,25 +184,29 @@ class HiveQuerySuite extends HiveComparisonTest { test("Query Hive native command execution result") { val tableName = "test_native_commands" - val q0 = hql(s"DROP TABLE IF EXISTS $tableName") - assert(q0.count() == 0) + assertResult(0) { + hql(s"DROP TABLE IF EXISTS $tableName").count() + } - val q1 = hql(s"CREATE TABLE $tableName(key INT, value STRING)") - assert(q1.count() == 0) + assertResult(0) { + hql(s"CREATE TABLE $tableName(key INT, value STRING)").count() + } - val q2 = hql("SHOW TABLES") - val tables = q2.select('result).collect().map { case Row(table: String) => table } - assert(tables.contains(tableName)) + assert( + hql("SHOW TABLES") + .select('result) + .collect() + .map(_.getString(0)) + .contains(tableName)) - val q3 = hql(s"DESCRIBE $tableName") assertResult(Array(Array("key", "int", "None"), Array("value", "string", "None"))) { - q3.select('result).collect().map { case Row(fieldDesc: String) => - fieldDesc.split("\t").map(_.trim) - } + hql(s"DESCRIBE $tableName") + .select('result) + .collect() + .map(_.getString(0).split("\t").map(_.trim)) } - val q4 = hql(s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key") - assert(isExplanation(q4)) + assert(isExplanation(hql(s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key"))) TestHive.reset() } -- cgit v1.2.3