From fa3c06987e6148975dd54b629bd9094224358175 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 25 Apr 2016 20:53:16 -0700 Subject: [SPARK-14904][SQL] Put removed HiveContext in compatibility module ## What changes were proposed in this pull request? This is for users who can't upgrade and need to continue to use HiveContext. ## How was this patch tested? Added some basic tests for sanity check. This is based on #12672 and closes #12672. Author: Andrew Or Author: Reynold Xin Closes #12682 from rxin/add-back-hive-context. --- sql/hivecontext-compatibility/pom.xml | 7 ++ .../org/apache/spark/sql/hive/HiveContext.scala | 61 ++++++++++++ .../sql/hive/HiveContextCompatibilitySuite.scala | 102 +++++++++++++++++++++ 3 files changed, 170 insertions(+) create mode 100644 sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala create mode 100644 sql/hivecontext-compatibility/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala (limited to 'sql/hivecontext-compatibility') diff --git a/sql/hivecontext-compatibility/pom.xml b/sql/hivecontext-compatibility/pom.xml index 90c6bfdd8d..ed9ef8e279 100644 --- a/sql/hivecontext-compatibility/pom.xml +++ b/sql/hivecontext-compatibility/pom.xml @@ -41,6 +41,13 @@ spark-hive_${scala.binary.version} ${project.version} + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + test-jar + test + diff --git a/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala new file mode 100644 index 0000000000..65fcba81c4 --- /dev/null +++ b/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.SparkContext +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{SparkSession, SQLContext} + + +/** + * An instance of the Spark SQL execution engine that integrates with data stored in Hive. + * Configuration for Hive is read from hive-site.xml on the classpath. + */ +@deprecated("Use SparkSession.withHiveSupport instead", "2.0.0") +class HiveContext private[hive]( + @transient private val sparkSession: SparkSession, + isRootContext: Boolean) + extends SQLContext(sparkSession, isRootContext) with Logging { + + self => + + def this(sc: SparkContext) = { + this(new SparkSession(HiveUtils.withHiveExternalCatalog(sc)), true) + } + + def this(sc: JavaSparkContext) = this(sc.sc) + + /** + * Returns a new HiveContext as new session, which will have separated SQLConf, UDF/UDAF, + * temporary tables and SessionState, but sharing the same CacheManager, IsolatedClientLoader + * and Hive client (both of execution and metadata) with existing HiveContext. + */ + override def newSession(): HiveContext = { + new HiveContext(sparkSession.newSession(), isRootContext = false) + } + + protected[sql] override def sessionState: HiveSessionState = { + sparkSession.sessionState.asInstanceOf[HiveSessionState] + } + + protected[sql] override def sharedState: HiveSharedState = { + sparkSession.sharedState.asInstanceOf[HiveSharedState] + } + +} diff --git a/sql/hivecontext-compatibility/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala b/sql/hivecontext-compatibility/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala new file mode 100644 index 0000000000..5df674d60e --- /dev/null +++ b/sql/hivecontext-compatibility/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala @@ -0,0 +1,102 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.hive + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkContext, SparkFunSuite} + + +class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEach { + + private var sc: SparkContext = null + private var hc: HiveContext = null + + override def beforeAll(): Unit = { + super.beforeAll() + sc = new SparkContext("local[4]", "test") + HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true).foreach { case (k, v) => + sc.hadoopConfiguration.set(k, v) + } + hc = new HiveContext(sc) + } + + override def afterEach(): Unit = { + try { + hc.sharedState.cacheManager.clearCache() + hc.sessionState.catalog.reset() + } finally { + super.afterEach() + } + } + + override def afterAll(): Unit = { + try { + sc.stop() + sc = null + hc = null + } finally { + super.afterAll() + } + } + + test("basic operations") { + val _hc = hc + import _hc.implicits._ + val df1 = (1 to 20).map { i => (i, i) }.toDF("a", "x") + val df2 = (1 to 100).map { i => (i, i % 10, i % 2 == 0) }.toDF("a", "b", "c") + .select($"a", $"b") + .filter($"a" > 10 && $"b" > 6 && $"c") + val df3 = df1.join(df2, "a") + val res = df3.collect() + val expected = Seq((18, 18, 8)).toDF("a", "x", "b").collect() + assert(res.toSeq == expected.toSeq) + df3.registerTempTable("mai_table") + val df4 = hc.table("mai_table") + val res2 = df4.collect() + assert(res2.toSeq == expected.toSeq) + } + + test("basic DDLs") { + val _hc = hc + import _hc.implicits._ + val databases = hc.sql("SHOW DATABASES").collect().map(_.getString(0)) + assert(databases.toSeq == Seq("default")) + hc.sql("CREATE DATABASE mee_db") + hc.sql("USE mee_db") + val databases2 = hc.sql("SHOW DATABASES").collect().map(_.getString(0)) + assert(databases2.toSet == Set("default", "mee_db")) + val df = (1 to 10).map { i => ("bob" + i.toString, i) }.toDF("name", "age") + df.registerTempTable("mee_table") + hc.sql("CREATE TABLE moo_table (name string, age int)") + hc.sql("INSERT INTO moo_table SELECT * FROM mee_table") + assert( + hc.sql("SELECT * FROM moo_table order by name").collect().toSeq == + df.collect().toSeq.sortBy(_.getString(0))) + val tables = hc.sql("SHOW TABLES IN mee_db").collect().map(_.getString(0)) + assert(tables.toSet == Set("moo_table", "mee_table")) + hc.sql("DROP TABLE moo_table") + hc.sql("DROP TABLE mee_table") + val tables2 = hc.sql("SHOW TABLES IN mee_db").collect().map(_.getString(0)) + assert(tables2.isEmpty) + hc.sql("DROP DATABASE mee_db CASCADE") + val databases3 = hc.sql("SHOW DATABASES").collect().map(_.getString(0)) + assert(databases3.toSeq == Seq("default")) + } + +} -- cgit v1.2.3