aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala17
-rw-r--r--core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala41
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala60
6 files changed, 138 insertions, 24 deletions
diff --git a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
index a177e66645..d87619afd3 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigBuilder.scala
@@ -18,6 +18,9 @@
package org.apache.spark.internal.config
import java.util.concurrent.TimeUnit
+import java.util.regex.PatternSyntaxException
+
+import scala.util.matching.Regex
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
@@ -65,6 +68,13 @@ private object ConfigHelpers {
def byteToString(v: Long, unit: ByteUnit): String = unit.convertTo(v, ByteUnit.BYTE) + "b"
+ def regexFromString(str: String, key: String): Regex = {
+ try str.r catch {
+ case e: PatternSyntaxException =>
+ throw new IllegalArgumentException(s"$key should be a regex, but was $str", e)
+ }
+ }
+
}
/**
@@ -214,4 +224,7 @@ private[spark] case class ConfigBuilder(key: String) {
new FallbackConfigEntry(key, _doc, _public, fallback)
}
+ def regexConf: TypedConfigBuilder[Regex] = {
+ new TypedConfigBuilder(this, regexFromString(_, this.key), _.regex)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 223c921810..89aeea4939 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -246,8 +246,16 @@ package object config {
"driver and executor environments contain sensitive information. When this regex matches " +
"a property, its value is redacted from the environment UI and various logs like YARN " +
"and event logs.")
- .stringConf
- .createWithDefault("(?i)secret|password")
+ .regexConf
+ .createWithDefault("(?i)secret|password".r)
+
+ private[spark] val STRING_REDACTION_PATTERN =
+ ConfigBuilder("spark.redaction.string.regex")
+ .doc("Regex to decide which parts of strings produced by Spark contain sensitive " +
+ "information. When this regex matches a string part, that string part is replaced by a " +
+ "dummy value. This is currently used to redact the output of SQL explain commands.")
+ .regexConf
+ .createOptional
private[spark] val NETWORK_AUTH_ENABLED =
ConfigBuilder("spark.authenticate")
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1af34e3da2..943dde0723 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2585,13 +2585,26 @@ private[spark] object Utils extends Logging {
}
}
- private[util] val REDACTION_REPLACEMENT_TEXT = "*********(redacted)"
+ private[spark] val REDACTION_REPLACEMENT_TEXT = "*********(redacted)"
+ /**
+ * Redact the sensitive values in the given map. If a map key matches the redaction pattern then
+ * its value is replaced with a dummy text.
+ */
def redact(conf: SparkConf, kvs: Seq[(String, String)]): Seq[(String, String)] = {
- val redactionPattern = conf.get(SECRET_REDACTION_PATTERN).r
+ val redactionPattern = conf.get(SECRET_REDACTION_PATTERN)
redact(redactionPattern, kvs)
}
+ /**
+ * Redact the sensitive information in the given string.
+ */
+ def redact(conf: SparkConf, text: String): String = {
+ if (text == null || text.isEmpty || !conf.contains(STRING_REDACTION_PATTERN)) return text
+ val regex = conf.get(STRING_REDACTION_PATTERN).get
+ regex.replaceAllIn(text, REDACTION_REPLACEMENT_TEXT)
+ }
+
private def redact(redactionPattern: Regex, kvs: Seq[(String, String)]): Seq[(String, String)] = {
kvs.map { kv =>
redactionPattern.findFirstIn(kv._1)
diff --git a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
index 71eed46488..f3756b2108 100644
--- a/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
+++ b/core/src/test/scala/org/apache/spark/internal/config/ConfigEntrySuite.scala
@@ -19,9 +19,6 @@ package org.apache.spark.internal.config
import java.util.concurrent.TimeUnit
-import scala.collection.JavaConverters._
-import scala.collection.mutable.HashMap
-
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.util.SparkConfWithEnv
@@ -98,6 +95,21 @@ class ConfigEntrySuite extends SparkFunSuite {
assert(conf.get(bytes) === 1L)
}
+ test("conf entry: regex") {
+ val conf = new SparkConf()
+ val rConf = ConfigBuilder(testKey("regex")).regexConf.createWithDefault(".*".r)
+
+ conf.set(rConf, "[0-9a-f]{8}".r)
+ assert(conf.get(rConf).regex === "[0-9a-f]{8}")
+
+ conf.set(rConf.key, "[0-9a-f]{4}")
+ assert(conf.get(rConf).regex === "[0-9a-f]{4}")
+
+ conf.set(rConf.key, "[.")
+ val e = intercept[IllegalArgumentException](conf.get(rConf))
+ assert(e.getMessage.contains("regex should be a regex, but was"))
+ }
+
test("conf entry: string seq") {
val conf = new SparkConf()
val seq = ConfigBuilder(testKey("seq")).stringConf.toSequence.createWithDefault(Seq())
@@ -239,5 +251,4 @@ class ConfigEntrySuite extends SparkFunSuite {
.createWithDefault(null)
testEntryRef(nullConf, ref(nullConf))
}
-
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index bfe9c8e351..28156b277f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -41,9 +41,33 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
val relation: BaseRelation
val metastoreTableIdentifier: Option[TableIdentifier]
+ protected val nodeNamePrefix: String = ""
+
override val nodeName: String = {
s"Scan $relation ${metastoreTableIdentifier.map(_.unquotedString).getOrElse("")}"
}
+
+ override def simpleString: String = {
+ val metadataEntries = metadata.toSeq.sorted.map {
+ case (key, value) =>
+ key + ": " + StringUtils.abbreviate(redact(value), 100)
+ }
+ val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
+ s"$nodeNamePrefix$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
+ }
+
+ override def verboseString: String = redact(super.verboseString)
+
+ override def treeString(verbose: Boolean, addSuffix: Boolean): String = {
+ redact(super.treeString(verbose, addSuffix))
+ }
+
+ /**
+ * Shorthand for calling redactString() without specifying redacting rules
+ */
+ private def redact(text: String): String = {
+ Utils.redact(SparkSession.getActiveSession.get.sparkContext.conf, text)
+ }
}
/** Physical plan node for scanning data from a relation. */
@@ -85,15 +109,6 @@ case class RowDataSourceScanExec(
}
}
- override def simpleString: String = {
- val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
- key + ": " + StringUtils.abbreviate(value, 100)
- }
-
- s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}" +
- s"${Utils.truncatedString(metadataEntries, " ", ", ", "")}"
- }
-
override def inputRDDs(): Seq[RDD[InternalRow]] = {
rdd :: Nil
}
@@ -307,13 +322,7 @@ case class FileSourceScanExec(
}
}
- override def simpleString: String = {
- val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
- key + ": " + StringUtils.abbreviate(value, 100)
- }
- val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
- s"File$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
- }
+ override val nodeNamePrefix: String = "File"
override protected def doProduce(ctx: CodegenContext): String = {
if (supportsBatch) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
new file mode 100644
index 0000000000..986fa878ee
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.Utils
+
+/**
+ * Suite that tests the redaction of DataSourceScanExec
+ */
+class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext {
+
+ import Utils._
+
+ override def beforeAll(): Unit = {
+ sparkConf.set("spark.redaction.string.regex",
+ "spark-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}")
+ super.beforeAll()
+ }
+
+ test("treeString is redacted") {
+ withTempDir { dir =>
+ val basePath = dir.getCanonicalPath
+ spark.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
+ val df = spark.read.parquet(basePath)
+
+ val rootPath = df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
+ .asInstanceOf[FileSourceScanExec].relation.location.rootPaths.head
+ assert(rootPath.toString.contains(basePath.toString))
+
+ assert(!df.queryExecution.sparkPlan.treeString(verbose = true).contains(rootPath.getName))
+ assert(!df.queryExecution.executedPlan.treeString(verbose = true).contains(rootPath.getName))
+ assert(!df.queryExecution.toString.contains(rootPath.getName))
+ assert(!df.queryExecution.simpleString.contains(rootPath.getName))
+
+ val replacement = "*********"
+ assert(df.queryExecution.sparkPlan.treeString(verbose = true).contains(replacement))
+ assert(df.queryExecution.executedPlan.treeString(verbose = true).contains(replacement))
+ assert(df.queryExecution.toString.contains(replacement))
+ assert(df.queryExecution.simpleString.contains(replacement))
+ }
+ }
+}