aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-09-15 01:33:56 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-15 01:33:56 +0800
commita79838bdeeb12cec4d50da3948bd8a33777e53a6 (patch)
treed82fc307ba42ef212edc1d50cc945d9b3e6b661f /sql
parent6d06ff6f7e2dd72ba8fe96cd875e83eda6ebb2a9 (diff)
downloadspark-a79838bdeeb12cec4d50da3948bd8a33777e53a6.tar.gz
spark-a79838bdeeb12cec4d50da3948bd8a33777e53a6.tar.bz2
spark-a79838bdeeb12cec4d50da3948bd8a33777e53a6.zip
[MINOR][SQL] Add missing functions for some options in SQLConf and use them where applicable
## What changes were proposed in this pull request? I first thought they are missing because they are kind of hidden options but it seems they are just missing. For example, `spark.sql.parquet.mergeSchema` is documented in [sql-programming-guide.md](https://github.com/apache/spark/blob/master/docs/sql-programming-guide.md) but this function is missing whereas many options such as `spark.sql.join.preferSortMergeJoin` are not documented but have its own function individually. So, this PR suggests making them consistent by adding the missing functions for some options in `SQLConf` and use them where applicable, in order to make them more readable. ## How was this patch tested? Existing tests should cover this. Author: hyukjinkwon <gurwls223@gmail.com> Closes #14678 from HyukjinKwon/sqlconf-cleanup.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala42
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala4
12 files changed, 49 insertions, 31 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index 53d732403f..6c3fe07709 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -313,7 +313,7 @@ class RelationalGroupedDataset protected[sql](
*/
def pivot(pivotColumn: String): RelationalGroupedDataset = {
// This is to prevent unintended OOM errors when the number of distinct values is large
- val maxValues = df.sparkSession.conf.get(SQLConf.DATAFRAME_PIVOT_MAX_VALUES)
+ val maxValues = df.sparkSession.sessionState.conf.dataFramePivotMaxValues
// Get the distinct values of the column and sort them so its consistent
val values = df.select(pivotColumn)
.distinct()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index d4845637be..383b3a233f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -55,7 +55,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
}
def assertSupported(): Unit = {
- if (sparkSession.sessionState.conf.getConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
+ if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
UnsupportedOperationChecker.checkForBatch(analyzed)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 825c01365d..93154bd2ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -231,7 +231,7 @@ case class DataSource(
}
}
- val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE)
+ val isSchemaInferenceEnabled = sparkSession.sessionState.conf.streamingSchemaInference
val isTextSource = providingClass == classOf[text.TextFileFormat]
// If the schema inference is disabled, only text sources require schema to be specified
if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 02ce7fab64..99ca3df673 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -131,7 +131,7 @@ case class InsertIntoHadoopFsRelationCommand(
dataColumns = dataColumns,
inputSchema = query.output,
PartitioningUtils.DEFAULT_PARTITION_NAME,
- sparkSession.conf.get(SQLConf.PARTITION_MAX_FILES),
+ sparkSession.sessionState.conf.partitionMaxFiles,
isAppend)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index cef9d4d9c7..d2d5b56c82 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -126,7 +126,7 @@ abstract class PartitioningAwareFileCatalog(
PartitioningUtils.parsePartitions(
leafDirs,
PartitioningUtils.DEFAULT_PARTITION_NAME,
- typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled(),
+ typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
basePaths = basePaths)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 9208c82179..e7c3545630 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -151,7 +151,7 @@ class ParquetFileFormat
// Should we merge schemas from all Parquet part-files?
val shouldMergeSchemas = parquetOptions.mergeSchema
- val mergeRespectSummaries = sparkSession.conf.get(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES)
+ val mergeRespectSummaries = sparkSession.sessionState.conf.isParquetSchemaRespectSummaries
val filesByType = splitFiles(files)
@@ -308,14 +308,14 @@ class ParquetFileFormat
// Sets flags for `CatalystSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
- sparkSession.conf.get(SQLConf.PARQUET_BINARY_AS_STRING))
+ sparkSession.sessionState.conf.isParquetBinaryAsString)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
- sparkSession.conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
+ sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
// Try to push down filters when filter push-down is enabled.
val pushed =
- if (sparkSession.conf.get(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) {
+ if (sparkSession.sessionState.conf.parquetFilterPushDown) {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index 3eec582714..615731889d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -52,7 +52,7 @@ private[parquet] class ParquetOptions(
val mergeSchema: Boolean = parameters
.get(MERGE_SCHEMA)
.map(_.toBoolean)
- .getOrElse(sqlConf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
+ .getOrElse(sqlConf.isParquetSchemaMergingEnabled)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index 7520163522..6f9f7c18c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -93,11 +93,11 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
* a live lock may happen if the compaction happens too frequently: one processing keeps deleting
* old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it.
*/
- private val fileCleanupDelayMs = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY)
+ private val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay
- private val isDeletingExpiredLog = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_DELETION)
+ private val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion
- private val compactInterval = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL)
+ private val compactInterval = sparkSession.sessionState.conf.fileSinkLogCompatInterval
require(compactInterval > 0,
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
"to a positive value.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 5e1e5eeb50..a1aae61107 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -58,7 +58,7 @@ class StreamExecution(
import org.apache.spark.sql.streaming.StreamingQueryListener._
- private val pollingDelayMs = sparkSession.conf.get(SQLConf.STREAMING_POLLING_DELAY)
+ private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
/**
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index e55f63a6c8..de72f1cf27 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -24,11 +24,9 @@ private[streaming] class StateStoreConf(@transient private val conf: SQLConf) ex
def this() = this(new SQLConf)
- import SQLConf._
+ val minDeltasForSnapshot = conf.stateStoreMinDeltasForSnapshot
- val minDeltasForSnapshot = conf.getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
-
- val minVersionsToRetain = conf.getConf(STATE_STORE_MIN_VERSIONS_TO_RETAIN)
+ val minVersionsToRetain = conf.stateStoreMinVersionsToRetain
}
private[streaming] object StateStoreConf {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1d6ca5a965..428032b1fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -338,11 +338,6 @@ object SQLConf {
.intConf
.createWithDefault(4000)
- val PARTITION_DISCOVERY_ENABLED = SQLConfigBuilder("spark.sql.sources.partitionDiscovery.enabled")
- .doc("When true, automatically discover data partitions.")
- .booleanConf
- .createWithDefault(true)
-
val PARTITION_COLUMN_TYPE_INFERENCE =
SQLConfigBuilder("spark.sql.sources.partitionColumnTypeInference.enabled")
.doc("When true, automatically infer the data types for partitioned columns.")
@@ -391,8 +386,10 @@ object SQLConf {
val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold")
- .doc("The degree of parallelism for schema merging and partition discovery of " +
- "Parquet data sources.")
+ .doc("The maximum number of files allowed for listing files at driver side. If the number " +
+ "of detected files exceeds this value during partition discovery, it tries to list the " +
+ "files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and " +
+ "LibSVM data sources.")
.intConf
.createWithDefault(32)
@@ -592,8 +589,24 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)
+ def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
+
+ def stateStoreMinVersionsToRetain: Int = getConf(STATE_STORE_MIN_VERSIONS_TO_RETAIN)
+
def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)
+ def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
+
+ def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
+
+ def fileSinkLogCompatInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL)
+
+ def fileSinkLogCleanupDelay: Long = getConf(FILE_SINK_LOG_CLEANUP_DELAY)
+
+ def streamingSchemaInference: Boolean = getConf(STREAMING_SCHEMA_INFERENCE)
+
+ def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY)
+
def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES)
@@ -657,6 +670,12 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES, Long.MaxValue)
+ def isParquetSchemaMergingEnabled: Boolean = getConf(PARQUET_SCHEMA_MERGING_ENABLED)
+
+ def isParquetSchemaRespectSummaries: Boolean = getConf(PARQUET_SCHEMA_RESPECT_SUMMARIES)
+
+ def parquetOutputCommitterClass: String = getConf(PARQUET_OUTPUT_COMMITTER_CLASS)
+
def isParquetBinaryAsString: Boolean = getConf(PARQUET_BINARY_AS_STRING)
def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)
@@ -673,12 +692,11 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def convertCTAS: Boolean = getConf(CONVERT_CTAS)
- def partitionDiscoveryEnabled(): Boolean =
- getConf(SQLConf.PARTITION_DISCOVERY_ENABLED)
-
- def partitionColumnTypeInferenceEnabled(): Boolean =
+ def partitionColumnTypeInferenceEnabled: Boolean =
getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE)
+ def partitionMaxFiles: Int = getConf(PARTITION_MAX_FILES)
+
def parallelPartitionDiscoveryThreshold: Int =
getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)
@@ -695,6 +713,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS)
+ def dataFramePivotMaxValues: Int = getConf(DATAFRAME_PIVOT_MAX_VALUES)
+
override def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)
def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index bae7f56a23..bba7bc753e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -204,7 +204,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
new Path(userSpecified).toUri.toString
}.orElse {
- df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location =>
+ df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
new Path(location, name).toUri.toString
}
}.getOrElse {
@@ -232,7 +232,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
val analyzedPlan = df.queryExecution.analyzed
df.queryExecution.assertAnalyzed()
- if (sparkSession.conf.get(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
+ if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
}