aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-04-03 15:45:34 -0700
committerMatei Zaharia <matei@databricks.com>2014-04-03 15:45:34 -0700
commitb8f534196f9a8c99f75728a06e62282d139dee28 (patch)
tree426e03d88695719bc58ff6f0aaa4794bd2fb40a2 /sql/hive
parentc1ea3afb516c204925259f0928dfb17d0fa89621 (diff)
downloadspark-b8f534196f9a8c99f75728a06e62282d139dee28.tar.gz
spark-b8f534196f9a8c99f75728a06e62282d139dee28.tar.bz2
spark-b8f534196f9a8c99f75728a06e62282d139dee28.zip
[SQL] SPARK-1333 First draft of java API
WIP: Some work remains... * [x] Hive support * [x] Tests * [x] Update docs Feedback welcome! Author: Michael Armbrust <michael@databricks.com> Closes #248 from marmbrus/javaSchemaRDD and squashes the following commits: b393913 [Michael Armbrust] @srowen 's java style suggestions. f531eb1 [Michael Armbrust] Address matei's comments. 33a1b1a [Michael Armbrust] Ignore JavaHiveSuite. 822f626 [Michael Armbrust] improve docs. ab91750 [Michael Armbrust] Improve Java SQL API: * Change JavaRow => Row * Add support for querying RDDs of JavaBeans * Docs * Tests * Hive support 0b859c8 [Michael Armbrust] First draft of java API.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala42
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveSuite.scala41
3 files changed, 95 insertions, 0 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 46febbfad0..ff8eaacded 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -71,6 +71,18 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
override def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }
+ /**
+ * Executes a query expressed in HiveQL using Spark, returning the result as a SchemaRDD.
+ */
+ def hql(hqlQuery: String): SchemaRDD = {
+ val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))
+ // We force query optimization to happen right away instead of letting it happen lazily like
+ // when using the query DSL. This is so DDL commands behave as expected. This is only
+ // generates the RDD lineage for DML queries, but do not perform any execution.
+ result.queryExecution.toRdd
+ result
+ }
+
// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
@transient
protected val outputBuffer = new java.io.OutputStream {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala
new file mode 100644
index 0000000000..6df76fa825
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.api.java
+
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.api.java.{JavaSQLContext, JavaSchemaRDD}
+import org.apache.spark.sql.hive.{HiveContext, HiveQl}
+
+/**
+ * The entry point for executing Spark SQL queries from a Java program.
+ */
+class JavaHiveContext(sparkContext: JavaSparkContext) extends JavaSQLContext(sparkContext) {
+
+ override val sqlContext = new HiveContext(sparkContext)
+
+ /**
+ * Executes a query expressed in HiveQL, returning the result as a JavaSchemaRDD.
+ */
+ def hql(hqlQuery: String): JavaSchemaRDD = {
+ val result = new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery))
+ // We force query optimization to happen right away instead of letting it happen lazily like
+ // when using the query DSL. This is so DDL commands behave as expected. This is only
+ // generates the RDD lineage for DML queries, but do not perform any execution.
+ result.queryExecution.toRdd
+ result
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveSuite.scala
new file mode 100644
index 0000000000..8137f99b22
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveSuite.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.api.java
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.hive.TestHive
+
+// Implicits
+import scala.collection.JavaConversions._
+
+class JavaHiveSQLSuite extends FunSuite {
+ ignore("SELECT * FROM src") {
+ val javaCtx = new JavaSparkContext(TestSQLContext.sparkContext)
+ // There is a little trickery here to avoid instantiating two HiveContexts in the same JVM
+ val javaSqlCtx = new JavaHiveContext(javaCtx) {
+ override val sqlContext = TestHive
+ }
+
+ assert(
+ javaSqlCtx.hql("SELECT * FROM src").collect().map(_.getInt(0)) ===
+ TestHive.sql("SELECT * FROM src").collect().map(_.getInt(0)).toSeq)
+ }
+}