aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-01-13 13:30:35 -0800
committerMichael Armbrust <michael@databricks.com>2015-01-13 13:30:35 -0800
commit14e3f114efb906937b2d7b7ac04484b2814a3b48 (patch)
tree5f20050f930c68d7f9a52a5ee696b0f633fc4ea9 /sql/core
parent6463e0b9e8067cce70602c5c9006a2546856a9d6 (diff)
downloadspark-14e3f114efb906937b2d7b7ac04484b2814a3b48.tar.gz
spark-14e3f114efb906937b2d7b7ac04484b2814a3b48.tar.bz2
spark-14e3f114efb906937b2d7b7ac04484b2814a3b48.zip
[SPARK-5168] Make SQLConf a field rather than mixin in SQLContext
This change should be binary and source backward compatible since we didn't change any user facing APIs. Author: Reynold Xin <rxin@databricks.com> Closes #3965 from rxin/SPARK-5168-sqlconf and squashes the following commits: 42eec09 [Reynold Xin] Fix default conf value. 0ef86cc [Reynold Xin] Fix constructor ordering. 4d7f910 [Reynold Xin] Properly override config. ccc8e6a [Reynold Xin] [SPARK-5168] Make SQLConf a field rather than mixin in SQLContext
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala40
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala14
26 files changed, 96 insertions, 67 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index 3c9439b2e9..e715d9434a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -91,8 +91,8 @@ private[sql] trait CacheManager {
CachedData(
planToCache,
InMemoryRelation(
- useCompression,
- columnBatchSize,
+ conf.useCompression,
+ conf.columnBatchSize,
storageLevel,
query.queryExecution.executedPlan,
tableName))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index f5bf935522..206d16f5b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -61,7 +61,7 @@ private[spark] object SQLConf {
*
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
*/
-private[sql] trait SQLConf {
+private[sql] class SQLConf {
import SQLConf._
/** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index e7021cc336..d8efce0cb4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql
+import java.util.Properties
+
+import scala.collection.immutable
import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag
@@ -49,7 +52,6 @@ import org.apache.spark.sql.sources.{DataSourceStrategy, BaseRelation, DDLParser
@AlphaComponent
class SQLContext(@transient val sparkContext: SparkContext)
extends org.apache.spark.Logging
- with SQLConf
with CacheManager
with ExpressionConversions
with UDFRegistration
@@ -57,6 +59,30 @@ class SQLContext(@transient val sparkContext: SparkContext)
self =>
+ // Note that this is a lazy val so we can override the default value in subclasses.
+ private[sql] lazy val conf: SQLConf = new SQLConf
+
+ /** Set Spark SQL configuration properties. */
+ def setConf(props: Properties): Unit = conf.setConf(props)
+
+ /** Set the given Spark SQL configuration property. */
+ def setConf(key: String, value: String): Unit = conf.setConf(key, value)
+
+ /** Return the value of Spark SQL configuration property for the given key. */
+ def getConf(key: String): String = conf.getConf(key)
+
+ /**
+ * Return the value of Spark SQL configuration property for the given key. If the key is not set
+ * yet, return `defaultValue`.
+ */
+ def getConf(key: String, defaultValue: String): String = conf.getConf(key, defaultValue)
+
+ /**
+ * Return all the configuration properties that have been set (i.e. not the default).
+ * This creates a new copy of the config properties in the form of a Map.
+ */
+ def getAllConfs: immutable.Map[String, String] = conf.getAllConfs
+
@transient
protected[sql] lazy val catalog: Catalog = new SimpleCatalog(true)
@@ -212,7 +238,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
@Experimental
def jsonRDD(json: RDD[String], schema: StructType): SchemaRDD = {
- val columnNameOfCorruptJsonRecord = columnNameOfCorruptRecord
+ val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
val appliedSchema =
Option(schema).getOrElse(
JsonRDD.nullTypeToStringType(
@@ -226,7 +252,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
@Experimental
def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD = {
- val columnNameOfCorruptJsonRecord = columnNameOfCorruptRecord
+ val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
val appliedSchema =
JsonRDD.nullTypeToStringType(
JsonRDD.inferSchema(json, samplingRatio, columnNameOfCorruptJsonRecord))
@@ -299,10 +325,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group userf
*/
def sql(sqlText: String): SchemaRDD = {
- if (dialect == "sql") {
+ if (conf.dialect == "sql") {
new SchemaRDD(this, parseSql(sqlText))
} else {
- sys.error(s"Unsupported SQL dialect: $dialect")
+ sys.error(s"Unsupported SQL dialect: ${conf.dialect}")
}
}
@@ -323,9 +349,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
val sqlContext: SQLContext = self
- def codegenEnabled = self.codegenEnabled
+ def codegenEnabled = self.conf.codegenEnabled
- def numPartitions = self.numShufflePartitions
+ def numPartitions = self.conf.numShufflePartitions
def strategies: Seq[Strategy] =
extraStrategies ++ (
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 8884204e50..7f868cd4af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -52,7 +52,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
* @group userf
*/
def sql(sqlText: String): JavaSchemaRDD = {
- if (sqlContext.dialect == "sql") {
+ if (sqlContext.conf.dialect == "sql") {
new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlText))
} else {
sys.error(s"Unsupported SQL dialect: $sqlContext.dialect")
@@ -164,7 +164,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
* It goes through the entire dataset once to determine the schema.
*/
def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD = {
- val columnNameOfCorruptJsonRecord = sqlContext.columnNameOfCorruptRecord
+ val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord
val appliedScalaSchema =
JsonRDD.nullTypeToStringType(
JsonRDD.inferSchema(json.rdd, 1.0, columnNameOfCorruptJsonRecord))
@@ -182,7 +182,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
*/
@Experimental
def jsonRDD(json: JavaRDD[String], schema: StructType): JavaSchemaRDD = {
- val columnNameOfCorruptJsonRecord = sqlContext.columnNameOfCorruptRecord
+ val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord
val appliedScalaSchema =
Option(asScalaDataType(schema)).getOrElse(
JsonRDD.nullTypeToStringType(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 1e432485c4..065fae3c83 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -82,7 +82,7 @@ private[sql] case class InMemoryRelation(
if (batchStats.value.isEmpty) {
// Underlying columnar RDD hasn't been materialized, no useful statistics information
// available, return the default statistics.
- Statistics(sizeInBytes = child.sqlContext.defaultSizeInBytes)
+ Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
} else {
// Underlying columnar RDD has been materialized, required information has also been collected
// via the `batchStats` accumulator, compute the final statistics, and update `_statistics`.
@@ -233,7 +233,7 @@ private[sql] case class InMemoryColumnarTableScan(
val readPartitions = sparkContext.accumulator(0)
val readBatches = sparkContext.accumulator(0)
- private val inMemoryPartitionPruningEnabled = sqlContext.inMemoryPartitionPruning
+ private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning
override def execute() = {
readPartitions.setValue(0)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index d7c811ca89..7c0b72aab4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -123,7 +123,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
*/
private[sql] case class AddExchange(sqlContext: SQLContext) extends Rule[SparkPlan] {
// TODO: Determine the number of partitions.
- def numPartitions = sqlContext.numShufflePartitions
+ def numPartitions = sqlContext.conf.numShufflePartitions
def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case operator: SparkPlan =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index d2d8cb1c62..069e950195 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -69,7 +69,7 @@ case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlContext: SQLCont
@transient override lazy val statistics = Statistics(
// TODO: Instead of returning a default value here, find a way to return a meaningful size
// estimate for RDDs. See PR 1238 for more discussions.
- sizeInBytes = BigInt(sqlContext.defaultSizeInBytes)
+ sizeInBytes = BigInt(sqlContext.conf.defaultSizeInBytes)
)
}
@@ -106,6 +106,6 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQ
@transient override lazy val statistics = Statistics(
// TODO: Instead of returning a default value here, find a way to return a meaningful size
// estimate for RDDs. See PR 1238 for more discussions.
- sizeInBytes = BigInt(sqlContext.defaultSizeInBytes)
+ sizeInBytes = BigInt(sqlContext.conf.defaultSizeInBytes)
)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 017c78d2c6..6fecd1ff06 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -51,7 +51,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
// sqlContext will be null when we are being deserialized on the slaves. In this instance
// the value of codegenEnabled will be set by the desserializer after the constructor has run.
val codegenEnabled: Boolean = if (sqlContext != null) {
- sqlContext.codegenEnabled
+ sqlContext.conf.codegenEnabled
} else {
false
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index d91b1fbc69..0652d2ff7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -35,8 +35,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object LeftSemiJoin extends Strategy with PredicateHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right)
- if sqlContext.autoBroadcastJoinThreshold > 0 &&
- right.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold =>
+ if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
+ right.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold =>
val semiJoin = joins.BroadcastLeftSemiJoinHash(
leftKeys, rightKeys, planLater(left), planLater(right))
condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil
@@ -81,13 +81,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
- if sqlContext.autoBroadcastJoinThreshold > 0 &&
- right.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold =>
+ if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
+ right.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold =>
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildRight)
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
- if sqlContext.autoBroadcastJoinThreshold > 0 &&
- left.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold =>
+ if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
+ left.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold =>
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildLeft)
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
@@ -215,7 +215,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
val prunePushedDownFilters =
- if (sqlContext.parquetFilterPushDown) {
+ if (sqlContext.conf.parquetFilterPushDown) {
(predicates: Seq[Expression]) => {
// Note: filters cannot be pushed down to Parquet if they contain more complex
// expressions than simple "Attribute cmp Literal" comparisons. Here we remove all
@@ -237,7 +237,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
ParquetTableScan(
_,
relation,
- if (sqlContext.parquetFilterPushDown) filters else Nil)) :: Nil
+ if (sqlContext.conf.parquetFilterPushDown) filters else Nil)) :: Nil
case _ => Nil
}
@@ -270,7 +270,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// This sort only sorts tuples within a partition. Its requiredDistribution will be
// an UnspecifiedDistribution.
execution.Sort(sortExprs, global = false, planLater(child)) :: Nil
- case logical.Sort(sortExprs, global, child) if sqlContext.externalSortEnabled =>
+ case logical.Sort(sortExprs, global, child) if sqlContext.conf.externalSortEnabled =>
execution.ExternalSort(sortExprs, global, planLater(child)):: Nil
case logical.Sort(sortExprs, global, child) =>
execution.Sort(sortExprs, global, planLater(child)):: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index df8e616151..af6b07bd6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -94,7 +94,7 @@ case class SetCommand(
logWarning(
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
- Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.numShufflePartitions}"))
+ Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.conf.numShufflePartitions}"))
// Queries a single property.
case Some((key, None)) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index fbe1d06ed2..2dd22c020e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -43,7 +43,7 @@ case class BroadcastHashJoin(
extends BinaryNode with HashJoin {
val timeout = {
- val timeoutValue = sqlContext.broadcastTimeout
+ val timeoutValue = sqlContext.conf.broadcastTimeout
if (timeoutValue < 0) {
Duration.Inf
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index a9a6696cb1..f5c02224c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -59,8 +59,8 @@ private[sql] case class JSONRelation(
JsonRDD.inferSchema(
baseRDD,
samplingRatio,
- sqlContext.columnNameOfCorruptRecord)))
+ sqlContext.conf.columnNameOfCorruptRecord)))
override def buildScan() =
- JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.columnNameOfCorruptRecord)
+ JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.conf.columnNameOfCorruptRecord)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 2835dc3408..cde5160149 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -65,7 +65,7 @@ private[sql] case class ParquetRelation(
ParquetTypesConverter.readSchemaFromFile(
new Path(path.split(",").head),
conf,
- sqlContext.isParquetBinaryAsString)
+ sqlContext.conf.isParquetBinaryAsString)
lazy val attributeMap = AttributeMap(output.map(o => o -> o))
@@ -80,7 +80,7 @@ private[sql] case class ParquetRelation(
}
// TODO: Use data from the footers.
- override lazy val statistics = Statistics(sizeInBytes = sqlContext.defaultSizeInBytes)
+ override lazy val statistics = Statistics(sizeInBytes = sqlContext.conf.defaultSizeInBytes)
}
private[sql] object ParquetRelation {
@@ -163,7 +163,8 @@ private[sql] object ParquetRelation {
sqlContext: SQLContext): ParquetRelation = {
val path = checkPath(pathString, allowExisting, conf)
conf.set(ParquetOutputFormat.COMPRESSION, shortParquetCompressionCodecNames.getOrElse(
- sqlContext.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED).name())
+ sqlContext.conf.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED)
+ .name())
ParquetRelation.enableLogForwarding()
ParquetTypesConverter.writeMetaData(attributes, path, conf)
new ParquetRelation(path.toString, Some(conf), sqlContext) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
index b4d48902fd..02ce1b3e6d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
@@ -54,7 +54,7 @@ trait ParquetTest {
try f finally {
keys.zip(currentValues).foreach {
case (key, Some(value)) => setConf(key, value)
- case (key, None) => unsetConf(key)
+ case (key, None) => conf.unsetConf(key)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 2e0c6c51c0..55a2728a85 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -137,7 +137,7 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
ParquetTypesConverter.readSchemaFromFile(
partitions.head.files.head.getPath,
Some(sparkContext.hadoopConfiguration),
- sqlContext.isParquetBinaryAsString))
+ sqlContext.conf.isParquetBinaryAsString))
val dataIncludesKey =
partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true)
@@ -198,7 +198,7 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
predicates
.reduceOption(And)
.flatMap(ParquetFilters.createFilter)
- .filter(_ => sqlContext.parquetFilterPushDown)
+ .filter(_ => sqlContext.conf.parquetFilterPushDown)
.foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
def percentRead = selectedPartitions.size.toDouble / partitions.size.toDouble * 100
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 2a7be23e37..7f5564baa0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -99,7 +99,7 @@ abstract class BaseRelation {
* large to broadcast. This method will be called multiple times during query planning
* and thus should not perform expensive operations for each invocation.
*/
- def sizeInBytes = sqlContext.defaultSizeInBytes
+ def sizeInBytes = sqlContext.conf.defaultSizeInBytes
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 6bb81c76ed..8c80be106f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -29,6 +29,7 @@ object TestSQLContext
new SparkConf().set("spark.sql.testkey", "true"))) {
/** Fewer partitions to speed up testing. */
- override private[spark] def numShufflePartitions: Int =
- getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
+ private[sql] override lazy val conf: SQLConf = new SQLConf {
+ override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index c7e136388f..e5ab16f9dd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -387,7 +387,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
test("broadcasted left semi join operator selection") {
clearCache()
sql("CACHE TABLE testData")
- val tmp = autoBroadcastJoinThreshold
+ val tmp = conf.autoBroadcastJoinThreshold
sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=1000000000")
Seq(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
index 60701f0e15..bf73d0c707 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
@@ -37,7 +37,7 @@ class SQLConfSuite extends QueryTest with FunSuiteLike {
}
test("programmatic ways of basic setting and getting") {
- clear()
+ conf.clear()
assert(getAllConfs.size === 0)
setConf(testKey, testVal)
@@ -51,11 +51,11 @@ class SQLConfSuite extends QueryTest with FunSuiteLike {
assert(TestSQLContext.getConf(testKey, testVal + "_") == testVal)
assert(TestSQLContext.getAllConfs.contains(testKey))
- clear()
+ conf.clear()
}
test("parse SQL set commands") {
- clear()
+ conf.clear()
sql(s"set $testKey=$testVal")
assert(getConf(testKey, testVal + "_") == testVal)
assert(TestSQLContext.getConf(testKey, testVal + "_") == testVal)
@@ -73,11 +73,11 @@ class SQLConfSuite extends QueryTest with FunSuiteLike {
sql(s"set $key=")
assert(getConf(key, "0") == "")
- clear()
+ conf.clear()
}
test("deprecated property") {
- clear()
+ conf.clear()
sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10")
assert(getConf(SQLConf.SHUFFLE_PARTITIONS) == "10")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index d9de5686dc..bc72daf088 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -79,7 +79,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
}
test("aggregation with codegen") {
- val originalValue = codegenEnabled
+ val originalValue = conf.codegenEnabled
setConf(SQLConf.CODEGEN_ENABLED, "true")
sql("SELECT key FROM testData GROUP BY key").collect()
setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString)
@@ -245,14 +245,14 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
}
test("sorting") {
- val before = externalSortEnabled
+ val before = conf.externalSortEnabled
setConf(SQLConf.EXTERNAL_SORT, "false")
sortTest()
setConf(SQLConf.EXTERNAL_SORT, before.toString)
}
test("external sorting") {
- val before = externalSortEnabled
+ val before = conf.externalSortEnabled
setConf(SQLConf.EXTERNAL_SORT, "true")
sortTest()
setConf(SQLConf.EXTERNAL_SORT, before.toString)
@@ -600,7 +600,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
}
test("SET commands semantics using sql()") {
- clear()
+ conf.clear()
val testKey = "test.key.0"
val testVal = "test.val.0"
val nonexistentKey = "nonexistent"
@@ -632,7 +632,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
sql(s"SET $nonexistentKey"),
Seq(Seq(s"$nonexistentKey=<undefined>"))
)
- clear()
+ conf.clear()
}
test("apply schema") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
index fc95dccc74..d94729ba92 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -39,7 +39,8 @@ class InMemoryColumnarQuerySuite extends QueryTest {
sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).registerTempTable("sizeTst")
cacheTable("sizeTst")
assert(
- table("sizeTst").queryExecution.logical.statistics.sizeInBytes > autoBroadcastJoinThreshold)
+ table("sizeTst").queryExecution.logical.statistics.sizeInBytes >
+ conf.autoBroadcastJoinThreshold)
}
test("projection") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
index 1915c25392..592cafbbdc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
@@ -23,8 +23,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.test.TestSQLContext._
class PartitionBatchPruningSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfter {
- val originalColumnBatchSize = columnBatchSize
- val originalInMemoryPartitionPruning = inMemoryPartitionPruning
+ val originalColumnBatchSize = conf.columnBatchSize
+ val originalInMemoryPartitionPruning = conf.inMemoryPartitionPruning
override protected def beforeAll(): Unit = {
// Make a table with 5 partitions, 2 batches per partition, 10 elements per batch
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index a5af71acfc..c5b6fce5fd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -60,7 +60,7 @@ class PlannerSuite extends FunSuite {
}
test("sizeInBytes estimation of limit operator for broadcast hash join optimization") {
- val origThreshold = autoBroadcastJoinThreshold
+ val origThreshold = conf.autoBroadcastJoinThreshold
setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 81920.toString)
// Using a threshold that is definitely larger than the small testing table (b) below
@@ -78,7 +78,7 @@ class PlannerSuite extends FunSuite {
}
test("InMemoryRelation statistics propagation") {
- val origThreshold = autoBroadcastJoinThreshold
+ val origThreshold = conf.autoBroadcastJoinThreshold
setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 81920.toString)
testData.limit(3).registerTempTable("tiny")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 8dce3372a8..b09f1ac495 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -713,7 +713,7 @@ class JsonSuite extends QueryTest {
test("Corrupt records") {
// Test if we can query corrupt records.
- val oldColumnNameOfCorruptRecord = TestSQLContext.columnNameOfCorruptRecord
+ val oldColumnNameOfCorruptRecord = TestSQLContext.conf.columnNameOfCorruptRecord
TestSQLContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed")
val jsonSchemaRDD = jsonRDD(corruptRecords)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 10a01474e9..6ac67fcafe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -213,7 +213,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest {
def checkCompressionCodec(codec: CompressionCodecName): Unit = {
withSQLConf(SQLConf.PARQUET_COMPRESSION -> codec.name()) {
withParquetFile(data) { path =>
- assertResult(parquetCompressionCodec.toUpperCase) {
+ assertResult(conf.parquetCompressionCodec.toUpperCase) {
compressionCodecFor(path)
}
}
@@ -221,7 +221,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest {
}
// Checks default compression codec
- checkCompressionCodec(CompressionCodecName.fromConf(parquetCompressionCodec))
+ checkCompressionCodec(CompressionCodecName.fromConf(conf.parquetCompressionCodec))
checkCompressionCodec(CompressionCodecName.UNCOMPRESSED)
checkCompressionCodec(CompressionCodecName.GZIP)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index a5fe2e8da2..0a92336a3c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -88,7 +88,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
TestData // Load test data tables.
private var testRDD: SchemaRDD = null
- private val originalParquetFilterPushdownEnabled = TestSQLContext.parquetFilterPushDown
+ private val originalParquetFilterPushdownEnabled = TestSQLContext.conf.parquetFilterPushDown
override def beforeAll() {
ParquetTestData.writeFile()
@@ -144,7 +144,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
}
ignore("Treat binary as string") {
- val oldIsParquetBinaryAsString = TestSQLContext.isParquetBinaryAsString
+ val oldIsParquetBinaryAsString = TestSQLContext.conf.isParquetBinaryAsString
// Create the test file.
val file = getTempFilePath("parquet")
@@ -174,7 +174,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
}
test("Compression options for writing to a Parquetfile") {
- val defaultParquetCompressionCodec = TestSQLContext.parquetCompressionCodec
+ val defaultParquetCompressionCodec = TestSQLContext.conf.parquetCompressionCodec
import scala.collection.JavaConversions._
val file = getTempFilePath("parquet")
@@ -186,7 +186,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
rdd.saveAsParquetFile(path)
var actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
- assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
+ assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil)
parquetFile(path).registerTempTable("tmp")
checkAnswer(
@@ -202,7 +202,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
rdd.saveAsParquetFile(path)
actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
- assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
+ assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil)
parquetFile(path).registerTempTable("tmp")
checkAnswer(
@@ -234,7 +234,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
rdd.saveAsParquetFile(path)
actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
- assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
+ assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil)
parquetFile(path).registerTempTable("tmp")
checkAnswer(
@@ -250,7 +250,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
rdd.saveAsParquetFile(path)
actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
.getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
- assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
+ assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil)
parquetFile(path).registerTempTable("tmp")
checkAnswer(