aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-07-27 13:03:38 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-07-27 13:03:38 -0700
commitf6ff2a61d00d12481bfb211ae13d6992daacdcc2 (patch)
treed3dea56612ddadcdd51f70c10e44f9078fd2aeff /sql/core
parent2bbf235376f40a4b95d7e6e42e1bed893c124ecb (diff)
downloadspark-f6ff2a61d00d12481bfb211ae13d6992daacdcc2.tar.gz
spark-f6ff2a61d00d12481bfb211ae13d6992daacdcc2.tar.bz2
spark-f6ff2a61d00d12481bfb211ae13d6992daacdcc2.zip
[SPARK-2410][SQL] Merging Hive Thrift/JDBC server
(This is a replacement of #1399, trying to fix potential `HiveThriftServer2` port collision between parallel builds. Please refer to [these comments](https://github.com/apache/spark/pull/1399#issuecomment-50212572) for details.) JIRA issue: [SPARK-2410](https://issues.apache.org/jira/browse/SPARK-2410) Merging the Hive Thrift/JDBC server from [branch-1.0-jdbc](https://github.com/apache/spark/tree/branch-1.0-jdbc). Thanks chenghao-intel for his initial contribution of the Spark SQL CLI. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #1600 from liancheng/jdbc and squashes the following commits: ac4618b [Cheng Lian] Uses random port for HiveThriftServer2 to avoid collision with parallel builds 090beea [Cheng Lian] Revert changes related to SPARK-2678, decided to move them to another PR 21c6cf4 [Cheng Lian] Updated Spark SQL programming guide docs fe0af31 [Cheng Lian] Reordered spark-submit options in spark-shell[.cmd] 199e3fb [Cheng Lian] Disabled MIMA for hive-thriftserver 1083e9d [Cheng Lian] Fixed failed test suites 7db82a1 [Cheng Lian] Fixed spark-submit application options handling logic 9cc0f06 [Cheng Lian] Starts beeline with spark-submit cfcf461 [Cheng Lian] Updated documents and build scripts for the newly added hive-thriftserver profile 061880f [Cheng Lian] Addressed all comments by @pwendell 7755062 [Cheng Lian] Adapts test suites to spark-submit settings 40bafef [Cheng Lian] Fixed more license header issues e214aab [Cheng Lian] Added missing license headers b8905ba [Cheng Lian] Fixed minor issues in spark-sql and start-thriftserver.sh f975d22 [Cheng Lian] Updated docs for Hive compatibility and Shark migration guide draft 3ad4e75 [Cheng Lian] Starts spark-sql shell with spark-submit a5310d1 [Cheng Lian] Make HiveThriftServer2 play well with spark-submit 61f39f4 [Cheng Lian] Starts Hive Thrift server via spark-submit 2c4c539 [Cheng Lian] Cherry picked the Hive Thrift server
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/pom.xml2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala42
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala13
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala10
5 files changed, 64 insertions, 23 deletions
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index c309c43804..3a038a2db6 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -32,7 +32,7 @@
<name>Spark Project SQL</name>
<url>http://spark.apache.org/</url>
<properties>
- <sbt.project.name>sql</sbt.project.name>
+ <sbt.project.name>sql</sbt.project.name>
</properties>
<dependencies>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 2b787e14f3..41920c00b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -30,12 +30,13 @@ import scala.collection.JavaConverters._
* SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads).
*/
trait SQLConf {
+ import SQLConf._
/** ************************ Spark SQL Params/Hints ******************* */
// TODO: refactor so that these hints accessors don't pollute the name space of SQLContext?
/** Number of partitions to use for shuffle operators. */
- private[spark] def numShufflePartitions: Int = get("spark.sql.shuffle.partitions", "200").toInt
+ private[spark] def numShufflePartitions: Int = get(SHUFFLE_PARTITIONS, "200").toInt
/**
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
@@ -43,11 +44,10 @@ trait SQLConf {
* effectively disables auto conversion.
* Hive setting: hive.auto.convert.join.noconditionaltask.size.
*/
- private[spark] def autoConvertJoinSize: Int =
- get("spark.sql.auto.convert.join.size", "10000").toInt
+ private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt
/** A comma-separated list of table names marked to be broadcasted during joins. */
- private[spark] def joinBroadcastTables: String = get("spark.sql.join.broadcastTables", "")
+ private[spark] def joinBroadcastTables: String = get(JOIN_BROADCAST_TABLES, "")
/** ********************** SQLConf functionality methods ************ */
@@ -61,7 +61,7 @@ trait SQLConf {
def set(key: String, value: String): Unit = {
require(key != null, "key cannot be null")
- require(value != null, s"value cannot be null for ${key}")
+ require(value != null, s"value cannot be null for $key")
settings.put(key, value)
}
@@ -90,3 +90,13 @@ trait SQLConf {
}
}
+
+object SQLConf {
+ val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size"
+ val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
+ val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables"
+
+ object Deprecated {
+ val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 98d2f89c8a..9293239131 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -17,12 +17,13 @@
package org.apache.spark.sql.execution
+import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SQLConf, SQLContext}
trait Command {
/**
@@ -44,28 +45,53 @@ trait Command {
case class SetCommand(
key: Option[String], value: Option[String], output: Seq[Attribute])(
@transient context: SQLContext)
- extends LeafNode with Command {
+ extends LeafNode with Command with Logging {
- override protected[sql] lazy val sideEffectResult: Seq[(String, String)] = (key, value) match {
+ override protected[sql] lazy val sideEffectResult: Seq[String] = (key, value) match {
// Set value for key k.
case (Some(k), Some(v)) =>
- context.set(k, v)
- Array(k -> v)
+ if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
+ logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
+ s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
+ context.set(SQLConf.SHUFFLE_PARTITIONS, v)
+ Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")
+ } else {
+ context.set(k, v)
+ Array(s"$k=$v")
+ }
// Query the value bound to key k.
case (Some(k), _) =>
- Array(k -> context.getOption(k).getOrElse("<undefined>"))
+ // TODO (lian) This is just a workaround to make the Simba ODBC driver work.
+ // Should remove this once we get the ODBC driver updated.
+ if (k == "-v") {
+ val hiveJars = Seq(
+ "hive-exec-0.12.0.jar",
+ "hive-service-0.12.0.jar",
+ "hive-common-0.12.0.jar",
+ "hive-hwi-0.12.0.jar",
+ "hive-0.12.0.jar").mkString(":")
+
+ Array(
+ "system:java.class.path=" + hiveJars,
+ "system:sun.java.command=shark.SharkServer2")
+ }
+ else {
+ Array(s"$k=${context.getOption(k).getOrElse("<undefined>")}")
+ }
// Query all key-value pairs that are set in the SQLConf of the context.
case (None, None) =>
- context.getAll
+ context.getAll.map { case (k, v) =>
+ s"$k=$v"
+ }
case _ =>
throw new IllegalArgumentException()
}
def execute(): RDD[Row] = {
- val rows = sideEffectResult.map { case (k, v) => new GenericRow(Array[Any](k, v)) }
+ val rows = sideEffectResult.map { line => new GenericRow(Array[Any](line)) }
context.sparkContext.parallelize(rows, 1)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
index 08293f7f0c..1a58d73d9e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
@@ -54,10 +54,10 @@ class SQLConfSuite extends QueryTest {
assert(get(testKey, testVal + "_") == testVal)
assert(TestSQLContext.get(testKey, testVal + "_") == testVal)
- sql("set mapred.reduce.tasks=20")
- assert(get("mapred.reduce.tasks", "0") == "20")
- sql("set mapred.reduce.tasks = 40")
- assert(get("mapred.reduce.tasks", "0") == "40")
+ sql("set some.property=20")
+ assert(get("some.property", "0") == "20")
+ sql("set some.property = 40")
+ assert(get("some.property", "0") == "40")
val key = "spark.sql.key"
val vs = "val0,val_1,val2.3,my_table"
@@ -70,4 +70,9 @@ class SQLConfSuite extends QueryTest {
clear()
}
+ test("deprecated property") {
+ clear()
+ sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10")
+ assert(get(SQLConf.SHUFFLE_PARTITIONS) == "10")
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 6736189c96..de9e8aa4f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -424,25 +424,25 @@ class SQLQuerySuite extends QueryTest {
sql(s"SET $testKey=$testVal")
checkAnswer(
sql("SET"),
- Seq(Seq(testKey, testVal))
+ Seq(Seq(s"$testKey=$testVal"))
)
sql(s"SET ${testKey + testKey}=${testVal + testVal}")
checkAnswer(
sql("set"),
Seq(
- Seq(testKey, testVal),
- Seq(testKey + testKey, testVal + testVal))
+ Seq(s"$testKey=$testVal"),
+ Seq(s"${testKey + testKey}=${testVal + testVal}"))
)
// "set key"
checkAnswer(
sql(s"SET $testKey"),
- Seq(Seq(testKey, testVal))
+ Seq(Seq(s"$testKey=$testVal"))
)
checkAnswer(
sql(s"SET $nonexistentKey"),
- Seq(Seq(nonexistentKey, "<undefined>"))
+ Seq(Seq(s"$nonexistentKey=<undefined>"))
)
clear()
}