aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-01-27 16:08:24 -0800
committerReynold Xin <rxin@databricks.com>2015-01-27 16:08:24 -0800
commit119f45d61d7b48d376cca05e1b4f0c7fcf65bfa8 (patch)
tree714df6362313e93bee0e9dba2f84b3ba1697e555 /sql/hive
parentb1b35ca2e440df40b253bf967bb93705d355c1c0 (diff)
downloadspark-119f45d61d7b48d376cca05e1b4f0c7fcf65bfa8.tar.gz
spark-119f45d61d7b48d376cca05e1b4f0c7fcf65bfa8.tar.bz2
spark-119f45d61d7b48d376cca05e1b4f0c7fcf65bfa8.zip
[SPARK-5097][SQL] DataFrame
This pull request redesigns the existing Spark SQL dsl, which already provides data frame like functionalities. TODOs: With the exception of Python support, other tasks can be done in separate, follow-up PRs. - [ ] Audit of the API - [ ] Documentation - [ ] More test cases to cover the new API - [x] Python support - [ ] Type alias SchemaRDD Author: Reynold Xin <rxin@databricks.com> Author: Davies Liu <davies@databricks.com> Closes #4173 from rxin/df1 and squashes the following commits: 0a1a73b [Reynold Xin] Merge branch 'df1' of github.com:rxin/spark into df1 23b4427 [Reynold Xin] Mima. 828f70d [Reynold Xin] Merge pull request #7 from davies/df 257b9e6 [Davies Liu] add repartition 6bf2b73 [Davies Liu] fix collect with UDT and tests e971078 [Reynold Xin] Missing quotes. b9306b4 [Reynold Xin] Remove removeColumn/updateColumn for now. a728bf2 [Reynold Xin] Example rename. e8aa3d3 [Reynold Xin] groupby -> groupBy. 9662c9e [Davies Liu] improve DataFrame Python API 4ae51ea [Davies Liu] python API for dataframe 1e5e454 [Reynold Xin] Fixed a bug with symbol conversion. 2ca74db [Reynold Xin] Couple minor fixes. ea98ea1 [Reynold Xin] Documentation & literal expressions. 2b22684 [Reynold Xin] Got rid of IntelliJ problems. 02bbfbc [Reynold Xin] Tightening imports. ffbce66 [Reynold Xin] Fixed compilation error. 59b6d8b [Reynold Xin] Style violation. b85edfb [Reynold Xin] ALS. 8c37f0a [Reynold Xin] Made MLlib and examples compile 6d53134 [Reynold Xin] Hive module. d35efd5 [Reynold Xin] Fixed compilation error. ce4a5d2 [Reynold Xin] Fixed test cases in SQL except ParquetIOSuite. 66d5ef1 [Reynold Xin] SQLContext minor patch. c9bcdc0 [Reynold Xin] Checkpoint: SQL module compiles!
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala17
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala7
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala11
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala2
9 files changed, 37 insertions, 34 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 9d2cfd8e0d..b746942cb1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -64,15 +64,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true"
override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
- new this.QueryExecution { val logical = plan }
+ new this.QueryExecution(plan)
- override def sql(sqlText: String): SchemaRDD = {
+ override def sql(sqlText: String): DataFrame = {
val substituted = new VariableSubstitution().substitute(hiveconf, sqlText)
// TODO: Create a framework for registering parsers instead of just hardcoding if statements.
if (conf.dialect == "sql") {
super.sql(substituted)
} else if (conf.dialect == "hiveql") {
- new SchemaRDD(this, ddlParser(sqlText, false).getOrElse(HiveQl.parseSql(substituted)))
+ new DataFrame(this, ddlParser(sqlText, false).getOrElse(HiveQl.parseSql(substituted)))
} else {
sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'")
}
@@ -352,7 +352,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
override protected[sql] val planner = hivePlanner
/** Extends QueryExecution with hive specific features. */
- protected[sql] abstract class QueryExecution extends super.QueryExecution {
+ protected[sql] class QueryExecution(logicalPlan: LogicalPlan)
+ extends super.QueryExecution(logicalPlan) {
/**
* Returns the result as a hive compatible sequence of strings. For native commands, the
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 6952b126cf..ace9329cd5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive
import scala.collection.JavaConversions._
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.{SQLContext, SchemaRDD, Strategy}
+import org.apache.spark.sql.{Column, DataFrame, SQLContext, Strategy}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
@@ -55,16 +55,15 @@ private[hive] trait HiveStrategies {
*/
@Experimental
object ParquetConversion extends Strategy {
- implicit class LogicalPlanHacks(s: SchemaRDD) {
- def lowerCase =
- new SchemaRDD(s.sqlContext, s.logicalPlan)
+ implicit class LogicalPlanHacks(s: DataFrame) {
+ def lowerCase = new DataFrame(s.sqlContext, s.logicalPlan)
def addPartitioningAttributes(attrs: Seq[Attribute]) = {
// Don't add the partitioning key if its already present in the data.
if (attrs.map(_.name).toSet.subsetOf(s.logicalPlan.output.map(_.name).toSet)) {
s
} else {
- new SchemaRDD(
+ new DataFrame(
s.sqlContext,
s.logicalPlan transform {
case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
@@ -97,13 +96,13 @@ private[hive] trait HiveStrategies {
// We are going to throw the predicates and projection back at the whole optimization
// sequence so lets unresolve all the attributes, allowing them to be rebound to the
// matching parquet attributes.
- val unresolvedOtherPredicates = otherPredicates.map(_ transform {
+ val unresolvedOtherPredicates = new Column(otherPredicates.map(_ transform {
case a: AttributeReference => UnresolvedAttribute(a.name)
- }).reduceOption(And).getOrElse(Literal(true))
+ }).reduceOption(And).getOrElse(Literal(true)))
- val unresolvedProjection = projectList.map(_ transform {
+ val unresolvedProjection: Seq[Column] = projectList.map(_ transform {
case a: AttributeReference => UnresolvedAttribute(a.name)
- })
+ }).map(new Column(_))
try {
if (relation.hiveQlTable.isPartitioned) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index 47431cef03..8e70ae8f56 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -99,7 +99,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
override def runSqlHive(sql: String): Seq[String] = super.runSqlHive(rewritePaths(sql))
override def executePlan(plan: LogicalPlan): this.QueryExecution =
- new this.QueryExecution { val logical = plan }
+ new this.QueryExecution(plan)
/** Fewer partitions to speed up testing. */
protected[sql] override lazy val conf: SQLConf = new SQLConf {
@@ -150,8 +150,8 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
val describedTable = "DESCRIBE (\\w+)".r
- protected[hive] class HiveQLQueryExecution(hql: String) extends this.QueryExecution {
- lazy val logical = HiveQl.parseSql(hql)
+ protected[hive] class HiveQLQueryExecution(hql: String)
+ extends this.QueryExecution(HiveQl.parseSql(hql)) {
def hiveExec() = runSqlHive(hql)
override def toString = hql + "\n" + super.toString
}
@@ -159,7 +159,8 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
/**
* Override QueryExecution with special debug workflow.
*/
- abstract class QueryExecution extends super.QueryExecution {
+ class QueryExecution(logicalPlan: LogicalPlan)
+ extends super.QueryExecution(logicalPlan) {
override lazy val analyzed = {
val describedTables = logical match {
case HiveNativeCommand(describedTable(tbl)) => tbl :: Nil
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
index f320d732fb..ba39129388 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -36,12 +36,12 @@ class QueryTest extends PlanTest {
/**
* Runs the plan and makes sure the answer contains all of the keywords, or the
* none of keywords are listed in the answer
- * @param rdd the [[SchemaRDD]] to be executed
+ * @param rdd the [[DataFrame]] to be executed
* @param exists true for make sure the keywords are listed in the output, otherwise
* to make sure none of the keyword are not listed in the output
* @param keywords keyword in string array
*/
- def checkExistence(rdd: SchemaRDD, exists: Boolean, keywords: String*) {
+ def checkExistence(rdd: DataFrame, exists: Boolean, keywords: String*) {
val outputs = rdd.collect().map(_.mkString).mkString
for (key <- keywords) {
if (exists) {
@@ -54,10 +54,10 @@ class QueryTest extends PlanTest {
/**
* Runs the plan and makes sure the answer matches the expected result.
- * @param rdd the [[SchemaRDD]] to be executed
+ * @param rdd the [[DataFrame]] to be executed
* @param expectedAnswer the expected result, can either be an Any, Seq[Product], or Seq[ Seq[Any] ].
*/
- protected def checkAnswer(rdd: SchemaRDD, expectedAnswer: Seq[Row]): Unit = {
+ protected def checkAnswer(rdd: DataFrame, expectedAnswer: Seq[Row]): Unit = {
val isSorted = rdd.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty
def prepareAnswer(answer: Seq[Row]): Seq[Row] = {
// Converts data to types that we can do equality comparison using Scala collections.
@@ -101,7 +101,7 @@ class QueryTest extends PlanTest {
}
}
- protected def checkAnswer(rdd: SchemaRDD, expectedAnswer: Row): Unit = {
+ protected def checkAnswer(rdd: DataFrame, expectedAnswer: Row): Unit = {
checkAnswer(rdd, Seq(expectedAnswer))
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index f95a6b43af..61e5117fea 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
-import org.apache.spark.sql.{QueryTest, SchemaRDD}
+import org.apache.spark.sql.{DataFrame, QueryTest}
import org.apache.spark.storage.RDDBlockId
class CachedTableSuite extends QueryTest {
@@ -28,7 +28,7 @@ class CachedTableSuite extends QueryTest {
* Throws a test failed exception when the number of cached tables differs from the expected
* number.
*/
- def assertCached(query: SchemaRDD, numCachedTables: Int = 1): Unit = {
+ def assertCached(query: DataFrame, numCachedTables: Int = 1): Unit = {
val planWithCaching = query.queryExecution.withCachedData
val cachedData = planWithCaching collect {
case cached: InMemoryRelation => cached
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 0e6636d38e..5775d83fcb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -52,7 +52,7 @@ class InsertIntoHiveTableSuite extends QueryTest {
// Make sure the table has been updated.
checkAnswer(
sql("SELECT * FROM createAndInsertTest"),
- testData.toSchemaRDD.collect().toSeq ++ testData.toSchemaRDD.collect().toSeq
+ testData.toDF.collect().toSeq ++ testData.toDF.collect().toSeq
)
// Now overwrite.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index df72be7746..d67b00bc9d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -27,11 +27,12 @@ import scala.util.Try
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.spark.{SparkFiles, SparkException}
+import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.plans.logical.Project
+import org.apache.spark.sql.dsl._
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
-import org.apache.spark.sql.{SQLConf, Row, SchemaRDD}
case class TestData(a: Int, b: String)
@@ -473,7 +474,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
}
}
- def isExplanation(result: SchemaRDD) = {
+ def isExplanation(result: DataFrame) = {
val explanation = result.select('plan).collect().map { case Row(plan: String) => plan }
explanation.contains("== Physical Plan ==")
}
@@ -842,7 +843,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
val testVal = "test.val.0"
val nonexistentKey = "nonexistent"
val KV = "([^=]+)=([^=]*)".r
- def collectResults(rdd: SchemaRDD): Set[(String, String)] =
+ def collectResults(rdd: DataFrame): Set[(String, String)] =
rdd.collect().map {
case Row(key: String, value: String) => key -> value
case Row(KV(key, value)) => key -> value
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
index 16f77a438e..a081227b4e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
@@ -17,9 +17,10 @@
package org.apache.spark.sql.hive.execution
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.dsl._
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
-import org.apache.spark.sql.Row
import org.apache.spark.util.Utils
@@ -82,10 +83,10 @@ class HiveTableScanSuite extends HiveComparisonTest {
sql("create table spark_4959 (col1 string)")
sql("""insert into table spark_4959 select "hi" from src limit 1""")
table("spark_4959").select(
- 'col1.as('CaseSensitiveColName),
- 'col1.as('CaseSensitiveColName2)).registerTempTable("spark_4959_2")
+ 'col1.as("CaseSensitiveColName"),
+ 'col1.as("CaseSensitiveColName2")).registerTempTable("spark_4959_2")
- assert(sql("select CaseSensitiveColName from spark_4959_2").first() === Row("hi"))
- assert(sql("select casesensitivecolname from spark_4959_2").first() === Row("hi"))
+ assert(sql("select CaseSensitiveColName from spark_4959_2").head() === Row("hi"))
+ assert(sql("select casesensitivecolname from spark_4959_2").head() === Row("hi"))
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
index f2374a2152..dd0df1a9f6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
@@ -58,7 +58,7 @@ class HiveUdfSuite extends QueryTest {
| getStruct(1).f3,
| getStruct(1).f4,
| getStruct(1).f5 FROM src LIMIT 1
- """.stripMargin).first() === Row(1, 2, 3, 4, 5))
+ """.stripMargin).head() === Row(1, 2, 3, 4, 5))
}
test("SPARK-4785 When called with arguments referring column fields, PMOD throws NPE") {