aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-06-17 23:22:54 -0700
committerReynold Xin <rxin@databricks.com>2015-06-17 23:22:54 -0700
commit78a430ea4d2aef58a8bf38ce488553ca6acea428 (patch)
tree467ff63fa1838be86432de2c505e22880c937293 /sql/core
parent9db73ec12412f6809030546cf69dcb32d2c8e0fe (diff)
downloadspark-78a430ea4d2aef58a8bf38ce488553ca6acea428.tar.gz
spark-78a430ea4d2aef58a8bf38ce488553ca6acea428.tar.bz2
spark-78a430ea4d2aef58a8bf38ce488553ca6acea428.zip
[SPARK-7961][SQL]Refactor SQLConf to display better error message
1. Add `SQLConfEntry` to store the information about a configuration. For those configurations that cannot be found in `sql-programming-guide.md`, I left the doc as `<TODO>`. 2. Verify the value when setting a configuration if this is in SQLConf. 3. Use `SET -v` to display all public configurations. Author: zsxwing <zsxwing@gmail.com> Closes #6747 from zsxwing/sqlconf and squashes the following commits: 7d09bad [zsxwing] Use SQLConfEntry in HiveContext 49f6213 [zsxwing] Add getConf, setConf to SQLContext and HiveContext e014f53 [zsxwing] Merge branch 'master' into sqlconf 93dad8e [zsxwing] Fix the unit tests cf950c1 [zsxwing] Fix the code style and tests 3c5f03e [zsxwing] Add unsetConf(SQLConfEntry) and fix the code style a2f4add [zsxwing] getConf will return the default value if a config is not set 037b1db [zsxwing] Add schema to SetCommand 0520c3c [zsxwing] Merge branch 'master' into sqlconf 7afb0ec [zsxwing] Fix the configurations about HiveThriftServer 7e728e3 [zsxwing] Add doc for SQLConfEntry and fix 'toString' 5e95b10 [zsxwing] Add enumConf c6ba76d [zsxwing] setRawString => setConfString, getRawString => getConfString 4abd807 [zsxwing] Fix the test for 'set -v' 6e47e56 [zsxwing] Fix the compilation error 8973ced [zsxwing] Remove floatConf 1fc3a8b [zsxwing] Remove the 'conf' command and use 'set -v' instead 99c9c16 [zsxwing] Fix tests that use SQLConfEntry as a string 88a03cc [zsxwing] Add new lines between confs and return types ce7c6c8 [zsxwing] Remove seqConf f3c1b33 [zsxwing] Refactor SQLConf to display better error message
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala493
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala25
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala98
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala150
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala42
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala6
23 files changed, 733 insertions, 207 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 55ab6b3358..16493c3d7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -25,74 +25,333 @@ import scala.collection.JavaConversions._
import org.apache.spark.sql.catalyst.CatalystConf
private[spark] object SQLConf {
- val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
- val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"
- val IN_MEMORY_PARTITION_PRUNING = "spark.sql.inMemoryColumnarStorage.partitionPruning"
- val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
- val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
- val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
- val CODEGEN_ENABLED = "spark.sql.codegen"
- val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
- val DIALECT = "spark.sql.dialect"
- val CASE_SENSITIVE = "spark.sql.caseSensitive"
-
- val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
- val PARQUET_INT96_AS_TIMESTAMP = "spark.sql.parquet.int96AsTimestamp"
- val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
- val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
- val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
- val PARQUET_USE_DATA_SOURCE_API = "spark.sql.parquet.useDataSourceApi"
-
- val ORC_FILTER_PUSHDOWN_ENABLED = "spark.sql.orc.filterPushdown"
-
- val HIVE_VERIFY_PARTITIONPATH = "spark.sql.hive.verifyPartitionPath"
-
- val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
- val BROADCAST_TIMEOUT = "spark.sql.broadcastTimeout"
+
+ private val sqlConfEntries = java.util.Collections.synchronizedMap(
+ new java.util.HashMap[String, SQLConfEntry[_]]())
+
+ /**
+ * An entry contains all meta information for a configuration.
+ *
+ * @param key the key for the configuration
+ * @param defaultValue the default value for the configuration
+ * @param valueConverter how to convert a string to the value. It should throw an exception if the
+ * string does not have the required format.
+ * @param stringConverter how to convert a value to a string that the user can use it as a valid
+ * string value. It's usually `toString`. But sometimes, a custom converter
+ * is necessary. E.g., if T is List[String], `a, b, c` is better than
+ * `List(a, b, c)`.
+ * @param doc the document for the configuration
+ * @param isPublic if this configuration is public to the user. If it's `false`, this
+ * configuration is only used internally and we should not expose it to the user.
+ * @tparam T the value type
+ */
+ private[sql] class SQLConfEntry[T] private(
+ val key: String,
+ val defaultValue: Option[T],
+ val valueConverter: String => T,
+ val stringConverter: T => String,
+ val doc: String,
+ val isPublic: Boolean) {
+
+ def defaultValueString: String = defaultValue.map(stringConverter).getOrElse("<undefined>")
+
+ override def toString: String = {
+ s"SQLConfEntry(key = $key, defaultValue=$defaultValueString, doc=$doc, isPublic = $isPublic)"
+ }
+ }
+
+ private[sql] object SQLConfEntry {
+
+ private def apply[T](
+ key: String,
+ defaultValue: Option[T],
+ valueConverter: String => T,
+ stringConverter: T => String,
+ doc: String,
+ isPublic: Boolean): SQLConfEntry[T] =
+ sqlConfEntries.synchronized {
+ if (sqlConfEntries.containsKey(key)) {
+ throw new IllegalArgumentException(s"Duplicate SQLConfEntry. $key has been registered")
+ }
+ val entry =
+ new SQLConfEntry[T](key, defaultValue, valueConverter, stringConverter, doc, isPublic)
+ sqlConfEntries.put(key, entry)
+ entry
+ }
+
+ def intConf(
+ key: String,
+ defaultValue: Option[Int] = None,
+ doc: String = "",
+ isPublic: Boolean = true): SQLConfEntry[Int] =
+ SQLConfEntry(key, defaultValue, { v =>
+ try {
+ v.toInt
+ } catch {
+ case _: NumberFormatException =>
+ throw new IllegalArgumentException(s"$key should be int, but was $v")
+ }
+ }, _.toString, doc, isPublic)
+
+ def longConf(
+ key: String,
+ defaultValue: Option[Long] = None,
+ doc: String = "",
+ isPublic: Boolean = true): SQLConfEntry[Long] =
+ SQLConfEntry(key, defaultValue, { v =>
+ try {
+ v.toLong
+ } catch {
+ case _: NumberFormatException =>
+ throw new IllegalArgumentException(s"$key should be long, but was $v")
+ }
+ }, _.toString, doc, isPublic)
+
+ def doubleConf(
+ key: String,
+ defaultValue: Option[Double] = None,
+ doc: String = "",
+ isPublic: Boolean = true): SQLConfEntry[Double] =
+ SQLConfEntry(key, defaultValue, { v =>
+ try {
+ v.toDouble
+ } catch {
+ case _: NumberFormatException =>
+ throw new IllegalArgumentException(s"$key should be double, but was $v")
+ }
+ }, _.toString, doc, isPublic)
+
+ def booleanConf(
+ key: String,
+ defaultValue: Option[Boolean] = None,
+ doc: String = "",
+ isPublic: Boolean = true): SQLConfEntry[Boolean] =
+ SQLConfEntry(key, defaultValue, { v =>
+ try {
+ v.toBoolean
+ } catch {
+ case _: IllegalArgumentException =>
+ throw new IllegalArgumentException(s"$key should be boolean, but was $v")
+ }
+ }, _.toString, doc, isPublic)
+
+ def stringConf(
+ key: String,
+ defaultValue: Option[String] = None,
+ doc: String = "",
+ isPublic: Boolean = true): SQLConfEntry[String] =
+ SQLConfEntry(key, defaultValue, v => v, v => v, doc, isPublic)
+
+ def enumConf[T](
+ key: String,
+ valueConverter: String => T,
+ validValues: Set[T],
+ defaultValue: Option[T] = None,
+ doc: String = "",
+ isPublic: Boolean = true): SQLConfEntry[T] =
+ SQLConfEntry(key, defaultValue, v => {
+ val _v = valueConverter(v)
+ if (!validValues.contains(_v)) {
+ throw new IllegalArgumentException(
+ s"The value of $key should be one of ${validValues.mkString(", ")}, but was $v")
+ }
+ _v
+ }, _.toString, doc, isPublic)
+
+ def seqConf[T](
+ key: String,
+ valueConverter: String => T,
+ defaultValue: Option[Seq[T]] = None,
+ doc: String = "",
+ isPublic: Boolean = true): SQLConfEntry[Seq[T]] = {
+ SQLConfEntry(
+ key, defaultValue, _.split(",").map(valueConverter), _.mkString(","), doc, isPublic)
+ }
+
+ def stringSeqConf(
+ key: String,
+ defaultValue: Option[Seq[String]] = None,
+ doc: String = "",
+ isPublic: Boolean = true): SQLConfEntry[Seq[String]] = {
+ seqConf(key, s => s, defaultValue, doc, isPublic)
+ }
+ }
+
+ import SQLConfEntry._
+
+ val COMPRESS_CACHED = booleanConf("spark.sql.inMemoryColumnarStorage.compressed",
+ defaultValue = Some(true),
+ doc = "When set to true Spark SQL will automatically select a compression codec for each " +
+ "column based on statistics of the data.")
+
+ val COLUMN_BATCH_SIZE = intConf("spark.sql.inMemoryColumnarStorage.batchSize",
+ defaultValue = Some(10000),
+ doc = "Controls the size of batches for columnar caching. Larger batch sizes can improve " +
+ "memory utilization and compression, but risk OOMs when caching data.")
+
+ val IN_MEMORY_PARTITION_PRUNING =
+ booleanConf("spark.sql.inMemoryColumnarStorage.partitionPruning",
+ defaultValue = Some(false),
+ doc = "<TODO>")
+
+ val AUTO_BROADCASTJOIN_THRESHOLD = intConf("spark.sql.autoBroadcastJoinThreshold",
+ defaultValue = Some(10 * 1024 * 1024),
+ doc = "Configures the maximum size in bytes for a table that will be broadcast to all worker " +
+ "nodes when performing a join. By setting this value to -1 broadcasting can be disabled. " +
+ "Note that currently statistics are only supported for Hive Metastore tables where the " +
+ "command<code>ANALYZE TABLE &lt;tableName&gt; COMPUTE STATISTICS noscan</code> has been run.")
+
+ val DEFAULT_SIZE_IN_BYTES = longConf("spark.sql.defaultSizeInBytes", isPublic = false)
+
+ val SHUFFLE_PARTITIONS = intConf("spark.sql.shuffle.partitions",
+ defaultValue = Some(200),
+ doc = "Configures the number of partitions to use when shuffling data for joins or " +
+ "aggregations.")
+
+ val CODEGEN_ENABLED = booleanConf("spark.sql.codegen",
+ defaultValue = Some(true),
+ doc = "When true, code will be dynamically generated at runtime for expression evaluation in" +
+ " a specific query. For some queries with complicated expression this option can lead to " +
+ "significant speed-ups. However, for simple queries this can actually slow down query " +
+ "execution.")
+
+ val UNSAFE_ENABLED = booleanConf("spark.sql.unsafe.enabled",
+ defaultValue = Some(false),
+ doc = "<TDDO>")
+
+ val DIALECT = stringConf("spark.sql.dialect", defaultValue = Some("sql"), doc = "<TODO>")
+
+ val CASE_SENSITIVE = booleanConf("spark.sql.caseSensitive",
+ defaultValue = Some(true),
+ doc = "<TODO>")
+
+ val PARQUET_BINARY_AS_STRING = booleanConf("spark.sql.parquet.binaryAsString",
+ defaultValue = Some(false),
+ doc = "Some other Parquet-producing systems, in particular Impala and older versions of " +
+ "Spark SQL, do not differentiate between binary data and strings when writing out the " +
+ "Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide " +
+ "compatibility with these systems.")
+
+ val PARQUET_INT96_AS_TIMESTAMP = booleanConf("spark.sql.parquet.int96AsTimestamp",
+ defaultValue = Some(true),
+ doc = "Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " +
+ "Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " +
+ "nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " +
+ "provide compatibility with these systems.")
+
+ val PARQUET_CACHE_METADATA = booleanConf("spark.sql.parquet.cacheMetadata",
+ defaultValue = Some(true),
+ doc = "Turns on caching of Parquet schema metadata. Can speed up querying of static data.")
+
+ val PARQUET_COMPRESSION = enumConf("spark.sql.parquet.compression.codec",
+ valueConverter = v => v.toLowerCase,
+ validValues = Set("uncompressed", "snappy", "gzip", "lzo"),
+ defaultValue = Some("gzip"),
+ doc = "Sets the compression codec use when writing Parquet files. Acceptable values include: " +
+ "uncompressed, snappy, gzip, lzo.")
+
+ val PARQUET_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.parquet.filterPushdown",
+ defaultValue = Some(false),
+ doc = "Turn on Parquet filter pushdown optimization. This feature is turned off by default" +
+ " because of a known bug in Paruet 1.6.0rc3 " +
+ "(<a href=\"https://issues.apache.org/jira/browse/PARQUET-136\">PARQUET-136</a>). However, " +
+ "if your table doesn't contain any nullable string or binary columns, it's still safe to " +
+ "turn this feature on.")
+
+ val PARQUET_USE_DATA_SOURCE_API = booleanConf("spark.sql.parquet.useDataSourceApi",
+ defaultValue = Some(true),
+ doc = "<TODO>")
+
+ val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
+ defaultValue = Some(false),
+ doc = "<TODO>")
+
+ val HIVE_VERIFY_PARTITIONPATH = booleanConf("spark.sql.hive.verifyPartitionPath",
+ defaultValue = Some(true),
+ doc = "<TODO>")
+
+ val COLUMN_NAME_OF_CORRUPT_RECORD = stringConf("spark.sql.columnNameOfCorruptRecord",
+ defaultValue = Some("_corrupt_record"),
+ doc = "<TODO>")
+
+ val BROADCAST_TIMEOUT = intConf("spark.sql.broadcastTimeout",
+ defaultValue = Some(5 * 60),
+ doc = "<TODO>")
// Options that control which operators can be chosen by the query planner. These should be
// considered hints and may be ignored by future versions of Spark SQL.
- val EXTERNAL_SORT = "spark.sql.planner.externalSort"
- val SORTMERGE_JOIN = "spark.sql.planner.sortMergeJoin"
+ val EXTERNAL_SORT = booleanConf("spark.sql.planner.externalSort",
+ defaultValue = Some(true),
+ doc = "When true, performs sorts spilling to disk as needed otherwise sort each partition in" +
+ " memory.")
+
+ val SORTMERGE_JOIN = booleanConf("spark.sql.planner.sortMergeJoin",
+ defaultValue = Some(false),
+ doc = "<TODO>")
// This is only used for the thriftserver
- val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
- val THRIFTSERVER_UI_STATEMENT_LIMIT = "spark.sql.thriftserver.ui.retainedStatements"
- val THRIFTSERVER_UI_SESSION_LIMIT = "spark.sql.thriftserver.ui.retainedSessions"
+ val THRIFTSERVER_POOL = stringConf("spark.sql.thriftserver.scheduler.pool",
+ doc = "Set a Fair Scheduler pool for a JDBC client session")
+
+ val THRIFTSERVER_UI_STATEMENT_LIMIT = intConf("spark.sql.thriftserver.ui.retainedStatements",
+ defaultValue = Some(200),
+ doc = "<TODO>")
+
+ val THRIFTSERVER_UI_SESSION_LIMIT = intConf("spark.sql.thriftserver.ui.retainedSessions",
+ defaultValue = Some(200),
+ doc = "<TODO>")
// This is used to set the default data source
- val DEFAULT_DATA_SOURCE_NAME = "spark.sql.sources.default"
+ val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default",
+ defaultValue = Some("org.apache.spark.sql.parquet"),
+ doc = "<TODO>")
+
// This is used to control the when we will split a schema's JSON string to multiple pieces
// in order to fit the JSON string in metastore's table property (by default, the value has
// a length restriction of 4000 characters). We will split the JSON string of a schema
// to its length exceeds the threshold.
- val SCHEMA_STRING_LENGTH_THRESHOLD = "spark.sql.sources.schemaStringLengthThreshold"
+ val SCHEMA_STRING_LENGTH_THRESHOLD = intConf("spark.sql.sources.schemaStringLengthThreshold",
+ defaultValue = Some(4000),
+ doc = "<TODO>")
// Whether to perform partition discovery when loading external data sources. Default to true.
- val PARTITION_DISCOVERY_ENABLED = "spark.sql.sources.partitionDiscovery.enabled"
+ val PARTITION_DISCOVERY_ENABLED = booleanConf("spark.sql.sources.partitionDiscovery.enabled",
+ defaultValue = Some(true),
+ doc = "<TODO>")
// Whether to perform partition column type inference. Default to true.
- val PARTITION_COLUMN_TYPE_INFERENCE = "spark.sql.sources.partitionColumnTypeInference.enabled"
+ val PARTITION_COLUMN_TYPE_INFERENCE =
+ booleanConf("spark.sql.sources.partitionColumnTypeInference.enabled",
+ defaultValue = Some(true),
+ doc = "<TODO>")
// The output committer class used by FSBasedRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
// NOTE: This property should be set in Hadoop `Configuration` rather than Spark `SQLConf`
- val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass"
+ val OUTPUT_COMMITTER_CLASS =
+ stringConf("spark.sql.sources.outputCommitterClass", isPublic = false)
// Whether to perform eager analysis when constructing a dataframe.
// Set to false when debugging requires the ability to look at invalid query plans.
- val DATAFRAME_EAGER_ANALYSIS = "spark.sql.eagerAnalysis"
+ val DATAFRAME_EAGER_ANALYSIS = booleanConf("spark.sql.eagerAnalysis",
+ defaultValue = Some(true),
+ doc = "<TODO>")
// Whether to automatically resolve ambiguity in join conditions for self-joins.
// See SPARK-6231.
- val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = "spark.sql.selfJoinAutoResolveAmbiguity"
+ val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
+ booleanConf("spark.sql.selfJoinAutoResolveAmbiguity", defaultValue = Some(true), doc = "<TODO>")
// Whether to retain group by columns or not in GroupedData.agg.
- val DATAFRAME_RETAIN_GROUP_COLUMNS = "spark.sql.retainGroupColumns"
+ val DATAFRAME_RETAIN_GROUP_COLUMNS = booleanConf("spark.sql.retainGroupColumns",
+ defaultValue = Some(true),
+ doc = "<TODO>")
- val USE_SQL_SERIALIZER2 = "spark.sql.useSerializer2"
+ val USE_SQL_SERIALIZER2 = booleanConf("spark.sql.useSerializer2",
+ defaultValue = Some(true), doc = "<TODO>")
- val USE_JACKSON_STREAMING_API = "spark.sql.json.useJacksonStreamingAPI"
+ val USE_JACKSON_STREAMING_API = booleanConf("spark.sql.json.useJacksonStreamingAPI",
+ defaultValue = Some(true), doc = "<TODO>")
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
@@ -131,56 +390,54 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
* Note that the choice of dialect does not affect things like what tables are available or
* how query execution is performed.
*/
- private[spark] def dialect: String = getConf(DIALECT, "sql")
+ private[spark] def dialect: String = getConf(DIALECT)
/** When true tables cached using the in-memory columnar caching will be compressed. */
- private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "true").toBoolean
+ private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED)
/** The compression codec for writing to a Parquetfile */
- private[spark] def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION, "gzip")
+ private[spark] def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION)
+
+ private[spark] def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA)
/** The number of rows that will be */
- private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "10000").toInt
+ private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
/** Number of partitions to use for shuffle operators. */
- private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt
+ private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
/** When true predicates will be passed to the parquet record reader when possible. */
- private[spark] def parquetFilterPushDown =
- getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean
+ private[spark] def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
/** When true uses Parquet implementation based on data source API */
- private[spark] def parquetUseDataSourceApi =
- getConf(PARQUET_USE_DATA_SOURCE_API, "true").toBoolean
+ private[spark] def parquetUseDataSourceApi: Boolean = getConf(PARQUET_USE_DATA_SOURCE_API)
- private[spark] def orcFilterPushDown =
- getConf(ORC_FILTER_PUSHDOWN_ENABLED, "false").toBoolean
+ private[spark] def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
/** When true uses verifyPartitionPath to prune the path which is not exists. */
- private[spark] def verifyPartitionPath =
- getConf(HIVE_VERIFY_PARTITIONPATH, "true").toBoolean
+ private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITIONPATH)
/** When true the planner will use the external sort, which may spill to disk. */
- private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "true").toBoolean
+ private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT)
/**
* Sort merge join would sort the two side of join first, and then iterate both sides together
* only once to get all matches. Using sort merge join can save a lot of memory usage compared
* to HashJoin.
*/
- private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN, "false").toBoolean
+ private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN)
/**
* When set to true, Spark SQL will use the Janino at runtime to generate custom bytecode
* that evaluates expressions found in queries. In general this custom code runs much faster
* than interpreted evaluation, but there are some start-up costs (5-10ms) due to compilation.
*/
- private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, "true").toBoolean
+ private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED)
/**
* caseSensitive analysis true by default
*/
- def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, "true").toBoolean
+ def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
/**
* When set to true, Spark SQL will use managed memory for certain operations. This option only
@@ -188,15 +445,14 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
*
* Defaults to false as this feature is currently experimental.
*/
- private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED, "false").toBoolean
+ private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED)
- private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2, "true").toBoolean
+ private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2)
/**
* Selects between the new (true) and old (false) JSON handlers, to be removed in Spark 1.5.0
*/
- private[spark] def useJacksonStreamingAPI: Boolean =
- getConf(USE_JACKSON_STREAMING_API, "true").toBoolean
+ private[spark] def useJacksonStreamingAPI: Boolean = getConf(USE_JACKSON_STREAMING_API)
/**
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
@@ -205,8 +461,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
*
* Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is 10000.
*/
- private[spark] def autoBroadcastJoinThreshold: Int =
- getConf(AUTO_BROADCASTJOIN_THRESHOLD, (10 * 1024 * 1024).toString).toInt
+ private[spark] def autoBroadcastJoinThreshold: Int = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
/**
* The default size in bytes to assign to a logical operator's estimation statistics. By default,
@@ -215,82 +470,116 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
* in joins.
*/
private[spark] def defaultSizeInBytes: Long =
- getConf(DEFAULT_SIZE_IN_BYTES, (autoBroadcastJoinThreshold + 1).toString).toLong
+ getConf(DEFAULT_SIZE_IN_BYTES, autoBroadcastJoinThreshold + 1L)
/**
* When set to true, we always treat byte arrays in Parquet files as strings.
*/
- private[spark] def isParquetBinaryAsString: Boolean =
- getConf(PARQUET_BINARY_AS_STRING, "false").toBoolean
+ private[spark] def isParquetBinaryAsString: Boolean = getConf(PARQUET_BINARY_AS_STRING)
/**
* When set to true, we always treat INT96Values in Parquet files as timestamp.
*/
- private[spark] def isParquetINT96AsTimestamp: Boolean =
- getConf(PARQUET_INT96_AS_TIMESTAMP, "true").toBoolean
+ private[spark] def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)
/**
* When set to true, partition pruning for in-memory columnar tables is enabled.
*/
- private[spark] def inMemoryPartitionPruning: Boolean =
- getConf(IN_MEMORY_PARTITION_PRUNING, "false").toBoolean
+ private[spark] def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)
- private[spark] def columnNameOfCorruptRecord: String =
- getConf(COLUMN_NAME_OF_CORRUPT_RECORD, "_corrupt_record")
+ private[spark] def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD)
/**
* Timeout in seconds for the broadcast wait time in hash join
*/
- private[spark] def broadcastTimeout: Int =
- getConf(BROADCAST_TIMEOUT, (5 * 60).toString).toInt
+ private[spark] def broadcastTimeout: Int = getConf(BROADCAST_TIMEOUT)
- private[spark] def defaultDataSourceName: String =
- getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet")
+ private[spark] def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)
- private[spark] def partitionDiscoveryEnabled() =
- getConf(SQLConf.PARTITION_DISCOVERY_ENABLED, "true").toBoolean
+ private[spark] def partitionDiscoveryEnabled(): Boolean =
+ getConf(SQLConf.PARTITION_DISCOVERY_ENABLED)
- private[spark] def partitionColumnTypeInferenceEnabled() =
- getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE, "true").toBoolean
+ private[spark] def partitionColumnTypeInferenceEnabled(): Boolean =
+ getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE)
// Do not use a value larger than 4000 as the default value of this property.
// See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
- private[spark] def schemaStringLengthThreshold: Int =
- getConf(SCHEMA_STRING_LENGTH_THRESHOLD, "4000").toInt
+ private[spark] def schemaStringLengthThreshold: Int = getConf(SCHEMA_STRING_LENGTH_THRESHOLD)
- private[spark] def dataFrameEagerAnalysis: Boolean =
- getConf(DATAFRAME_EAGER_ANALYSIS, "true").toBoolean
+ private[spark] def dataFrameEagerAnalysis: Boolean = getConf(DATAFRAME_EAGER_ANALYSIS)
private[spark] def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
- getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY, "true").toBoolean
+ getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY)
- private[spark] def dataFrameRetainGroupColumns: Boolean =
- getConf(DATAFRAME_RETAIN_GROUP_COLUMNS, "true").toBoolean
+ private[spark] def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS)
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */
def setConf(props: Properties): Unit = settings.synchronized {
- props.foreach { case (k, v) => settings.put(k, v) }
+ props.foreach { case (k, v) => setConfString(k, v) }
}
- /** Set the given Spark SQL configuration property. */
- def setConf(key: String, value: String): Unit = {
+ /** Set the given Spark SQL configuration property using a `string` value. */
+ def setConfString(key: String, value: String): Unit = {
require(key != null, "key cannot be null")
require(value != null, s"value cannot be null for key: $key")
+ val entry = sqlConfEntries.get(key)
+ if (entry != null) {
+ // Only verify configs in the SQLConf object
+ entry.valueConverter(value)
+ }
settings.put(key, value)
}
+ /** Set the given Spark SQL configuration property. */
+ def setConf[T](entry: SQLConfEntry[T], value: T): Unit = {
+ require(entry != null, "entry cannot be null")
+ require(value != null, s"value cannot be null for key: ${entry.key}")
+ require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
+ settings.put(entry.key, entry.stringConverter(value))
+ }
+
/** Return the value of Spark SQL configuration property for the given key. */
- def getConf(key: String): String = {
- Option(settings.get(key)).getOrElse(throw new NoSuchElementException(key))
+ def getConfString(key: String): String = {
+ Option(settings.get(key)).
+ orElse {
+ // Try to use the default value
+ Option(sqlConfEntries.get(key)).map(_.defaultValueString)
+ }.
+ getOrElse(throw new NoSuchElementException(key))
+ }
+
+ /**
+ * Return the value of Spark SQL configuration property for the given key. If the key is not set
+ * yet, return `defaultValue`. This is useful when `defaultValue` in SQLConfEntry is not the
+ * desired one.
+ */
+ def getConf[T](entry: SQLConfEntry[T], defaultValue: T): T = {
+ require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
+ Option(settings.get(entry.key)).map(entry.valueConverter).getOrElse(defaultValue)
}
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
- * yet, return `defaultValue`.
+ * yet, return `defaultValue` in [[SQLConfEntry]].
+ */
+ def getConf[T](entry: SQLConfEntry[T]): T = {
+ require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
+ Option(settings.get(entry.key)).map(entry.valueConverter).orElse(entry.defaultValue).
+ getOrElse(throw new NoSuchElementException(entry.key))
+ }
+
+ /**
+ * Return the `string` value of Spark SQL configuration property for the given key. If the key is
+ * not set yet, return `defaultValue`.
*/
- def getConf(key: String, defaultValue: String): String = {
+ def getConfString(key: String, defaultValue: String): String = {
+ val entry = sqlConfEntries.get(key)
+ if (entry != null && defaultValue != "<undefined>") {
+ // Only verify configs in the SQLConf object
+ entry.valueConverter(defaultValue)
+ }
Option(settings.get(key)).getOrElse(defaultValue)
}
@@ -300,11 +589,25 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
*/
def getAllConfs: immutable.Map[String, String] = settings.synchronized { settings.toMap }
- private[spark] def unsetConf(key: String) {
+ /**
+ * Return all the configuration definitions that have been defined in [[SQLConf]]. Each
+ * definition contains key, defaultValue and doc.
+ */
+ def getAllDefinedConfs: Seq[(String, String, String)] = sqlConfEntries.synchronized {
+ sqlConfEntries.values.filter(_.isPublic).map { entry =>
+ (entry.key, entry.defaultValueString, entry.doc)
+ }.toSeq
+ }
+
+ private[spark] def unsetConf(key: String): Unit = {
settings -= key
}
- private[spark] def clear() {
+ private[spark] def unsetConf(entry: SQLConfEntry[_]): Unit = {
+ settings -= entry.key
+ }
+
+ private[spark] def clear(): Unit = {
settings.clear()
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 6b605f7130..04fc798bf3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -31,6 +31,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLConf.SQLConfEntry
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.errors.DialectException
import org.apache.spark.sql.catalyst.expressions._
@@ -79,13 +80,16 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
def setConf(props: Properties): Unit = conf.setConf(props)
+ /** Set the given Spark SQL configuration property. */
+ private[sql] def setConf[T](entry: SQLConfEntry[T], value: T): Unit = conf.setConf(entry, value)
+
/**
* Set the given Spark SQL configuration property.
*
* @group config
* @since 1.0.0
*/
- def setConf(key: String, value: String): Unit = conf.setConf(key, value)
+ def setConf(key: String, value: String): Unit = conf.setConfString(key, value)
/**
* Return the value of Spark SQL configuration property for the given key.
@@ -93,7 +97,22 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group config
* @since 1.0.0
*/
- def getConf(key: String): String = conf.getConf(key)
+ def getConf(key: String): String = conf.getConfString(key)
+
+ /**
+ * Return the value of Spark SQL configuration property for the given key. If the key is not set
+ * yet, return `defaultValue` in [[SQLConfEntry]].
+ */
+ private[sql] def getConf[T](entry: SQLConfEntry[T]): T = conf.getConf(entry)
+
+ /**
+ * Return the value of Spark SQL configuration property for the given key. If the key is not set
+ * yet, return `defaultValue`. This is useful when `defaultValue` in SQLConfEntry is not the
+ * desired one.
+ */
+ private[sql] def getConf[T](entry: SQLConfEntry[T], defaultValue: T): T = {
+ conf.getConf(entry, defaultValue)
+ }
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
@@ -102,7 +121,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group config
* @since 1.0.0
*/
- def getConf(key: String, defaultValue: String): String = conf.getConf(key, defaultValue)
+ def getConf(key: String, defaultValue: String): String = conf.getConfString(key, defaultValue)
/**
* Return all the configuration properties that have been set (i.e. not the default).
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
index 305b306a79..e59fa6e162 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
@@ -44,8 +44,8 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
private val pair: Parser[LogicalPlan] =
(key ~ ("=".r ~> value).?).? ^^ {
- case None => SetCommand(None, output)
- case Some(k ~ v) => SetCommand(Some(k.trim -> v.map(_.trim)), output)
+ case None => SetCommand(None)
+ case Some(k ~ v) => SetCommand(Some(k.trim -> v.map(_.trim)))
}
def apply(input: String): LogicalPlan = parseAll(pair, input) match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index c9dfcea5d0..5e9951f248 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution
+import java.util.NoSuchElementException
+
import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
@@ -75,48 +77,92 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
* :: DeveloperApi ::
*/
@DeveloperApi
-case class SetCommand(
- kv: Option[(String, Option[String])],
- override val output: Seq[Attribute])
- extends RunnableCommand with Logging {
+case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableCommand with Logging {
+
+ private def keyValueOutput: Seq[Attribute] = {
+ val schema = StructType(
+ StructField("key", StringType, false) ::
+ StructField("value", StringType, false) :: Nil)
+ schema.toAttributes
+ }
- override def run(sqlContext: SQLContext): Seq[Row] = kv match {
+ private val (_output, runFunc): (Seq[Attribute], SQLContext => Seq[Row]) = kv match {
// Configures the deprecated "mapred.reduce.tasks" property.
case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) =>
- logWarning(
- s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
- s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
- if (value.toInt < 1) {
- val msg = s"Setting negative ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} for automatically " +
- "determining the number of reducers is not supported."
- throw new IllegalArgumentException(msg)
- } else {
- sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS, value)
- Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$value"))
+ val runFunc = (sqlContext: SQLContext) => {
+ logWarning(
+ s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
+ s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} instead.")
+ if (value.toInt < 1) {
+ val msg =
+ s"Setting negative ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} for automatically " +
+ "determining the number of reducers is not supported."
+ throw new IllegalArgumentException(msg)
+ } else {
+ sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS.key, value)
+ Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value))
+ }
}
+ (keyValueOutput, runFunc)
// Configures a single property.
case Some((key, Some(value))) =>
- sqlContext.setConf(key, value)
- Seq(Row(s"$key=$value"))
+ val runFunc = (sqlContext: SQLContext) => {
+ sqlContext.setConf(key, value)
+ Seq(Row(key, value))
+ }
+ (keyValueOutput, runFunc)
- // Queries all key-value pairs that are set in the SQLConf of the sqlContext.
- // Notice that different from Hive, here "SET -v" is an alias of "SET".
// (In Hive, "SET" returns all changed properties while "SET -v" returns all properties.)
- case Some(("-v", None)) | None =>
- sqlContext.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq
+ // Queries all key-value pairs that are set in the SQLConf of the sqlContext.
+ case None =>
+ val runFunc = (sqlContext: SQLContext) => {
+ sqlContext.getAllConfs.map { case (k, v) => Row(k, v) }.toSeq
+ }
+ (keyValueOutput, runFunc)
+
+ // Queries all properties along with their default values and docs that are defined in the
+ // SQLConf of the sqlContext.
+ case Some(("-v", None)) =>
+ val runFunc = (sqlContext: SQLContext) => {
+ sqlContext.conf.getAllDefinedConfs.map { case (key, defaultValue, doc) =>
+ Row(key, defaultValue, doc)
+ }
+ }
+ val schema = StructType(
+ StructField("key", StringType, false) ::
+ StructField("default", StringType, false) ::
+ StructField("meaning", StringType, false) :: Nil)
+ (schema.toAttributes, runFunc)
// Queries the deprecated "mapred.reduce.tasks" property.
case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) =>
- logWarning(
- s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
- s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
- Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.conf.numShufflePartitions}"))
+ val runFunc = (sqlContext: SQLContext) => {
+ logWarning(
+ s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
+ s"showing ${SQLConf.SHUFFLE_PARTITIONS.key} instead.")
+ Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, sqlContext.conf.numShufflePartitions.toString))
+ }
+ (keyValueOutput, runFunc)
// Queries a single property.
case Some((key, None)) =>
- Seq(Row(s"$key=${sqlContext.getConf(key, "<undefined>")}"))
+ val runFunc = (sqlContext: SQLContext) => {
+ val value =
+ try {
+ sqlContext.getConf(key)
+ } catch {
+ case _: NoSuchElementException => "<undefined>"
+ }
+ Seq(Row(key, value))
+ }
+ (keyValueOutput, runFunc)
}
+
+ override val output: Seq[Attribute] = _output
+
+ override def run(sqlContext: SQLContext): Seq[Row] = runFunc(sqlContext)
+
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 3ee4033bae..2964edac1a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -48,7 +48,7 @@ package object debug {
*/
implicit class DebugSQLContext(sqlContext: SQLContext) {
def debug(): Unit = {
- sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false")
+ sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, false)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 39360e1331..65ecad9878 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -113,12 +113,12 @@ private[sql] case class ParquetTableScan(
.foreach(ParquetInputFormat.setFilterPredicate(conf, _))
// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
- conf.set(
- SQLConf.PARQUET_CACHE_METADATA,
- sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true"))
+ conf.setBoolean(
+ SQLConf.PARQUET_CACHE_METADATA.key,
+ sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, true))
// Use task side metadata in parquet
- conf.setBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true);
+ conf.setBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
val baseRDD =
new org.apache.spark.rdd.NewHadoopRDD(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index bba6f1ec96..4c702c3b0d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -220,7 +220,7 @@ private[sql] class ParquetRelation2(
}
conf.setClass(
- SQLConf.OUTPUT_COMMITTER_CLASS,
+ SQLConf.OUTPUT_COMMITTER_CLASS.key,
committerClass,
classOf[ParquetOutputCommitter])
@@ -259,7 +259,7 @@ private[sql] class ParquetRelation2(
filters: Array[Filter],
inputFiles: Array[FileStatus],
broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
- val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
+ val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
// Create the function to set variable Parquet confs at both driver and executor side.
val initLocalJobFuncOpt =
@@ -498,7 +498,7 @@ private[sql] object ParquetRelation2 extends Logging {
ParquetTypesConverter.convertToString(dataSchema.toAttributes))
// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
- conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
+ conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
}
/** This closure sets input paths at the driver side. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 3dbe6faabf..d39a20b388 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -323,7 +323,7 @@ private[sql] abstract class BaseWriterContainer(
private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
val committerClass = context.getConfiguration.getClass(
- SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])
+ SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
Option(committerClass).map { clazz =>
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 356a6100d2..9fa394525d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -38,7 +38,7 @@ class LocalSQLContext
protected[sql] class SQLSession extends super.SQLSession {
protected[sql] override lazy val conf: SQLConf = new SQLConf {
/** Fewer partitions to speed up testing. */
- override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
+ override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, 5)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 790b405c72..b26d3ab253 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -68,12 +68,12 @@ class DataFrameAggregateSuite extends QueryTest {
Seq(Row(1, 3), Row(2, 3), Row(3, 3))
)
- ctx.conf.setConf("spark.sql.retainGroupColumns", "false")
+ ctx.conf.setConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS, false)
checkAnswer(
testData2.groupBy("a").agg(sum($"b")),
Seq(Row(3), Row(3), Row(3))
)
- ctx.conf.setConf("spark.sql.retainGroupColumns", "true")
+ ctx.conf.setConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS, true)
}
test("agg without groups") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index fa98e23e3d..ba1d020f22 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -33,7 +33,7 @@ class DataFrameSuite extends QueryTest {
test("analysis error should be eagerly reported") {
val oldSetting = ctx.conf.dataFrameEagerAnalysis
// Eager analysis.
- ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "true")
+ ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, true)
intercept[Exception] { testData.select('nonExistentName) }
intercept[Exception] {
@@ -47,11 +47,11 @@ class DataFrameSuite extends QueryTest {
}
// No more eager analysis once the flag is turned off
- ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false")
+ ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, false)
testData.select('nonExistentName)
// Set the flag back to original value before this test.
- ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString)
+ ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting)
}
test("dataframe toString") {
@@ -70,7 +70,7 @@ class DataFrameSuite extends QueryTest {
test("invalid plan toString, debug mode") {
val oldSetting = ctx.conf.dataFrameEagerAnalysis
- ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "true")
+ ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, true)
// Turn on debug mode so we can see invalid query plans.
import org.apache.spark.sql.execution.debug._
@@ -83,7 +83,7 @@ class DataFrameSuite extends QueryTest {
badPlan.toString)
// Set the flag back to original value before this test.
- ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString)
+ ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting)
}
test("access complex data") {
@@ -556,13 +556,13 @@ class DataFrameSuite extends QueryTest {
test("SPARK-6899") {
val originalValue = ctx.conf.codegenEnabled
- ctx.setConf(SQLConf.CODEGEN_ENABLED, "true")
+ ctx.setConf(SQLConf.CODEGEN_ENABLED, true)
try{
checkAnswer(
decimalData.agg(avg('a)),
Row(new java.math.BigDecimal(2.0)))
} finally {
- ctx.setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString)
+ ctx.setConf(SQLConf.CODEGEN_ENABLED, originalValue)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index ffd26c4f5a..20390a5544 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -95,14 +95,14 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
classOf[BroadcastNestedLoopJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
try {
- ctx.conf.setConf("spark.sql.planner.sortMergeJoin", "true")
+ ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, true)
Seq(
("SELECT * FROM testData JOIN testData2 ON key = a", classOf[SortMergeJoin]),
("SELECT * FROM testData JOIN testData2 ON key = a and key = 2", classOf[SortMergeJoin]),
("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
} finally {
- ctx.conf.setConf("spark.sql.planner.sortMergeJoin", SORTMERGEJOIN_ENABLED.toString)
+ ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, SORTMERGEJOIN_ENABLED)
}
}
@@ -118,7 +118,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
classOf[BroadcastHashJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
try {
- ctx.conf.setConf("spark.sql.planner.sortMergeJoin", "true")
+ ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, true)
Seq(
("SELECT * FROM testData join testData2 ON key = a", classOf[BroadcastHashJoin]),
("SELECT * FROM testData join testData2 ON key = a and key = 2",
@@ -127,7 +127,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
classOf[BroadcastHashJoin])
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
} finally {
- ctx.conf.setConf("spark.sql.planner.sortMergeJoin", SORTMERGEJOIN_ENABLED.toString)
+ ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, SORTMERGEJOIN_ENABLED)
}
ctx.sql("UNCACHE TABLE testData")
@@ -416,7 +416,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
ctx.sql("CACHE TABLE testData")
val tmp = ctx.conf.autoBroadcastJoinThreshold
- ctx.sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=1000000000")
+ ctx.sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=1000000000")
Seq(
("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
classOf[BroadcastLeftSemiJoinHash])
@@ -424,7 +424,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
case (query, joinClass) => assertJoin(query, joinClass)
}
- ctx.sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1")
+ ctx.sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1")
Seq(
("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[LeftSemiJoinHash])
@@ -432,7 +432,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
case (query, joinClass) => assertJoin(query, joinClass)
}
- ctx.setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, tmp.toString)
+ ctx.setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, tmp)
ctx.sql("UNCACHE TABLE testData")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala
new file mode 100644
index 0000000000..2e33777f14
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.SQLConf._
+
+class SQLConfEntrySuite extends SparkFunSuite {
+
+ val conf = new SQLConf
+
+ test("intConf") {
+ val key = "spark.sql.SQLConfEntrySuite.int"
+ val confEntry = SQLConfEntry.intConf(key)
+ assert(conf.getConf(confEntry, 5) === 5)
+
+ conf.setConf(confEntry, 10)
+ assert(conf.getConf(confEntry, 5) === 10)
+
+ conf.setConfString(key, "20")
+ assert(conf.getConfString(key, "5") === "20")
+ assert(conf.getConfString(key) === "20")
+ assert(conf.getConf(confEntry, 5) === 20)
+
+ val e = intercept[IllegalArgumentException] {
+ conf.setConfString(key, "abc")
+ }
+ assert(e.getMessage === s"$key should be int, but was abc")
+ }
+
+ test("longConf") {
+ val key = "spark.sql.SQLConfEntrySuite.long"
+ val confEntry = SQLConfEntry.longConf(key)
+ assert(conf.getConf(confEntry, 5L) === 5L)
+
+ conf.setConf(confEntry, 10L)
+ assert(conf.getConf(confEntry, 5L) === 10L)
+
+ conf.setConfString(key, "20")
+ assert(conf.getConfString(key, "5") === "20")
+ assert(conf.getConfString(key) === "20")
+ assert(conf.getConf(confEntry, 5L) === 20L)
+
+ val e = intercept[IllegalArgumentException] {
+ conf.setConfString(key, "abc")
+ }
+ assert(e.getMessage === s"$key should be long, but was abc")
+ }
+
+ test("booleanConf") {
+ val key = "spark.sql.SQLConfEntrySuite.boolean"
+ val confEntry = SQLConfEntry.booleanConf(key)
+ assert(conf.getConf(confEntry, false) === false)
+
+ conf.setConf(confEntry, true)
+ assert(conf.getConf(confEntry, false) === true)
+
+ conf.setConfString(key, "true")
+ assert(conf.getConfString(key, "false") === "true")
+ assert(conf.getConfString(key) === "true")
+ assert(conf.getConf(confEntry, false) === true)
+
+ val e = intercept[IllegalArgumentException] {
+ conf.setConfString(key, "abc")
+ }
+ assert(e.getMessage === s"$key should be boolean, but was abc")
+ }
+
+ test("doubleConf") {
+ val key = "spark.sql.SQLConfEntrySuite.double"
+ val confEntry = SQLConfEntry.doubleConf(key)
+ assert(conf.getConf(confEntry, 5.0) === 5.0)
+
+ conf.setConf(confEntry, 10.0)
+ assert(conf.getConf(confEntry, 5.0) === 10.0)
+
+ conf.setConfString(key, "20.0")
+ assert(conf.getConfString(key, "5.0") === "20.0")
+ assert(conf.getConfString(key) === "20.0")
+ assert(conf.getConf(confEntry, 5.0) === 20.0)
+
+ val e = intercept[IllegalArgumentException] {
+ conf.setConfString(key, "abc")
+ }
+ assert(e.getMessage === s"$key should be double, but was abc")
+ }
+
+ test("stringConf") {
+ val key = "spark.sql.SQLConfEntrySuite.string"
+ val confEntry = SQLConfEntry.stringConf(key)
+ assert(conf.getConf(confEntry, "abc") === "abc")
+
+ conf.setConf(confEntry, "abcd")
+ assert(conf.getConf(confEntry, "abc") === "abcd")
+
+ conf.setConfString(key, "abcde")
+ assert(conf.getConfString(key, "abc") === "abcde")
+ assert(conf.getConfString(key) === "abcde")
+ assert(conf.getConf(confEntry, "abc") === "abcde")
+ }
+
+ test("enumConf") {
+ val key = "spark.sql.SQLConfEntrySuite.enum"
+ val confEntry = SQLConfEntry.enumConf(key, v => v, Set("a", "b", "c"), defaultValue = Some("a"))
+ assert(conf.getConf(confEntry) === "a")
+
+ conf.setConf(confEntry, "b")
+ assert(conf.getConf(confEntry) === "b")
+
+ conf.setConfString(key, "c")
+ assert(conf.getConfString(key, "a") === "c")
+ assert(conf.getConfString(key) === "c")
+ assert(conf.getConf(confEntry) === "c")
+
+ val e = intercept[IllegalArgumentException] {
+ conf.setConfString(key, "d")
+ }
+ assert(e.getMessage === s"The value of $key should be one of a, b, c, but was d")
+ }
+
+ test("stringSeqConf") {
+ val key = "spark.sql.SQLConfEntrySuite.stringSeq"
+ val confEntry = SQLConfEntry.stringSeqConf("spark.sql.SQLConfEntrySuite.stringSeq",
+ defaultValue = Some(Nil))
+ assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c"))
+
+ conf.setConf(confEntry, Seq("a", "b", "c", "d"))
+ assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", "d"))
+
+ conf.setConfString(key, "a,b,c,d,e")
+ assert(conf.getConfString(key, "a,b,c") === "a,b,c,d,e")
+ assert(conf.getConfString(key) === "a,b,c,d,e")
+ assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", "d", "e"))
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
index 76d0dd1744..75791e9d53 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
@@ -75,6 +75,14 @@ class SQLConfSuite extends QueryTest {
test("deprecated property") {
ctx.conf.clear()
ctx.sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10")
- assert(ctx.getConf(SQLConf.SHUFFLE_PARTITIONS) === "10")
+ assert(ctx.conf.numShufflePartitions === 10)
+ }
+
+ test("invalid conf value") {
+ ctx.conf.clear()
+ val e = intercept[IllegalArgumentException] {
+ ctx.sql(s"set ${SQLConf.CASE_SENSITIVE.key}=10")
+ }
+ assert(e.getMessage === s"${SQLConf.CASE_SENSITIVE.key} should be boolean, but was 10")
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 30db840166..82f3fdb48b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -190,7 +190,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
test("aggregation with codegen") {
val originalValue = sqlContext.conf.codegenEnabled
- sqlContext.setConf(SQLConf.CODEGEN_ENABLED, "true")
+ sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true)
// Prepare a table that we can group some rows.
sqlContext.table("testData")
.unionAll(sqlContext.table("testData"))
@@ -287,7 +287,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
Row(0, null, 0) :: Nil)
} finally {
sqlContext.dropTempTable("testData3x")
- sqlContext.setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString)
+ sqlContext.setConf(SQLConf.CODEGEN_ENABLED, originalValue)
}
}
@@ -480,41 +480,41 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
test("sorting") {
val before = sqlContext.conf.externalSortEnabled
- sqlContext.setConf(SQLConf.EXTERNAL_SORT, "false")
+ sqlContext.setConf(SQLConf.EXTERNAL_SORT, false)
sortTest()
- sqlContext.setConf(SQLConf.EXTERNAL_SORT, before.toString)
+ sqlContext.setConf(SQLConf.EXTERNAL_SORT, before)
}
test("external sorting") {
val before = sqlContext.conf.externalSortEnabled
- sqlContext.setConf(SQLConf.EXTERNAL_SORT, "true")
+ sqlContext.setConf(SQLConf.EXTERNAL_SORT, true)
sortTest()
- sqlContext.setConf(SQLConf.EXTERNAL_SORT, before.toString)
+ sqlContext.setConf(SQLConf.EXTERNAL_SORT, before)
}
test("SPARK-6927 sorting with codegen on") {
val externalbefore = sqlContext.conf.externalSortEnabled
val codegenbefore = sqlContext.conf.codegenEnabled
- sqlContext.setConf(SQLConf.EXTERNAL_SORT, "false")
- sqlContext.setConf(SQLConf.CODEGEN_ENABLED, "true")
+ sqlContext.setConf(SQLConf.EXTERNAL_SORT, false)
+ sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true)
try{
sortTest()
} finally {
- sqlContext.setConf(SQLConf.EXTERNAL_SORT, externalbefore.toString)
- sqlContext.setConf(SQLConf.CODEGEN_ENABLED, codegenbefore.toString)
+ sqlContext.setConf(SQLConf.EXTERNAL_SORT, externalbefore)
+ sqlContext.setConf(SQLConf.CODEGEN_ENABLED, codegenbefore)
}
}
test("SPARK-6927 external sorting with codegen on") {
val externalbefore = sqlContext.conf.externalSortEnabled
val codegenbefore = sqlContext.conf.codegenEnabled
- sqlContext.setConf(SQLConf.CODEGEN_ENABLED, "true")
- sqlContext.setConf(SQLConf.EXTERNAL_SORT, "true")
+ sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true)
+ sqlContext.setConf(SQLConf.EXTERNAL_SORT, true)
try {
sortTest()
} finally {
- sqlContext.setConf(SQLConf.EXTERNAL_SORT, externalbefore.toString)
- sqlContext.setConf(SQLConf.CODEGEN_ENABLED, codegenbefore.toString)
+ sqlContext.setConf(SQLConf.EXTERNAL_SORT, externalbefore)
+ sqlContext.setConf(SQLConf.CODEGEN_ENABLED, codegenbefore)
}
}
@@ -908,25 +908,25 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
sql(s"SET $testKey=$testVal")
checkAnswer(
sql("SET"),
- Row(s"$testKey=$testVal")
+ Row(testKey, testVal)
)
sql(s"SET ${testKey + testKey}=${testVal + testVal}")
checkAnswer(
sql("set"),
Seq(
- Row(s"$testKey=$testVal"),
- Row(s"${testKey + testKey}=${testVal + testVal}"))
+ Row(testKey, testVal),
+ Row(testKey + testKey, testVal + testVal))
)
// "set key"
checkAnswer(
sql(s"SET $testKey"),
- Row(s"$testKey=$testVal")
+ Row(testKey, testVal)
)
checkAnswer(
sql(s"SET $nonexistentKey"),
- Row(s"$nonexistentKey=<undefined>")
+ Row(nonexistentKey, "<undefined>")
)
sqlContext.conf.clear()
}
@@ -1340,12 +1340,12 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
}
test("SPARK-4699 case sensitivity SQL query") {
- sqlContext.setConf(SQLConf.CASE_SENSITIVE, "false")
+ sqlContext.setConf(SQLConf.CASE_SENSITIVE, false)
val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil
val rdd = sqlContext.sparkContext.parallelize((0 to 1).map(i => data(i)))
rdd.toDF().registerTempTable("testTable1")
checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1"))
- sqlContext.setConf(SQLConf.CASE_SENSITIVE, "true")
+ sqlContext.setConf(SQLConf.CASE_SENSITIVE, true)
}
test("SPARK-6145: ORDER BY test for nested fields") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
index 6545c6b314..2c0879927a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
@@ -32,7 +32,7 @@ class PartitionBatchPruningSuite extends SparkFunSuite with BeforeAndAfterAll wi
override protected def beforeAll(): Unit = {
// Make a table with 5 partitions, 2 batches per partition, 10 elements per batch
- ctx.setConf(SQLConf.COLUMN_BATCH_SIZE, "10")
+ ctx.setConf(SQLConf.COLUMN_BATCH_SIZE, 10)
val pruningData = ctx.sparkContext.makeRDD((1 to 100).map { key =>
val string = if (((key - 1) / 10) % 2 == 0) null else key.toString
@@ -41,14 +41,14 @@ class PartitionBatchPruningSuite extends SparkFunSuite with BeforeAndAfterAll wi
pruningData.registerTempTable("pruningData")
// Enable in-memory partition pruning
- ctx.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, "true")
+ ctx.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true)
// Enable in-memory table scan accumulators
ctx.setConf("spark.sql.inMemoryTableScanStatistics.enable", "true")
}
override protected def afterAll(): Unit = {
- ctx.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize.toString)
- ctx.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning.toString)
+ ctx.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
+ ctx.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
}
before {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 3e27f58a92..5854ab48db 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -63,7 +63,7 @@ class PlannerSuite extends SparkFunSuite {
test("sizeInBytes estimation of limit operator for broadcast hash join optimization") {
def checkPlan(fieldTypes: Seq[DataType], newThreshold: Int): Unit = {
- setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, newThreshold.toString)
+ setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, newThreshold)
val fields = fieldTypes.zipWithIndex.map {
case (dataType, index) => StructField(s"c${index}", dataType, true)
} :+ StructField("key", IntegerType, true)
@@ -119,12 +119,12 @@ class PlannerSuite extends SparkFunSuite {
checkPlan(complexTypes, newThreshold = 901617)
- setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold.toString)
+ setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold)
}
test("InMemoryRelation statistics propagation") {
val origThreshold = conf.autoBroadcastJoinThreshold
- setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 81920.toString)
+ setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 81920)
testData.limit(3).registerTempTable("tiny")
sql("CACHE TABLE tiny")
@@ -139,6 +139,6 @@ class PlannerSuite extends SparkFunSuite {
assert(broadcastHashJoins.size === 1, "Should use broadcast hash join")
assert(shuffledHashJoins.isEmpty, "Should not use shuffled hash join")
- setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold.toString)
+ setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index fca24364fe..945d437503 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -1077,14 +1077,14 @@ class JsonSuite extends QueryTest with TestJsonData {
}
test("SPARK-7565 MapType in JsonRDD") {
- val useStreaming = ctx.getConf(SQLConf.USE_JACKSON_STREAMING_API, "true")
+ val useStreaming = ctx.conf.useJacksonStreamingAPI
val oldColumnNameOfCorruptRecord = ctx.conf.columnNameOfCorruptRecord
ctx.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed")
val schemaWithSimpleMap = StructType(
StructField("map", MapType(StringType, IntegerType, true), false) :: Nil)
try{
- for (useStreaming <- List("true", "false")) {
+ for (useStreaming <- List(true, false)) {
ctx.setConf(SQLConf.USE_JACKSON_STREAMING_API, useStreaming)
val temp = Utils.createTempDir().getPath
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index fa5d4eca05..a2763c78b6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -51,7 +51,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
expected: Seq[Row]): Unit = {
val output = predicate.collect { case a: Attribute => a }.distinct
- withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
val query = df
.select(output.map(e => Column(e)): _*)
.where(Column(predicate))
@@ -314,17 +314,17 @@ class ParquetDataSourceOnFilterSuite extends ParquetFilterSuiteBase with BeforeA
lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
override protected def beforeAll(): Unit = {
- sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+ sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
}
override protected def afterAll(): Unit = {
- sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+ sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
}
test("SPARK-6554: don't push down predicates which reference partition columns") {
import sqlContext.implicits._
- withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/part=1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
@@ -343,17 +343,17 @@ class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with Before
lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
override protected def beforeAll(): Unit = {
- sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+ sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
}
override protected def afterAll(): Unit = {
- sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+ sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
}
test("SPARK-6742: don't push down predicates which reference partition columns") {
import sqlContext.implicits._
- withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/part=1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index fc827bc4ca..284d99d493 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -94,8 +94,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
val data = (1 to 4).map(i => Tuple1(i.toString))
// Property spark.sql.parquet.binaryAsString shouldn't affect Parquet files written by Spark SQL
// as we store Spark SQL schema in the extra metadata.
- withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> "false")(checkParquetFile(data))
- withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> "true")(checkParquetFile(data))
+ withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "false")(checkParquetFile(data))
+ withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true")(checkParquetFile(data))
}
test("fixed-length decimals") {
@@ -231,7 +231,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
val data = (0 until 10).map(i => (i, i.toString))
def checkCompressionCodec(codec: CompressionCodecName): Unit = {
- withSQLConf(SQLConf.PARQUET_COMPRESSION -> codec.name()) {
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
withParquetFile(data) { path =>
assertResult(sqlContext.conf.parquetCompressionCodec.toUpperCase) {
compressionCodecFor(path)
@@ -408,7 +408,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
val clonedConf = new Configuration(configuration)
configuration.set(
- SQLConf.OUTPUT_COMMITTER_CLASS, classOf[ParquetOutputCommitter].getCanonicalName)
+ SQLConf.OUTPUT_COMMITTER_CLASS.key, classOf[ParquetOutputCommitter].getCanonicalName)
configuration.set(
"spark.sql.parquet.output.committer.class",
@@ -440,11 +440,11 @@ class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterA
private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
override protected def beforeAll(): Unit = {
- sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+ sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
}
override protected def afterAll(): Unit = {
- sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+ sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API.key, originalConf.toString)
}
test("SPARK-6330 regression test") {
@@ -464,10 +464,10 @@ class ParquetDataSourceOffIOSuite extends ParquetIOSuiteBase with BeforeAndAfter
private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
override protected def beforeAll(): Unit = {
- sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+ sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
}
override protected def afterAll(): Unit = {
- sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+ sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index be3b34d5b9..fafad67dde 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -128,11 +128,11 @@ class ParquetDataSourceOnQuerySuite extends ParquetQuerySuiteBase with BeforeAnd
private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
override protected def beforeAll(): Unit = {
- sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+ sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
}
override protected def afterAll(): Unit = {
- sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+ sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
}
}
@@ -140,10 +140,10 @@ class ParquetDataSourceOffQuerySuite extends ParquetQuerySuiteBase with BeforeAn
private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
override protected def beforeAll(): Unit = {
- sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+ sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
}
override protected def afterAll(): Unit = {
- sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+ sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
index 3f77960d09..00cc7d5ea5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
@@ -27,7 +27,7 @@ abstract class DataSourceTest extends QueryTest with BeforeAndAfter {
// We want to test some edge cases.
protected implicit lazy val caseInsensitiveContext = {
val ctx = new SQLContext(TestSQLContext.sparkContext)
- ctx.setConf(SQLConf.CASE_SENSITIVE, "false")
+ ctx.setConf(SQLConf.CASE_SENSITIVE, false)
ctx
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index ac4a00a6f3..fa01823e94 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -37,11 +37,11 @@ trait SQLTestUtils {
*/
protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
val (keys, values) = pairs.unzip
- val currentValues = keys.map(key => Try(sqlContext.conf.getConf(key)).toOption)
- (keys, values).zipped.foreach(sqlContext.conf.setConf)
+ val currentValues = keys.map(key => Try(sqlContext.conf.getConfString(key)).toOption)
+ (keys, values).zipped.foreach(sqlContext.conf.setConfString)
try f finally {
keys.zip(currentValues).foreach {
- case (key, Some(value)) => sqlContext.conf.setConf(key, value)
+ case (key, Some(value)) => sqlContext.conf.setConfString(key, value)
case (key, None) => sqlContext.conf.unsetConf(key)
}
}