aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-29 01:14:02 -0700
committerReynold Xin <rxin@databricks.com>2016-04-29 01:14:02 -0700
commit054f991c4350af1350af7a4109ee77f4a34822f0 (patch)
treeec40f69f6dae5ed63c7247027f47f0b2da9d49c7
parent2057cbcb0bc9d5a4fb66006c42457a556d0bb277 (diff)
downloadspark-054f991c4350af1350af7a4109ee77f4a34822f0.tar.gz
spark-054f991c4350af1350af7a4109ee77f4a34822f0.tar.bz2
spark-054f991c4350af1350af7a4109ee77f4a34822f0.zip
[SPARK-14994][SQL] Remove execution hive from HiveSessionState
## What changes were proposed in this pull request? This patch removes executionHive from HiveSessionState and HiveSharedState. ## How was this patch tested? Updated test cases. Author: Reynold Xin <rxin@databricks.com> Author: Yin Huai <yhuai@databricks.com> Closes #12770 from rxin/SPARK-14994.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala152
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.java85
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala37
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala21
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala5
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala34
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala67
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala27
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala40
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala32
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala45
20 files changed, 327 insertions, 309 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index 179dab11a2..4df100c2a8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -34,6 +34,8 @@ trait CatalystConf {
def runSQLonFile: Boolean
+ def warehousePath: String
+
/**
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
* identifiers are equal.
@@ -52,5 +54,6 @@ case class SimpleCatalystConf(
optimizerMaxIterations: Int = 100,
optimizerInSetConversionThreshold: Int = 10,
maxCaseBranchesForCodegen: Int = 20,
- runSQLonFile: Boolean = true)
+ runSQLonFile: Boolean = true,
+ warehousePath: String = "/user/hive/warehouse")
extends CatalystConf
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index d7fd54308a..b06f24bc48 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -125,7 +125,7 @@ class SessionCatalog(
}
def getDefaultDBPath(db: String): String = {
- System.getProperty("java.io.tmpdir") + File.separator + db + ".db"
+ new Path(new Path(conf.warehousePath), db + ".db").toString
}
// ----------------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6fbf32676f..2bfc895678 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -52,6 +52,11 @@ object SQLConf {
}
+ val WAREHOUSE_PATH = SQLConfigBuilder("spark.sql.warehouse.dir")
+ .doc("The default location for managed databases and tables.")
+ .stringConf
+ .createWithDefault("${system:user.dir}/spark-warehouse")
+
val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations")
.internal()
.doc("The max number of iterations the optimizer and analyzer runs.")
@@ -645,6 +650,10 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def variableSubstituteDepth: Int = getConf(VARIABLE_SUBSTITUTE_DEPTH)
+ def warehousePath: String = {
+ getConf(WAREHOUSE_PATH).replace("${system:user.dir}", System.getProperty("user.dir"))
+ }
+
override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index c05fe37886..cacf50ec7a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -17,11 +17,13 @@
package org.apache.spark.sql.internal
+import java.io.File
import java.util.Properties
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql._
@@ -65,9 +67,6 @@ private[sql] class SessionState(sparkSession: SparkSession) {
hadoopConf
}
- // Automatically extract `spark.sql.*` entries and put it in our SQLConf
- setConf(SQLContext.getSQLProperties(sparkSession.sparkContext.getConf))
-
lazy val experimentalMethods = new ExperimentalMethods
/**
@@ -150,6 +149,12 @@ private[sql] class SessionState(sparkSession: SparkSession) {
new ContinuousQueryManager(sparkSession)
}
+ private val jarClassLoader: NonClosableMutableURLClassLoader =
+ sparkSession.sharedState.jarClassLoader
+
+ // Automatically extract `spark.sql.*` entries and put it in our SQLConf
+ // We need to call it after all of vals have been initialized.
+ setConf(SQLContext.getSQLProperties(sparkSession.sparkContext.getConf))
// ------------------------------------------------------
// Helper methods, partially leftover from pre-2.0 days
@@ -180,6 +185,17 @@ private[sql] class SessionState(sparkSession: SparkSession) {
def addJar(path: String): Unit = {
sparkSession.sparkContext.addJar(path)
+
+ val uri = new Path(path).toUri
+ val jarURL = if (uri.getScheme == null) {
+ // `path` is a local file path without a URL scheme
+ new File(path).toURI.toURL
+ } else {
+ // `path` is a URL with a scheme
+ uri.toURL
+ }
+ jarClassLoader.addURL(jarURL)
+ Thread.currentThread().setContextClassLoader(jarClassLoader)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 9a30c7de1f..ab4af8d142 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.ui.SQLListener
+import org.apache.spark.util.MutableURLClassLoader
/**
@@ -44,4 +45,21 @@ private[sql] class SharedState(val sparkContext: SparkContext) {
*/
lazy val externalCatalog: ExternalCatalog = new InMemoryCatalog
+ /**
+ * A classloader used to load all user-added jar.
+ */
+ val jarClassLoader = new NonClosableMutableURLClassLoader(
+ org.apache.spark.util.Utils.getContextOrSparkClassLoader)
+
+}
+
+
+/**
+ * URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader.
+ * This class loader cannot be closed (its `close` method is a no-op).
+ */
+private[sql] class NonClosableMutableURLClassLoader(parent: ClassLoader)
+ extends MutableURLClassLoader(Array.empty, parent) {
+
+ override def close(): Unit = {}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index a9a9bf76be..4162329d76 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
@@ -83,91 +84,100 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
test("Create/Drop Database") {
- val catalog = sqlContext.sessionState.catalog
+ withSQLConf(
+ SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) {
+ val catalog = sqlContext.sessionState.catalog
- val databaseNames = Seq("db1", "`database`")
+ val databaseNames = Seq("db1", "`database`")
- databaseNames.foreach { dbName =>
- try {
- val dbNameWithoutBackTicks = cleanIdentifier(dbName)
-
- sql(s"CREATE DATABASE $dbName")
- val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
- assert(db1 == CatalogDatabase(
- dbNameWithoutBackTicks,
- "",
- System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db",
- Map.empty))
- sql(s"DROP DATABASE $dbName CASCADE")
- assert(!catalog.databaseExists(dbNameWithoutBackTicks))
- } finally {
- catalog.reset()
+ databaseNames.foreach { dbName =>
+ try {
+ val dbNameWithoutBackTicks = cleanIdentifier(dbName)
+
+ sql(s"CREATE DATABASE $dbName")
+ val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
+ assert(db1 == CatalogDatabase(
+ dbNameWithoutBackTicks,
+ "",
+ System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db",
+ Map.empty))
+ sql(s"DROP DATABASE $dbName CASCADE")
+ assert(!catalog.databaseExists(dbNameWithoutBackTicks))
+ } finally {
+ catalog.reset()
+ }
}
}
}
test("Create Database - database already exists") {
- val catalog = sqlContext.sessionState.catalog
- val databaseNames = Seq("db1", "`database`")
-
- databaseNames.foreach { dbName =>
- try {
- val dbNameWithoutBackTicks = cleanIdentifier(dbName)
- sql(s"CREATE DATABASE $dbName")
- val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
- assert(db1 == CatalogDatabase(
- dbNameWithoutBackTicks,
- "",
- System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db",
- Map.empty))
-
- val message = intercept[AnalysisException] {
+ withSQLConf(
+ SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) {
+ val catalog = sqlContext.sessionState.catalog
+ val databaseNames = Seq("db1", "`database`")
+
+ databaseNames.foreach { dbName =>
+ try {
+ val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
- }.getMessage
- assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists."))
- } finally {
- catalog.reset()
+ val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
+ assert(db1 == CatalogDatabase(
+ dbNameWithoutBackTicks,
+ "",
+ System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db",
+ Map.empty))
+
+ val message = intercept[AnalysisException] {
+ sql(s"CREATE DATABASE $dbName")
+ }.getMessage
+ assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists."))
+ } finally {
+ catalog.reset()
+ }
}
}
}
test("Alter/Describe Database") {
- val catalog = sqlContext.sessionState.catalog
- val databaseNames = Seq("db1", "`database`")
-
- databaseNames.foreach { dbName =>
- try {
- val dbNameWithoutBackTicks = cleanIdentifier(dbName)
- val location =
- System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db"
- sql(s"CREATE DATABASE $dbName")
-
- checkAnswer(
- sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
- Row("Database Name", dbNameWithoutBackTicks) ::
- Row("Description", "") ::
- Row("Location", location) ::
- Row("Properties", "") :: Nil)
-
- sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")
-
- checkAnswer(
- sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
- Row("Database Name", dbNameWithoutBackTicks) ::
- Row("Description", "") ::
- Row("Location", location) ::
- Row("Properties", "((a,a), (b,b), (c,c))") :: Nil)
-
- sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
+ withSQLConf(
+ SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) {
+ val catalog = sqlContext.sessionState.catalog
+ val databaseNames = Seq("db1", "`database`")
+
+ databaseNames.foreach { dbName =>
+ try {
+ val dbNameWithoutBackTicks = cleanIdentifier(dbName)
+ val location =
+ System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db"
+ sql(s"CREATE DATABASE $dbName")
- checkAnswer(
- sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
- Row("Database Name", dbNameWithoutBackTicks) ::
- Row("Description", "") ::
- Row("Location", location) ::
- Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil)
- } finally {
- catalog.reset()
+ checkAnswer(
+ sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
+ Row("Database Name", dbNameWithoutBackTicks) ::
+ Row("Description", "") ::
+ Row("Location", location) ::
+ Row("Properties", "") :: Nil)
+
+ sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")
+
+ checkAnswer(
+ sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
+ Row("Database Name", dbNameWithoutBackTicks) ::
+ Row("Description", "") ::
+ Row("Location", location) ::
+ Row("Properties", "((a,a), (b,b), (c,c))") :: Nil)
+
+ sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
+
+ checkAnswer(
+ sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
+ Row("Database Name", dbNameWithoutBackTicks) ::
+ Row("Description", "") ::
+ Row("Location", location) ::
+ Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil)
+ } finally {
+ catalog.reset()
+ }
}
}
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.java b/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.java
new file mode 100644
index 0000000000..0f2683db07
--- /dev/null
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.service.server;
+
+import java.util.Properties;
+
+import org.apache.commons.cli.*;
+
+public class HiveServerServerOptionsProcessor {
+ private final Options options = new Options();
+ private org.apache.commons.cli.CommandLine commandLine;
+ private final String serverName;
+ private final StringBuilder debugMessage = new StringBuilder();
+
+ @SuppressWarnings("static-access")
+ public HiveServerServerOptionsProcessor(String serverName) {
+ this.serverName = serverName;
+ // -hiveconf x=y
+ options.addOption(OptionBuilder
+ .withValueSeparator()
+ .hasArgs(2)
+ .withArgName("property=value")
+ .withLongOpt("hiveconf")
+ .withDescription("Use value for given property")
+ .create());
+ // -deregister <versionNumber>
+ options.addOption(OptionBuilder
+ .hasArgs(1)
+ .withArgName("versionNumber")
+ .withLongOpt("deregister")
+ .withDescription("Deregister all instances of given version from dynamic service discovery")
+ .create());
+ options.addOption(new Option("H", "help", false, "Print help information"));
+ }
+
+ public HiveServer2.ServerOptionsProcessorResponse parse(String[] argv) {
+ try {
+ commandLine = new GnuParser().parse(options, argv);
+ // Process --hiveconf
+ // Get hiveconf param values and set the System property values
+ Properties confProps = commandLine.getOptionProperties("hiveconf");
+ for (String propKey : confProps.stringPropertyNames()) {
+ // save logging message for log4j output latter after log4j initialize properly
+ debugMessage.append("Setting " + propKey + "=" + confProps.getProperty(propKey) + ";\n");
+ // System.setProperty("hivecli." + propKey, confProps.getProperty(propKey));
+ System.setProperty(propKey, confProps.getProperty(propKey));
+ }
+
+ // Process --help
+ if (commandLine.hasOption('H')) {
+ return new HiveServer2.ServerOptionsProcessorResponse(
+ new HiveServer2.HelpOptionExecutor(serverName, options));
+ }
+
+ // Process --deregister
+ if (commandLine.hasOption("deregister")) {
+ return new HiveServer2.ServerOptionsProcessorResponse(
+ new HiveServer2.DeregisterOptionExecutor(
+ commandLine.getOptionValue("deregister")));
+ }
+ } catch (ParseException e) {
+ // Error out & exit - we were not able to parse the args successfully
+ System.err.println("Error starting HiveServer2 with given arguments: ");
+ System.err.println(e.getMessage());
+ System.exit(-1);
+ }
+ // Default executor, when no option is specified
+ return new HiveServer2.ServerOptionsProcessorResponse(new HiveServer2.StartOptionExecutor());
+ }
+}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala b/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala
deleted file mode 100644
index 60bb4dc5e7..0000000000
--- a/sql/hive-thriftserver/src/main/scala/org/apache/hive/service/server/HiveServerServerOptionsProcessor.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hive.service.server
-
-import org.apache.hive.service.server.HiveServer2.{ServerOptionsProcessor, StartOptionExecutor}
-
-/**
- * Class to upgrade a package-private class to public, and
- * implement a `process()` operation consistent with
- * the behavior of older Hive versions
- * @param serverName name of the hive server
- */
-private[apache] class HiveServerServerOptionsProcessor(serverName: String)
- extends ServerOptionsProcessor(serverName) {
-
- def process(args: Array[String]): Boolean = {
- // A parse failure automatically triggers a system exit
- val response = super.parse(args)
- val executor = response.getServerOptionsExecutor()
- // return true if the parsed option was to start the service
- executor.isInstanceOf[StartOptionExecutor]
- }
-}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 24a25023a6..03727b8ab2 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -22,8 +22,10 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
import org.apache.commons.logging.LogFactory
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService}
@@ -34,7 +36,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart}
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.hive.HiveSessionState
+import org.apache.spark.sql.hive.{HiveSharedState, HiveUtils}
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
import org.apache.spark.sql.internal.SQLConf
@@ -56,7 +58,12 @@ object HiveThriftServer2 extends Logging {
@DeveloperApi
def startWithContext(sqlContext: SQLContext): Unit = {
val server = new HiveThriftServer2(sqlContext)
- server.init(sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf)
+
+ val executionHive = HiveUtils.newClientForExecution(
+ sqlContext.sparkContext.conf,
+ sqlContext.sessionState.newHadoopConf())
+
+ server.init(executionHive.conf)
server.start()
listener = new HiveThriftServer2Listener(server, sqlContext.conf)
sqlContext.sparkContext.addSparkListener(listener)
@@ -70,9 +77,7 @@ object HiveThriftServer2 extends Logging {
def main(args: Array[String]) {
Utils.initDaemon(log)
val optionsProcessor = new HiveServerServerOptionsProcessor("HiveThriftServer2")
- if (!optionsProcessor.process(args)) {
- System.exit(-1)
- }
+ optionsProcessor.parse(args)
logInfo("Starting SparkContext")
SparkSQLEnv.init()
@@ -82,9 +87,13 @@ object HiveThriftServer2 extends Logging {
uiTab.foreach(_.detach())
}
+ val executionHive = HiveUtils.newClientForExecution(
+ SparkSQLEnv.sqlContext.sparkContext.conf,
+ SparkSQLEnv.sqlContext.sessionState.newHadoopConf())
+
try {
val server = new HiveThriftServer2(SparkSQLEnv.sqlContext)
- server.init(SparkSQLEnv.sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf)
+ server.init(executionHive.conf)
server.start()
logInfo("HiveThriftServer2 started")
listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 40dc81e02d..e8bcdd76ef 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -35,7 +35,7 @@ import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext}
import org.apache.spark.sql.execution.command.SetCommand
-import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils}
+import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.{Utils => SparkUtils}
@@ -195,9 +195,8 @@ private[hive] class SparkExecuteStatementOperation(
statementId = UUID.randomUUID().toString
logInfo(s"Running query '$statement' with $statementId")
setState(OperationState.RUNNING)
- val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
// Always use the latest class loader provided by executionHive's state.
- val executionHiveClassLoader = sessionState.executionHive.state.getConf.getClassLoader
+ val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
HiveThriftServer2.listener.onStatementStart(
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
index 268ba2f0bc..665a44e51a 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
@@ -60,13 +60,7 @@ private[hive] object SparkSQLEnv extends Logging {
sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))
-
sqlContext.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
-
- if (log.isDebugEnabled) {
- sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted
- .foreach { case (k, v) => logDebug(s"HiveConf var: $k=$v") }
- }
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index f70131ec86..456587e0e0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -20,8 +20,6 @@ package org.apache.spark.sql.hive
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.exec.{UDAF, UDF}
import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry}
import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF}
@@ -46,8 +44,7 @@ private[sql] class HiveSessionCatalog(
sparkSession: SparkSession,
functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
- conf: SQLConf,
- hiveconf: HiveConf)
+ conf: SQLConf)
extends SessionCatalog(externalCatalog, functionResourceLoader, functionRegistry, conf) {
override def setCurrentDatabase(db: String): Unit = {
@@ -73,11 +70,6 @@ private[sql] class HiveSessionCatalog(
// | Methods and fields for interacting with HiveMetastoreCatalog |
// ----------------------------------------------------------------
- override def getDefaultDBPath(db: String): String = {
- val defaultPath = hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE)
- new Path(new Path(defaultPath), db + ".db").toString
- }
-
// Catalog for handling data source tables. TODO: This really doesn't belong here since it is
// essentially a cache for metastore tables. However, it relies on a lot of session-specific
// things so it would be a lot of work to split its functionality between HiveSessionCatalog
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index e085094383..9608f0b4ef 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.hive
-import java.util.regex.Pattern
-
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
@@ -26,7 +24,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.execution.SparkPlanner
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.hive.client.{HiveClient, HiveClientImpl}
+import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.SessionState
@@ -43,11 +41,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
}
/**
- * A Hive client used for execution.
- */
- lazy val executionHive: HiveClientImpl = sharedState.executionHive.newSession()
-
- /**
* A Hive client used for interacting with the metastore.
*/
lazy val metadataHive: HiveClient = sharedState.metadataHive.newSession()
@@ -61,9 +54,20 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
* set in the SQLConf *as well as* in the HiveConf.
*/
lazy val hiveconf: HiveConf = {
- val c = executionHive.conf
- conf.setConf(c.getAllProperties)
- c
+ val initialConf = new HiveConf(
+ sparkSession.sparkContext.hadoopConfiguration,
+ classOf[org.apache.hadoop.hive.ql.session.SessionState])
+
+ // HiveConf is a Hadoop Configuration, which has a field of classLoader and
+ // the initial value will be the current thread's context class loader
+ // (i.e. initClassLoader at here).
+ // We call initialConf.setClassLoader(initClassLoader) at here to make
+ // this action explicit.
+ initialConf.setClassLoader(sparkSession.sharedState.jarClassLoader)
+ sparkSession.sparkContext.conf.getAll.foreach { case (k, v) =>
+ initialConf.set(k, v)
+ }
+ initialConf
}
setDefaultOverrideConfs()
@@ -78,8 +82,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
sparkSession,
functionResourceLoader,
functionRegistry,
- conf,
- hiveconf)
+ conf)
}
/**
@@ -141,16 +144,13 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
override def setConf(key: String, value: String): Unit = {
super.setConf(key, value)
- executionHive.runSqlHive(s"SET $key=$value")
metadataHive.runSqlHive(s"SET $key=$value")
hiveconf.set(key, value)
}
override def addJar(path: String): Unit = {
- super.addJar(path)
- executionHive.addJar(path)
metadataHive.addJar(path)
- Thread.currentThread().setContextClassLoader(executionHive.clientLoader.classLoader)
+ super.addJar(path)
}
/**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
index fb1f59eed3..0ea5ce9196 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
@@ -32,13 +32,6 @@ private[hive] class HiveSharedState(override val sparkContext: SparkContext)
// TODO: just share the IsolatedClientLoader instead of the client instances themselves
/**
- * A Hive client used for execution.
- */
- val executionHive: HiveClientImpl = {
- HiveUtils.newClientForExecution(sparkContext.conf, sparkContext.hadoopConfiguration)
- }
-
- /**
* A Hive client used to interact with the metastore.
*/
// This needs to be a lazy val at here because TestHiveSharedState is overriding it.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 0380d2342b..e1950d181d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -33,6 +33,7 @@ import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.internal.NonClosableMutableURLClassLoader
import org.apache.spark.util.{MutableURLClassLoader, Utils}
/** Factory for `IsolatedClientLoader` with specific versions of hive. */
@@ -278,14 +279,3 @@ private[hive] class IsolatedClientLoader(
*/
private[hive] var cachedHive: Any = null
}
-
-/**
- * URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader.
- * This class loader cannot be closed (its `close` method is a no-op).
- */
-private[sql] class NonClosableMutableURLClassLoader(
- parent: ClassLoader)
- extends MutableURLClassLoader(Array.empty, parent) {
-
- override def close(): Unit = {}
-}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index cba10caf98..73ccec2ee0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -17,11 +17,19 @@
package org.apache.spark.sql.hive.execution
+import java.io.IOException
+import java.net.URI
+import java.text.SimpleDateFormat
import java.util
+import java.util.{Date, Random}
import scala.collection.JavaConverters._
-import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.common.FileUtils
+import org.apache.hadoop.hive.ql.exec.TaskRunner
+import org.apache.hadoop.hive.ql.ErrorMsg
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
import org.apache.spark.rdd.RDD
@@ -46,6 +54,61 @@ case class InsertIntoHiveTable(
def output: Seq[Attribute] = Seq.empty
+ val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
+
+ private def executionId: String = {
+ val rand: Random = new Random
+ val format: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS")
+ val executionId: String = "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
+ return executionId
+ }
+
+ private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = {
+ val inputPathUri: URI = inputPath.toUri
+ val inputPathName: String = inputPathUri.getPath
+ val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
+ val stagingPathName: String =
+ if (inputPathName.indexOf(stagingDir) == -1) {
+ new Path(inputPathName, stagingDir).toString
+ } else {
+ inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
+ }
+ val dir: Path =
+ fs.makeQualified(
+ new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
+ logDebug("Created staging dir = " + dir + " for path = " + inputPath)
+ try {
+ if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
+ throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
+ }
+ fs.deleteOnExit(dir)
+ }
+ catch {
+ case e: IOException =>
+ throw new RuntimeException(
+ "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
+
+ }
+ return dir
+ }
+
+ private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = {
+ getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
+ }
+
+ def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
+ val extURI: URI = path.toUri
+ if (extURI.getScheme == "viewfs") {
+ getExtTmpPathRelTo(path.getParent, hadoopConf)
+ } else {
+ new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
+ }
+ }
+
+ def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = {
+ new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000
+ }
+
private def saveAsHiveFile(
rdd: RDD[InternalRow],
valueClass: Class[_],
@@ -81,7 +144,7 @@ case class InsertIntoHiveTable(
val tableDesc = table.tableDesc
val tableLocation = table.hiveQlTable.getDataLocation
val hadoopConf = sessionState.newHadoopConf()
- val tmpLocation = new Context(hadoopConf).getExternalTmpPath(tableLocation)
+ val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
val isCompressed =
sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index ddb72fb1e1..c4a3a74b9b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -136,7 +136,8 @@ private[hive] class TestHiveSparkSession(
}
@transient
- override lazy val sessionState: TestHiveSessionState = new TestHiveSessionState(self)
+ override lazy val sessionState: TestHiveSessionState =
+ new TestHiveSessionState(self, warehousePath)
override def newSession(): TestHiveSparkSession = {
new TestHiveSparkSession(
@@ -156,19 +157,8 @@ private[hive] class TestHiveSparkSession(
sessionState.hiveconf.set("hive.plan.serialization.format", "javaXML")
- // A snapshot of the entries in the starting SQLConf
- // We save this because tests can mutate this singleton object if they want
- // This snapshot is saved when we create this TestHiveSparkSession.
- val initialSQLConf: SQLConf = {
- val snapshot = new SQLConf
- sessionState.conf.getAllConfs.foreach { case (k, v) => snapshot.setConfString(k, v) }
- snapshot
- }
-
- val testTempDir = Utils.createTempDir()
-
// For some hive test case which contain ${system:test.tmp.dir}
- System.setProperty("test.tmp.dir", testTempDir.getCanonicalPath)
+ System.setProperty("test.tmp.dir", Utils.createTempDir().getCanonicalPath)
/** The location of the compiled hive distribution */
lazy val hiveHome = envVarToFile("HIVE_HOME")
@@ -521,8 +511,10 @@ private[hive] class TestHiveSharedState(
}
-private[hive] class TestHiveSessionState(sparkSession: TestHiveSparkSession)
- extends HiveSessionState(sparkSession) {
+private[hive] class TestHiveSessionState(
+ sparkSession: TestHiveSparkSession,
+ warehousePath: File)
+ extends HiveSessionState(sparkSession) { self =>
override lazy val conf: SQLConf = {
new SQLConf {
@@ -530,9 +522,8 @@ private[hive] class TestHiveSessionState(sparkSession: TestHiveSparkSession)
override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
override def clear(): Unit = {
super.clear()
- TestHiveContext.overrideConfs.map {
- case (key, value) => setConfString(key, value)
- }
+ TestHiveContext.overrideConfs.foreach { case (k, v) => setConfString(k, v) }
+ setConfString("hive.metastore.warehouse.dir", self.warehousePath.toURI.toString)
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala
deleted file mode 100644
index b2c0f7e0e5..0000000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.hive.test.TestHive
-
-
-class HiveContextSuite extends SparkFunSuite {
-
- test("HiveContext can access `spark.sql.*` configs") {
- // Avoid creating another SparkContext in the same JVM
- val sc = TestHive.sparkContext
- require(sc.conf.get("spark.sql.hive.metastore.barrierPrefixes") ==
- "org.apache.spark.sql.hive.execution.PairSerDe")
- assert(TestHive.sparkSession.initialSQLConf.getConfString(
- "spark.sql.hive.metastore.barrierPrefixes") ==
- "org.apache.spark.sql.hive.execution.PairSerDe")
- // This setting should be also set in the hiveconf of the current session.
- assert(TestHive.sessionState.hiveconf.get(
- "spark.sql.hive.metastore.barrierPrefixes", "") ==
- "org.apache.spark.sql.hive.execution.PairSerDe")
- }
-
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala
deleted file mode 100644
index ac3a65032f..0000000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.serializer.JavaSerializer
-
-class SerializationSuite extends SparkFunSuite {
-
- test("[SPARK-5840] HiveContext should be serializable") {
- val hiveContext = org.apache.spark.sql.hive.test.TestHive
- hiveContext.sessionState.hiveconf
- val serializer = new JavaSerializer(new SparkConf()).newInstance()
- val bytes = serializer.serialize(hiveContext)
- val deSer = serializer.deserialize[AnyRef](bytes)
- }
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index e5a7706cc5..3bf0e84267 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -1070,51 +1070,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
assert(getConf(testKey, "0") == "")
}
- test("SET commands semantics for a HiveContext") {
- // Adapted from its SQL counterpart.
- val testKey = "spark.sql.key.usedfortestonly"
- val testVal = "test.val.0"
- val nonexistentKey = "nonexistent"
- def collectResults(df: DataFrame): Set[Any] =
- df.collect().map {
- case Row(key: String, value: String) => key -> value
- case Row(key: String, defaultValue: String, doc: String) => (key, defaultValue, doc)
- }.toSet
- conf.clear()
-
- val expectedConfs = conf.getAllDefinedConfs.toSet
- assertResult(expectedConfs)(collectResults(sql("SET -v")))
-
- // "SET" itself returns all config variables currently specified in SQLConf.
- // TODO: Should we be listing the default here always? probably...
- assert(sql("SET").collect().size === TestHiveContext.overrideConfs.size)
-
- val defaults = collectResults(sql("SET"))
- assertResult(Set(testKey -> testVal)) {
- collectResults(sql(s"SET $testKey=$testVal"))
- }
-
- assert(sessionState.hiveconf.get(testKey, "") === testVal)
- assertResult(defaults ++ Set(testKey -> testVal))(collectResults(sql("SET")))
-
- sql(s"SET ${testKey + testKey}=${testVal + testVal}")
- assert(sessionState.hiveconf.get(testKey + testKey, "") == testVal + testVal)
- assertResult(defaults ++ Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) {
- collectResults(sql("SET"))
- }
-
- // "SET key"
- assertResult(Set(testKey -> testVal)) {
- collectResults(sql(s"SET $testKey"))
- }
-
- assertResult(Set(nonexistentKey -> "<undefined>")) {
- collectResults(sql(s"SET $nonexistentKey"))
- }
-
- conf.clear()
- }
-
test("current_database with multiple sessions") {
sql("create database a")
sql("use a")