aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSandeep Singh <sandeep@techaddict.me>2016-05-03 18:02:57 -0700
committerAndrew Or <andrew@databricks.com>2016-05-03 18:02:57 -0700
commita8d56f538878443da6eae69449858ad4e2274151 (patch)
treed56164d6ea2511bbfd4704d1691f5ff9ca0d6fde
parentc4e0fde876fff259308d1d58ab51ae2697ae31f1 (diff)
downloadspark-a8d56f538878443da6eae69449858ad4e2274151.tar.gz
spark-a8d56f538878443da6eae69449858ad4e2274151.tar.bz2
spark-a8d56f538878443da6eae69449858ad4e2274151.zip
[SPARK-14422][SQL] Improve handling of optional configs in SQLConf
## What changes were proposed in this pull request? Create a new API for handling Optional Configs in SQLConf. Right now `getConf` for `OptionalConfigEntry[T]` returns value of type `T`, if doesn't exist throws an exception. Add new method `getOptionalConf`(suggestions on naming) which will now returns value of type `Option[T]`(so if doesn't exist it returns `None`). ## How was this patch tested? Add test and ran tests locally. Author: Sandeep Singh <sandeep@techaddict.me> Closes #12846 from techaddict/SPARK-14422.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala11
4 files changed, 25 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index a8f96a9b45..0793b62fae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -296,7 +296,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
new Path(userSpecified).toUri.toString
}.orElse {
val checkpointConfig: Option[String] =
- df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION, None)
+ df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION)
checkpointConfig.map { location =>
new Path(location, queryName).toUri.toString
@@ -334,9 +334,10 @@ final class DataFrameWriter private[sql](df: DataFrame) {
partitionColumns = normalizedParCols.getOrElse(Nil))
val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
- val checkpointLocation = extraOptions.getOrElse("checkpointLocation", {
- new Path(df.sparkSession.sessionState.conf.checkpointLocation, queryName).toUri.toString
- })
+ val checkpointLocation = extraOptions.getOrElse("checkpointLocation",
+ new Path(df.sparkSession.sessionState.conf.checkpointLocation.get, queryName).toUri.toString
+ )
+
df.sparkSession.sessionState.continuousQueryManager.startQuery(
queryName,
checkpointLocation,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
index 670288b234..4fd6e42640 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql
-import org.apache.spark.internal.config.ConfigEntry
+import org.apache.spark.internal.config.{ConfigEntry, OptionalConfigEntry}
import org.apache.spark.sql.internal.SQLConf
@@ -86,6 +86,10 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
sqlConf.getConf(entry)
}
+ protected[sql] def get[T](entry: OptionalConfigEntry[T]): Option[T] = {
+ sqlConf.getConf(entry)
+ }
+
/**
* Returns the value of Spark runtime configuration property for the given key.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 0bcf0f817a..5e19984deb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -546,7 +546,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)
- def checkpointLocation: String = getConf(CHECKPOINT_LOCATION)
+ def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)
def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
@@ -717,12 +717,11 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
/**
* Return the value of an optional Spark SQL configuration property for the given key. If the key
- * is not set yet, throw an exception.
+ * is not set yet, returns None.
*/
- def getConf[T](entry: OptionalConfigEntry[T]): T = {
+ def getConf[T](entry: OptionalConfigEntry[T]): Option[T] = {
require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
- Option(settings.get(entry.key)).map(entry.rawValueConverter).
- getOrElse(throw new NoSuchElementException(entry.key))
+ Option(settings.get(entry.key)).map(entry.rawValueConverter)
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
index cc69199139..95bfd05c1f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfEntrySuite.scala
@@ -153,6 +153,17 @@ class SQLConfEntrySuite extends SparkFunSuite {
assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", "d", "e"))
}
+ test("optionalConf") {
+ val key = "spark.sql.SQLConfEntrySuite.optional"
+ val confEntry = SQLConfigBuilder(key)
+ .stringConf
+ .createOptional
+
+ assert(conf.getConf(confEntry) === None)
+ conf.setConfString(key, "a")
+ assert(conf.getConf(confEntry) === Some("a"))
+ }
+
test("duplicate entry") {
val key = "spark.sql.SQLConfEntrySuite.duplicate"
SQLConfigBuilder(key).stringConf.createOptional