aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-06-09 18:05:16 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-06-09 18:05:16 -0700
commitb914e1930fd5c5f2808f92d4958ec6fbeddf2e30 (patch)
tree4503a4aeaf068a7e8e5565f8a9a3e1bd7732cb23
parent83070cd1d459101e1189f3b07ea59e22f98e84ce (diff)
downloadspark-b914e1930fd5c5f2808f92d4958ec6fbeddf2e30.tar.gz
spark-b914e1930fd5c5f2808f92d4958ec6fbeddf2e30.tar.bz2
spark-b914e1930fd5c5f2808f92d4958ec6fbeddf2e30.zip
[SPARK-15794] Should truncate toString() of very wide plans
## What changes were proposed in this pull request? With very wide tables, e.g. thousands of fields, the plan output is unreadable and often causes OOMs due to inefficient string processing. This truncates all struct and operator field lists to a user configurable threshold to limit performance impact. It would also be nice to optimize string generation to avoid these sort of O(n^2) slowdowns entirely (i.e. use StringBuilder everywhere including expressions), but this is probably too large of a change for 2.0 at this point, and truncation has other benefits for usability. ## How was this patch tested? Added a microbenchmark that covers this case particularly well. I also ran the microbenchmark while varying the truncation threshold. ``` numFields = 5 wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ 2000 wide x 50 rows (write in-mem) 2336 / 2558 0.0 23364.4 0.1X numFields = 25 wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ 2000 wide x 50 rows (write in-mem) 4237 / 4465 0.0 42367.9 0.1X numFields = 100 wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ 2000 wide x 50 rows (write in-mem) 10458 / 11223 0.0 104582.0 0.0X numFields = Infinity wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ [info] java.lang.OutOfMemoryError: Java heap space ``` Author: Eric Liang <ekl@databricks.com> Author: Eric Liang <ekhliang@gmail.com> Closes #13537 from ericl/truncated-string.
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala47
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala49
14 files changed, 140 insertions, 24 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1a9dbcae8c..f9d05409e1 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -26,6 +26,7 @@ import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.util.{Locale, Properties, Random, UUID}
import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicBoolean
import javax.net.ssl.HttpsURLConnection
import scala.annotation.tailrec
@@ -78,6 +79,52 @@ private[spark] object Utils extends Logging {
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null
+ /**
+ * The performance overhead of creating and logging strings for wide schemas can be large. To
+ * limit the impact, we bound the number of fields to include by default. This can be overriden
+ * by setting the 'spark.debug.maxToStringFields' conf in SparkEnv.
+ */
+ val DEFAULT_MAX_TO_STRING_FIELDS = 25
+
+ private def maxNumToStringFields = {
+ if (SparkEnv.get != null) {
+ SparkEnv.get.conf.getInt("spark.debug.maxToStringFields", DEFAULT_MAX_TO_STRING_FIELDS)
+ } else {
+ DEFAULT_MAX_TO_STRING_FIELDS
+ }
+ }
+
+ /** Whether we have warned about plan string truncation yet. */
+ private val truncationWarningPrinted = new AtomicBoolean(false)
+
+ /**
+ * Format a sequence with semantics similar to calling .mkString(). Any elements beyond
+ * maxNumToStringFields will be dropped and replaced by a "... N more fields" placeholder.
+ *
+ * @return the trimmed and formatted string.
+ */
+ def truncatedString[T](
+ seq: Seq[T],
+ start: String,
+ sep: String,
+ end: String,
+ maxNumFields: Int = maxNumToStringFields): String = {
+ if (seq.length > maxNumFields) {
+ if (truncationWarningPrinted.compareAndSet(false, true)) {
+ logWarning(
+ "Truncated the string representation of a plan since it was too large. This " +
+ "behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.")
+ }
+ val numFields = math.max(0, maxNumFields - 1)
+ seq.take(numFields).mkString(
+ start, sep, sep + "... " + (seq.length - numFields) + " more fields" + end)
+ } else {
+ seq.mkString(start, sep, end)
+ }
+ }
+
+ /** Shorthand for calling truncatedString() without start or end strings. */
+ def truncatedString[T](seq: Seq[T], sep: String): String = truncatedString(seq, "", sep, "")
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 6698749866..a5363f0bfd 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -40,6 +40,14 @@ import org.apache.spark.network.util.ByteUnit
class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
+ test("truncatedString") {
+ assert(Utils.truncatedString(Nil, "[", ", ", "]", 2) == "[]")
+ assert(Utils.truncatedString(Seq(1, 2), "[", ", ", "]", 2) == "[1, 2]")
+ assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", 2) == "[1, ... 2 more fields]")
+ assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", -5) == "[, ... 3 more fields]")
+ assert(Utils.truncatedString(Seq(1, 2, 3), ", ") == "1, 2, 3")
+ }
+
test("timeConversion") {
// Test -1
assert(Utils.timeStringAsSeconds("-1") === -1)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index efe592d69d..10a141254f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines the basic expression abstract classes in Catalyst.
@@ -196,7 +197,8 @@ abstract class Expression extends TreeNode[Expression] {
override def simpleString: String = toString
- override def toString: String = prettyName + flatArguments.mkString("(", ", ", ")")
+ override def toString: String = prettyName + Utils.truncatedString(
+ flatArguments.toSeq, "(", ", ", ")")
/**
* Returns SQL representation of this expression. For expressions extending [[NonSQLExpression]],
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index c67366d240..f924efe6e6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -448,10 +448,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
case tn: TreeNode[_] => tn.simpleString :: Nil
case seq: Seq[Any] if seq.toSet.subsetOf(allChildren.asInstanceOf[Set[Any]]) => Nil
case iter: Iterable[_] if iter.isEmpty => Nil
- case seq: Seq[_] => seq.mkString("[", ", ", "]") :: Nil
- case set: Set[_] => set.mkString("{", ", ", "}") :: Nil
+ case seq: Seq[_] => Utils.truncatedString(seq, "[", ", ", "]") :: Nil
+ case set: Set[_] => Utils.truncatedString(set.toSeq, "{", ", ", "}") :: Nil
case array: Array[_] if array.isEmpty => Nil
- case array: Array[_] => array.mkString("[", ", ", "]") :: Nil
+ case array: Array[_] => Utils.truncatedString(array, "[", ", ", "]") :: Nil
case null => Nil
case None => Nil
case Some(null) => Nil
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index 9a92373759..436512ff69 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -22,11 +22,12 @@ import scala.util.Try
import org.json4s.JsonDSL._
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, InterpretedOrdering}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, LegacyTypeStringParser}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
+import org.apache.spark.util.Utils
/**
* :: DeveloperApi ::
@@ -293,8 +294,8 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
override def defaultSize: Int = fields.map(_.dataType.defaultSize).sum
override def simpleString: String = {
- val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}")
- s"struct<${fieldTypes.mkString(",")}>"
+ val fieldTypes = fields.view.map(field => s"${field.name}:${field.dataType.simpleString}")
+ Utils.truncatedString(fieldTypes, "struct<", ",", ">")
}
override def sql: String = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index b8b392608d..9ab98fd124 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.util.Utils
object RDDConversions {
def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = {
@@ -123,7 +124,7 @@ private[sql] case class RDDScanExec(
}
override def simpleString: String = {
- s"Scan $nodeName${output.mkString("[", ",", "]")}"
+ s"Scan $nodeName${Utils.truncatedString(output, "[", ",", "]")}"
}
}
@@ -186,7 +187,8 @@ private[sql] case class RowDataSourceScanExec(
key + ": " + StringUtils.abbreviate(value, 100)
}
- s"$nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}"
+ s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}" +
+ s"${Utils.truncatedString(metadataEntries, " ", ", ", "")}"
}
override def inputRDDs(): Seq[RDD[InternalRow]] = {
@@ -239,8 +241,8 @@ private[sql] case class BatchedDataSourceScanExec(
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
key + ": " + StringUtils.abbreviate(value, 100)
}
- val metadataStr = metadataEntries.mkString(" ", ", ", "")
- s"Batched$nodeName${output.mkString("[", ",", "]")}$metadataStr"
+ val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
+ s"Batched$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
}
override def inputRDDs(): Seq[RDD[InternalRow]] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 560214a65e..a2d45026e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCom
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _}
+import org.apache.spark.util.Utils
/**
* The primary workflow for executing relational queries using Spark. Designed to allow easy
@@ -206,8 +207,8 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
}
override def toString: String = {
- def output =
- analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")
+ def output = Utils.truncatedString(
+ analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ")
val analyzedPlan = Seq(
stringOrError(output),
stringOrError(analyzed.treeString(verbose = true))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index b617e26418..caeeba1793 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
import org.apache.spark.unsafe.KVIterator
+import org.apache.spark.util.Utils
/**
* Hash-based aggregate operator that can also fallback to sorting when data exceeds memory size.
@@ -773,9 +774,9 @@ case class HashAggregateExec(
testFallbackStartsAt match {
case None =>
- val keyString = groupingExpressions.mkString("[", ",", "]")
- val functionString = allAggregateExpressions.mkString("[", ",", "]")
- val outputString = output.mkString("[", ",", "]")
+ val keyString = Utils.truncatedString(groupingExpressions, "[", ",", "]")
+ val functionString = Utils.truncatedString(allAggregateExpressions, "[", ",", "]")
+ val outputString = Utils.truncatedString(output, "[", ",", "]")
if (verbose) {
s"HashAggregate(key=$keyString, functions=$functionString, output=$outputString)"
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
index 41ba9f5b3f..17126519eb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, UnspecifiedDistribution}
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.util.Utils
/**
* Sort-based aggregate operator.
@@ -110,9 +111,9 @@ case class SortAggregateExec(
private def toString(verbose: Boolean): String = {
val allAggregateExpressions = aggregateExpressions
- val keyString = groupingExpressions.mkString("[", ",", "]")
- val functionString = allAggregateExpressions.mkString("[", ",", "]")
- val outputString = output.mkString("[", ",", "]")
+ val keyString = Utils.truncatedString(groupingExpressions, "[", ",", "]")
+ val functionString = Utils.truncatedString(allAggregateExpressions, "[", ",", "]")
+ val outputString = Utils.truncatedString(output, "[", ",", "]")
if (verbose) {
s"SortAggregate(key=$keyString, functions=$functionString, output=$outputString)"
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 0e0748ff32..a418d02983 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.util.Utils
/**
* Used to link a [[BaseRelation]] in to a logical query plan.
@@ -82,5 +83,5 @@ case class LogicalRelation(
expectedOutputAttributes,
metastoreTableIdentifier).asInstanceOf[this.type]
- override def simpleString: String = s"Relation[${output.mkString(",")}] $relation"
+ override def simpleString: String = s"Relation[${Utils.truncatedString(output, ",")}] $relation"
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index b71f3335c9..781c016095 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, LazilyGeneratedOrdering}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
+import org.apache.spark.util.Utils
/**
@@ -159,8 +160,8 @@ case class TakeOrderedAndProjectExec(
override def outputOrdering: Seq[SortOrder] = sortOrder
override def simpleString: String = {
- val orderByString = sortOrder.mkString("[", ",", "]")
- val outputString = output.mkString("[", ",", "]")
+ val orderByString = Utils.truncatedString(sortOrder, "[", ",", "]")
+ val outputString = Utils.truncatedString(output, "[", ",", "]")
s"TakeOrderedAndProject(limit=$limit, orderBy=$orderByString, output=$outputString)"
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index d9800e4afd..954fc33ecc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -336,7 +336,8 @@ class StreamExecution(
newData.get(source).map { data =>
val newPlan = data.logicalPlan
assert(output.size == newPlan.output.size,
- s"Invalid batch: ${output.mkString(",")} != ${newPlan.output.mkString(",")}")
+ s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
+ s"${Utils.truncatedString(newPlan.output, ",")}")
replacements ++= output.zip(newPlan.output)
newPlan
}.getOrElse {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 4496f41615..77fd043ef7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
object MemoryStream {
protected val currentBlockId = new AtomicInteger(0)
@@ -81,7 +82,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
}
}
- override def toString: String = s"MemoryStream[${output.mkString(",")}]"
+ override def toString: String = s"MemoryStream[${Utils.truncatedString(output, ",")}]"
override def getOffset: Option[Offset] = synchronized {
if (batches.isEmpty) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala
index b4811fe27a..06466e629b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala
@@ -155,6 +155,55 @@ many column field r/w: Best/Avg Time(ms) Rate(M/s) Per Ro
*/
}
+ ignore("wide shallowly nested struct field read and write") {
+ val benchmark = new Benchmark(
+ "wide shallowly nested struct field r/w", scaleFactor)
+ for (width <- widthsToTest) {
+ val numRows = scaleFactor / width
+ var datum: String = "{"
+ for (i <- 1 to width) {
+ if (i == 1) {
+ datum += s""""value_$i": 1"""
+ } else {
+ datum += s""", "value_$i": 1"""
+ }
+ }
+ datum += "}"
+ datum = s"""{"a": {"b": {"c": $datum, "d": $datum}, "e": $datum}}"""
+ val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache()
+ df.count() // force caching
+ addCases(benchmark, df, s"$width wide x $numRows rows", "a.b.c.value_1")
+ }
+ benchmark.run()
+
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.7.0_80-b15 on Linux 4.2.0-36-generic
+Intel(R) Xeon(R) CPU E5-1650 v3 @ 3.50GHz
+wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------
+1 wide x 100000 rows (read in-mem) 100 / 125 1.0 997.7 1.0X
+1 wide x 100000 rows (write in-mem) 130 / 147 0.8 1302.9 0.8X
+1 wide x 100000 rows (read parquet) 195 / 228 0.5 1951.4 0.5X
+1 wide x 100000 rows (write parquet) 248 / 259 0.4 2479.7 0.4X
+10 wide x 10000 rows (read in-mem) 76 / 89 1.3 757.2 1.3X
+10 wide x 10000 rows (write in-mem) 90 / 116 1.1 900.0 1.1X
+10 wide x 10000 rows (read parquet) 90 / 135 1.1 903.9 1.1X
+10 wide x 10000 rows (write parquet) 222 / 240 0.4 2222.8 0.4X
+100 wide x 1000 rows (read in-mem) 71 / 91 1.4 710.8 1.4X
+100 wide x 1000 rows (write in-mem) 252 / 324 0.4 2522.4 0.4X
+100 wide x 1000 rows (read parquet) 310 / 329 0.3 3095.9 0.3X
+100 wide x 1000 rows (write parquet) 253 / 267 0.4 2525.7 0.4X
+1000 wide x 100 rows (read in-mem) 144 / 160 0.7 1439.5 0.7X
+1000 wide x 100 rows (write in-mem) 2055 / 2326 0.0 20553.9 0.0X
+1000 wide x 100 rows (read parquet) 750 / 925 0.1 7496.8 0.1X
+1000 wide x 100 rows (write parquet) 235 / 317 0.4 2347.5 0.4X
+2500 wide x 40 rows (read in-mem) 219 / 227 0.5 2190.9 0.5X
+2500 wide x 40 rows (write in-mem) 5177 / 5423 0.0 51773.2 0.0X
+2500 wide x 40 rows (read parquet) 1642 / 1714 0.1 16417.7 0.1X
+2500 wide x 40 rows (write parquet) 357 / 381 0.3 3568.2 0.3X
+*/
+ }
+
ignore("wide struct field read and write") {
val benchmark = new Benchmark("wide struct field r/w", scaleFactor)
for (width <- widthsToTest) {