From a79838bdeeb12cec4d50da3948bd8a33777e53a6 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 15 Sep 2016 01:33:56 +0800 Subject: [MINOR][SQL] Add missing functions for some options in SQLConf and use them where applicable ## What changes were proposed in this pull request? I first thought they are missing because they are kind of hidden options but it seems they are just missing. For example, `spark.sql.parquet.mergeSchema` is documented in [sql-programming-guide.md](https://github.com/apache/spark/blob/master/docs/sql-programming-guide.md) but this function is missing whereas many options such as `spark.sql.join.preferSortMergeJoin` are not documented but have its own function individually. So, this PR suggests making them consistent by adding the missing functions for some options in `SQLConf` and use them where applicable, in order to make them more readable. ## How was this patch tested? Existing tests should cover this. Author: hyukjinkwon Closes #14678 from HyukjinKwon/sqlconf-cleanup. --- .../spark/sql/RelationalGroupedDataset.scala | 2 +- .../spark/sql/execution/QueryExecution.scala | 2 +- .../sql/execution/datasources/DataSource.scala | 2 +- .../InsertIntoHadoopFsRelationCommand.scala | 2 +- .../datasources/PartitioningAwareFileCatalog.scala | 2 +- .../datasources/parquet/ParquetFileFormat.scala | 8 ++--- .../datasources/parquet/ParquetOptions.scala | 2 +- .../execution/streaming/FileStreamSinkLog.scala | 6 ++-- .../sql/execution/streaming/StreamExecution.scala | 2 +- .../execution/streaming/state/StateStoreConf.scala | 6 ++-- .../org/apache/spark/sql/internal/SQLConf.scala | 42 ++++++++++++++++------ .../sql/streaming/StreamingQueryManager.scala | 4 +-- 12 files changed, 49 insertions(+), 31 deletions(-) (limited to 'sql/core') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index 53d732403f..6c3fe07709 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -313,7 +313,7 @@ class RelationalGroupedDataset protected[sql]( */ def pivot(pivotColumn: String): RelationalGroupedDataset = { // This is to prevent unintended OOM errors when the number of distinct values is large - val maxValues = df.sparkSession.conf.get(SQLConf.DATAFRAME_PIVOT_MAX_VALUES) + val maxValues = df.sparkSession.sessionState.conf.dataFramePivotMaxValues // Get the distinct values of the column and sort them so its consistent val values = df.select(pivotColumn) .distinct() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index d4845637be..383b3a233f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -55,7 +55,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { } def assertSupported(): Unit = { - if (sparkSession.sessionState.conf.getConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) { + if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) { UnsupportedOperationChecker.checkForBatch(analyzed) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 825c01365d..93154bd2ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -231,7 +231,7 @@ case class DataSource( } } - val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE) + val isSchemaInferenceEnabled = sparkSession.sessionState.conf.streamingSchemaInference val isTextSource = providingClass == classOf[text.TextFileFormat] // If the schema inference is disabled, only text sources require schema to be specified if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 02ce7fab64..99ca3df673 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -131,7 +131,7 @@ case class InsertIntoHadoopFsRelationCommand( dataColumns = dataColumns, inputSchema = query.output, PartitioningUtils.DEFAULT_PARTITION_NAME, - sparkSession.conf.get(SQLConf.PARTITION_MAX_FILES), + sparkSession.sessionState.conf.partitionMaxFiles, isAppend) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala index cef9d4d9c7..d2d5b56c82 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -126,7 +126,7 @@ abstract class PartitioningAwareFileCatalog( PartitioningUtils.parsePartitions( leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME, - typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled(), + typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, basePaths = basePaths) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 9208c82179..e7c3545630 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -151,7 +151,7 @@ class ParquetFileFormat // Should we merge schemas from all Parquet part-files? val shouldMergeSchemas = parquetOptions.mergeSchema - val mergeRespectSummaries = sparkSession.conf.get(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES) + val mergeRespectSummaries = sparkSession.sessionState.conf.isParquetSchemaRespectSummaries val filesByType = splitFiles(files) @@ -308,14 +308,14 @@ class ParquetFileFormat // Sets flags for `CatalystSchemaConverter` hadoopConf.setBoolean( SQLConf.PARQUET_BINARY_AS_STRING.key, - sparkSession.conf.get(SQLConf.PARQUET_BINARY_AS_STRING)) + sparkSession.sessionState.conf.isParquetBinaryAsString) hadoopConf.setBoolean( SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, - sparkSession.conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP)) + sparkSession.sessionState.conf.isParquetINT96AsTimestamp) // Try to push down filters when filter push-down is enabled. val pushed = - if (sparkSession.conf.get(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) { + if (sparkSession.sessionState.conf.parquetFilterPushDown) { filters // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index 3eec582714..615731889d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -52,7 +52,7 @@ private[parquet] class ParquetOptions( val mergeSchema: Boolean = parameters .get(MERGE_SCHEMA) .map(_.toBoolean) - .getOrElse(sqlConf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED)) + .getOrElse(sqlConf.isParquetSchemaMergingEnabled) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index 7520163522..6f9f7c18c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -93,11 +93,11 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String) * a live lock may happen if the compaction happens too frequently: one processing keeps deleting * old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it. */ - private val fileCleanupDelayMs = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY) + private val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay - private val isDeletingExpiredLog = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_DELETION) + private val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion - private val compactInterval = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL) + private val compactInterval = sparkSession.sessionState.conf.fileSinkLogCompatInterval require(compactInterval > 0, s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " + "to a positive value.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 5e1e5eeb50..a1aae61107 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -58,7 +58,7 @@ class StreamExecution( import org.apache.spark.sql.streaming.StreamingQueryListener._ - private val pollingDelayMs = sparkSession.conf.get(SQLConf.STREAMING_POLLING_DELAY) + private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay /** * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala index e55f63a6c8..de72f1cf27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala @@ -24,11 +24,9 @@ private[streaming] class StateStoreConf(@transient private val conf: SQLConf) ex def this() = this(new SQLConf) - import SQLConf._ + val minDeltasForSnapshot = conf.stateStoreMinDeltasForSnapshot - val minDeltasForSnapshot = conf.getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT) - - val minVersionsToRetain = conf.getConf(STATE_STORE_MIN_VERSIONS_TO_RETAIN) + val minVersionsToRetain = conf.stateStoreMinVersionsToRetain } private[streaming] object StateStoreConf { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1d6ca5a965..428032b1fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -338,11 +338,6 @@ object SQLConf { .intConf .createWithDefault(4000) - val PARTITION_DISCOVERY_ENABLED = SQLConfigBuilder("spark.sql.sources.partitionDiscovery.enabled") - .doc("When true, automatically discover data partitions.") - .booleanConf - .createWithDefault(true) - val PARTITION_COLUMN_TYPE_INFERENCE = SQLConfigBuilder("spark.sql.sources.partitionColumnTypeInference.enabled") .doc("When true, automatically infer the data types for partitioned columns.") @@ -391,8 +386,10 @@ object SQLConf { val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold") - .doc("The degree of parallelism for schema merging and partition discovery of " + - "Parquet data sources.") + .doc("The maximum number of files allowed for listing files at driver side. If the number " + + "of detected files exceeds this value during partition discovery, it tries to list the " + + "files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and " + + "LibSVM data sources.") .intConf .createWithDefault(32) @@ -592,8 +589,24 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD) + def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT) + + def stateStoreMinVersionsToRetain: Int = getConf(STATE_STORE_MIN_VERSIONS_TO_RETAIN) + def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION) + def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED) + + def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION) + + def fileSinkLogCompatInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL) + + def fileSinkLogCleanupDelay: Long = getConf(FILE_SINK_LOG_CLEANUP_DELAY) + + def streamingSchemaInference: Boolean = getConf(STREAMING_SCHEMA_INFERENCE) + + def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY) + def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES) def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES) @@ -657,6 +670,12 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES, Long.MaxValue) + def isParquetSchemaMergingEnabled: Boolean = getConf(PARQUET_SCHEMA_MERGING_ENABLED) + + def isParquetSchemaRespectSummaries: Boolean = getConf(PARQUET_SCHEMA_RESPECT_SUMMARIES) + + def parquetOutputCommitterClass: String = getConf(PARQUET_OUTPUT_COMMITTER_CLASS) + def isParquetBinaryAsString: Boolean = getConf(PARQUET_BINARY_AS_STRING) def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP) @@ -673,12 +692,11 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def convertCTAS: Boolean = getConf(CONVERT_CTAS) - def partitionDiscoveryEnabled(): Boolean = - getConf(SQLConf.PARTITION_DISCOVERY_ENABLED) - - def partitionColumnTypeInferenceEnabled(): Boolean = + def partitionColumnTypeInferenceEnabled: Boolean = getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE) + def partitionMaxFiles: Int = getConf(PARTITION_MAX_FILES) + def parallelPartitionDiscoveryThreshold: Int = getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD) @@ -695,6 +713,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS) + def dataFramePivotMaxValues: Int = getConf(DATAFRAME_PIVOT_MAX_VALUES) + override def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES) def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index bae7f56a23..bba7bc753e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -204,7 +204,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified => new Path(userSpecified).toUri.toString }.orElse { - df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location => + df.sparkSession.sessionState.conf.checkpointLocation.map { location => new Path(location, name).toUri.toString } }.getOrElse { @@ -232,7 +232,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { val analyzedPlan = df.queryExecution.analyzed df.queryExecution.assertAnalyzed() - if (sparkSession.conf.get(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) { + if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) { UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode) } -- cgit v1.2.3