diff options
author | Sandeep Singh <sandeep@techaddict.me> | 2016-05-03 18:02:57 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-05-03 18:02:57 -0700 |
commit | a8d56f538878443da6eae69449858ad4e2274151 (patch) | |
tree | d56164d6ea2511bbfd4704d1691f5ff9ca0d6fde | |
parent | c4e0fde876fff259308d1d58ab51ae2697ae31f1 (diff) | |
download | spark-a8d56f538878443da6eae69449858ad4e2274151.tar.gz spark-a8d56f538878443da6eae69449858ad4e2274151.tar.bz2 spark-a8d56f538878443da6eae69449858ad4e2274151.zip |
[SPARK-14422][SQL] Improve handling of optional configs in SQLConf
## What changes were proposed in this pull request?
Create a new API for handling Optional Configs in SQLConf.
Right now `getConf` for `OptionalConfigEntry[T]` returns value of type `T`, if doesn't exist throws an exception. Add new method `getOptionalConf`(suggestions on naming) which will now returns value of type `Option[T]`(so if doesn't exist it returns `None`).
## How was this patch tested?
Add test and ran tests locally.
Author: Sandeep Singh <sandeep@techaddict.me>
Closes #12846 from techaddict/SPARK-14422.
4 files changed, 25 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index a8f96a9b45..0793b62fae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -296,7 +296,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { new Path(userSpecified).toUri.toString }.orElse { val checkpointConfig: Option[String] = - df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION, None) + df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION) checkpointConfig.map { location => new Path(location, queryName).toUri.toString @@ -334,9 +334,10 @@ final class DataFrameWriter private[sql](df: DataFrame) { partitionColumns = normalizedParCols.getOrElse(Nil)) val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName) - val checkpointLocation = extraOptions.getOrElse("checkpointLocation", { - new Path(df.sparkSession.sessionState.conf.checkpointLocation, queryName).toUri.toString - }) + val checkpointLocation = extraOptions.getOrElse("checkpointLocation", + new Path(df.sparkSession.sessionState.conf.checkpointLocation.get, queryName).toUri.toString + ) + df.sparkSession.sessionState.continuousQueryManager.startQuery( queryName, checkpointLocation, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala index 670288b234..4fd6e42640 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import org.apache.spark.internal.config.ConfigEntry +import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry} import org.apache.spark.sql.internal.SQLConf @@ -86,6 +86,10 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) { sqlConf.getConf(entry) } + protected[sql] def get[T](entry: OptionalConfigEntry[T]): Option[T] = { + sqlConf.getConf(entry) + } + /** * Returns the value of Spark runtime configuration property for the given key. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0bcf0f817a..5e19984deb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -546,7 +546,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD) - def checkpointLocation: String = getConf(CHECKPOINT_LOCATION) + def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION) def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES) @@ -717,12 +717,11 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { /** * Return the value of an optional Spark SQL configuration property for the given key. If the key - * is not set yet, throw an exception. + * is not set yet, returns None. */ - def getConf[T](entry: OptionalConfigEntry[T]): T = { + def getConf[T](entry: OptionalConfigEntry[T]): Option[T] = { require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered") - Option(settings.get(entry.key)).map(entry.rawValueConverter). - getOrElse(throw new NoSuchElementException(entry.key)) + Option(settings.get(entry.key)).map(entry.rawValueConverter) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala index cc69199139..95bfd05c1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala @@ -153,6 +153,17 @@ class SQLConfEntrySuite extends SparkFunSuite { assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", "d", "e")) } + test("optionalConf") { + val key = "spark.sql.SQLConfEntrySuite.optional" + val confEntry = SQLConfigBuilder(key) + .stringConf + .createOptional + + assert(conf.getConf(confEntry) === None) + conf.setConfString(key, "a") + assert(conf.getConf(confEntry) === Some("a")) + } + test("duplicate entry") { val key = "spark.sql.SQLConfEntrySuite.duplicate" SQLConfigBuilder(key).stringConf.createOptional |