aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-04-25 20:53:16 -0700
committerReynold Xin <rxin@databricks.com>2016-04-25 20:53:16 -0700
commitfa3c06987e6148975dd54b629bd9094224358175 (patch)
tree78b96e83d16fa9e922dacedd096a425bf4dcbcee
parentc71c6853fcecd3b41de0e8273329ea83a9779320 (diff)
downloadspark-fa3c06987e6148975dd54b629bd9094224358175.tar.gz
spark-fa3c06987e6148975dd54b629bd9094224358175.tar.bz2
spark-fa3c06987e6148975dd54b629bd9094224358175.zip
[SPARK-14904][SQL] Put removed HiveContext in compatibility module
## What changes were proposed in this pull request? This is for users who can't upgrade and need to continue to use HiveContext. ## How was this patch tested? Added some basic tests for sanity check. This is based on #12672 and closes #12672. Author: Andrew Or <andrew@databricks.com> Author: Reynold Xin <rxin@databricks.com> Closes #12682 from rxin/add-back-hive-context.
-rw-r--r--sql/hivecontext-compatibility/pom.xml7
-rw-r--r--sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala61
-rw-r--r--sql/hivecontext-compatibility/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala102
3 files changed, 170 insertions, 0 deletions
diff --git a/sql/hivecontext-compatibility/pom.xml b/sql/hivecontext-compatibility/pom.xml
index 90c6bfdd8d..ed9ef8e279 100644
--- a/sql/hivecontext-compatibility/pom.xml
+++ b/sql/hivecontext-compatibility/pom.xml
@@ -41,6 +41,13 @@
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
new file mode 100644
index 0000000000..65fcba81c4
--- /dev/null
+++ b/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.SparkContext
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{SparkSession, SQLContext}
+
+
+/**
+ * An instance of the Spark SQL execution engine that integrates with data stored in Hive.
+ * Configuration for Hive is read from hive-site.xml on the classpath.
+ */
+@deprecated("Use SparkSession.withHiveSupport instead", "2.0.0")
+class HiveContext private[hive](
+ @transient private val sparkSession: SparkSession,
+ isRootContext: Boolean)
+ extends SQLContext(sparkSession, isRootContext) with Logging {
+
+ self =>
+
+ def this(sc: SparkContext) = {
+ this(new SparkSession(HiveUtils.withHiveExternalCatalog(sc)), true)
+ }
+
+ def this(sc: JavaSparkContext) = this(sc.sc)
+
+ /**
+ * Returns a new HiveContext as new session, which will have separated SQLConf, UDF/UDAF,
+ * temporary tables and SessionState, but sharing the same CacheManager, IsolatedClientLoader
+ * and Hive client (both of execution and metadata) with existing HiveContext.
+ */
+ override def newSession(): HiveContext = {
+ new HiveContext(sparkSession.newSession(), isRootContext = false)
+ }
+
+ protected[sql] override def sessionState: HiveSessionState = {
+ sparkSession.sessionState.asInstanceOf[HiveSessionState]
+ }
+
+ protected[sql] override def sharedState: HiveSharedState = {
+ sparkSession.sharedState.asInstanceOf[HiveSharedState]
+ }
+
+}
diff --git a/sql/hivecontext-compatibility/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala b/sql/hivecontext-compatibility/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala
new file mode 100644
index 0000000000..5df674d60e
--- /dev/null
+++ b/sql/hivecontext-compatibility/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala
@@ -0,0 +1,102 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.hive
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkContext, SparkFunSuite}
+
+
+class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEach {
+
+ private var sc: SparkContext = null
+ private var hc: HiveContext = null
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ sc = new SparkContext("local[4]", "test")
+ HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true).foreach { case (k, v) =>
+ sc.hadoopConfiguration.set(k, v)
+ }
+ hc = new HiveContext(sc)
+ }
+
+ override def afterEach(): Unit = {
+ try {
+ hc.sharedState.cacheManager.clearCache()
+ hc.sessionState.catalog.reset()
+ } finally {
+ super.afterEach()
+ }
+ }
+
+ override def afterAll(): Unit = {
+ try {
+ sc.stop()
+ sc = null
+ hc = null
+ } finally {
+ super.afterAll()
+ }
+ }
+
+ test("basic operations") {
+ val _hc = hc
+ import _hc.implicits._
+ val df1 = (1 to 20).map { i => (i, i) }.toDF("a", "x")
+ val df2 = (1 to 100).map { i => (i, i % 10, i % 2 == 0) }.toDF("a", "b", "c")
+ .select($"a", $"b")
+ .filter($"a" > 10 && $"b" > 6 && $"c")
+ val df3 = df1.join(df2, "a")
+ val res = df3.collect()
+ val expected = Seq((18, 18, 8)).toDF("a", "x", "b").collect()
+ assert(res.toSeq == expected.toSeq)
+ df3.registerTempTable("mai_table")
+ val df4 = hc.table("mai_table")
+ val res2 = df4.collect()
+ assert(res2.toSeq == expected.toSeq)
+ }
+
+ test("basic DDLs") {
+ val _hc = hc
+ import _hc.implicits._
+ val databases = hc.sql("SHOW DATABASES").collect().map(_.getString(0))
+ assert(databases.toSeq == Seq("default"))
+ hc.sql("CREATE DATABASE mee_db")
+ hc.sql("USE mee_db")
+ val databases2 = hc.sql("SHOW DATABASES").collect().map(_.getString(0))
+ assert(databases2.toSet == Set("default", "mee_db"))
+ val df = (1 to 10).map { i => ("bob" + i.toString, i) }.toDF("name", "age")
+ df.registerTempTable("mee_table")
+ hc.sql("CREATE TABLE moo_table (name string, age int)")
+ hc.sql("INSERT INTO moo_table SELECT * FROM mee_table")
+ assert(
+ hc.sql("SELECT * FROM moo_table order by name").collect().toSeq ==
+ df.collect().toSeq.sortBy(_.getString(0)))
+ val tables = hc.sql("SHOW TABLES IN mee_db").collect().map(_.getString(0))
+ assert(tables.toSet == Set("moo_table", "mee_table"))
+ hc.sql("DROP TABLE moo_table")
+ hc.sql("DROP TABLE mee_table")
+ val tables2 = hc.sql("SHOW TABLES IN mee_db").collect().map(_.getString(0))
+ assert(tables2.isEmpty)
+ hc.sql("DROP DATABASE mee_db CASCADE")
+ val databases3 = hc.sql("SHOW DATABASES").collect().map(_.getString(0))
+ assert(databases3.toSeq == Seq("default"))
+ }
+
+}