From d5da81cdd1c330b125282a39bbca040fbb6c7dda Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Tue, 10 Jun 2014 00:49:09 -0700 Subject: [SPARK-1508][SQL] Add SQLConf to SQLContext. This PR (1) introduces a new class SQLConf that stores key-value properties for a SQLContext (2) clean up the semantics of various forms of SET commands. The SQLConf class unlocks user-controllable optimization opportunities; for example, user can now override the number of partitions used during an Exchange. A SQLConf can be accessed and modified programmatically through its getters and setters. It can also be modified through SET commands executed by `sql()` or `hql()`. Note that users now have the ability to change a particular property for different queries inside the same Spark job, unlike settings configured in SparkConf. For SET commands: "SET" will return all properties currently set in a SQLConf, "SET key" will return the key-value pair (if set) or an undefined message, and "SET key=value" will call the setter on SQLConf, and if a HiveContext is used, it will be executed in Hive as well. Author: Zongheng Yang Closes #956 from concretevitamin/sqlconf and squashes the following commits: 4968c11 [Zongheng Yang] Very minor cleanup. d74dde5 [Zongheng Yang] Remove the redundant mkQueryExecution() method. c129b86 [Zongheng Yang] Merge remote-tracking branch 'upstream/master' into sqlconf 26c40eb [Zongheng Yang] Make SQLConf a trait and have SQLContext mix it in. dd19666 [Zongheng Yang] Update a comment. baa5d29 [Zongheng Yang] Remove default param for shuffle partitions accessor. 5f7e6d8 [Zongheng Yang] Add default num partitions. 22d9ed7 [Zongheng Yang] Fix output() of Set physical. Add SQLConf param accessor method. e9856c4 [Zongheng Yang] Use java.util.Collections.synchronizedMap on a Java HashMap. 88dd0c8 [Zongheng Yang] Remove redundant SET Keyword. 271f0b1 [Zongheng Yang] Minor change. f8983d1 [Zongheng Yang] Minor changes per review comments. 1ce8a5e [Zongheng Yang] Invoke runSqlHive() in SQLConf#get for the HiveContext case. b766af9 [Zongheng Yang] Remove a test. d52e1bd [Zongheng Yang] De-hardcode number of shuffle partitions for BasicOperators (read from SQLConf). 555599c [Zongheng Yang] Bullet-proof (relatively) parsing SET per review comment. c2067e8 [Zongheng Yang] Mark SQLContext transient and put it in a second param list. 2ea8cdc [Zongheng Yang] Wrap long line. 41d7f09 [Zongheng Yang] Fix imports. 13279e6 [Zongheng Yang] Refactor the logic of eagerly processing SET commands. b14b83e [Zongheng Yang] In a HiveContext, make SQLConf a subset of HiveConf. 6983180 [Zongheng Yang] Move a SET test to SQLQuerySuite and make it complete. 5b67985 [Zongheng Yang] New line at EOF. c651797 [Zongheng Yang] Add commands.scala. efd82db [Zongheng Yang] Clean up semantics of several cases of SET. c1017c2 [Zongheng Yang] WIP in changing SetCommand to take two Options (for different semantics of SETs). 0f00d86 [Zongheng Yang] Add a test for singleton set command in SQL. 41acd75 [Zongheng Yang] Add a test for hql() in HiveQuerySuite. 2276929 [Zongheng Yang] Fix default hive result for set commands in HiveComparisonTest. 3b0c71b [Zongheng Yang] Remove Parser for set commands. A few other fixes. d0c4578 [Zongheng Yang] Tmux typo. 0ecea46 [Zongheng Yang] Changes for HiveQl and HiveContext. ce22d80 [Zongheng Yang] Fix parsing issues. cb722c1 [Zongheng Yang] Finish up SQLConf patch. 4ebf362 [Zongheng Yang] First cut at SQLConf inside SQLContext. (cherry picked from commit 08ed9ad81397b71206c4dc903bfb94b6105691ed) Signed-off-by: Michael Armbrust --- .../org/apache/spark/sql/hive/HiveContext.scala | 73 ++++++++++++--------- .../scala/org/apache/spark/sql/hive/HiveQl.scala | 13 +++- .../sql/hive/execution/HiveComparisonTest.scala | 3 + .../spark/sql/hive/execution/HiveQuerySuite.scala | 75 ++++++++++++++++++++++ 4 files changed, 131 insertions(+), 33 deletions(-) (limited to 'sql/hive/src') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 4b97dc25ac..6497821554 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -18,11 +18,11 @@ package org.apache.spark.sql package hive -import scala.language.implicitConversions - import java.io.{BufferedReader, File, InputStreamReader, PrintStream} import java.util.{ArrayList => JArrayList} +import scala.collection.JavaConversions._ +import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag import org.apache.hadoop.hive.conf.HiveConf @@ -30,20 +30,15 @@ import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog} import org.apache.spark.sql.catalyst.expressions.GenericRow -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema} -import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand, ExplainCommand} -import org.apache.spark.sql.catalyst.ScalaReflection +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.execution._ -/* Implicit conversions */ -import scala.collection.JavaConversions._ - /** * Starts up an instance of hive where metadata is stored locally. An in-process metadata data is * created with data stored in ./metadata. Warehouse data is stored in in ./warehouse. @@ -55,10 +50,9 @@ class LocalHiveContext(sc: SparkContext) extends HiveContext(sc) { /** Sets up the system initially or after a RESET command */ protected def configure() { - // TODO: refactor this so we can work with other databases. - runSqlHive( - s"set javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastorePath;create=true") - runSqlHive("set hive.metastore.warehouse.dir=" + warehousePath) + set("javax.jdo.option.ConnectionURL", + s"jdbc:derby:;databaseName=$metastorePath;create=true") + set("hive.metastore.warehouse.dir", warehousePath) } configure() // Must be called before initializing the catalog below. @@ -129,12 +123,27 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } } + /** + * SQLConf and HiveConf contracts: when the hive session is first initialized, params in + * HiveConf will get picked up by the SQLConf. Additionally, any properties set by + * set() or a SET command inside hql() or sql() will be set in the SQLConf *as well as* + * in the HiveConf. + */ @transient protected[hive] lazy val hiveconf = new HiveConf(classOf[SessionState]) - @transient protected[hive] lazy val sessionState = new SessionState(hiveconf) + @transient protected[hive] lazy val sessionState = { + val ss = new SessionState(hiveconf) + set(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf. + ss + } sessionState.err = new PrintStream(outputBuffer, true, "UTF-8") sessionState.out = new PrintStream(outputBuffer, true, "UTF-8") + override def set(key: String, value: String): Unit = { + super.set(key, value) + runSqlHive(s"SET $key=$value") + } + /* A catalyst metadata catalog that points to the Hive Metastore. */ @transient override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog { @@ -236,30 +245,31 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { @transient override protected[sql] val planner = hivePlanner - @transient - protected lazy val emptyResult = - sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1) - /** Extends QueryExecution with hive specific features. */ protected[sql] abstract class QueryExecution extends super.QueryExecution { // TODO: Create mixin for the analyzer instead of overriding things here. override lazy val optimizedPlan = optimizer(catalog.PreInsertionCasts(catalog.CreateTables(analyzed))) - override lazy val toRdd: RDD[Row] = - analyzed match { - case NativeCommand(cmd) => - val output = runSqlHive(cmd) + override lazy val toRdd: RDD[Row] = { + def processCmd(cmd: String): RDD[Row] = { + val output = runSqlHive(cmd) + if (output.size == 0) { + emptyResult + } else { + val asRows = output.map(r => new GenericRow(r.split("\t").asInstanceOf[Array[Any]])) + sparkContext.parallelize(asRows, 1) + } + } - if (output.size == 0) { - emptyResult - } else { - val asRows = output.map(r => new GenericRow(r.split("\t").asInstanceOf[Array[Any]])) - sparkContext.parallelize(asRows, 1) - } - case _ => - executedPlan.execute().map(_.copy()) + logical match { + case s: SetCommand => eagerlyProcess(s) + case _ => analyzed match { + case NativeCommand(cmd) => processCmd(cmd) + case _ => executedPlan.execute().map(_.copy()) + } } + } protected val primitiveTypes = Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType, @@ -305,7 +315,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { */ def stringResult(): Seq[String] = analyzed match { case NativeCommand(cmd) => runSqlHive(cmd) - case ExplainCommand(plan) => mkQueryExecution(plan).toString.split("\n") + case ExplainCommand(plan) => executePlan(plan).toString.split("\n") case query => val result: Seq[Seq[Any]] = toRdd.collect().toSeq // We need the types so we can output struct field names @@ -318,6 +328,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { override def simpleString: String = logical match { case _: NativeCommand => "" + case _: SetCommand => "" case _ => executedPlan.toString } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index cc9e24a057..4e74d9bc90 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -207,8 +207,17 @@ private[hive] object HiveQl { /** Returns a LogicalPlan for a given HiveQL string. */ def parseSql(sql: String): LogicalPlan = { try { - if (sql.toLowerCase.startsWith("set")) { - NativeCommand(sql) + if (sql.trim.toLowerCase.startsWith("set")) { + // Split in two parts since we treat the part before the first "=" + // as key, and the part after as value, which may contain other "=" signs. + sql.trim.drop(3).split("=", 2).map(_.trim) match { + case Array("") => // "set" + SetCommand(None, None) + case Array(key) => // "set key" + SetCommand(Some(key), None) + case Array(key, value) => // "set key=value" + SetCommand(Some(key), Some(value)) + } } else if (sql.toLowerCase.startsWith("add jar")) { AddJar(sql.drop(8)) } else if (sql.toLowerCase.startsWith("add file")) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 0f954103a8..357c7e654b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -138,6 +138,9 @@ abstract class HiveComparisonTest val orderedAnswer = hiveQuery.logical match { // Clean out non-deterministic time schema info. + // Hack: Hive simply prints the result of a SET command to screen, + // and does not return it as a query answer. + case _: SetCommand => Seq("0") case _: NativeCommand => answer.filterNot(nonDeterministicLine).filterNot(_ == "") case _: ExplainCommand => answer case plan => if (isSorted(plan)) answer else answer.sorted diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index c56eee2580..6c239b02ed 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive.execution +import org.apache.spark.sql.Row import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive @@ -171,4 +172,78 @@ class HiveQuerySuite extends HiveComparisonTest { TestHive.reset() } + test("parse HQL set commands") { + // Adapted from its SQL counterpart. + val testKey = "spark.sql.key.usedfortestonly" + val testVal = "val0,val_1,val2.3,my_table" + + hql(s"set $testKey=$testVal") + assert(get(testKey, testVal + "_") == testVal) + + hql("set mapred.reduce.tasks=20") + assert(get("mapred.reduce.tasks", "0") == "20") + hql("set mapred.reduce.tasks = 40") + assert(get("mapred.reduce.tasks", "0") == "40") + + hql(s"set $testKey=$testVal") + assert(get(testKey, "0") == testVal) + + hql(s"set $testKey=") + assert(get(testKey, "0") == "") + } + + test("SET commands semantics for a HiveContext") { + // Adapted from its SQL counterpart. + val testKey = "spark.sql.key.usedfortestonly" + var testVal = "test.val.0" + val nonexistentKey = "nonexistent" + def fromRows(row: Array[Row]): Array[String] = row.map(_.getString(0)) + + clear() + + // "set" itself returns all config variables currently specified in SQLConf. + assert(hql("set").collect().size == 0) + + // "set key=val" + hql(s"SET $testKey=$testVal") + assert(fromRows(hql("SET").collect()) sameElements Array(s"$testKey=$testVal")) + assert(hiveconf.get(testKey, "") == testVal) + + hql(s"SET ${testKey + testKey}=${testVal + testVal}") + assert(fromRows(hql("SET").collect()) sameElements + Array( + s"$testKey=$testVal", + s"${testKey + testKey}=${testVal + testVal}")) + assert(hiveconf.get(testKey + testKey, "") == testVal + testVal) + + // "set key" + assert(fromRows(hql(s"SET $testKey").collect()) sameElements + Array(s"$testKey=$testVal")) + assert(fromRows(hql(s"SET $nonexistentKey").collect()) sameElements + Array(s"$nonexistentKey is undefined")) + + // Assert that sql() should have the same effects as hql() by repeating the above using sql(). + clear() + assert(sql("set").collect().size == 0) + + sql(s"SET $testKey=$testVal") + assert(fromRows(sql("SET").collect()) sameElements Array(s"$testKey=$testVal")) + assert(hiveconf.get(testKey, "") == testVal) + + sql(s"SET ${testKey + testKey}=${testVal + testVal}") + assert(fromRows(sql("SET").collect()) sameElements + Array( + s"$testKey=$testVal", + s"${testKey + testKey}=${testVal + testVal}")) + assert(hiveconf.get(testKey + testKey, "") == testVal + testVal) + + assert(fromRows(sql(s"SET $testKey").collect()) sameElements + Array(s"$testKey=$testVal")) + assert(fromRows(sql(s"SET $nonexistentKey").collect()) sameElements + Array(s"$nonexistentKey is undefined")) + } + + // Put tests that depend on specific Hive settings before these last two test, + // since they modify /clear stuff. + } -- cgit v1.2.3