diff options
author | Reynold Xin <rxin@databricks.com> | 2016-05-20 22:01:55 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-05-20 22:01:55 -0700 |
commit | 45b7557e61d440612d4ce49c31b5ef242fdefa54 (patch) | |
tree | dcaff3ba6c7231e343b7f146a422934bea628bf3 /sql/hive/src | |
parent | 021c19702c720b4466b016498917d47f99000e13 (diff) | |
download | spark-45b7557e61d440612d4ce49c31b5ef242fdefa54.tar.gz spark-45b7557e61d440612d4ce49c31b5ef242fdefa54.tar.bz2 spark-45b7557e61d440612d4ce49c31b5ef242fdefa54.zip |
[SPARK-15424][SPARK-15437][SPARK-14807][SQL] Revert Create a hivecontext-compatibility module
## What changes were proposed in this pull request?
I initially asked to create a hivecontext-compatibility module to put the HiveContext there. But we are so close to Spark 2.0 release and there is only a single class in it. It seems overkill to have an entire package, which makes it more inconvenient, for a single class.
## How was this patch tested?
Tests were moved.
Author: Reynold Xin <rxin@databricks.com>
Closes #13207 from rxin/SPARK-15424.
Diffstat (limited to 'sql/hive/src')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala | 73 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala | 101 |
2 files changed, 174 insertions, 0 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala new file mode 100644 index 0000000000..415d4c0049 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.SparkContext +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{SparkSession, SQLContext} + + +/** + * An instance of the Spark SQL execution engine that integrates with data stored in Hive. + * Configuration for Hive is read from hive-site.xml on the classpath. + */ +@deprecated("Use SparkSession.builder.enableHiveSupport instead", "2.0.0") +class HiveContext private[hive]( + _sparkSession: SparkSession, + isRootContext: Boolean) + extends SQLContext(_sparkSession, isRootContext) with Logging { + + self => + + def this(sc: SparkContext) = { + this(new SparkSession(HiveUtils.withHiveExternalCatalog(sc)), true) + } + + def this(sc: JavaSparkContext) = this(sc.sc) + + /** + * Returns a new HiveContext as new session, which will have separated SQLConf, UDF/UDAF, + * temporary tables and SessionState, but sharing the same CacheManager, IsolatedClientLoader + * and Hive client (both of execution and metadata) with existing HiveContext. + */ + override def newSession(): HiveContext = { + new HiveContext(sparkSession.newSession(), isRootContext = false) + } + + protected[sql] override def sessionState: HiveSessionState = { + sparkSession.sessionState.asInstanceOf[HiveSessionState] + } + + protected[sql] override def sharedState: HiveSharedState = { + sparkSession.sharedState.asInstanceOf[HiveSharedState] + } + + /** + * Invalidate and refresh all the cached the metadata of the given table. For performance reasons, + * Spark SQL or the external data source library it uses might cache certain metadata about a + * table, such as the location of blocks. When those change outside of Spark SQL, users should + * call this function to invalidate the cache. + * + * @since 1.3.0 + */ + def refreshTable(tableName: String): Unit = { + sparkSession.catalog.refreshTable(tableName) + } + +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala new file mode 100644 index 0000000000..3aa8174702 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala @@ -0,0 +1,101 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.hive + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} + + +class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEach { + + private var sc: SparkContext = null + private var hc: HiveContext = null + + override def beforeAll(): Unit = { + super.beforeAll() + sc = SparkContext.getOrCreate(new SparkConf().setMaster("local").setAppName("test")) + HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true).foreach { case (k, v) => + sc.hadoopConfiguration.set(k, v) + } + hc = new HiveContext(sc) + } + + override def afterEach(): Unit = { + try { + hc.sharedState.cacheManager.clearCache() + hc.sessionState.catalog.reset() + } finally { + super.afterEach() + } + } + + override def afterAll(): Unit = { + try { + sc = null + hc = null + } finally { + super.afterAll() + } + } + + test("basic operations") { + val _hc = hc + import _hc.implicits._ + val df1 = (1 to 20).map { i => (i, i) }.toDF("a", "x") + val df2 = (1 to 100).map { i => (i, i % 10, i % 2 == 0) }.toDF("a", "b", "c") + .select($"a", $"b") + .filter($"a" > 10 && $"b" > 6 && $"c") + val df3 = df1.join(df2, "a") + val res = df3.collect() + val expected = Seq((18, 18, 8)).toDF("a", "x", "b").collect() + assert(res.toSeq == expected.toSeq) + df3.createOrReplaceTempView("mai_table") + val df4 = hc.table("mai_table") + val res2 = df4.collect() + assert(res2.toSeq == expected.toSeq) + } + + test("basic DDLs") { + val _hc = hc + import _hc.implicits._ + val databases = hc.sql("SHOW DATABASES").collect().map(_.getString(0)) + assert(databases.toSeq == Seq("default")) + hc.sql("CREATE DATABASE mee_db") + hc.sql("USE mee_db") + val databases2 = hc.sql("SHOW DATABASES").collect().map(_.getString(0)) + assert(databases2.toSet == Set("default", "mee_db")) + val df = (1 to 10).map { i => ("bob" + i.toString, i) }.toDF("name", "age") + df.createOrReplaceTempView("mee_table") + hc.sql("CREATE TABLE moo_table (name string, age int)") + hc.sql("INSERT INTO moo_table SELECT * FROM mee_table") + assert( + hc.sql("SELECT * FROM moo_table order by name").collect().toSeq == + df.collect().toSeq.sortBy(_.getString(0))) + val tables = hc.sql("SHOW TABLES IN mee_db").collect().map(_.getString(0)) + assert(tables.toSet == Set("moo_table", "mee_table")) + hc.sql("DROP TABLE moo_table") + hc.sql("DROP TABLE mee_table") + val tables2 = hc.sql("SHOW TABLES IN mee_db").collect().map(_.getString(0)) + assert(tables2.isEmpty) + hc.sql("DROP DATABASE mee_db CASCADE") + val databases3 = hc.sql("SHOW DATABASES").collect().map(_.getString(0)) + assert(databases3.toSeq == Seq("default")) + } + +} |