diff options
author | Ahir Reddy <ahirreddy@gmail.com> | 2014-04-15 00:07:55 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-04-15 00:07:55 -0700 |
commit | c99bcb7feaa761c5826f2e1d844d0502a3b79538 (patch) | |
tree | cb136b0fbeaac6268eea2782f5ac9d615aafdb5b /sql | |
parent | 0247b5c5467ca1b0d03ba929a78fa4d805582d84 (diff) | |
download | spark-c99bcb7feaa761c5826f2e1d844d0502a3b79538.tar.gz spark-c99bcb7feaa761c5826f2e1d844d0502a3b79538.tar.bz2 spark-c99bcb7feaa761c5826f2e1d844d0502a3b79538.zip |
SPARK-1374: PySpark API for SparkSQL
An initial API that exposes SparkSQL functionality in PySpark. A PythonRDD composed of dictionaries, with string keys and primitive values (boolean, float, int, long, string) can be converted into a SchemaRDD that supports sql queries.
```
from pyspark.context import SQLContext
sqlCtx = SQLContext(sc)
rdd = sc.parallelize([{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
srdd = sqlCtx.applySchema(rdd)
sqlCtx.registerRDDAsTable(srdd, "table1")
srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1")
srdd2.collect()
```
The last line yields ```[{"f1" : 1, "f2" : "row1"}, {"f1" : 2, "f2": "row2"}, {"f1" : 3, "f2": "row3"}]```
Author: Ahir Reddy <ahirreddy@gmail.com>
Author: Michael Armbrust <michael@databricks.com>
Closes #363 from ahirreddy/pysql and squashes the following commits:
0294497 [Ahir Reddy] Updated log4j properties to supress Hive Warns
307d6e0 [Ahir Reddy] Style fix
6f7b8f6 [Ahir Reddy] Temporary fix MIMA checker. Since we now assemble Spark jar with Hive, we don't want to check the interfaces of all of our hive dependencies
3ef074a [Ahir Reddy] Updated documentation because classes moved to sql.py
29245bf [Ahir Reddy] Cache underlying SchemaRDD instead of generating and caching PythonRDD
f2312c7 [Ahir Reddy] Moved everything into sql.py
a19afe4 [Ahir Reddy] Doc fixes
6d658ba [Ahir Reddy] Remove the metastore directory created by the HiveContext tests in SparkSQL
521ff6d [Ahir Reddy] Trying to get spark to build with hive
ab95eba [Ahir Reddy] Set SPARK_HIVE=true on jenkins
ded03e7 [Ahir Reddy] Added doc test for HiveContext
22de1d4 [Ahir Reddy] Fixed maven pyrolite dependency
e4da06c [Ahir Reddy] Display message if hive is not built into spark
227a0be [Michael Armbrust] Update API links. Fix Hive example.
58e2aa9 [Michael Armbrust] Build Docs for pyspark SQL Api. Minor fixes.
4285340 [Michael Armbrust] Fix building of Hive API Docs.
38a92b0 [Michael Armbrust] Add note to future non-python developers about python docs.
337b201 [Ahir Reddy] Changed com.clearspring.analytics stream version from 2.4.0 to 2.5.1 to match SBT build, and added pyrolite to maven build
40491c9 [Ahir Reddy] PR Changes + Method Visibility
1836944 [Michael Armbrust] Fix comments.
e00980f [Michael Armbrust] First draft of python sql programming guide.
b0192d3 [Ahir Reddy] Added Long, Double and Boolean as usable types + unit test
f98a422 [Ahir Reddy] HiveContexts
79621cf [Ahir Reddy] cleaning up cruft
b406ba0 [Ahir Reddy] doctest formatting
20936a5 [Ahir Reddy] Added tests and documentation
e4d21b4 [Ahir Reddy] Added pyrolite dependency
79f739d [Ahir Reddy] added more tests
7515ba0 [Ahir Reddy] added more tests :)
d26ec5e [Ahir Reddy] added test
e9f5b8d [Ahir Reddy] adding tests
906d180 [Ahir Reddy] added todo explaining cost of creating Row object in python
251f99d [Ahir Reddy] for now only allow dictionaries as input
09b9980 [Ahir Reddy] made jrdd explicitly lazy
c608947 [Ahir Reddy] SchemaRDD now has all RDD operations
725c91e [Ahir Reddy] awesome row objects
55d1c76 [Ahir Reddy] return row objects
4fe1319 [Ahir Reddy] output dictionaries correctly
be079de [Ahir Reddy] returning dictionaries works
cd5f79f [Ahir Reddy] Switched to using Scala SQLContext
e948bd9 [Ahir Reddy] yippie
4886052 [Ahir Reddy] even better
c0fb1c6 [Ahir Reddy] more working
043ca85 [Ahir Reddy] working
5496f9f [Ahir Reddy] doesn't crash
b8b904b [Ahir Reddy] Added schema rdd class
67ba875 [Ahir Reddy] java to python, and python to java
bcc0f23 [Ahir Reddy] Java to python
ab6025d [Ahir Reddy] compiling
Diffstat (limited to 'sql')
4 files changed, 55 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index d3d4c56baf..24d60ea074 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.dsl import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor @@ -241,4 +242,30 @@ class SQLContext(@transient val sparkContext: SparkContext) */ def debugExec() = DebugQuery(executedPlan).execute().collect() } + + /** + * Peek at the first row of the RDD and infer its schema. + * TODO: We only support primitive types, add support for nested types. + */ + private[sql] def inferSchema(rdd: RDD[Map[String, _]]): SchemaRDD = { + val schema = rdd.first.map { case (fieldName, obj) => + val dataType = obj.getClass match { + case c: Class[_] if c == classOf[java.lang.String] => StringType + case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType + case c: Class[_] if c == classOf[java.lang.Long] => LongType + case c: Class[_] if c == classOf[java.lang.Double] => DoubleType + case c: Class[_] if c == classOf[java.lang.Boolean] => BooleanType + case c => throw new Exception(s"Object of type $c cannot be used") + } + AttributeReference(fieldName, dataType, true)() + }.toSeq + + val rowRdd = rdd.mapPartitions { iter => + iter.map { map => + new GenericRow(map.values.toArray.asInstanceOf[Array[Any]]): Row + } + } + new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd))) + } + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 91500416ee..a771147f90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import net.razorvine.pickle.Pickler + import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext} import org.apache.spark.annotation.{AlphaComponent, Experimental} import org.apache.spark.rdd.RDD @@ -25,6 +27,8 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.types.BooleanType +import org.apache.spark.api.java.JavaRDD +import java.util.{Map => JMap} /** * :: AlphaComponent :: @@ -308,4 +312,23 @@ class SchemaRDD( /** FOR INTERNAL USE ONLY */ def analyze = sqlContext.analyzer(logicalPlan) + + private[sql] def javaToPython: JavaRDD[Array[Byte]] = { + val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name) + this.mapPartitions { iter => + val pickle = new Pickler + iter.map { row => + val map: JMap[String, Any] = new java.util.HashMap + // TODO: We place the map in an ArrayList so that the object is pickled to a List[Dict]. + // Ideally we should be able to pickle an object directly into a Python collection so we + // don't have to create an ArrayList every time. + val arr: java.util.ArrayList[Any] = new java.util.ArrayList + row.zip(fieldNames).foreach { case (obj, name) => + map.put(name, obj) + } + arr.add(map) + pickle.dumps(arr) + } + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 465e5f146f..444bbfb4dd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -261,8 +261,9 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { testTables.get(name).map(_.commands).getOrElse(sys.error(s"Unknown test table $name")) createCmds.foreach(_()) - if (cacheTables) + if (cacheTables) { cacheTable(name) + } } } diff --git a/sql/hive/src/test/resources/log4j.properties b/sql/hive/src/test/resources/log4j.properties index 5e17e3b596..c07d8fedf1 100644 --- a/sql/hive/src/test/resources/log4j.properties +++ b/sql/hive/src/test/resources/log4j.properties @@ -45,3 +45,6 @@ log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF log4j.additivity.hive.ql.metadata.Hive=false log4j.logger.hive.ql.metadata.Hive=OFF +log4j.additivity.org.apache.hadoop.hive.ql.io.RCFile=false +log4j.logger.org.apache.hadoop.hive.ql.io.RCFile=ERROR + |