diff options
Diffstat (limited to 'sql/core/src')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala | 41 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala | 60 |
2 files changed, 85 insertions, 16 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index bfe9c8e351..28156b277f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -41,9 +41,33 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport { val relation: BaseRelation val metastoreTableIdentifier: Option[TableIdentifier] + protected val nodeNamePrefix: String = "" + override val nodeName: String = { s"Scan $relation ${metastoreTableIdentifier.map(_.unquotedString).getOrElse("")}" } + + override def simpleString: String = { + val metadataEntries = metadata.toSeq.sorted.map { + case (key, value) => + key + ": " + StringUtils.abbreviate(redact(value), 100) + } + val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "") + s"$nodeNamePrefix$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr" + } + + override def verboseString: String = redact(super.verboseString) + + override def treeString(verbose: Boolean, addSuffix: Boolean): String = { + redact(super.treeString(verbose, addSuffix)) + } + + /** + * Shorthand for calling redactString() without specifying redacting rules + */ + private def redact(text: String): String = { + Utils.redact(SparkSession.getActiveSession.get.sparkContext.conf, text) + } } /** Physical plan node for scanning data from a relation. */ @@ -85,15 +109,6 @@ case class RowDataSourceScanExec( } } - override def simpleString: String = { - val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield { - key + ": " + StringUtils.abbreviate(value, 100) - } - - s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}" + - s"${Utils.truncatedString(metadataEntries, " ", ", ", "")}" - } - override def inputRDDs(): Seq[RDD[InternalRow]] = { rdd :: Nil } @@ -307,13 +322,7 @@ case class FileSourceScanExec( } } - override def simpleString: String = { - val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield { - key + ": " + StringUtils.abbreviate(value, 100) - } - val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "") - s"File$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr" - } + override val nodeNamePrefix: String = "File" override protected def doProduce(ctx: CodegenContext): String = { if (supportsBatch) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala new file mode 100644 index 0000000000..986fa878ee --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/DataSourceScanExecRedactionSuite.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.Utils + +/** + * Suite that tests the redaction of DataSourceScanExec + */ +class DataSourceScanExecRedactionSuite extends QueryTest with SharedSQLContext { + + import Utils._ + + override def beforeAll(): Unit = { + sparkConf.set("spark.redaction.string.regex", + "spark-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}") + super.beforeAll() + } + + test("treeString is redacted") { + withTempDir { dir => + val basePath = dir.getCanonicalPath + spark.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) + val df = spark.read.parquet(basePath) + + val rootPath = df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get + .asInstanceOf[FileSourceScanExec].relation.location.rootPaths.head + assert(rootPath.toString.contains(basePath.toString)) + + assert(!df.queryExecution.sparkPlan.treeString(verbose = true).contains(rootPath.getName)) + assert(!df.queryExecution.executedPlan.treeString(verbose = true).contains(rootPath.getName)) + assert(!df.queryExecution.toString.contains(rootPath.getName)) + assert(!df.queryExecution.simpleString.contains(rootPath.getName)) + + val replacement = "*********" + assert(df.queryExecution.sparkPlan.treeString(verbose = true).contains(replacement)) + assert(df.queryExecution.executedPlan.treeString(verbose = true).contains(replacement)) + assert(df.queryExecution.toString.contains(replacement)) + assert(df.queryExecution.simpleString.contains(replacement)) + } + } +} |