From 14e3f114efb906937b2d7b7ac04484b2814a3b48 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 13 Jan 2015 13:30:35 -0800 Subject: [SPARK-5168] Make SQLConf a field rather than mixin in SQLContext This change should be binary and source backward compatible since we didn't change any user facing APIs. Author: Reynold Xin Closes #3965 from rxin/SPARK-5168-sqlconf and squashes the following commits: 42eec09 [Reynold Xin] Fix default conf value. 0ef86cc [Reynold Xin] Fix constructor ordering. 4d7f910 [Reynold Xin] Properly override config. ccc8e6a [Reynold Xin] [SPARK-5168] Make SQLConf a field rather than mixin in SQLContext --- .../scala/org/apache/spark/sql/CacheManager.scala | 4 +-- .../main/scala/org/apache/spark/sql/SQLConf.scala | 2 +- .../scala/org/apache/spark/sql/SQLContext.scala | 40 ++++++++++++++++++---- .../apache/spark/sql/api/java/JavaSQLContext.scala | 6 ++-- .../sql/columnar/InMemoryColumnarTableScan.scala | 4 +-- .../org/apache/spark/sql/execution/Exchange.scala | 2 +- .../apache/spark/sql/execution/ExistingRDD.scala | 4 +-- .../org/apache/spark/sql/execution/SparkPlan.scala | 2 +- .../spark/sql/execution/SparkStrategies.scala | 18 +++++----- .../org/apache/spark/sql/execution/commands.scala | 2 +- .../sql/execution/joins/BroadcastHashJoin.scala | 2 +- .../org/apache/spark/sql/json/JSONRelation.scala | 4 +-- .../apache/spark/sql/parquet/ParquetRelation.scala | 7 ++-- .../org/apache/spark/sql/parquet/ParquetTest.scala | 2 +- .../org/apache/spark/sql/parquet/newParquet.scala | 4 +-- .../org/apache/spark/sql/sources/interfaces.scala | 2 +- .../org/apache/spark/sql/test/TestSQLContext.scala | 5 +-- .../scala/org/apache/spark/sql/JoinSuite.scala | 2 +- .../scala/org/apache/spark/sql/SQLConfSuite.scala | 10 +++--- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 10 +++--- .../sql/columnar/InMemoryColumnarQuerySuite.scala | 3 +- .../sql/columnar/PartitionBatchPruningSuite.scala | 4 +-- .../apache/spark/sql/execution/PlannerSuite.scala | 4 +-- .../org/apache/spark/sql/json/JsonSuite.scala | 2 +- .../apache/spark/sql/parquet/ParquetIOSuite.scala | 4 +-- .../spark/sql/parquet/ParquetQuerySuite.scala | 14 ++++---- .../hive/execution/HiveCompatibilitySuite.scala | 4 +-- .../org/apache/spark/sql/hive/HiveContext.scala | 11 +++--- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../scala/org/apache/spark/sql/hive/TestHive.scala | 6 ++-- .../spark/sql/hive/api/java/JavaHiveContext.scala | 6 ++-- .../apache/spark/sql/hive/StatisticsSuite.scala | 20 +++++------ .../spark/sql/hive/execution/HiveQuerySuite.scala | 4 +-- 33 files changed, 124 insertions(+), 92 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala index 3c9439b2e9..e715d9434a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala @@ -91,8 +91,8 @@ private[sql] trait CacheManager { CachedData( planToCache, InMemoryRelation( - useCompression, - columnBatchSize, + conf.useCompression, + conf.columnBatchSize, storageLevel, query.queryExecution.executedPlan, tableName)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index f5bf935522..206d16f5b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -61,7 +61,7 @@ private[spark] object SQLConf { * * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads). */ -private[sql] trait SQLConf { +private[sql] class SQLConf { import SQLConf._ /** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index e7021cc336..d8efce0cb4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql +import java.util.Properties + +import scala.collection.immutable import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag @@ -49,7 +52,6 @@ import org.apache.spark.sql.sources.{DataSourceStrategy, BaseRelation, DDLParser @AlphaComponent class SQLContext(@transient val sparkContext: SparkContext) extends org.apache.spark.Logging - with SQLConf with CacheManager with ExpressionConversions with UDFRegistration @@ -57,6 +59,30 @@ class SQLContext(@transient val sparkContext: SparkContext) self => + // Note that this is a lazy val so we can override the default value in subclasses. + private[sql] lazy val conf: SQLConf = new SQLConf + + /** Set Spark SQL configuration properties. */ + def setConf(props: Properties): Unit = conf.setConf(props) + + /** Set the given Spark SQL configuration property. */ + def setConf(key: String, value: String): Unit = conf.setConf(key, value) + + /** Return the value of Spark SQL configuration property for the given key. */ + def getConf(key: String): String = conf.getConf(key) + + /** + * Return the value of Spark SQL configuration property for the given key. If the key is not set + * yet, return `defaultValue`. + */ + def getConf(key: String, defaultValue: String): String = conf.getConf(key, defaultValue) + + /** + * Return all the configuration properties that have been set (i.e. not the default). + * This creates a new copy of the config properties in the form of a Map. + */ + def getAllConfs: immutable.Map[String, String] = conf.getAllConfs + @transient protected[sql] lazy val catalog: Catalog = new SimpleCatalog(true) @@ -212,7 +238,7 @@ class SQLContext(@transient val sparkContext: SparkContext) */ @Experimental def jsonRDD(json: RDD[String], schema: StructType): SchemaRDD = { - val columnNameOfCorruptJsonRecord = columnNameOfCorruptRecord + val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord val appliedSchema = Option(schema).getOrElse( JsonRDD.nullTypeToStringType( @@ -226,7 +252,7 @@ class SQLContext(@transient val sparkContext: SparkContext) */ @Experimental def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD = { - val columnNameOfCorruptJsonRecord = columnNameOfCorruptRecord + val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord val appliedSchema = JsonRDD.nullTypeToStringType( JsonRDD.inferSchema(json, samplingRatio, columnNameOfCorruptJsonRecord)) @@ -299,10 +325,10 @@ class SQLContext(@transient val sparkContext: SparkContext) * @group userf */ def sql(sqlText: String): SchemaRDD = { - if (dialect == "sql") { + if (conf.dialect == "sql") { new SchemaRDD(this, parseSql(sqlText)) } else { - sys.error(s"Unsupported SQL dialect: $dialect") + sys.error(s"Unsupported SQL dialect: ${conf.dialect}") } } @@ -323,9 +349,9 @@ class SQLContext(@transient val sparkContext: SparkContext) val sqlContext: SQLContext = self - def codegenEnabled = self.codegenEnabled + def codegenEnabled = self.conf.codegenEnabled - def numPartitions = self.numShufflePartitions + def numPartitions = self.conf.numShufflePartitions def strategies: Seq[Strategy] = extraStrategies ++ ( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index 8884204e50..7f868cd4af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -52,7 +52,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration { * @group userf */ def sql(sqlText: String): JavaSchemaRDD = { - if (sqlContext.dialect == "sql") { + if (sqlContext.conf.dialect == "sql") { new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlText)) } else { sys.error(s"Unsupported SQL dialect: $sqlContext.dialect") @@ -164,7 +164,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration { * It goes through the entire dataset once to determine the schema. */ def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD = { - val columnNameOfCorruptJsonRecord = sqlContext.columnNameOfCorruptRecord + val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord val appliedScalaSchema = JsonRDD.nullTypeToStringType( JsonRDD.inferSchema(json.rdd, 1.0, columnNameOfCorruptJsonRecord)) @@ -182,7 +182,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration { */ @Experimental def jsonRDD(json: JavaRDD[String], schema: StructType): JavaSchemaRDD = { - val columnNameOfCorruptJsonRecord = sqlContext.columnNameOfCorruptRecord + val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord val appliedScalaSchema = Option(asScalaDataType(schema)).getOrElse( JsonRDD.nullTypeToStringType( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 1e432485c4..065fae3c83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -82,7 +82,7 @@ private[sql] case class InMemoryRelation( if (batchStats.value.isEmpty) { // Underlying columnar RDD hasn't been materialized, no useful statistics information // available, return the default statistics. - Statistics(sizeInBytes = child.sqlContext.defaultSizeInBytes) + Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) } else { // Underlying columnar RDD has been materialized, required information has also been collected // via the `batchStats` accumulator, compute the final statistics, and update `_statistics`. @@ -233,7 +233,7 @@ private[sql] case class InMemoryColumnarTableScan( val readPartitions = sparkContext.accumulator(0) val readBatches = sparkContext.accumulator(0) - private val inMemoryPartitionPruningEnabled = sqlContext.inMemoryPartitionPruning + private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning override def execute() = { readPartitions.setValue(0) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index d7c811ca89..7c0b72aab4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -123,7 +123,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una */ private[sql] case class AddExchange(sqlContext: SQLContext) extends Rule[SparkPlan] { // TODO: Determine the number of partitions. - def numPartitions = sqlContext.numShufflePartitions + def numPartitions = sqlContext.conf.numShufflePartitions def apply(plan: SparkPlan): SparkPlan = plan.transformUp { case operator: SparkPlan => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index d2d8cb1c62..069e950195 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -69,7 +69,7 @@ case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlContext: SQLCont @transient override lazy val statistics = Statistics( // TODO: Instead of returning a default value here, find a way to return a meaningful size // estimate for RDDs. See PR 1238 for more discussions. - sizeInBytes = BigInt(sqlContext.defaultSizeInBytes) + sizeInBytes = BigInt(sqlContext.conf.defaultSizeInBytes) ) } @@ -106,6 +106,6 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQ @transient override lazy val statistics = Statistics( // TODO: Instead of returning a default value here, find a way to return a meaningful size // estimate for RDDs. See PR 1238 for more discussions. - sizeInBytes = BigInt(sqlContext.defaultSizeInBytes) + sizeInBytes = BigInt(sqlContext.conf.defaultSizeInBytes) ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 017c78d2c6..6fecd1ff06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -51,7 +51,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ // sqlContext will be null when we are being deserialized on the slaves. In this instance // the value of codegenEnabled will be set by the desserializer after the constructor has run. val codegenEnabled: Boolean = if (sqlContext != null) { - sqlContext.codegenEnabled + sqlContext.conf.codegenEnabled } else { false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index d91b1fbc69..0652d2ff7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -35,8 +35,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object LeftSemiJoin extends Strategy with PredicateHelper { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) - if sqlContext.autoBroadcastJoinThreshold > 0 && - right.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold => + if sqlContext.conf.autoBroadcastJoinThreshold > 0 && + right.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold => val semiJoin = joins.BroadcastLeftSemiJoinHash( leftKeys, rightKeys, planLater(left), planLater(right)) condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil @@ -81,13 +81,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) - if sqlContext.autoBroadcastJoinThreshold > 0 && - right.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold => + if sqlContext.conf.autoBroadcastJoinThreshold > 0 && + right.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold => makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildRight) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) - if sqlContext.autoBroadcastJoinThreshold > 0 && - left.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold => + if sqlContext.conf.autoBroadcastJoinThreshold > 0 && + left.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold => makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildLeft) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => @@ -215,7 +215,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => val prunePushedDownFilters = - if (sqlContext.parquetFilterPushDown) { + if (sqlContext.conf.parquetFilterPushDown) { (predicates: Seq[Expression]) => { // Note: filters cannot be pushed down to Parquet if they contain more complex // expressions than simple "Attribute cmp Literal" comparisons. Here we remove all @@ -237,7 +237,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { ParquetTableScan( _, relation, - if (sqlContext.parquetFilterPushDown) filters else Nil)) :: Nil + if (sqlContext.conf.parquetFilterPushDown) filters else Nil)) :: Nil case _ => Nil } @@ -270,7 +270,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // This sort only sorts tuples within a partition. Its requiredDistribution will be // an UnspecifiedDistribution. execution.Sort(sortExprs, global = false, planLater(child)) :: Nil - case logical.Sort(sortExprs, global, child) if sqlContext.externalSortEnabled => + case logical.Sort(sortExprs, global, child) if sqlContext.conf.externalSortEnabled => execution.ExternalSort(sortExprs, global, planLater(child)):: Nil case logical.Sort(sortExprs, global, child) => execution.Sort(sortExprs, global, planLater(child)):: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index df8e616151..af6b07bd6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -94,7 +94,7 @@ case class SetCommand( logWarning( s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.") - Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.numShufflePartitions}")) + Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.conf.numShufflePartitions}")) // Queries a single property. case Some((key, None)) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala index fbe1d06ed2..2dd22c020e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala @@ -43,7 +43,7 @@ case class BroadcastHashJoin( extends BinaryNode with HashJoin { val timeout = { - val timeoutValue = sqlContext.broadcastTimeout + val timeoutValue = sqlContext.conf.broadcastTimeout if (timeoutValue < 0) { Duration.Inf } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index a9a6696cb1..f5c02224c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -59,8 +59,8 @@ private[sql] case class JSONRelation( JsonRDD.inferSchema( baseRDD, samplingRatio, - sqlContext.columnNameOfCorruptRecord))) + sqlContext.conf.columnNameOfCorruptRecord))) override def buildScan() = - JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.columnNameOfCorruptRecord) + JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.conf.columnNameOfCorruptRecord) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 2835dc3408..cde5160149 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -65,7 +65,7 @@ private[sql] case class ParquetRelation( ParquetTypesConverter.readSchemaFromFile( new Path(path.split(",").head), conf, - sqlContext.isParquetBinaryAsString) + sqlContext.conf.isParquetBinaryAsString) lazy val attributeMap = AttributeMap(output.map(o => o -> o)) @@ -80,7 +80,7 @@ private[sql] case class ParquetRelation( } // TODO: Use data from the footers. - override lazy val statistics = Statistics(sizeInBytes = sqlContext.defaultSizeInBytes) + override lazy val statistics = Statistics(sizeInBytes = sqlContext.conf.defaultSizeInBytes) } private[sql] object ParquetRelation { @@ -163,7 +163,8 @@ private[sql] object ParquetRelation { sqlContext: SQLContext): ParquetRelation = { val path = checkPath(pathString, allowExisting, conf) conf.set(ParquetOutputFormat.COMPRESSION, shortParquetCompressionCodecNames.getOrElse( - sqlContext.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED).name()) + sqlContext.conf.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED) + .name()) ParquetRelation.enableLogForwarding() ParquetTypesConverter.writeMetaData(attributes, path, conf) new ParquetRelation(path.toString, Some(conf), sqlContext) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala index b4d48902fd..02ce1b3e6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala @@ -54,7 +54,7 @@ trait ParquetTest { try f finally { keys.zip(currentValues).foreach { case (key, Some(value)) => setConf(key, value) - case (key, None) => unsetConf(key) + case (key, None) => conf.unsetConf(key) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 2e0c6c51c0..55a2728a85 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -137,7 +137,7 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext) ParquetTypesConverter.readSchemaFromFile( partitions.head.files.head.getPath, Some(sparkContext.hadoopConfiguration), - sqlContext.isParquetBinaryAsString)) + sqlContext.conf.isParquetBinaryAsString)) val dataIncludesKey = partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true) @@ -198,7 +198,7 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext) predicates .reduceOption(And) .flatMap(ParquetFilters.createFilter) - .filter(_ => sqlContext.parquetFilterPushDown) + .filter(_ => sqlContext.conf.parquetFilterPushDown) .foreach(ParquetInputFormat.setFilterPredicate(jobConf, _)) def percentRead = selectedPartitions.size.toDouble / partitions.size.toDouble * 100 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 2a7be23e37..7f5564baa0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -99,7 +99,7 @@ abstract class BaseRelation { * large to broadcast. This method will be called multiple times during query planning * and thus should not perform expensive operations for each invocation. */ - def sizeInBytes = sqlContext.defaultSizeInBytes + def sizeInBytes = sqlContext.conf.defaultSizeInBytes } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala index 6bb81c76ed..8c80be106f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -29,6 +29,7 @@ object TestSQLContext new SparkConf().set("spark.sql.testkey", "true"))) { /** Fewer partitions to speed up testing. */ - override private[spark] def numShufflePartitions: Int = - getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt + private[sql] override lazy val conf: SQLConf = new SQLConf { + override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index c7e136388f..e5ab16f9dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -387,7 +387,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { test("broadcasted left semi join operator selection") { clearCache() sql("CACHE TABLE testData") - val tmp = autoBroadcastJoinThreshold + val tmp = conf.autoBroadcastJoinThreshold sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=1000000000") Seq( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala index 60701f0e15..bf73d0c707 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala @@ -37,7 +37,7 @@ class SQLConfSuite extends QueryTest with FunSuiteLike { } test("programmatic ways of basic setting and getting") { - clear() + conf.clear() assert(getAllConfs.size === 0) setConf(testKey, testVal) @@ -51,11 +51,11 @@ class SQLConfSuite extends QueryTest with FunSuiteLike { assert(TestSQLContext.getConf(testKey, testVal + "_") == testVal) assert(TestSQLContext.getAllConfs.contains(testKey)) - clear() + conf.clear() } test("parse SQL set commands") { - clear() + conf.clear() sql(s"set $testKey=$testVal") assert(getConf(testKey, testVal + "_") == testVal) assert(TestSQLContext.getConf(testKey, testVal + "_") == testVal) @@ -73,11 +73,11 @@ class SQLConfSuite extends QueryTest with FunSuiteLike { sql(s"set $key=") assert(getConf(key, "0") == "") - clear() + conf.clear() } test("deprecated property") { - clear() + conf.clear() sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10") assert(getConf(SQLConf.SHUFFLE_PARTITIONS) == "10") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index d9de5686dc..bc72daf088 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -79,7 +79,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { } test("aggregation with codegen") { - val originalValue = codegenEnabled + val originalValue = conf.codegenEnabled setConf(SQLConf.CODEGEN_ENABLED, "true") sql("SELECT key FROM testData GROUP BY key").collect() setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString) @@ -245,14 +245,14 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { } test("sorting") { - val before = externalSortEnabled + val before = conf.externalSortEnabled setConf(SQLConf.EXTERNAL_SORT, "false") sortTest() setConf(SQLConf.EXTERNAL_SORT, before.toString) } test("external sorting") { - val before = externalSortEnabled + val before = conf.externalSortEnabled setConf(SQLConf.EXTERNAL_SORT, "true") sortTest() setConf(SQLConf.EXTERNAL_SORT, before.toString) @@ -600,7 +600,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { } test("SET commands semantics using sql()") { - clear() + conf.clear() val testKey = "test.key.0" val testVal = "test.val.0" val nonexistentKey = "nonexistent" @@ -632,7 +632,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { sql(s"SET $nonexistentKey"), Seq(Seq(s"$nonexistentKey=")) ) - clear() + conf.clear() } test("apply schema") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala index fc95dccc74..d94729ba92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala @@ -39,7 +39,8 @@ class InMemoryColumnarQuerySuite extends QueryTest { sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).registerTempTable("sizeTst") cacheTable("sizeTst") assert( - table("sizeTst").queryExecution.logical.statistics.sizeInBytes > autoBroadcastJoinThreshold) + table("sizeTst").queryExecution.logical.statistics.sizeInBytes > + conf.autoBroadcastJoinThreshold) } test("projection") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala index 1915c25392..592cafbbdc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala @@ -23,8 +23,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.test.TestSQLContext._ class PartitionBatchPruningSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfter { - val originalColumnBatchSize = columnBatchSize - val originalInMemoryPartitionPruning = inMemoryPartitionPruning + val originalColumnBatchSize = conf.columnBatchSize + val originalInMemoryPartitionPruning = conf.inMemoryPartitionPruning override protected def beforeAll(): Unit = { // Make a table with 5 partitions, 2 batches per partition, 10 elements per batch diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index a5af71acfc..c5b6fce5fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -60,7 +60,7 @@ class PlannerSuite extends FunSuite { } test("sizeInBytes estimation of limit operator for broadcast hash join optimization") { - val origThreshold = autoBroadcastJoinThreshold + val origThreshold = conf.autoBroadcastJoinThreshold setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 81920.toString) // Using a threshold that is definitely larger than the small testing table (b) below @@ -78,7 +78,7 @@ class PlannerSuite extends FunSuite { } test("InMemoryRelation statistics propagation") { - val origThreshold = autoBroadcastJoinThreshold + val origThreshold = conf.autoBroadcastJoinThreshold setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 81920.toString) testData.limit(3).registerTempTable("tiny") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala index 8dce3372a8..b09f1ac495 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala @@ -713,7 +713,7 @@ class JsonSuite extends QueryTest { test("Corrupt records") { // Test if we can query corrupt records. - val oldColumnNameOfCorruptRecord = TestSQLContext.columnNameOfCorruptRecord + val oldColumnNameOfCorruptRecord = TestSQLContext.conf.columnNameOfCorruptRecord TestSQLContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed") val jsonSchemaRDD = jsonRDD(corruptRecords) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index 10a01474e9..6ac67fcafe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -213,7 +213,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest { def checkCompressionCodec(codec: CompressionCodecName): Unit = { withSQLConf(SQLConf.PARQUET_COMPRESSION -> codec.name()) { withParquetFile(data) { path => - assertResult(parquetCompressionCodec.toUpperCase) { + assertResult(conf.parquetCompressionCodec.toUpperCase) { compressionCodecFor(path) } } @@ -221,7 +221,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest { } // Checks default compression codec - checkCompressionCodec(CompressionCodecName.fromConf(parquetCompressionCodec)) + checkCompressionCodec(CompressionCodecName.fromConf(conf.parquetCompressionCodec)) checkCompressionCodec(CompressionCodecName.UNCOMPRESSED) checkCompressionCodec(CompressionCodecName.GZIP) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index a5fe2e8da2..0a92336a3c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -88,7 +88,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA TestData // Load test data tables. private var testRDD: SchemaRDD = null - private val originalParquetFilterPushdownEnabled = TestSQLContext.parquetFilterPushDown + private val originalParquetFilterPushdownEnabled = TestSQLContext.conf.parquetFilterPushDown override def beforeAll() { ParquetTestData.writeFile() @@ -144,7 +144,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA } ignore("Treat binary as string") { - val oldIsParquetBinaryAsString = TestSQLContext.isParquetBinaryAsString + val oldIsParquetBinaryAsString = TestSQLContext.conf.isParquetBinaryAsString // Create the test file. val file = getTempFilePath("parquet") @@ -174,7 +174,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA } test("Compression options for writing to a Parquetfile") { - val defaultParquetCompressionCodec = TestSQLContext.parquetCompressionCodec + val defaultParquetCompressionCodec = TestSQLContext.conf.parquetCompressionCodec import scala.collection.JavaConversions._ val file = getTempFilePath("parquet") @@ -186,7 +186,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA rdd.saveAsParquetFile(path) var actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)) .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct - assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil) + assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil) parquetFile(path).registerTempTable("tmp") checkAnswer( @@ -202,7 +202,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA rdd.saveAsParquetFile(path) actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)) .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct - assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil) + assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil) parquetFile(path).registerTempTable("tmp") checkAnswer( @@ -234,7 +234,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA rdd.saveAsParquetFile(path) actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)) .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct - assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil) + assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil) parquetFile(path).registerTempTable("tmp") checkAnswer( @@ -250,7 +250,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA rdd.saveAsParquetFile(path) actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration)) .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct - assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil) + assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil) parquetFile(path).registerTempTable("tmp") checkAnswer( diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 23283fd3fe..0d934620ac 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -36,8 +36,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { private val originalTimeZone = TimeZone.getDefault private val originalLocale = Locale.getDefault - private val originalColumnBatchSize = TestHive.columnBatchSize - private val originalInMemoryPartitionPruning = TestHive.inMemoryPartitionPruning + private val originalColumnBatchSize = TestHive.conf.columnBatchSize + private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning def testCases = hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 09ff4cc5ab..9aeebd7e54 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -71,8 +71,9 @@ class LocalHiveContext(sc: SparkContext) extends HiveContext(sc) { class HiveContext(sc: SparkContext) extends SQLContext(sc) { self => - // Change the default SQL dialect to HiveQL - override private[spark] def dialect: String = getConf(SQLConf.DIALECT, "hiveql") + private[sql] override lazy val conf: SQLConf = new SQLConf { + override def dialect: String = getConf(SQLConf.DIALECT, "hiveql") + } /** * When true, enables an experimental feature where metastore tables that use the parquet SerDe @@ -87,12 +88,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { override def sql(sqlText: String): SchemaRDD = { // TODO: Create a framework for registering parsers instead of just hardcoding if statements. - if (dialect == "sql") { + if (conf.dialect == "sql") { super.sql(sqlText) - } else if (dialect == "hiveql") { + } else if (conf.dialect == "hiveql") { new SchemaRDD(this, ddlParser(sqlText).getOrElse(HiveQl.parseSql(sqlText))) } else { - sys.error(s"Unsupported SQL dialect: $dialect. Try 'sql' or 'hiveql'") + sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'") } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index daeabb6c8b..785a6a14f4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -515,7 +515,7 @@ private[hive] case class MetastoreRelation // if the size is still less than zero, we use default size Option(totalSize).map(_.toLong).filter(_ > 0) .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0) - .getOrElse(sqlContext.defaultSizeInBytes))) + .getOrElse(sqlContext.conf.defaultSizeInBytes))) } ) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 31c7ce9639..52e1f0d94f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -102,8 +102,10 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { new this.QueryExecution { val logical = plan } /** Fewer partitions to speed up testing. */ - override private[spark] def numShufflePartitions: Int = - getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt + private[sql] override lazy val conf: SQLConf = new SQLConf { + override def numShufflePartitions: Int = getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt + override def dialect: String = getConf(SQLConf.DIALECT, "hiveql") + } /** * Returns the value of specified environmental variable as a [[java.io.File]] after checking diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala index 1817c78324..038f63f6c7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala @@ -31,12 +31,12 @@ class JavaHiveContext(sqlContext: SQLContext) extends JavaSQLContext(sqlContext) override def sql(sqlText: String): JavaSchemaRDD = { // TODO: Create a framework for registering parsers instead of just hardcoding if statements. - if (sqlContext.dialect == "sql") { + if (sqlContext.conf.dialect == "sql") { super.sql(sqlText) - } else if (sqlContext.dialect == "hiveql") { + } else if (sqlContext.conf.dialect == "hiveql") { new JavaSchemaRDD(sqlContext, HiveQl.parseSql(sqlText)) } else { - sys.error(s"Unsupported SQL dialect: ${sqlContext.dialect}. Try 'sql' or 'hiveql'") + sys.error(s"Unsupported SQL dialect: ${sqlContext.conf.dialect}. Try 'sql' or 'hiveql'") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index a758f921e0..0b4e76c9d3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -81,7 +81,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { // TODO: How does it works? needs to add it back for other hive version. if (HiveShim.version =="0.12.0") { - assert(queryTotalSize("analyzeTable") === defaultSizeInBytes) + assert(queryTotalSize("analyzeTable") === conf.defaultSizeInBytes) } sql("ANALYZE TABLE analyzeTable COMPUTE STATISTICS noscan") @@ -110,7 +110,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { |SELECT * FROM src """.stripMargin).collect() - assert(queryTotalSize("analyzeTable_part") === defaultSizeInBytes) + assert(queryTotalSize("analyzeTable_part") === conf.defaultSizeInBytes) sql("ANALYZE TABLE analyzeTable_part COMPUTE STATISTICS noscan") @@ -151,8 +151,8 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { val sizes = rdd.queryExecution.analyzed.collect { case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes } - assert(sizes.size === 2 && sizes(0) <= autoBroadcastJoinThreshold - && sizes(1) <= autoBroadcastJoinThreshold, + assert(sizes.size === 2 && sizes(0) <= conf.autoBroadcastJoinThreshold + && sizes(1) <= conf.autoBroadcastJoinThreshold, s"query should contain two relations, each of which has size smaller than autoConvertSize") // Using `sparkPlan` because for relevant patterns in HashJoin to be @@ -163,8 +163,8 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { checkAnswer(rdd, expectedAnswer) // check correctness of output - TestHive.settings.synchronized { - val tmp = autoBroadcastJoinThreshold + TestHive.conf.settings.synchronized { + val tmp = conf.autoBroadcastJoinThreshold sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""") rdd = sql(query) @@ -207,8 +207,8 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { .isAssignableFrom(r.getClass) => r.statistics.sizeInBytes } - assert(sizes.size === 2 && sizes(1) <= autoBroadcastJoinThreshold - && sizes(0) <= autoBroadcastJoinThreshold, + assert(sizes.size === 2 && sizes(1) <= conf.autoBroadcastJoinThreshold + && sizes(0) <= conf.autoBroadcastJoinThreshold, s"query should contain two relations, each of which has size smaller than autoConvertSize") // Using `sparkPlan` because for relevant patterns in HashJoin to be @@ -221,8 +221,8 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { checkAnswer(rdd, answer) // check correctness of output - TestHive.settings.synchronized { - val tmp = autoBroadcastJoinThreshold + TestHive.conf.settings.synchronized { + val tmp = conf.autoBroadcastJoinThreshold sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1") rdd = sql(leftSemiJoinQuery) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 4decd15485..c14f0d24e0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -847,7 +847,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { case Row(key: String, value: String) => key -> value case Row(KV(key, value)) => key -> value }.toSet - clear() + conf.clear() // "SET" itself returns all config variables currently specified in SQLConf. // TODO: Should we be listing the default here always? probably... @@ -879,7 +879,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { collectResults(sql(s"SET $nonexistentKey")) } - clear() + conf.clear() } createQueryTest("select from thrift based table", -- cgit v1.2.3