From d5911d1173fe0872f21cae6c47abf8ff479345a4 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 26 May 2016 22:40:57 -0700 Subject: [SPARK-15529][SQL] Replace SQLContext and HiveContext with SparkSession in Test #### What changes were proposed in this pull request? This PR is to use the new entrance `Sparksession` to replace the existing `SQLContext` and `HiveContext` in SQL test suites. No change is made in the following suites: - `ListTablesSuite` is to test the APIs of `SQLContext`. - `SQLContextSuite` is to test `SQLContext` - `HiveContextCompatibilitySuite` is to test `HiveContext` **Update**: Move tests in `ListTableSuite` to `SQLContextSuite` #### How was this patch tested? N/A Author: gatorsmile Author: xiaoli Author: Xiao Li Closes #13337 from gatorsmile/sparkSessionTest. --- .../org/apache/spark/sql/DataFrameReader.scala | 2 +- .../org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../org/apache/spark/sql/DataFrameSuite.scala | 2 +- .../scala/org/apache/spark/sql/DatasetSuite.scala | 6 +- .../org/apache/spark/sql/ListTablesSuite.scala | 90 ---------------------- .../org/apache/spark/sql/SQLContextSuite.scala | 61 +++++++++++++++ .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 8 +- .../sql/execution/ExchangeCoordinatorSuite.scala | 22 +++--- .../apache/spark/sql/execution/PlannerSuite.scala | 2 +- .../org/apache/spark/sql/execution/SortSuite.scala | 2 +- .../sql/execution/TakeOrderedAndProjectSuite.scala | 2 +- .../spark/sql/execution/command/DDLSuite.scala | 18 ++--- .../execution/datasources/FileCatalogSuite.scala | 6 +- .../sql/execution/datasources/csv/CSVSuite.scala | 2 +- .../parquet/ParquetPartitionDiscoverySuite.scala | 10 +-- .../datasources/parquet/ParquetTest.scala | 2 +- .../sql/execution/joins/BroadcastJoinSuite.scala | 2 +- .../sql/execution/joins/ExistenceJoinSuite.scala | 6 +- .../apache/spark/sql/internal/SQLConfSuite.scala | 22 +++--- .../sql/sources/CreateTableAsSelectSuite.scala | 12 +-- .../spark/sql/sources/DDLSourceLoadSuite.scala | 10 +-- .../apache/spark/sql/sources/DDLTestSuite.scala | 2 +- .../apache/spark/sql/sources/DataSourceTest.scala | 9 +-- .../spark/sql/sources/FilteredScanSuite.scala | 8 +- .../org/apache/spark/sql/sources/InsertSuite.scala | 22 +++--- .../apache/spark/sql/sources/PrunedScanSuite.scala | 6 +- .../apache/spark/sql/sources/SaveLoadSuite.scala | 31 ++++---- .../apache/spark/sql/sources/TableScanSuite.scala | 8 +- .../spark/sql/streaming/FileStreamSinkSuite.scala | 2 +- .../apache/spark/sql/hive/CachedTableSuite.scala | 6 +- .../apache/spark/sql/hive/ErrorPositionSuite.scala | 8 +- .../sql/hive/HiveDataFrameAnalyticsSuite.scala | 8 +- .../spark/sql/hive/HiveDataFrameJoinSuite.scala | 2 +- .../apache/spark/sql/hive/HiveDataFrameSuite.scala | 10 +-- .../spark/sql/hive/HiveMetastoreCatalogSuite.scala | 4 +- .../apache/spark/sql/hive/HiveParquetSuite.scala | 4 +- .../spark/sql/hive/InsertIntoHiveTableSuite.scala | 20 ++--- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 2 +- .../apache/spark/sql/hive/MultiDatabaseSuite.scala | 53 +++++++------ .../spark/sql/hive/QueryPartitionSuite.scala | 2 +- .../apache/spark/sql/hive/StatisticsSuite.scala | 36 ++++----- .../scala/org/apache/spark/sql/hive/UDFSuite.scala | 14 ++-- .../spark/sql/hive/execution/HiveDDLSuite.scala | 24 +++--- .../sql/hive/execution/HiveExplainSuite.scala | 2 +- .../execution/HiveOperatorQueryableSuite.scala | 2 +- .../spark/sql/hive/execution/HivePlanTest.scala | 4 +- .../spark/sql/hive/execution/HiveUDFSuite.scala | 22 +++--- .../spark/sql/hive/execution/SQLQuerySuite.scala | 8 +- .../spark/sql/hive/execution/SQLViewSuite.scala | 2 +- .../hive/execution/SQLWindowFunctionSuite.scala | 2 +- .../hive/execution/ScriptTransformationSuite.scala | 2 +- .../sql/hive/orc/OrcHadoopFsRelationSuite.scala | 2 +- .../sql/hive/orc/OrcPartitionDiscoverySuite.scala | 6 +- .../apache/spark/sql/hive/orc/OrcSourceSuite.scala | 6 +- .../org/apache/spark/sql/hive/orc/OrcTest.scala | 2 +- .../org/apache/spark/sql/hive/parquetSuites.scala | 4 +- .../spark/sql/sources/BucketedReadSuite.scala | 17 ++-- .../spark/sql/sources/BucketedWriteSuite.scala | 2 +- .../sql/sources/JsonHadoopFsRelationSuite.scala | 10 +-- .../sql/sources/ParquetHadoopFsRelationSuite.scala | 8 +- .../sources/SimpleTextHadoopFsRelationSuite.scala | 2 +- 61 files changed, 319 insertions(+), 354 deletions(-) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 0fed9171a8..2057878028 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -426,7 +426,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * * @param path input path * @since 1.5.0 - * @note Currently, this method can only be used together with `HiveContext`. + * @note Currently, this method can only be used after enabling Hive support. */ def orc(path: String): DataFrame = format("orc").load(path) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index c7b887ecd4..f2ba2dfc08 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -639,7 +639,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { * This will overwrite `orc.compress`. * * @since 1.5.0 - * @note Currently, this method can only be used together with `HiveContext`. + * @note Currently, this method can only be used after enabling Hive support */ def orc(path: String): Unit = { assertNotStreaming("orc() can only be called on non-continuous queries") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 0614747352..e2dc4d8639 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -515,7 +515,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { ) } - test("callUDF in SQLContext") { + test("callUDF without Hive Support") { val df = Seq(("id1", 1), ("id2", 4), ("id3", 5)).toDF("id", "value") df.sparkSession.udf.register("simpleUDF", (v: Int) => v * v) checkAnswer( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 32320a6435..2a6591653e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -673,7 +673,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { test("runtime null check for RowEncoder") { val schema = new StructType().add("i", IntegerType, nullable = false) - val df = sqlContext.range(10).map(l => { + val df = spark.range(10).map(l => { if (l % 5 == 0) { Row(null) } else { @@ -689,9 +689,9 @@ class DatasetSuite extends QueryTest with SharedSQLContext { test("row nullability mismatch") { val schema = new StructType().add("a", StringType, true).add("b", StringType, false) - val rdd = sqlContext.sparkContext.parallelize(Row(null, "123") :: Row("234", null) :: Nil) + val rdd = spark.sparkContext.parallelize(Row(null, "123") :: Row("234", null) :: Nil) val message = intercept[Exception] { - sqlContext.createDataFrame(rdd, schema).collect() + spark.createDataFrame(rdd, schema).collect() }.getMessage assert(message.contains("The 1th field 'b' of input row cannot be null")) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala deleted file mode 100644 index b447006761..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql - -import org.scalatest.BeforeAndAfter - -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType} - -class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContext { - import testImplicits._ - - private lazy val df = (1 to 10).map(i => (i, s"str$i")).toDF("key", "value") - - before { - df.createOrReplaceTempView("listtablessuitetable") - } - - after { - spark.sessionState.catalog.dropTable( - TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true) - } - - test("get all tables") { - checkAnswer( - spark.sqlContext.tables().filter("tableName = 'listtablessuitetable'"), - Row("listtablessuitetable", true)) - - checkAnswer( - sql("SHOW tables").filter("tableName = 'listtablessuitetable'"), - Row("listtablessuitetable", true)) - - spark.sessionState.catalog.dropTable( - TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true) - assert(spark.sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0) - } - - test("getting all tables with a database name has no impact on returned table names") { - checkAnswer( - spark.sqlContext.tables("default").filter("tableName = 'listtablessuitetable'"), - Row("listtablessuitetable", true)) - - checkAnswer( - sql("show TABLES in default").filter("tableName = 'listtablessuitetable'"), - Row("listtablessuitetable", true)) - - spark.sessionState.catalog.dropTable( - TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true) - assert(spark.sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0) - } - - test("query the returned DataFrame of tables") { - val expectedSchema = StructType( - StructField("tableName", StringType, false) :: - StructField("isTemporary", BooleanType, false) :: Nil) - - Seq(spark.sqlContext.tables(), sql("SHOW TABLes")).foreach { - case tableDF => - assert(expectedSchema === tableDF.schema) - - tableDF.createOrReplaceTempView("tables") - checkAnswer( - sql( - "SELECT isTemporary, tableName from tables WHERE tableName = 'listtablessuitetable'"), - Row(true, "listtablessuitetable") - ) - checkAnswer( - spark.sqlContext.tables() - .filter("tableName = 'tables'").select("tableName", "isTemporary"), - Row("tables", true)) - spark.catalog.dropTempView("tables") - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala index c9594a7e9a..417d09e238 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala @@ -20,7 +20,9 @@ package org.apache.spark.sql import org.apache.spark.{SharedSparkContext, SparkFunSuite} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType} class SQLContextSuite extends SparkFunSuite with SharedSparkContext { @@ -79,4 +81,63 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext { assert(sqlContext.sessionState.optimizer.batches.flatMap(_.rules).contains(DummyRule)) } + test("get all tables") { + val sqlContext = SQLContext.getOrCreate(sc) + val df = sqlContext.range(10) + df.createOrReplaceTempView("listtablessuitetable") + assert( + sqlContext.tables().filter("tableName = 'listtablessuitetable'").collect().toSeq == + Row("listtablessuitetable", true) :: Nil) + + assert( + sqlContext.sql("SHOW tables").filter("tableName = 'listtablessuitetable'").collect().toSeq == + Row("listtablessuitetable", true) :: Nil) + + sqlContext.sessionState.catalog.dropTable( + TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true) + assert(sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0) + } + + test("getting all tables with a database name has no impact on returned table names") { + val sqlContext = SQLContext.getOrCreate(sc) + val df = sqlContext.range(10) + df.createOrReplaceTempView("listtablessuitetable") + assert( + sqlContext.tables("default").filter("tableName = 'listtablessuitetable'").collect().toSeq == + Row("listtablessuitetable", true) :: Nil) + + assert( + sqlContext.sql("show TABLES in default").filter("tableName = 'listtablessuitetable'") + .collect().toSeq == Row("listtablessuitetable", true) :: Nil) + + sqlContext.sessionState.catalog.dropTable( + TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true) + assert(sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0) + } + + test("query the returned DataFrame of tables") { + val sqlContext = SQLContext.getOrCreate(sc) + val df = sqlContext.range(10) + df.createOrReplaceTempView("listtablessuitetable") + + val expectedSchema = StructType( + StructField("tableName", StringType, false) :: + StructField("isTemporary", BooleanType, false) :: Nil) + + Seq(sqlContext.tables(), sqlContext.sql("SHOW TABLes")).foreach { + case tableDF => + assert(expectedSchema === tableDF.schema) + + tableDF.createOrReplaceTempView("tables") + assert( + sqlContext.sql( + "SELECT isTemporary, tableName from tables WHERE tableName = 'listtablessuitetable'") + .collect().toSeq == Row(true, "listtablessuitetable") :: Nil) + assert( + sqlContext.tables().filter("tableName = 'tables'").select("tableName", "isTemporary") + .collect().toSeq == Row("tables", true) :: Nil) + sqlContext.dropTempTable("tables") + } + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index b1f848fdc8..1ddb586d60 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1048,7 +1048,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("SET commands semantics using sql()") { - spark.sqlContext.conf.clear() + spark.sessionState.conf.clear() val testKey = "test.key.0" val testVal = "test.val.0" val nonexistentKey = "nonexistent" @@ -1089,17 +1089,17 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { sql(s"SET $nonexistentKey"), Row(nonexistentKey, "") ) - spark.sqlContext.conf.clear() + spark.sessionState.conf.clear() } test("SET commands with illegal or inappropriate argument") { - spark.sqlContext.conf.clear() + spark.sessionState.conf.clear() // Set negative mapred.reduce.tasks for automatically determining // the number of reducers is not supported intercept[IllegalArgumentException](sql(s"SET mapred.reduce.tasks=-1")) intercept[IllegalArgumentException](sql(s"SET mapred.reduce.tasks=-01")) intercept[IllegalArgumentException](sql(s"SET mapred.reduce.tasks=-2")) - spark.sqlContext.conf.clear() + spark.sessionState.conf.clear() } test("apply schema") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala index 2f45db3925..2803b62462 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala @@ -27,12 +27,12 @@ import org.apache.spark.sql.internal.SQLConf class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { - private var originalActiveSQLContext: Option[SparkSession] = _ - private var originalInstantiatedSQLContext: Option[SparkSession] = _ + private var originalActiveSparkSession: Option[SparkSession] = _ + private var originalInstantiatedSparkSession: Option[SparkSession] = _ override protected def beforeAll(): Unit = { - originalActiveSQLContext = SparkSession.getActiveSession - originalInstantiatedSQLContext = SparkSession.getDefaultSession + originalActiveSparkSession = SparkSession.getActiveSession + originalInstantiatedSparkSession = SparkSession.getDefaultSession SparkSession.clearActiveSession() SparkSession.clearDefaultSession() @@ -40,8 +40,8 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { override protected def afterAll(): Unit = { // Set these states back. - originalActiveSQLContext.foreach(ctx => SparkSession.setActiveSession(ctx)) - originalInstantiatedSQLContext.foreach(ctx => SparkSession.setDefaultSession(ctx)) + originalActiveSparkSession.foreach(ctx => SparkSession.setActiveSession(ctx)) + originalInstantiatedSparkSession.foreach(ctx => SparkSession.setDefaultSession(ctx)) } private def checkEstimation( @@ -249,7 +249,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { } } - def withSQLContext( + def withSparkSession( f: SparkSession => Unit, targetNumPostShufflePartitions: Int, minNumPostShufflePartitions: Option[Int]): Unit = { @@ -322,7 +322,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { } } - withSQLContext(test, 2000, minNumPostShufflePartitions) + withSparkSession(test, 2000, minNumPostShufflePartitions) } test(s"determining the number of reducers: join operator$testNameNote") { @@ -373,7 +373,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { } } - withSQLContext(test, 16384, minNumPostShufflePartitions) + withSparkSession(test, 16384, minNumPostShufflePartitions) } test(s"determining the number of reducers: complex query 1$testNameNote") { @@ -425,7 +425,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { } } - withSQLContext(test, 6644, minNumPostShufflePartitions) + withSparkSession(test, 6644, minNumPostShufflePartitions) } test(s"determining the number of reducers: complex query 2$testNameNote") { @@ -477,7 +477,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { } } - withSQLContext(test, 6144, minNumPostShufflePartitions) + withSparkSession(test, 6144, minNumPostShufflePartitions) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 8243470b19..c96239e682 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -155,7 +155,7 @@ class PlannerSuite extends SharedSQLContext { val path = file.getCanonicalPath testData.write.parquet(path) val df = spark.read.parquet(path) - spark.sqlContext.registerDataFrameAsTable(df, "testPushed") + df.createOrReplaceTempView("testPushed") withTempTable("testPushed") { val exp = sql("select * from testPushed where key = 15").queryExecution.sparkPlan diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala index ebeb39b690..c3acf29c2d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala @@ -95,7 +95,7 @@ class SortSuite extends SparkPlanTest with SharedSQLContext { ) { test(s"sorting on $dataType with nullable=$nullable, sortOrder=$sortOrder") { val inputData = Seq.fill(1000)(randomDataGenerator()) - val inputDf = sqlContext.createDataFrame( + val inputDf = spark.createDataFrame( sparkContext.parallelize(Random.shuffle(inputData).map(v => Row(v))), StructType(StructField("a", dataType, nullable = true) :: Nil) ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala index fba04d0cb2..3217e34bd8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala @@ -42,7 +42,7 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSQLContext { .add("a", IntegerType, nullable = false) .add("b", IntegerType, nullable = false) val inputData = Seq.fill(10000)(Row(rand.nextInt(), rand.nextInt())) - sqlContext.createDataFrame(sparkContext.parallelize(Random.shuffle(inputData), 10), schema) + spark.createDataFrame(sparkContext.parallelize(Random.shuffle(inputData), 10), schema) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index ff56749387..e32521aaaf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -132,7 +132,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { sql(s"CREATE DATABASE db2") val pathInCatalog = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri assert("file" === pathInCatalog.getScheme) - val expectedPath = appendTrailingSlash(sqlContext.conf.warehousePath) + "db2.db" + val expectedPath = appendTrailingSlash(spark.sessionState.conf.warehousePath) + "db2.db" assert(expectedPath === pathInCatalog.getPath) } @@ -145,7 +145,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { withTempDir { tmpDir => val path = tmpDir.toString withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) { - val catalog = sqlContext.sessionState.catalog + val catalog = spark.sessionState.catalog val databaseNames = Seq("db1", "`database`") databaseNames.foreach { dbName => @@ -172,7 +172,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } test("Create/Drop Database - location") { - val catalog = sqlContext.sessionState.catalog + val catalog = spark.sessionState.catalog val databaseNames = Seq("db1", "`database`") withTempDir { tmpDir => val path = tmpDir.toString @@ -200,7 +200,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { withTempDir { tmpDir => val path = tmpDir.toString withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) { - val catalog = sqlContext.sessionState.catalog + val catalog = spark.sessionState.catalog val databaseNames = Seq("db1", "`database`") databaseNames.foreach { dbName => @@ -231,7 +231,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { withTempDir { tmpDir => val path = tmpDir.toString withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) { - val catalog = sqlContext.sessionState.catalog + val catalog = spark.sessionState.catalog val databaseNames = Seq("db1", "`database`") databaseNames.foreach { dbName => @@ -300,7 +300,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } test("drop non-empty database in restrict mode") { - val catalog = sqlContext.sessionState.catalog + val catalog = spark.sessionState.catalog val dbName = "db1" sql(s"CREATE DATABASE $dbName") @@ -322,7 +322,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } test("drop non-empty database in cascade mode") { - val catalog = sqlContext.sessionState.catalog + val catalog = spark.sessionState.catalog val dbName = "db1" sql(s"CREATE DATABASE $dbName") @@ -441,7 +441,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { "RENAME TEMPORARY TABLE from '`tab1`' to '`default`.`tab2`': " + "cannot specify database name 'default' in the destination table")) - val catalog = sqlContext.sessionState.catalog + val catalog = spark.sessionState.catalog assert(catalog.listTables("default") == Seq(TableIdentifier("tab1"))) } } @@ -476,7 +476,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(e.getMessage.contains( "RENAME TEMPORARY TABLE from '`tab1`' to '`tab2`': destination table already exists")) - val catalog = sqlContext.sessionState.catalog + val catalog = spark.sessionState.catalog assert(catalog.listTables("default") == Seq(TableIdentifier("tab1"), TableIdentifier("tab2"))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala index dab5c76200..85c2e8ba55 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala @@ -32,7 +32,7 @@ class FileCatalogSuite extends SharedSQLContext { stringToFile(file, "text") val path = new Path(file.getCanonicalPath) - val catalog = new ListingFileCatalog(sqlContext.sparkSession, Seq(path), Map.empty, None) { + val catalog = new ListingFileCatalog(spark, Seq(path), Map.empty, None) { def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq } @@ -56,11 +56,11 @@ class FileCatalogSuite extends SharedSQLContext { require(qualifiedFilePath.toString.startsWith("file:")) val catalog1 = new ListingFileCatalog( - sqlContext.sparkSession, Seq(unqualifiedDirPath), Map.empty, None) + spark, Seq(unqualifiedDirPath), Map.empty, None) assert(catalog1.allFiles.map(_.getPath) === Seq(qualifiedFilePath)) val catalog2 = new ListingFileCatalog( - sqlContext.sparkSession, Seq(unqualifiedFilePath), Map.empty, None) + spark, Seq(unqualifiedFilePath), Map.empty, None) assert(catalog2.allFiles.map(_.getPath) === Seq(qualifiedFilePath)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index ad7c05c12e..bc95446387 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -137,7 +137,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } test("test inferring decimals") { - val result = sqlContext.read + val result = spark.read .format("csv") .option("comment", "~") .option("header", "true") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 847ea6bd52..e19345529e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -445,17 +445,17 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha (1 to 10).map(i => ParquetData(i, i.toString)), path) // when the input is the base path containing partitioning directories - val baseDf = sqlContext.read.parquet(base.getCanonicalPath) + val baseDf = spark.read.parquet(base.getCanonicalPath) assert(baseDf.schema.map(_.name) === Seq("intField", "stringField", "pi", "ps")) // when the input is a path to the leaf directory containing a parquet file - val partDf = sqlContext.read.parquet(path.getCanonicalPath) + val partDf = spark.read.parquet(path.getCanonicalPath) assert(partDf.schema.map(_.name) === Seq("intField", "stringField")) path.listFiles().foreach { f => if (f.getName.toLowerCase().endsWith(".parquet")) { // when the input is a path to a parquet file - val df = sqlContext.read.parquet(f.getCanonicalPath) + val df = spark.read.parquet(f.getCanonicalPath) assert(df.schema.map(_.name) === Seq("intField", "stringField")) } } @@ -464,7 +464,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha if (f.getName.toLowerCase().endsWith(".parquet")) { // when the input is a path to a parquet file but `basePath` is overridden to // the base path containing partitioning directories - val df = sqlContext + val df = spark .read.option("basePath", base.getCanonicalPath) .parquet(f.getCanonicalPath) assert(df.schema.map(_.name) === Seq("intField", "stringField", "pi", "ps")) @@ -780,7 +780,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha .parquet(dir.getCanonicalPath) def check(path: String, basePath: String, expectedDf: DataFrame): Unit = { - val testDf = sqlContext.read + val testDf = spark.read .option("basePath", basePath) .parquet(path) checkAnswer(testDf, expectedDf) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala index 1753b84ba6..1953d6fa5a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala @@ -90,7 +90,7 @@ private[sql] trait ParquetTest extends SQLTestUtils { (data: Seq[T], tableName: String, testVectorized: Boolean = true) (f: => Unit): Unit = { withParquetDataFrame(data, testVectorized) { df => - spark.sqlContext.registerDataFrameAsTable(df, tableName) + df.createOrReplaceTempView(tableName) withTempTable(tableName)(f) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala index be4bf5b447..db32b6b6be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala @@ -40,7 +40,7 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { protected var spark: SparkSession = null /** - * Create a new [[SQLContext]] running in local-cluster mode with unsafe and codegen enabled. + * Create a new [[SparkSession]] running in local-cluster mode with unsafe and codegen enabled. */ override def beforeAll(): Unit = { super.beforeAll() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala index 8093054b6d..38377164c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.types.{BooleanType, DoubleType, IntegerType, StructT class ExistenceJoinSuite extends SparkPlanTest with SharedSQLContext { - private lazy val left = sqlContext.createDataFrame( + private lazy val left = spark.createDataFrame( sparkContext.parallelize(Seq( Row(1, 2.0), Row(1, 2.0), @@ -42,7 +42,7 @@ class ExistenceJoinSuite extends SparkPlanTest with SharedSQLContext { Row(6, null) )), new StructType().add("a", IntegerType).add("b", DoubleType)) - private lazy val right = sqlContext.createDataFrame( + private lazy val right = spark.createDataFrame( sparkContext.parallelize(Seq( Row(2, 3.0), Row(2, 3.0), @@ -53,7 +53,7 @@ class ExistenceJoinSuite extends SparkPlanTest with SharedSQLContext { Row(6, null) )), new StructType().add("c", IntegerType).add("d", DoubleType)) - private lazy val rightUniqueKey = sqlContext.createDataFrame( + private lazy val rightUniqueKey = spark.createDataFrame( sparkContext.parallelize(Seq( Row(2, 3.0), Row(3, 2.0), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index f8227e3bd6..ad5365a35e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -35,7 +35,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { // Set a conf first. spark.conf.set(testKey, testVal) // Clear the conf. - spark.sqlContext.conf.clear() + spark.sessionState.conf.clear() // After clear, only overrideConfs used by unit test should be in the SQLConf. assert(spark.conf.getAll === TestSQLContext.overrideConfs) @@ -50,11 +50,11 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { assert(spark.conf.get(testKey, testVal + "_") === testVal) assert(spark.conf.getAll.contains(testKey)) - spark.sqlContext.conf.clear() + spark.sessionState.conf.clear() } test("parse SQL set commands") { - spark.sqlContext.conf.clear() + spark.sessionState.conf.clear() sql(s"set $testKey=$testVal") assert(spark.conf.get(testKey, testVal + "_") === testVal) assert(spark.conf.get(testKey, testVal + "_") === testVal) @@ -72,7 +72,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { sql(s"set $key=") assert(spark.conf.get(key, "0") === "") - spark.sqlContext.conf.clear() + spark.sessionState.conf.clear() } test("set command for display") { @@ -97,7 +97,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { } test("deprecated property") { - spark.sqlContext.conf.clear() + spark.sessionState.conf.clear() val original = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS) try { sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10") @@ -108,7 +108,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { } test("reset - public conf") { - spark.sqlContext.conf.clear() + spark.sessionState.conf.clear() val original = spark.conf.get(SQLConf.GROUP_BY_ORDINAL) try { assert(spark.conf.get(SQLConf.GROUP_BY_ORDINAL) === true) @@ -124,7 +124,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { } test("reset - internal conf") { - spark.sqlContext.conf.clear() + spark.sessionState.conf.clear() val original = spark.conf.get(SQLConf.NATIVE_VIEW) try { assert(spark.conf.get(SQLConf.NATIVE_VIEW) === true) @@ -140,7 +140,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { } test("reset - user-defined conf") { - spark.sqlContext.conf.clear() + spark.sessionState.conf.clear() val userDefinedConf = "x.y.z.reset" try { assert(spark.conf.getOption(userDefinedConf).isEmpty) @@ -155,7 +155,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { } test("invalid conf value") { - spark.sqlContext.conf.clear() + spark.sessionState.conf.clear() val e = intercept[IllegalArgumentException] { sql(s"set ${SQLConf.CASE_SENSITIVE.key}=10") } @@ -163,7 +163,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { } test("Test SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE's method") { - spark.sqlContext.conf.clear() + spark.sessionState.conf.clear() spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "100") assert(spark.conf.get(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE) === 100) @@ -191,7 +191,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { spark.conf.set(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "-90000000000g") } - spark.sqlContext.conf.clear() + spark.sessionState.conf.clear() } test("SparkSession can access configs set in SparkConf") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 03c18ad009..cbddb0643b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -27,19 +27,19 @@ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with BeforeAndAfter { - protected override lazy val sql = caseInsensitiveContext.sql _ + protected override lazy val sql = spark.sql _ private var path: File = null override def beforeAll(): Unit = { super.beforeAll() path = Utils.createTempDir() val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) - caseInsensitiveContext.read.json(rdd).createOrReplaceTempView("jt") + spark.read.json(rdd).createOrReplaceTempView("jt") } override def afterAll(): Unit = { try { - caseInsensitiveContext.dropTempTable("jt") + spark.catalog.dropTempView("jt") } finally { super.afterAll() } @@ -64,7 +64,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with sql("SELECT a, b FROM jsonTable"), sql("SELECT a, b FROM jt").collect()) - caseInsensitiveContext.dropTempTable("jsonTable") + spark.catalog.dropTempView("jsonTable") } test("CREATE TEMPORARY TABLE AS SELECT based on the file without write permission") { @@ -132,7 +132,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with sql("SELECT * FROM jsonTable"), sql("SELECT a * 4 FROM jt").collect()) - caseInsensitiveContext.dropTempTable("jsonTable") + spark.catalog.dropTempView("jsonTable") // Explicitly delete the data. if (path.exists()) Utils.deleteRecursively(path) @@ -150,7 +150,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSQLContext with sql("SELECT * FROM jsonTable"), sql("SELECT b FROM jt").collect()) - caseInsensitiveContext.dropTempTable("jsonTable") + spark.catalog.dropTempView("jsonTable") } test("CREATE TEMPORARY TABLE AS SELECT with IF NOT EXISTS is not allowed") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala index 853707c036..f07c33042a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala @@ -27,23 +27,23 @@ class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext { test("data sources with the same name") { intercept[RuntimeException] { - caseInsensitiveContext.read.format("Fluet da Bomb").load() + spark.read.format("Fluet da Bomb").load() } } test("load data source from format alias") { - caseInsensitiveContext.read.format("gathering quorum").load().schema == + spark.read.format("gathering quorum").load().schema == StructType(Seq(StructField("stringType", StringType, nullable = false))) } test("specify full classname with duplicate formats") { - caseInsensitiveContext.read.format("org.apache.spark.sql.sources.FakeSourceOne") + spark.read.format("org.apache.spark.sql.sources.FakeSourceOne") .load().schema == StructType(Seq(StructField("stringType", StringType, nullable = false))) } - test("should fail to load ORC without HiveContext") { + test("should fail to load ORC without Hive Support") { intercept[ClassNotFoundException] { - caseInsensitiveContext.read.format("orc").load() + spark.read.format("orc").load() } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala index a34f70ed65..5a7a9073fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala @@ -78,7 +78,7 @@ case class SimpleDDLScan( } class DDLTestSuite extends DataSourceTest with SharedSQLContext { - protected override lazy val sql = caseInsensitiveContext.sql _ + protected override lazy val sql = spark.sql _ override def beforeAll(): Unit = { super.beforeAll() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala index 754aa32a30..206d03ea98 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala @@ -22,16 +22,9 @@ import org.apache.spark.sql.internal.SQLConf private[sql] abstract class DataSourceTest extends QueryTest { - // We want to test some edge cases. - protected lazy val caseInsensitiveContext: SQLContext = { - val ctx = new SQLContext(spark.sparkContext) - ctx.setConf(SQLConf.CASE_SENSITIVE, false) - ctx - } - protected def sqlTest(sqlString: String, expectedAnswer: Seq[Row]) { test(sqlString) { - checkAnswer(caseInsensitiveContext.sql(sqlString), expectedAnswer) + checkAnswer(spark.sql(sqlString), expectedAnswer) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index f969660ddd..45e737f5ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -133,7 +133,7 @@ object ColumnsRequired { } class FilteredScanSuite extends DataSourceTest with SharedSQLContext with PredicateHelper { - protected override lazy val sql = caseInsensitiveContext.sql _ + protected override lazy val sql = spark.sql _ override def beforeAll(): Unit = { super.beforeAll() @@ -310,7 +310,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic test(s"PushDown Returns $expectedCount: $sqlString") { // These tests check a particular plan, disable whole stage codegen. - caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false) + spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, false) try { val queryExecution = sql(sqlString).queryExecution val rawPlan = queryExecution.executedPlan.collect { @@ -322,7 +322,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic val rawCount = rawPlan.execute().count() assert(ColumnsRequired.set === requiredColumnNames) - val table = caseInsensitiveContext.table("oneToTenFiltered") + val table = spark.table("oneToTenFiltered") val relation = table.queryExecution.logical.collectFirst { case LogicalRelation(r, _, _) => r }.get @@ -337,7 +337,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic queryExecution) } } finally { - caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, + spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, SQLConf.WHOLESTAGE_CODEGEN_ENABLED.defaultValue.get) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 854fec5b22..4780eb473d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -24,14 +24,14 @@ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils class InsertSuite extends DataSourceTest with SharedSQLContext { - protected override lazy val sql = caseInsensitiveContext.sql _ + protected override lazy val sql = spark.sql _ private var path: File = null override def beforeAll(): Unit = { super.beforeAll() path = Utils.createTempDir() val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""")) - caseInsensitiveContext.read.json(rdd).createOrReplaceTempView("jt") + spark.read.json(rdd).createOrReplaceTempView("jt") sql( s""" |CREATE TEMPORARY TABLE jsonTable (a int, b string) @@ -44,8 +44,8 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { override def afterAll(): Unit = { try { - caseInsensitiveContext.dropTempTable("jsonTable") - caseInsensitiveContext.dropTempTable("jt") + spark.catalog.dropTempView("jsonTable") + spark.catalog.dropTempView("jt") Utils.deleteRecursively(path) } finally { super.afterAll() @@ -111,7 +111,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { // Writing the table to less part files. val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""), 5) - caseInsensitiveContext.read.json(rdd1).createOrReplaceTempView("jt1") + spark.read.json(rdd1).createOrReplaceTempView("jt1") sql( s""" |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt1 @@ -123,7 +123,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { // Writing the table to more part files. val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""), 10) - caseInsensitiveContext.read.json(rdd2).createOrReplaceTempView("jt2") + spark.read.json(rdd2).createOrReplaceTempView("jt2") sql( s""" |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt2 @@ -142,8 +142,8 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { (1 to 10).map(i => Row(i * 10, s"str$i")) ) - caseInsensitiveContext.dropTempTable("jt1") - caseInsensitiveContext.dropTempTable("jt2") + spark.catalog.dropTempView("jt1") + spark.catalog.dropTempView("jt2") } test("INSERT INTO JSONRelation for now") { @@ -185,7 +185,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt """.stripMargin) // Cached Query Execution - caseInsensitiveContext.cacheTable("jsonTable") + spark.catalog.cacheTable("jsonTable") assertCached(sql("SELECT * FROM jsonTable")) checkAnswer( sql("SELECT * FROM jsonTable"), @@ -226,7 +226,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { // sql("SELECT a * 2, b FROM jt").collect()) // // // Verify uncaching -// caseInsensitiveContext.uncacheTable("jsonTable") +// spark.catalog.uncacheTable("jsonTable") // assertCached(sql("SELECT * FROM jsonTable"), 0) } @@ -257,6 +257,6 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { "It is not allowed to insert into a table that is not an InsertableRelation." ) - caseInsensitiveContext.dropTempTable("oneToTen") + spark.catalog.dropTempView("oneToTen") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala index 9cdf7dea76..207f89d3ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala @@ -56,7 +56,7 @@ case class SimplePrunedScan(from: Int, to: Int)(@transient val sparkSession: Spa } class PrunedScanSuite extends DataSourceTest with SharedSQLContext { - protected override lazy val sql = caseInsensitiveContext.sql _ + protected override lazy val sql = spark.sql _ override def beforeAll(): Unit = { super.beforeAll() @@ -122,7 +122,7 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext { test(s"Columns output ${expectedColumns.mkString(",")}: $sqlString") { // These tests check a particular plan, disable whole stage codegen. - caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false) + spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, false) try { val queryExecution = sql(sqlString).queryExecution val rawPlan = queryExecution.executedPlan.collect { @@ -145,7 +145,7 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext { fail(s"Wrong output row. Got $rawOutput\n$queryExecution") } } finally { - caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, + spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, SQLConf.WHOLESTAGE_CODEGEN_ENABLED.defaultValue.get) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala index 7738e4107d..b1756c27fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala @@ -28,26 +28,26 @@ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndAfter { - protected override lazy val sql = caseInsensitiveContext.sql _ + protected override lazy val sql = spark.sql _ private var originalDefaultSource: String = null private var path: File = null private var df: DataFrame = null override def beforeAll(): Unit = { super.beforeAll() - originalDefaultSource = caseInsensitiveContext.conf.defaultDataSourceName + originalDefaultSource = spark.sessionState.conf.defaultDataSourceName path = Utils.createTempDir() path.delete() val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) - df = caseInsensitiveContext.read.json(rdd) + df = spark.read.json(rdd) df.createOrReplaceTempView("jsonTable") } override def afterAll(): Unit = { try { - caseInsensitiveContext.conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource) + spark.conf.set(SQLConf.DEFAULT_DATA_SOURCE_NAME.key, originalDefaultSource) } finally { super.afterAll() } @@ -58,45 +58,42 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA } def checkLoad(expectedDF: DataFrame = df, tbl: String = "jsonTable"): Unit = { - caseInsensitiveContext.conf.setConf( - SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json") - checkAnswer(caseInsensitiveContext.read.load(path.toString), expectedDF.collect()) + spark.conf.set(SQLConf.DEFAULT_DATA_SOURCE_NAME.key, "org.apache.spark.sql.json") + checkAnswer(spark.read.load(path.toString), expectedDF.collect()) // Test if we can pick up the data source name passed in load. - caseInsensitiveContext.conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name") - checkAnswer(caseInsensitiveContext.read.format("json").load(path.toString), + spark.conf.set(SQLConf.DEFAULT_DATA_SOURCE_NAME.key, "not a source name") + checkAnswer(spark.read.format("json").load(path.toString), expectedDF.collect()) - checkAnswer(caseInsensitiveContext.read.format("json").load(path.toString), + checkAnswer(spark.read.format("json").load(path.toString), expectedDF.collect()) val schema = StructType(StructField("b", StringType, true) :: Nil) checkAnswer( - caseInsensitiveContext.read.format("json").schema(schema).load(path.toString), + spark.read.format("json").schema(schema).load(path.toString), sql(s"SELECT b FROM $tbl").collect()) } test("save with path and load") { - caseInsensitiveContext.conf.setConf( - SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json") + spark.conf.set(SQLConf.DEFAULT_DATA_SOURCE_NAME.key, "org.apache.spark.sql.json") df.write.save(path.toString) checkLoad() } test("save with string mode and path, and load") { - caseInsensitiveContext.conf.setConf( - SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json") + spark.conf.set(SQLConf.DEFAULT_DATA_SOURCE_NAME.key, "org.apache.spark.sql.json") path.createNewFile() df.write.mode("overwrite").save(path.toString) checkLoad() } test("save with path and datasource, and load") { - caseInsensitiveContext.conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name") + spark.conf.set(SQLConf.DEFAULT_DATA_SOURCE_NAME.key, "not a source name") df.write.json(path.toString) checkLoad() } test("save with data source and options, and load") { - caseInsensitiveContext.conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name") + spark.conf.set(SQLConf.DEFAULT_DATA_SOURCE_NAME.key, "not a source name") df.write.mode(SaveMode.ErrorIfExists).json(path.toString) checkLoad() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index cddf4a1884..93116d84ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -106,7 +106,7 @@ case class AllDataTypesScan( } class TableScanSuite extends DataSourceTest with SharedSQLContext { - protected override lazy val sql = caseInsensitiveContext.sql _ + protected override lazy val sql = spark.sql _ private lazy val tableWithSchemaExpected = (1 to 10).map { i => Row( @@ -241,7 +241,7 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext { Nil ) - assert(expectedSchema == caseInsensitiveContext.table("tableWithSchema").schema) + assert(expectedSchema == spark.table("tableWithSchema").schema) checkAnswer( sql( @@ -297,7 +297,7 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext { test("Caching") { // Cached Query Execution - caseInsensitiveContext.cacheTable("oneToTen") + spark.catalog.cacheTable("oneToTen") assertCached(sql("SELECT * FROM oneToTen")) checkAnswer( sql("SELECT * FROM oneToTen"), @@ -325,7 +325,7 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext { (2 to 10).map(i => Row(i, i - 1)).toSeq) // Verify uncaching - caseInsensitiveContext.uncacheTable("oneToTen") + spark.catalog.uncacheTable("oneToTen") assertCached(sql("SELECT * FROM oneToTen"), 0) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index f3262f772b..3d8dcaf5a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -173,7 +173,7 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext { query.processAllAvailable() } - val outputDf = sqlContext.read.parquet(outputDir) + val outputDf = spark.read.parquet(outputDir) val expectedSchema = new StructType() .add(StructField("value", IntegerType)) .add(StructField("id", IntegerType)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 4ca5619603..52ba90f02c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -97,18 +97,18 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton { test("correct error on uncache of non-cached table") { intercept[IllegalArgumentException] { - hiveContext.uncacheTable("src") + spark.catalog.uncacheTable("src") } } test("'CACHE TABLE' and 'UNCACHE TABLE' HiveQL statement") { sql("CACHE TABLE src") assertCached(table("src")) - assert(hiveContext.isCached("src"), "Table 'src' should be cached") + assert(spark.catalog.isCached("src"), "Table 'src' should be cached") sql("UNCACHE TABLE src") assertCached(table("src"), 0) - assert(!hiveContext.isCached("src"), "Table 'src' should not be cached") + assert(!spark.catalog.isCached("src"), "Table 'src' should not be cached") } test("CACHE TABLE tableName AS SELECT * FROM anotherTable") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala index dc8f374eb1..aa1973de7f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala @@ -26,11 +26,11 @@ import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.hive.test.TestHiveSingleton class ErrorPositionSuite extends QueryTest with TestHiveSingleton with BeforeAndAfterEach { - import hiveContext.implicits._ + import spark.implicits._ override protected def beforeEach(): Unit = { super.beforeEach() - if (spark.sqlContext.tableNames().contains("src")) { + if (spark.catalog.listTables().collect().map(_.name).contains("src")) { spark.catalog.dropTempView("src") } Seq((1, "")).toDF("key", "value").createOrReplaceTempView("src") @@ -130,12 +130,12 @@ class ErrorPositionSuite extends QueryTest with TestHiveSingleton with BeforeAnd * @param token a unique token in the string that should be indicated by the exception */ def positionTest(name: String, query: String, token: String): Unit = { - def ast = hiveContext.sessionState.sqlParser.parsePlan(query) + def ast = spark.sessionState.sqlParser.parsePlan(query) def parseTree = Try(quietly(ast.treeString)).getOrElse("") test(name) { val error = intercept[AnalysisException] { - quietly(hiveContext.sql(query)) + quietly(spark.sql(query)) } assert(!error.getMessage.contains("Seq(")) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala index cc41c04c71..6477974fe7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameAnalyticsSuite.scala @@ -27,20 +27,20 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton // `hive` package is optional in compiling, however, `SQLContext.sql` doesn't // support the `cube` or `rollup` yet. class HiveDataFrameAnalyticsSuite extends QueryTest with TestHiveSingleton with BeforeAndAfterAll { - import hiveContext.implicits._ - import hiveContext.sql + import spark.implicits._ + import spark.sql private var testData: DataFrame = _ override def beforeAll() { super.beforeAll() testData = Seq((1, 2), (2, 2), (3, 4)).toDF("a", "b") - hiveContext.registerDataFrameAsTable(testData, "mytable") + testData.createOrReplaceTempView("mytable") } override def afterAll(): Unit = { try { - hiveContext.dropTempTable("mytable") + spark.catalog.dropTempView("mytable") } finally { super.afterAll() } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala index 63cf5030ab..cdc259d75b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.hive.test.TestHiveSingleton class HiveDataFrameJoinSuite extends QueryTest with TestHiveSingleton { - import hiveContext.implicits._ + import spark.implicits._ // We should move this into SQL package if we make case sensitivity configurable in SQL. test("join - self join auto resolve ambiguity with case insensitivity") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala index 7fdc5d7193..1b31caa76d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala @@ -23,10 +23,10 @@ import org.apache.spark.sql.QueryTest class HiveDataFrameSuite extends QueryTest with TestHiveSingleton { test("table name with schema") { // regression test for SPARK-11778 - hiveContext.sql("create schema usrdb") - hiveContext.sql("create table usrdb.test(c int)") - hiveContext.read.table("usrdb.test") - hiveContext.sql("drop table usrdb.test") - hiveContext.sql("drop schema usrdb") + spark.sql("create schema usrdb") + spark.sql("create table usrdb.test(c int)") + spark.read.table("usrdb.test") + spark.sql("drop table usrdb.test") + spark.sql("drop schema usrdb") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index b043d291aa..b420781e51 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils} import org.apache.spark.sql.types.{DecimalType, StringType, StructType} class HiveMetastoreCatalogSuite extends TestHiveSingleton { - import hiveContext.implicits._ + import spark.implicits._ test("struct field should accept underscore in sub-column name") { val hiveTypeStr = "struct" @@ -45,7 +45,7 @@ class HiveMetastoreCatalogSuite extends TestHiveSingleton { } test("duplicated metastore relations") { - val df = hiveContext.sql("SELECT * FROM src") + val df = spark.sql("SELECT * FROM src") logInfo(df.queryExecution.toString) df.as('a).join(df.as('b), $"a.key" === $"b.key") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala index e2304b5397..33252ad07a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala @@ -51,7 +51,7 @@ class HiveParquetSuite extends QueryTest with ParquetTest with TestHiveSingleton test("Converting Hive to Parquet Table via saveAsParquetFile") { withTempPath { dir => sql("SELECT * FROM src").write.parquet(dir.getCanonicalPath) - hiveContext.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("p") + spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("p") withTempTable("p") { checkAnswer( sql("SELECT * FROM src ORDER BY key"), @@ -65,7 +65,7 @@ class HiveParquetSuite extends QueryTest with ParquetTest with TestHiveSingleton withParquetTable((1 to 10).map(i => (i, s"val_$i")), "t", false) { withTempPath { file => sql("SELECT * FROM t LIMIT 1").write.parquet(file.getCanonicalPath) - hiveContext.read.parquet(file.getCanonicalPath).createOrReplaceTempView("p") + spark.read.parquet(file.getCanonicalPath).createOrReplaceTempView("p") withTempTable("p") { // let's do three overwrites for good measure sql("INSERT OVERWRITE TABLE p SELECT * FROM t") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 4a55bcc3b1..fae59001b9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -36,9 +36,9 @@ case class ThreeCloumntable(key: Int, value: String, key1: String) class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter with SQLTestUtils { - import hiveContext.implicits._ + import spark.implicits._ - override lazy val testData = hiveContext.sparkContext.parallelize( + override lazy val testData = spark.sparkContext.parallelize( (1 to 100).map(i => TestData(i, i.toString))).toDF() before { @@ -95,9 +95,9 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef test("SPARK-4052: scala.collection.Map as value type of MapType") { val schema = StructType(StructField("m", MapType(StringType, StringType), true) :: Nil) - val rowRDD = hiveContext.sparkContext.parallelize( + val rowRDD = spark.sparkContext.parallelize( (1 to 100).map(i => Row(scala.collection.mutable.HashMap(s"key$i" -> s"value$i")))) - val df = hiveContext.createDataFrame(rowRDD, schema) + val df = spark.createDataFrame(rowRDD, schema) df.createOrReplaceTempView("tableWithMapValue") sql("CREATE TABLE hiveTableWithMapValue(m MAP )") sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue") @@ -169,8 +169,8 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef test("Insert ArrayType.containsNull == false") { val schema = StructType(Seq( StructField("a", ArrayType(StringType, containsNull = false)))) - val rowRDD = hiveContext.sparkContext.parallelize((1 to 100).map(i => Row(Seq(s"value$i")))) - val df = hiveContext.createDataFrame(rowRDD, schema) + val rowRDD = spark.sparkContext.parallelize((1 to 100).map(i => Row(Seq(s"value$i")))) + val df = spark.createDataFrame(rowRDD, schema) df.createOrReplaceTempView("tableWithArrayValue") sql("CREATE TABLE hiveTableWithArrayValue(a Array )") sql("INSERT OVERWRITE TABLE hiveTableWithArrayValue SELECT a FROM tableWithArrayValue") @@ -185,9 +185,9 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef test("Insert MapType.valueContainsNull == false") { val schema = StructType(Seq( StructField("m", MapType(StringType, StringType, valueContainsNull = false)))) - val rowRDD = hiveContext.sparkContext.parallelize( + val rowRDD = spark.sparkContext.parallelize( (1 to 100).map(i => Row(Map(s"key$i" -> s"value$i")))) - val df = hiveContext.createDataFrame(rowRDD, schema) + val df = spark.createDataFrame(rowRDD, schema) df.createOrReplaceTempView("tableWithMapValue") sql("CREATE TABLE hiveTableWithMapValue(m Map )") sql("INSERT OVERWRITE TABLE hiveTableWithMapValue SELECT m FROM tableWithMapValue") @@ -202,9 +202,9 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef test("Insert StructType.fields.exists(_.nullable == false)") { val schema = StructType(Seq( StructField("s", StructType(Seq(StructField("f", StringType, nullable = false)))))) - val rowRDD = hiveContext.sparkContext.parallelize( + val rowRDD = spark.sparkContext.parallelize( (1 to 100).map(i => Row(Row(s"value$i")))) - val df = hiveContext.createDataFrame(rowRDD, schema) + val df = spark.createDataFrame(rowRDD, schema) df.createOrReplaceTempView("tableWithStructValue") sql("CREATE TABLE hiveTableWithStructValue(s Struct )") sql("INSERT OVERWRITE TABLE hiveTableWithStructValue SELECT s FROM tableWithStructValue") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 153b0c3c72..1e6de463b3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -40,7 +40,7 @@ import org.apache.spark.util.Utils */ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { import hiveContext._ - import hiveContext.implicits._ + import spark.implicits._ var jsonFilePath: String = _ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index 5b706b0432..83f1b192f7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -25,22 +25,29 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle private lazy val df = spark.range(10).coalesce(1).toDF() private def checkTablePath(dbName: String, tableName: String): Unit = { - val metastoreTable = hiveContext.sharedState.externalCatalog.getTable(dbName, tableName) + val metastoreTable = spark.sharedState.externalCatalog.getTable(dbName, tableName) val expectedPath = - hiveContext.sharedState.externalCatalog.getDatabase(dbName).locationUri + "/" + tableName + spark.sharedState.externalCatalog.getDatabase(dbName).locationUri + "/" + tableName assert(metastoreTable.storage.serdeProperties("path") === expectedPath) } + private def getTableNames(dbName: Option[String] = None): Array[String] = { + dbName match { + case Some(db) => spark.catalog.listTables(db).collect().map(_.name) + case None => spark.catalog.listTables().collect().map(_.name) + } + } + test(s"saveAsTable() to non-default database - with USE - Overwrite") { withTempDatabase { db => activateDatabase(db) { df.write.mode(SaveMode.Overwrite).saveAsTable("t") - assert(spark.sqlContext.tableNames().contains("t")) + assert(getTableNames().contains("t")) checkAnswer(spark.table("t"), df) } - assert(spark.sqlContext.tableNames(db).contains("t")) + assert(getTableNames(Option(db)).contains("t")) checkAnswer(spark.table(s"$db.t"), df) checkTablePath(db, "t") @@ -50,7 +57,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle test(s"saveAsTable() to non-default database - without USE - Overwrite") { withTempDatabase { db => df.write.mode(SaveMode.Overwrite).saveAsTable(s"$db.t") - assert(spark.sqlContext.tableNames(db).contains("t")) + assert(getTableNames(Option(db)).contains("t")) checkAnswer(spark.table(s"$db.t"), df) checkTablePath(db, "t") @@ -65,7 +72,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle df.write.format("parquet").mode(SaveMode.Overwrite).save(path) spark.catalog.createExternalTable("t", path, "parquet") - assert(spark.sqlContext.tableNames(db).contains("t")) + assert(getTableNames(Option(db)).contains("t")) checkAnswer(spark.table("t"), df) sql( @@ -76,7 +83,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle | path '$path' |) """.stripMargin) - assert(spark.sqlContext.tableNames(db).contains("t1")) + assert(getTableNames(Option(db)).contains("t1")) checkAnswer(spark.table("t1"), df) } } @@ -90,7 +97,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle df.write.format("parquet").mode(SaveMode.Overwrite).save(path) spark.catalog.createExternalTable(s"$db.t", path, "parquet") - assert(spark.sqlContext.tableNames(db).contains("t")) + assert(getTableNames(Option(db)).contains("t")) checkAnswer(spark.table(s"$db.t"), df) sql( @@ -101,7 +108,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle | path '$path' |) """.stripMargin) - assert(spark.sqlContext.tableNames(db).contains("t1")) + assert(getTableNames(Option(db)).contains("t1")) checkAnswer(spark.table(s"$db.t1"), df) } } @@ -112,11 +119,11 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle activateDatabase(db) { df.write.mode(SaveMode.Overwrite).saveAsTable("t") df.write.mode(SaveMode.Append).saveAsTable("t") - assert(spark.sqlContext.tableNames().contains("t")) + assert(getTableNames().contains("t")) checkAnswer(spark.table("t"), df.union(df)) } - assert(spark.sqlContext.tableNames(db).contains("t")) + assert(getTableNames(Option(db)).contains("t")) checkAnswer(spark.table(s"$db.t"), df.union(df)) checkTablePath(db, "t") @@ -127,7 +134,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle withTempDatabase { db => df.write.mode(SaveMode.Overwrite).saveAsTable(s"$db.t") df.write.mode(SaveMode.Append).saveAsTable(s"$db.t") - assert(spark.sqlContext.tableNames(db).contains("t")) + assert(getTableNames(Option(db)).contains("t")) checkAnswer(spark.table(s"$db.t"), df.union(df)) checkTablePath(db, "t") @@ -138,7 +145,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle withTempDatabase { db => activateDatabase(db) { df.write.mode(SaveMode.Overwrite).saveAsTable("t") - assert(spark.sqlContext.tableNames().contains("t")) + assert(getTableNames().contains("t")) df.write.insertInto(s"$db.t") checkAnswer(spark.table(s"$db.t"), df.union(df)) @@ -150,10 +157,10 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle withTempDatabase { db => activateDatabase(db) { df.write.mode(SaveMode.Overwrite).saveAsTable("t") - assert(spark.sqlContext.tableNames().contains("t")) + assert(getTableNames().contains("t")) } - assert(spark.sqlContext.tableNames(db).contains("t")) + assert(getTableNames(Option(db)).contains("t")) df.write.insertInto(s"$db.t") checkAnswer(spark.table(s"$db.t"), df.union(df)) @@ -175,21 +182,21 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle withTempDatabase { db => activateDatabase(db) { sql(s"CREATE TABLE t (key INT)") - assert(spark.sqlContext.tableNames().contains("t")) - assert(!spark.sqlContext.tableNames("default").contains("t")) + assert(getTableNames().contains("t")) + assert(!getTableNames(Option("default")).contains("t")) } - assert(!spark.sqlContext.tableNames().contains("t")) - assert(spark.sqlContext.tableNames(db).contains("t")) + assert(!getTableNames().contains("t")) + assert(getTableNames(Option(db)).contains("t")) activateDatabase(db) { sql(s"DROP TABLE t") - assert(!spark.sqlContext.tableNames().contains("t")) - assert(!spark.sqlContext.tableNames("default").contains("t")) + assert(!getTableNames().contains("t")) + assert(!getTableNames(Option("default")).contains("t")) } - assert(!spark.sqlContext.tableNames().contains("t")) - assert(!spark.sqlContext.tableNames(db).contains("t")) + assert(!getTableNames().contains("t")) + assert(!getTableNames(Option(db)).contains("t")) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala index 266fdd6c1f..f7650e001a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.util.Utils class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { - import hiveContext.implicits._ + import spark.implicits._ test("SPARK-5068: query data when path doesn't exist") { withSQLConf((SQLConf.HIVE_VERIFY_PARTITION_PATH.key, "true")) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 73b1a7850d..666a8da0da 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -30,11 +30,10 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { - import hiveContext.sql test("parse analyze commands") { def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) { - val parsed = hiveContext.sessionState.sqlParser.parsePlan(analyzeCommand) + val parsed = spark.sessionState.sqlParser.parsePlan(analyzeCommand) val operators = parsed.collect { case a: AnalyzeTableCommand => a case o => o @@ -72,7 +71,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils } test("MetastoreRelations fallback to HDFS for size estimation") { - val enableFallBackToHdfsForStats = hiveContext.conf.fallBackToHdfsForStatsEnabled + val enableFallBackToHdfsForStats = spark.sessionState.conf.fallBackToHdfsForStatsEnabled try { withTempDir { tempDir => @@ -98,9 +97,9 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils LOCATION '$tempDir' """) - hiveContext.setConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS, true) + spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, true) - val relation = hiveContext.sessionState.catalog.lookupRelation(TableIdentifier("csv_table")) + val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier("csv_table")) .asInstanceOf[MetastoreRelation] val properties = relation.hiveQlTable.getParameters @@ -111,15 +110,14 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils assert(sizeInBytes === BigInt(file1.length() + file2.length())) } } finally { - hiveContext.setConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS, enableFallBackToHdfsForStats) + spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, enableFallBackToHdfsForStats) sql("DROP TABLE csv_table ") } } ignore("analyze MetastoreRelations") { def queryTotalSize(tableName: String): BigInt = - hiveContext.sessionState.catalog.lookupRelation( - TableIdentifier(tableName)).statistics.sizeInBytes + spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes // Non-partitioned table sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect() @@ -153,7 +151,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils |SELECT * FROM src """.stripMargin).collect() - assert(queryTotalSize("analyzeTable_part") === hiveContext.conf.defaultSizeInBytes) + assert(queryTotalSize("analyzeTable_part") === spark.sessionState.conf.defaultSizeInBytes) sql("ANALYZE TABLE analyzeTable_part COMPUTE STATISTICS noscan") @@ -165,9 +163,9 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils // Try to analyze a temp table sql("""SELECT * FROM src""").createOrReplaceTempView("tempTable") intercept[UnsupportedOperationException] { - hiveContext.sql("ANALYZE TABLE tempTable COMPUTE STATISTICS") + sql("ANALYZE TABLE tempTable COMPUTE STATISTICS") } - hiveContext.sessionState.catalog.dropTable( + spark.sessionState.catalog.dropTable( TableIdentifier("tempTable"), ignoreIfNotExists = true) } @@ -196,8 +194,8 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils val sizes = df.queryExecution.analyzed.collect { case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes } - assert(sizes.size === 2 && sizes(0) <= hiveContext.conf.autoBroadcastJoinThreshold - && sizes(1) <= hiveContext.conf.autoBroadcastJoinThreshold, + assert(sizes.size === 2 && sizes(0) <= spark.sessionState.conf.autoBroadcastJoinThreshold + && sizes(1) <= spark.sessionState.conf.autoBroadcastJoinThreshold, s"query should contain two relations, each of which has size smaller than autoConvertSize") // Using `sparkPlan` because for relevant patterns in HashJoin to be @@ -208,8 +206,8 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils checkAnswer(df, expectedAnswer) // check correctness of output - hiveContext.conf.settings.synchronized { - val tmp = hiveContext.conf.autoBroadcastJoinThreshold + spark.sessionState.conf.settings.synchronized { + val tmp = spark.sessionState.conf.autoBroadcastJoinThreshold sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1""") df = sql(query) @@ -252,8 +250,8 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils .isAssignableFrom(r.getClass) => r.statistics.sizeInBytes } - assert(sizes.size === 2 && sizes(1) <= hiveContext.conf.autoBroadcastJoinThreshold - && sizes(0) <= hiveContext.conf.autoBroadcastJoinThreshold, + assert(sizes.size === 2 && sizes(1) <= spark.sessionState.conf.autoBroadcastJoinThreshold + && sizes(0) <= spark.sessionState.conf.autoBroadcastJoinThreshold, s"query should contain two relations, each of which has size smaller than autoConvertSize") // Using `sparkPlan` because for relevant patterns in HashJoin to be @@ -266,8 +264,8 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils checkAnswer(df, answer) // check correctness of output - hiveContext.conf.settings.synchronized { - val tmp = hiveContext.conf.autoBroadcastJoinThreshold + spark.sessionState.conf.settings.synchronized { + val tmp = spark.sessionState.conf.autoBroadcastJoinThreshold sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1") df = sql(leftSemiJoinQuery) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala index d121bcbe15..88cc42efd0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala @@ -36,7 +36,7 @@ class UDFSuite with TestHiveSingleton with BeforeAndAfterEach { - import hiveContext.implicits._ + import spark.implicits._ private[this] val functionName = "myUPper" private[this] val functionNameUpper = "MYUPPER" @@ -64,12 +64,12 @@ class UDFSuite } test("UDF case insensitive") { - hiveContext.udf.register("random0", () => { Math.random() }) - hiveContext.udf.register("RANDOM1", () => { Math.random() }) - hiveContext.udf.register("strlenScala", (_: String).length + (_: Int)) - assert(hiveContext.sql("SELECT RANDOM0() FROM src LIMIT 1").head().getDouble(0) >= 0.0) - assert(hiveContext.sql("SELECT RANDOm1() FROM src LIMIT 1").head().getDouble(0) >= 0.0) - assert(hiveContext.sql("SELECT strlenscala('test', 1) FROM src LIMIT 1").head().getInt(0) === 5) + spark.udf.register("random0", () => { Math.random() }) + spark.udf.register("RANDOM1", () => { Math.random() }) + spark.udf.register("strlenScala", (_: String).length + (_: Int)) + assert(sql("SELECT RANDOM0() FROM src LIMIT 1").head().getDouble(0) >= 0.0) + assert(sql("SELECT RANDOm1() FROM src LIMIT 1").head().getDouble(0) >= 0.0) + assert(sql("SELECT strlenscala('test', 1) FROM src LIMIT 1").head().getInt(0) === 5) } test("temporary function: create and drop") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 80e6f4ec70..a98d469880 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.test.SQLTestUtils class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { - import hiveContext.implicits._ + import spark.implicits._ override def afterEach(): Unit = { try { @@ -52,7 +52,7 @@ class HiveDDLSuite new Path(new Path(dbPath.get), tableIdentifier.table).toString } val filesystemPath = new Path(expectedTablePath) - val fs = filesystemPath.getFileSystem(hiveContext.sessionState.newHadoopConf()) + val fs = filesystemPath.getFileSystem(spark.sessionState.newHadoopConf()) fs.exists(filesystemPath) } @@ -86,8 +86,7 @@ class HiveDDLSuite """.stripMargin) val hiveTable = - hiveContext.sessionState.catalog - .getTableMetadata(TableIdentifier(tabName, Some("default"))) + spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName, Some("default"))) assert(hiveTable.tableType == CatalogTableType.EXTERNAL) assert(tmpDir.listFiles.nonEmpty) @@ -113,8 +112,7 @@ class HiveDDLSuite } val hiveTable = - hiveContext.sessionState.catalog - .getTableMetadata(TableIdentifier(tabName, Some("default"))) + spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName, Some("default"))) // This data source table is external table assert(hiveTable.tableType == CatalogTableType.EXTERNAL) @@ -127,7 +125,7 @@ class HiveDDLSuite } test("create table and view with comment") { - val catalog = hiveContext.sessionState.catalog + val catalog = spark.sessionState.catalog val tabName = "tab1" withTable(tabName) { sql(s"CREATE TABLE $tabName(c1 int) COMMENT 'BLABLA'") @@ -143,7 +141,7 @@ class HiveDDLSuite } test("add/drop partitions - external table") { - val catalog = hiveContext.sessionState.catalog + val catalog = spark.sessionState.catalog withTempDir { tmpDir => val basePath = tmpDir.getCanonicalPath val partitionPath_1stCol_part1 = new File(basePath + "/ds=2008-04-08") @@ -242,7 +240,7 @@ class HiveDDLSuite val oldViewName = "view1" val newViewName = "view2" withView(oldViewName, newViewName) { - val catalog = hiveContext.sessionState.catalog + val catalog = spark.sessionState.catalog sql(s"CREATE VIEW $oldViewName AS SELECT * FROM $tabName") assert(catalog.tableExists(TableIdentifier(oldViewName))) @@ -260,7 +258,7 @@ class HiveDDLSuite spark.range(10).write.saveAsTable(tabName) val viewName = "view1" withView(viewName) { - val catalog = hiveContext.sessionState.catalog + val catalog = spark.sessionState.catalog sql(s"CREATE VIEW $viewName AS SELECT * FROM $tabName") assert(catalog.getTableMetadata(TableIdentifier(viewName)) @@ -299,7 +297,7 @@ class HiveDDLSuite val oldViewName = "view1" val newViewName = "view2" withView(oldViewName, newViewName) { - val catalog = hiveContext.sessionState.catalog + val catalog = spark.sessionState.catalog sql(s"CREATE VIEW $oldViewName AS SELECT * FROM $tabName") assert(catalog.tableExists(TableIdentifier(tabName))) @@ -391,7 +389,7 @@ class HiveDDLSuite val catalog = spark.sessionState.catalog val dbName = "db1" val tabName = "tab1" - val fs = new Path(tmpDir.toString).getFileSystem(hiveContext.sessionState.newHadoopConf()) + val fs = new Path(tmpDir.toString).getFileSystem(spark.sessionState.newHadoopConf()) withTable(tabName) { if (dirExists) { assert(tmpDir.listFiles.isEmpty) @@ -441,7 +439,7 @@ class HiveDDLSuite val path = tmpDir.toString withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) { val dbName = "db1" - val fs = new Path(path).getFileSystem(hiveContext.sessionState.newHadoopConf()) + val fs = new Path(path).getFileSystem(spark.sessionState.newHadoopConf()) val dbPath = new Path(path) // the database directory does not exist assert(!fs.exists(dbPath)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala index 131b06aec8..0d08f7edc8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala @@ -80,7 +80,7 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto test("SPARK-6212: The EXPLAIN output of CTAS only shows the analyzed plan") { withTempTable("jt") { val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""")) - hiveContext.read.json(rdd).createOrReplaceTempView("jt") + spark.read.json(rdd).createOrReplaceTempView("jt") val outputs = sql( s""" |EXPLAIN EXTENDED diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveOperatorQueryableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveOperatorQueryableSuite.scala index 4d2f190b8e..0e89e990e5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveOperatorQueryableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveOperatorQueryableSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton * A set of tests that validates commands can also be queried by like a table */ class HiveOperatorQueryableSuite extends QueryTest with TestHiveSingleton { - import hiveContext._ + import spark._ test("SPARK-5324 query result of describe command") { hiveContext.loadTestTable("src") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala index 78c0d1f97e..89e6edb6b1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala @@ -24,8 +24,8 @@ import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.hive.test.TestHiveSingleton class HivePlanTest extends QueryTest with TestHiveSingleton { - import hiveContext.sql - import hiveContext.implicits._ + import spark.sql + import spark.implicits._ test("udf constant folding") { Seq.empty[Tuple1[Int]].toDF("a").createOrReplaceTempView("t") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 23b7f6c75b..ffeed63695 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -47,8 +47,8 @@ case class ListStringCaseClass(l: Seq[String]) */ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { - import hiveContext.udf - import hiveContext.implicits._ + import spark.udf + import spark.implicits._ test("spark sql udf test that returns a struct") { udf.register("getStruct", (_: Int) => Fields(1, 2, 3, 4, 5)) @@ -151,7 +151,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } test("UDFIntegerToString") { - val testData = hiveContext.sparkContext.parallelize( + val testData = spark.sparkContext.parallelize( IntegerCaseClass(1) :: IntegerCaseClass(2) :: Nil).toDF() testData.createOrReplaceTempView("integerTable") @@ -166,7 +166,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } test("UDFToListString") { - val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + val testData = spark.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() testData.createOrReplaceTempView("inputTable") sql(s"CREATE TEMPORARY FUNCTION testUDFToListString AS '${classOf[UDFToListString].getName}'") @@ -181,7 +181,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } test("UDFToListInt") { - val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + val testData = spark.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() testData.createOrReplaceTempView("inputTable") sql(s"CREATE TEMPORARY FUNCTION testUDFToListInt AS '${classOf[UDFToListInt].getName}'") @@ -196,7 +196,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } test("UDFToStringIntMap") { - val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + val testData = spark.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() testData.createOrReplaceTempView("inputTable") sql(s"CREATE TEMPORARY FUNCTION testUDFToStringIntMap " + @@ -212,7 +212,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } test("UDFToIntIntMap") { - val testData = hiveContext.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() + val testData = spark.sparkContext.parallelize(StringCaseClass("") :: Nil).toDF() testData.createOrReplaceTempView("inputTable") sql(s"CREATE TEMPORARY FUNCTION testUDFToIntIntMap " + @@ -228,7 +228,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } test("UDFListListInt") { - val testData = hiveContext.sparkContext.parallelize( + val testData = spark.sparkContext.parallelize( ListListIntCaseClass(Nil) :: ListListIntCaseClass(Seq((1, 2, 3))) :: ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: Nil).toDF() @@ -244,7 +244,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } test("UDFListString") { - val testData = hiveContext.sparkContext.parallelize( + val testData = spark.sparkContext.parallelize( ListStringCaseClass(Seq("a", "b", "c")) :: ListStringCaseClass(Seq("d", "e")) :: Nil).toDF() testData.createOrReplaceTempView("listStringTable") @@ -259,7 +259,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } test("UDFStringString") { - val testData = hiveContext.sparkContext.parallelize( + val testData = spark.sparkContext.parallelize( StringCaseClass("world") :: StringCaseClass("goodbye") :: Nil).toDF() testData.createOrReplaceTempView("stringTable") @@ -278,7 +278,7 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } test("UDFTwoListList") { - val testData = hiveContext.sparkContext.parallelize( + val testData = spark.sparkContext.parallelize( ListListIntCaseClass(Nil) :: ListListIntCaseClass(Seq((1, 2, 3))) :: ListListIntCaseClass(Seq((4, 5, 6), (7, 8, 9))) :: diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 8a31a49d97..4b51f021bf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -58,7 +58,7 @@ case class Order( */ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { import hiveContext._ - import hiveContext.implicits._ + import spark.implicits._ test("UDTF") { withUserDefinedFunction("udtf_count2" -> true) { @@ -690,7 +690,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - test("SPARK-4699 HiveContext should be case insensitive by default") { + test("SPARK-4699 SparkSession with Hive Support should be case insensitive by default") { checkAnswer( sql("SELECT KEY FROM Src ORDER BY value"), sql("SELECT key FROM src ORDER BY value").collect().toSeq) @@ -707,7 +707,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { val rowRdd = sparkContext.parallelize(row :: Nil) - hiveContext.createDataFrame(rowRdd, schema).createOrReplaceTempView("testTable") + spark.createDataFrame(rowRdd, schema).createOrReplaceTempView("testTable") sql( """CREATE TABLE nullValuesInInnerComplexTypes @@ -1417,7 +1417,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { """.stripMargin) checkAnswer( - spark.sqlContext.tables().select('isTemporary).filter('tableName === "t2"), + spark.sql("SHOW TABLES").select('isTemporary).filter('tableName === "t2"), Row(true) ) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala index 42dbe188fb..72db3618e0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.test.SQLTestUtils * A suite for testing view related functionality. */ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { - import hiveContext.implicits._ + import spark.implicits._ override def beforeAll(): Unit = { // Create a simple table with two columns: id and id1 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala index 47ceefb88e..77e97dff8c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLWindowFunctionSuite.scala @@ -29,7 +29,7 @@ case class WindowData(month: Int, area: String, product: Int) * Test suite for SQL window functions. */ class SQLWindowFunctionSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { - import hiveContext.implicits._ + import spark.implicits._ test("window function: udaf with aggregate expression") { val data = Seq( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index 19e8025d6b..6f80622407 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.types.StringType class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { - import hiveContext.implicits._ + import spark.implicits._ private val noSerdeIOSchema = HiveScriptIOSchema( inputRowFormat = Seq.empty, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index 5dfa58f673..463c368fc4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -59,7 +59,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) checkQueries( - hiveContext.read.options(Map( + spark.read.options(Map( "path" -> file.getCanonicalPath, "dataSchema" -> dataSchemaWithPartition.json)).format(dataSourceName).load()) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala index fed0d11e9d..d1ce3f1e2f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala @@ -37,8 +37,8 @@ case class OrcParDataWithKey(intField: Int, pi: Int, stringField: String, ps: St // TODO This test suite duplicates ParquetPartitionDiscoverySuite a lot class OrcPartitionDiscoverySuite extends QueryTest with TestHiveSingleton with BeforeAndAfterAll { - import hiveContext._ - import hiveContext.implicits._ + import spark._ + import spark.implicits._ val defaultPartitionName = ConfVars.DEFAULTPARTITIONNAME.defaultStrVal @@ -59,7 +59,7 @@ class OrcPartitionDiscoverySuite extends QueryTest with TestHiveSingleton with B } protected def withTempTable(tableName: String)(f: => Unit): Unit = { - try f finally hiveContext.dropTempTable(tableName) + try f finally spark.catalog.dropTempView(tableName) } protected def makePartitionDir( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 4cac334859..871b9e02eb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.util.Utils case class OrcData(intField: Int, stringField: String) abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndAfterAll { - import hiveContext._ + import spark._ var orcTableDir: File = null var orcTableAsDir: File = null @@ -152,7 +152,7 @@ class OrcSourceSuite extends OrcSuite { override def beforeAll(): Unit = { super.beforeAll() - hiveContext.sql( + spark.sql( s"""CREATE TEMPORARY TABLE normal_orc_source |USING org.apache.spark.sql.hive.orc |OPTIONS ( @@ -160,7 +160,7 @@ class OrcSourceSuite extends OrcSuite { |) """.stripMargin) - hiveContext.sql( + spark.sql( s"""CREATE TEMPORARY TABLE normal_orc_as_source |USING org.apache.spark.sql.hive.orc |OPTIONS ( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala index bb351e20c5..2a647115b7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala @@ -61,7 +61,7 @@ private[sql] trait OrcTest extends SQLTestUtils with TestHiveSingleton { (data: Seq[T], tableName: String) (f: => Unit): Unit = { withOrcDataFrame(data) { df => - spark.sqlContext.registerDataFrameAsTable(df, tableName) + df.createOrReplaceTempView(tableName) withTempTable(tableName)(f) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 3e5140fe57..06b74da196 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -57,7 +57,7 @@ case class ParquetDataWithKeyAndComplexTypes( */ class ParquetMetastoreSuite extends ParquetPartitioningTest { import hiveContext._ - import hiveContext.implicits._ + import spark.implicits._ override def beforeAll(): Unit = { super.beforeAll() @@ -571,7 +571,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { */ class ParquetSourceSuite extends ParquetPartitioningTest { import testImplicits._ - import hiveContext._ + import spark._ override def beforeAll(): Unit = { super.beforeAll() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index d271e55467..f9891ac571 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -51,7 +51,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet .saveAsTable("bucketed_table") for (i <- 0 until 5) { - val table = hiveContext.table("bucketed_table").filter($"i" === i) + val table = spark.table("bucketed_table").filter($"i" === i) val query = table.queryExecution val output = query.analyzed.output val rdd = query.toRdd @@ -80,7 +80,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet originalDataFrame: DataFrame): Unit = { // This test verifies parts of the plan. Disable whole stage codegen. withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { - val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k") + val bucketedDataFrame = spark.table("bucketed_table").select("i", "j", "k") val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec // Limit: bucket pruning only works when the bucket column has one and only one column assert(bucketColumnNames.length == 1) @@ -252,8 +252,8 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { - val t1 = hiveContext.table("bucketed_table1") - val t2 = hiveContext.table("bucketed_table2") + val t1 = spark.table("bucketed_table1") + val t2 = spark.table("bucketed_table2") val joined = t1.join(t2, joinCondition(t1, t2, joinColumns)) // First check the result is corrected. @@ -321,7 +321,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet test("avoid shuffle when grouping keys are equal to bucket keys") { withTable("bucketed_table") { df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("bucketed_table") - val tbl = hiveContext.table("bucketed_table") + val tbl = spark.table("bucketed_table") val agged = tbl.groupBy("i", "j").agg(max("k")) checkAnswer( @@ -335,7 +335,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet test("avoid shuffle when grouping keys are a super-set of bucket keys") { withTable("bucketed_table") { df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") - val tbl = hiveContext.table("bucketed_table") + val tbl = spark.table("bucketed_table") val agged = tbl.groupBy("i", "j").agg(max("k")) checkAnswer( @@ -349,11 +349,12 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet test("error if there exists any malformed bucket files") { withTable("bucketed_table") { df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") - val tableDir = new File(hiveContext.sparkSession.warehousePath, "bucketed_table") + val tableDir = new File(hiveContext + .sparkSession.warehousePath, "bucketed_table") Utils.deleteRecursively(tableDir) df1.write.parquet(tableDir.getAbsolutePath) - val agged = hiveContext.table("bucketed_table").groupBy("i").count() + val agged = spark.table("bucketed_table").groupBy("i").count() val error = intercept[RuntimeException] { agged.count() } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 8bf6f224a4..ff44c6f294 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -69,7 +69,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle private val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") def tableDir: File = { - val identifier = hiveContext.sessionState.sqlParser.parseTableIdentifier("bucketed_table") + val identifier = spark.sessionState.sqlParser.parseTableIdentifier("bucketed_table") new File(URI.create(hiveContext.sessionState.catalog.hiveDefaultTableFilePath(identifier))) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala index ef37787137..d79edee5b1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala @@ -53,7 +53,7 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest { StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) checkQueries( - hiveContext.read.format(dataSourceName) + spark.read.format(dataSourceName) .option("dataSchema", dataSchemaWithPartition.json) .load(file.getCanonicalPath)) } @@ -71,14 +71,14 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest { val data = Row(Seq(1L, 2L, 3L), Map("m1" -> Row(4L))) :: Row(Seq(5L, 6L, 7L), Map("m2" -> Row(10L))) :: Nil - val df = hiveContext.createDataFrame(sparkContext.parallelize(data), schema) + val df = spark.createDataFrame(sparkContext.parallelize(data), schema) // Write the data out. df.write.format(dataSourceName).save(file.getCanonicalPath) // Read it back and check the result. checkAnswer( - hiveContext.read.format(dataSourceName).schema(schema).load(file.getCanonicalPath), + spark.read.format(dataSourceName).schema(schema).load(file.getCanonicalPath), df ) } @@ -96,14 +96,14 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest { Row(new BigDecimal("10.02")) :: Row(new BigDecimal("20000.99")) :: Row(new BigDecimal("10000")) :: Nil - val df = hiveContext.createDataFrame(sparkContext.parallelize(data), schema) + val df = spark.createDataFrame(sparkContext.parallelize(data), schema) // Write the data out. df.write.format(dataSourceName).save(file.getCanonicalPath) // Read it back and check the result. checkAnswer( - hiveContext.read.format(dataSourceName).schema(schema).load(file.getCanonicalPath), + spark.read.format(dataSourceName).schema(schema).load(file.getCanonicalPath), df ) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index 4b4852c1d7..f9a1d16d90 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -58,7 +58,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) checkQueries( - hiveContext.read.format(dataSourceName) + spark.read.format(dataSourceName) .option("dataSchema", dataSchemaWithPartition.json) .load(file.getCanonicalPath)) } @@ -76,7 +76,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { .format("parquet") .save(s"${dir.getCanonicalPath}/_temporary") - checkAnswer(hiveContext.read.format("parquet").load(dir.getCanonicalPath), df.collect()) + checkAnswer(spark.read.format("parquet").load(dir.getCanonicalPath), df.collect()) } } @@ -104,7 +104,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { // This shouldn't throw anything. df.write.format("parquet").mode(SaveMode.Overwrite).save(path) - checkAnswer(hiveContext.read.format("parquet").load(path), df) + checkAnswer(spark.read.format("parquet").load(path), df) } } @@ -114,7 +114,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { // Parquet doesn't allow field names with spaces. Here we are intentionally making an // exception thrown from the `ParquetRelation2.prepareForWriteJob()` method to trigger // the bug. Please refer to spark-8079 for more details. - hiveContext.range(1, 10) + spark.range(1, 10) .withColumnRenamed("id", "a b") .write .format("parquet") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala index fa64c7dcfa..a47a2246dd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextHadoopFsRelationSuite.scala @@ -60,7 +60,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) checkQueries( - hiveContext.read.format(dataSourceName) + spark.read.format(dataSourceName) .option("dataSchema", dataSchemaWithPartition.json) .load(file.getCanonicalPath)) } -- cgit v1.2.3