aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-07-28 12:07:30 -0700
committerMichael Armbrust <michael@databricks.com>2014-07-28 12:07:30 -0700
commita7a9d14479ea6421513a962ff0f45cb969368bab (patch)
tree76c4ec208e31cabea38e6e2ec6deaa7667dbf46a /sql/core/src/main
parent255b56f9f530e8594a7e6055ae07690454c66799 (diff)
downloadspark-a7a9d14479ea6421513a962ff0f45cb969368bab.tar.gz
spark-a7a9d14479ea6421513a962ff0f45cb969368bab.tar.bz2
spark-a7a9d14479ea6421513a962ff0f45cb969368bab.zip
[SPARK-2410][SQL] Merging Hive Thrift/JDBC server (with Maven profile fix)
JIRA issue: [SPARK-2410](https://issues.apache.org/jira/browse/SPARK-2410) Another try for #1399 & #1600. Those two PR breaks Jenkins builds because we made a separate profile `hive-thriftserver` in sub-project `assembly`, but the `hive-thriftserver` module is defined outside the `hive-thriftserver` profile. Thus every time a pull request that doesn't touch SQL code will also execute test suites defined in `hive-thriftserver`, but tests fail because related .class files are not included in the assembly jar. In the most recent commit, module `hive-thriftserver` is moved into its own profile to fix this problem. All previous commits are squashed for clarity. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #1620 from liancheng/jdbc-with-maven-fix and squashes the following commits: 629988e [Cheng Lian] Moved hive-thriftserver module definition into its own profile ec3c7a7 [Cheng Lian] Cherry picked the Hive Thrift server
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala42
2 files changed, 49 insertions, 13 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 2b787e14f3..41920c00b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -30,12 +30,13 @@ import scala.collection.JavaConverters._
* SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads).
*/
trait SQLConf {
+ import SQLConf._
/** ************************ Spark SQL Params/Hints ******************* */
// TODO: refactor so that these hints accessors don't pollute the name space of SQLContext?
/** Number of partitions to use for shuffle operators. */
- private[spark] def numShufflePartitions: Int = get("spark.sql.shuffle.partitions", "200").toInt
+ private[spark] def numShufflePartitions: Int = get(SHUFFLE_PARTITIONS, "200").toInt
/**
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
@@ -43,11 +44,10 @@ trait SQLConf {
* effectively disables auto conversion.
* Hive setting: hive.auto.convert.join.noconditionaltask.size.
*/
- private[spark] def autoConvertJoinSize: Int =
- get("spark.sql.auto.convert.join.size", "10000").toInt
+ private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt
/** A comma-separated list of table names marked to be broadcasted during joins. */
- private[spark] def joinBroadcastTables: String = get("spark.sql.join.broadcastTables", "")
+ private[spark] def joinBroadcastTables: String = get(JOIN_BROADCAST_TABLES, "")
/** ********************** SQLConf functionality methods ************ */
@@ -61,7 +61,7 @@ trait SQLConf {
def set(key: String, value: String): Unit = {
require(key != null, "key cannot be null")
- require(value != null, s"value cannot be null for ${key}")
+ require(value != null, s"value cannot be null for $key")
settings.put(key, value)
}
@@ -90,3 +90,13 @@ trait SQLConf {
}
}
+
+object SQLConf {
+ val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size"
+ val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
+ val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables"
+
+ object Deprecated {
+ val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 98d2f89c8a..9293239131 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -17,12 +17,13 @@
package org.apache.spark.sql.execution
+import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SQLConf, SQLContext}
trait Command {
/**
@@ -44,28 +45,53 @@ trait Command {
case class SetCommand(
key: Option[String], value: Option[String], output: Seq[Attribute])(
@transient context: SQLContext)
- extends LeafNode with Command {
+ extends LeafNode with Command with Logging {
- override protected[sql] lazy val sideEffectResult: Seq[(String, String)] = (key, value) match {
+ override protected[sql] lazy val sideEffectResult: Seq[String] = (key, value) match {
// Set value for key k.
case (Some(k), Some(v)) =>
- context.set(k, v)
- Array(k -> v)
+ if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
+ logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
+ s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
+ context.set(SQLConf.SHUFFLE_PARTITIONS, v)
+ Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")
+ } else {
+ context.set(k, v)
+ Array(s"$k=$v")
+ }
// Query the value bound to key k.
case (Some(k), _) =>
- Array(k -> context.getOption(k).getOrElse("<undefined>"))
+ // TODO (lian) This is just a workaround to make the Simba ODBC driver work.
+ // Should remove this once we get the ODBC driver updated.
+ if (k == "-v") {
+ val hiveJars = Seq(
+ "hive-exec-0.12.0.jar",
+ "hive-service-0.12.0.jar",
+ "hive-common-0.12.0.jar",
+ "hive-hwi-0.12.0.jar",
+ "hive-0.12.0.jar").mkString(":")
+
+ Array(
+ "system:java.class.path=" + hiveJars,
+ "system:sun.java.command=shark.SharkServer2")
+ }
+ else {
+ Array(s"$k=${context.getOption(k).getOrElse("<undefined>")}")
+ }
// Query all key-value pairs that are set in the SQLConf of the context.
case (None, None) =>
- context.getAll
+ context.getAll.map { case (k, v) =>
+ s"$k=$v"
+ }
case _ =>
throw new IllegalArgumentException()
}
def execute(): RDD[Row] = {
- val rows = sideEffectResult.map { case (k, v) => new GenericRow(Array[Any](k, v)) }
+ val rows = sideEffectResult.map { line => new GenericRow(Array[Any](line)) }
context.sparkContext.parallelize(rows, 1)
}