diff options
author | Michael Armbrust <michael@databricks.com> | 2014-04-03 15:45:34 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-04-03 15:45:34 -0700 |
commit | b8f534196f9a8c99f75728a06e62282d139dee28 (patch) | |
tree | 426e03d88695719bc58ff6f0aaa4794bd2fb40a2 /sql/hive | |
parent | c1ea3afb516c204925259f0928dfb17d0fa89621 (diff) | |
download | spark-b8f534196f9a8c99f75728a06e62282d139dee28.tar.gz spark-b8f534196f9a8c99f75728a06e62282d139dee28.tar.bz2 spark-b8f534196f9a8c99f75728a06e62282d139dee28.zip |
[SQL] SPARK-1333 First draft of java API
WIP: Some work remains...
* [x] Hive support
* [x] Tests
* [x] Update docs
Feedback welcome!
Author: Michael Armbrust <michael@databricks.com>
Closes #248 from marmbrus/javaSchemaRDD and squashes the following commits:
b393913 [Michael Armbrust] @srowen 's java style suggestions.
f531eb1 [Michael Armbrust] Address matei's comments.
33a1b1a [Michael Armbrust] Ignore JavaHiveSuite.
822f626 [Michael Armbrust] improve docs.
ab91750 [Michael Armbrust] Improve Java SQL API: * Change JavaRow => Row * Add support for querying RDDs of JavaBeans * Docs * Tests * Hive support
0b859c8 [Michael Armbrust] First draft of java API.
Diffstat (limited to 'sql/hive')
3 files changed, 95 insertions, 0 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 46febbfad0..ff8eaacded 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -71,6 +71,18 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { override def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution { val logical = plan } + /** + * Executes a query expressed in HiveQL using Spark, returning the result as a SchemaRDD. + */ + def hql(hqlQuery: String): SchemaRDD = { + val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery)) + // We force query optimization to happen right away instead of letting it happen lazily like + // when using the query DSL. This is so DDL commands behave as expected. This is only + // generates the RDD lineage for DML queries, but do not perform any execution. + result.queryExecution.toRdd + result + } + // Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur. @transient protected val outputBuffer = new java.io.OutputStream { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala new file mode 100644 index 0000000000..6df76fa825 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.api.java + +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.api.java.{JavaSQLContext, JavaSchemaRDD} +import org.apache.spark.sql.hive.{HiveContext, HiveQl} + +/** + * The entry point for executing Spark SQL queries from a Java program. + */ +class JavaHiveContext(sparkContext: JavaSparkContext) extends JavaSQLContext(sparkContext) { + + override val sqlContext = new HiveContext(sparkContext) + + /** + * Executes a query expressed in HiveQL, returning the result as a JavaSchemaRDD. + */ + def hql(hqlQuery: String): JavaSchemaRDD = { + val result = new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery)) + // We force query optimization to happen right away instead of letting it happen lazily like + // when using the query DSL. This is so DDL commands behave as expected. This is only + // generates the RDD lineage for DML queries, but do not perform any execution. + result.queryExecution.toRdd + result + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveSuite.scala new file mode 100644 index 0000000000..8137f99b22 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/api/java/JavaHiveSuite.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.api.java + +import org.scalatest.FunSuite + +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.hive.TestHive + +// Implicits +import scala.collection.JavaConversions._ + +class JavaHiveSQLSuite extends FunSuite { + ignore("SELECT * FROM src") { + val javaCtx = new JavaSparkContext(TestSQLContext.sparkContext) + // There is a little trickery here to avoid instantiating two HiveContexts in the same JVM + val javaSqlCtx = new JavaHiveContext(javaCtx) { + override val sqlContext = TestHive + } + + assert( + javaSqlCtx.hql("SELECT * FROM src").collect().map(_.getInt(0)) === + TestHive.sql("SELECT * FROM src").collect().map(_.getInt(0)).toSeq) + } +} |