aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xbin/beeline29
-rwxr-xr-xbin/spark-sql66
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala39
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala12
-rwxr-xr-xsbin/start-thriftserver.sh50
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala1
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala19
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala23
8 files changed, 164 insertions, 75 deletions
diff --git a/bin/beeline b/bin/beeline
index 09fe366c60..1bda4dba50 100755
--- a/bin/beeline
+++ b/bin/beeline
@@ -17,29 +17,14 @@
# limitations under the License.
#
-# Figure out where Spark is installed
-FWDIR="$(cd `dirname $0`/..; pwd)"
+#
+# Shell script for starting BeeLine
-# Find the java binary
-if [ -n "${JAVA_HOME}" ]; then
- RUNNER="${JAVA_HOME}/bin/java"
-else
- if [ `command -v java` ]; then
- RUNNER="java"
- else
- echo "JAVA_HOME is not set" >&2
- exit 1
- fi
-fi
+# Enter posix mode for bash
+set -o posix
-# Compute classpath using external script
-classpath_output=$($FWDIR/bin/compute-classpath.sh)
-if [[ "$?" != "0" ]]; then
- echo "$classpath_output"
- exit 1
-else
- CLASSPATH=$classpath_output
-fi
+# Figure out where Spark is installed
+FWDIR="$(cd `dirname $0`/..; pwd)"
CLASS="org.apache.hive.beeline.BeeLine"
-exec "$RUNNER" -cp "$CLASSPATH" $CLASS "$@"
+exec "$FWDIR/bin/spark-class" $CLASS "$@"
diff --git a/bin/spark-sql b/bin/spark-sql
index bba7f897b1..61ebd8ab6d 100755
--- a/bin/spark-sql
+++ b/bin/spark-sql
@@ -23,14 +23,72 @@
# Enter posix mode for bash
set -o posix
+CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
+
# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"
-if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
- echo "Usage: ./sbin/spark-sql [options]"
+function usage {
+ echo "Usage: ./sbin/spark-sql [options] [cli option]"
+ pattern="usage"
+ pattern+="\|Spark assembly has been built with Hive"
+ pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
+ pattern+="\|Spark Command: "
+ pattern+="\|--help"
+ pattern+="\|======="
+
$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
+ echo
+ echo "CLI options:"
+ $FWDIR/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
+}
+
+function ensure_arg_number {
+ arg_number=$1
+ at_least=$2
+
+ if [[ $arg_number -lt $at_least ]]; then
+ usage
+ exit 1
+ fi
+}
+
+if [[ "$@" = --help ]] || [[ "$@" = -h ]]; then
+ usage
exit 0
fi
-CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
-exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@
+CLI_ARGS=()
+SUBMISSION_ARGS=()
+
+while (($#)); do
+ case $1 in
+ -d | --define | --database | -f | -h | --hiveconf | --hivevar | -i | -p)
+ ensure_arg_number $# 2
+ CLI_ARGS+=($1); shift
+ CLI_ARGS+=($1); shift
+ ;;
+
+ -e)
+ ensure_arg_number $# 2
+ CLI_ARGS+=($1); shift
+ CLI_ARGS+=(\"$1\"); shift
+ ;;
+
+ -s | --silent)
+ CLI_ARGS+=($1); shift
+ ;;
+
+ -v | --verbose)
+ # Both SparkSubmit and SparkSQLCLIDriver recognizes -v | --verbose
+ CLI_ARGS+=($1)
+ SUBMISSION_ARGS+=($1); shift
+ ;;
+
+ *)
+ SUBMISSION_ARGS+=($1); shift
+ ;;
+ esac
+done
+
+eval exec "$FWDIR"/bin/spark-submit --class $CLASS ${SUBMISSION_ARGS[*]} spark-internal ${CLI_ARGS[*]}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 9391f24e71..087dd4d633 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -220,6 +220,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
/** Fill in values by parsing user options. */
private def parseOpts(opts: Seq[String]): Unit = {
var inSparkOpts = true
+ val EQ_SEPARATED_OPT="""(--[^=]+)=(.+)""".r
// Delineates parsing of Spark options from parsing of user options.
parse(opts)
@@ -322,33 +323,21 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
verbose = true
parse(tail)
+ case EQ_SEPARATED_OPT(opt, value) :: tail =>
+ parse(opt :: value :: tail)
+
+ case value :: tail if value.startsWith("-") =>
+ SparkSubmit.printErrorAndExit(s"Unrecognized option '$value'.")
+
case value :: tail =>
- if (inSparkOpts) {
- value match {
- // convert --foo=bar to --foo bar
- case v if v.startsWith("--") && v.contains("=") && v.split("=").size == 2 =>
- val parts = v.split("=")
- parse(Seq(parts(0), parts(1)) ++ tail)
- case v if v.startsWith("-") =>
- val errMessage = s"Unrecognized option '$value'."
- SparkSubmit.printErrorAndExit(errMessage)
- case v =>
- primaryResource =
- if (!SparkSubmit.isShell(v) && !SparkSubmit.isInternal(v)) {
- Utils.resolveURI(v).toString
- } else {
- v
- }
- inSparkOpts = false
- isPython = SparkSubmit.isPython(v)
- parse(tail)
+ primaryResource =
+ if (!SparkSubmit.isShell(value) && !SparkSubmit.isInternal(value)) {
+ Utils.resolveURI(value).toString
+ } else {
+ value
}
- } else {
- if (!value.isEmpty) {
- childArgs += value
- }
- parse(tail)
- }
+ isPython = SparkSubmit.isPython(value)
+ childArgs ++= tail
case Nil =>
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index a5cdcfb5de..7e1ef80c84 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -106,6 +106,18 @@ class SparkSubmitSuite extends FunSuite with Matchers {
appArgs.childArgs should be (Seq("some", "--weird", "args"))
}
+ test("handles arguments to user program with name collision") {
+ val clArgs = Seq(
+ "--name", "myApp",
+ "--class", "Foo",
+ "userjar.jar",
+ "--master", "local",
+ "some",
+ "--weird", "args")
+ val appArgs = new SparkSubmitArguments(clArgs)
+ appArgs.childArgs should be (Seq("--master", "local", "some", "--weird", "args"))
+ }
+
test("handles YARN cluster mode") {
val clArgs = Seq(
"--deploy-mode", "cluster",
diff --git a/sbin/start-thriftserver.sh b/sbin/start-thriftserver.sh
index 8398e6f19b..603f50ae13 100755
--- a/sbin/start-thriftserver.sh
+++ b/sbin/start-thriftserver.sh
@@ -26,11 +26,53 @@ set -o posix
# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"
-if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
- echo "Usage: ./sbin/start-thriftserver [options]"
+CLASS="org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"
+
+function usage {
+ echo "Usage: ./sbin/start-thriftserver [options] [thrift server options]"
+ pattern="usage"
+ pattern+="\|Spark assembly has been built with Hive"
+ pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
+ pattern+="\|Spark Command: "
+ pattern+="\|======="
+ pattern+="\|--help"
+
$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
+ echo
+ echo "Thrift server options:"
+ $FWDIR/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
+}
+
+function ensure_arg_number {
+ arg_number=$1
+ at_least=$2
+
+ if [[ $arg_number -lt $at_least ]]; then
+ usage
+ exit 1
+ fi
+}
+
+if [[ "$@" = --help ]] || [[ "$@" = -h ]]; then
+ usage
exit 0
fi
-CLASS="org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"
-exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@
+THRIFT_SERVER_ARGS=()
+SUBMISSION_ARGS=()
+
+while (($#)); do
+ case $1 in
+ --hiveconf)
+ ensure_arg_number $# 2
+ THRIFT_SERVER_ARGS+=($1); shift
+ THRIFT_SERVER_ARGS+=($1); shift
+ ;;
+
+ *)
+ SUBMISSION_ARGS+=($1); shift
+ ;;
+ esac
+done
+
+eval exec "$FWDIR"/bin/spark-submit --class $CLASS ${SUBMISSION_ARGS[*]} spark-internal ${THRIFT_SERVER_ARGS[*]}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 08d3f983d9..6f7942aba3 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -40,7 +40,6 @@ private[hive] object HiveThriftServer2 extends Logging {
val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2")
if (!optionsProcessor.process(args)) {
- logWarning("Error starting HiveThriftServer2 with given arguments")
System.exit(-1)
}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 69f19f826a..2bf8cfdcac 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.thriftserver
import java.io.{BufferedReader, InputStreamReader, PrintWriter}
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.{BeforeAndAfterAll, FunSuite}
class CliSuite extends FunSuite with BeforeAndAfterAll with TestUtils {
@@ -27,15 +28,15 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with TestUtils {
val METASTORE_PATH = TestUtils.getMetastorePath("cli")
override def beforeAll() {
- val pb = new ProcessBuilder(
- "../../bin/spark-sql",
- "--master",
- "local",
- "--hiveconf",
- s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$METASTORE_PATH;create=true",
- "--hiveconf",
- "hive.metastore.warehouse.dir=" + WAREHOUSE_PATH)
-
+ val jdbcUrl = s"jdbc:derby:;databaseName=$METASTORE_PATH;create=true"
+ val commands =
+ s"""../../bin/spark-sql
+ | --master local
+ | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}="$jdbcUrl"
+ | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$WAREHOUSE_PATH
+ """.stripMargin.split("\\s+")
+
+ val pb = new ProcessBuilder(commands: _*)
process = pb.start()
outputWriter = new PrintWriter(process.getOutputStream, true)
inputReader = new BufferedReader(new InputStreamReader(process.getInputStream))
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
index b7b7c9957a..78bffa2607 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
@@ -25,6 +25,7 @@ import java.io.{BufferedReader, InputStreamReader}
import java.net.ServerSocket
import java.sql.{Connection, DriverManager, Statement}
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.apache.spark.Logging
@@ -63,16 +64,18 @@ class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with TestUt
// Forking a new process to start the Hive Thrift server. The reason to do this is it is
// hard to clean up Hive resources entirely, so we just start a new process and kill
// that process for cleanup.
- val defaultArgs = Seq(
- "../../sbin/start-thriftserver.sh",
- "--master local",
- "--hiveconf",
- "hive.root.logger=INFO,console",
- "--hiveconf",
- s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$METASTORE_PATH;create=true",
- "--hiveconf",
- s"hive.metastore.warehouse.dir=$WAREHOUSE_PATH")
- val pb = new ProcessBuilder(defaultArgs ++ args)
+ val jdbcUrl = s"jdbc:derby:;databaseName=$METASTORE_PATH;create=true"
+ val command =
+ s"""../../sbin/start-thriftserver.sh
+ | --master local
+ | --hiveconf hive.root.logger=INFO,console
+ | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}="$jdbcUrl"
+ | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$METASTORE_PATH
+ | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=$HOST
+ | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$PORT
+ """.stripMargin.split("\\s+")
+
+ val pb = new ProcessBuilder(command ++ args: _*)
val environment = pb.environment()
environment.put("HIVE_SERVER2_THRIFT_PORT", PORT.toString)
environment.put("HIVE_SERVER2_THRIFT_BIND_HOST", HOST)