aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala47
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala49
14 files changed, 140 insertions, 24 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1a9dbcae8c..f9d05409e1 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -26,6 +26,7 @@ import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.util.{Locale, Properties, Random, UUID}
import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicBoolean
import javax.net.ssl.HttpsURLConnection
import scala.annotation.tailrec
@@ -78,6 +79,52 @@ private[spark] object Utils extends Logging {
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null
+ /**
+ * The performance overhead of creating and logging strings for wide schemas can be large. To
+ * limit the impact, we bound the number of fields to include by default. This can be overriden
+ * by setting the 'spark.debug.maxToStringFields' conf in SparkEnv.
+ */
+ val DEFAULT_MAX_TO_STRING_FIELDS = 25
+
+ private def maxNumToStringFields = {
+ if (SparkEnv.get != null) {
+ SparkEnv.get.conf.getInt("spark.debug.maxToStringFields", DEFAULT_MAX_TO_STRING_FIELDS)
+ } else {
+ DEFAULT_MAX_TO_STRING_FIELDS
+ }
+ }
+
+ /** Whether we have warned about plan string truncation yet. */
+ private val truncationWarningPrinted = new AtomicBoolean(false)
+
+ /**
+ * Format a sequence with semantics similar to calling .mkString(). Any elements beyond
+ * maxNumToStringFields will be dropped and replaced by a "... N more fields" placeholder.
+ *
+ * @return the trimmed and formatted string.
+ */
+ def truncatedString[T](
+ seq: Seq[T],
+ start: String,
+ sep: String,
+ end: String,
+ maxNumFields: Int = maxNumToStringFields): String = {
+ if (seq.length > maxNumFields) {
+ if (truncationWarningPrinted.compareAndSet(false, true)) {
+ logWarning(
+ "Truncated the string representation of a plan since it was too large. This " +
+ "behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.")
+ }
+ val numFields = math.max(0, maxNumFields - 1)
+ seq.take(numFields).mkString(
+ start, sep, sep + "... " + (seq.length - numFields) + " more fields" + end)
+ } else {
+ seq.mkString(start, sep, end)
+ }
+ }
+
+ /** Shorthand for calling truncatedString() without start or end strings. */
+ def truncatedString[T](seq: Seq[T], sep: String): String = truncatedString(seq, "", sep, "")
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 6698749866..a5363f0bfd 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -40,6 +40,14 @@ import org.apache.spark.network.util.ByteUnit
class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
+ test("truncatedString") {
+ assert(Utils.truncatedString(Nil, "[", ", ", "]", 2) == "[]")
+ assert(Utils.truncatedString(Seq(1, 2), "[", ", ", "]", 2) == "[1, 2]")
+ assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", 2) == "[1, ... 2 more fields]")
+ assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", -5) == "[, ... 3 more fields]")
+ assert(Utils.truncatedString(Seq(1, 2, 3), ", ") == "1, 2, 3")
+ }
+
test("timeConversion") {
// Test -1
assert(Utils.timeStringAsSeconds("-1") === -1)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index efe592d69d..10a141254f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines the basic expression abstract classes in Catalyst.
@@ -196,7 +197,8 @@ abstract class Expression extends TreeNode[Expression] {
override def simpleString: String = toString
- override def toString: String = prettyName + flatArguments.mkString("(", ", ", ")")
+ override def toString: String = prettyName + Utils.truncatedString(
+ flatArguments.toSeq, "(", ", ", ")")
/**
* Returns SQL representation of this expression. For expressions extending [[NonSQLExpression]],
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index c67366d240..f924efe6e6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -448,10 +448,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
case tn: TreeNode[_] => tn.simpleString :: Nil
case seq: Seq[Any] if seq.toSet.subsetOf(allChildren.asInstanceOf[Set[Any]]) => Nil
case iter: Iterable[_] if iter.isEmpty => Nil
- case seq: Seq[_] => seq.mkString("[", ", ", "]") :: Nil
- case set: Set[_] => set.mkString("{", ", ", "}") :: Nil
+ case seq: Seq[_] => Utils.truncatedString(seq, "[", ", ", "]") :: Nil
+ case set: Set[_] => Utils.truncatedString(set.toSeq, "{", ", ", "}") :: Nil
case array: Array[_] if array.isEmpty => Nil
- case array: Array[_] => array.mkString("[", ", ", "]") :: Nil
+ case array: Array[_] => Utils.truncatedString(array, "[", ", ", "]") :: Nil
case null => Nil
case None => Nil
case Some(null) => Nil
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index 9a92373759..436512ff69 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -22,11 +22,12 @@ import scala.util.Try
import org.json4s.JsonDSL._
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, InterpretedOrdering}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, LegacyTypeStringParser}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
+import org.apache.spark.util.Utils
/**
* :: DeveloperApi ::
@@ -293,8 +294,8 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
override def defaultSize: Int = fields.map(_.dataType.defaultSize).sum
override def simpleString: String = {
- val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}")
- s"struct<${fieldTypes.mkString(",")}>"
+ val fieldTypes = fields.view.map(field => s"${field.name}:${field.dataType.simpleString}")
+ Utils.truncatedString(fieldTypes, "struct<", ",", ">")
}
override def sql: String = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index b8b392608d..9ab98fd124 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.util.Utils
object RDDConversions {
def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = {
@@ -123,7 +124,7 @@ private[sql] case class RDDScanExec(
}
override def simpleString: String = {
- s"Scan $nodeName${output.mkString("[", ",", "]")}"
+ s"Scan $nodeName${Utils.truncatedString(output, "[", ",", "]")}"
}
}
@@ -186,7 +187,8 @@ private[sql] case class RowDataSourceScanExec(
key + ": " + StringUtils.abbreviate(value, 100)
}
- s"$nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}"
+ s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}" +
+ s"${Utils.truncatedString(metadataEntries, " ", ", ", "")}"
}
override def inputRDDs(): Seq[RDD[InternalRow]] = {
@@ -239,8 +241,8 @@ private[sql] case class BatchedDataSourceScanExec(
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
key + ": " + StringUtils.abbreviate(value, 100)
}
- val metadataStr = metadataEntries.mkString(" ", ", ", "")
- s"Batched$nodeName${output.mkString("[", ",", "]")}$metadataStr"
+ val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
+ s"Batched$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr"
}
override def inputRDDs(): Seq[RDD[InternalRow]] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 560214a65e..a2d45026e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCom
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _}
+import org.apache.spark.util.Utils
/**
* The primary workflow for executing relational queries using Spark. Designed to allow easy
@@ -206,8 +207,8 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
}
override def toString: String = {
- def output =
- analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")
+ def output = Utils.truncatedString(
+ analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ")
val analyzedPlan = Seq(
stringOrError(output),
stringOrError(analyzed.treeString(verbose = true))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index b617e26418..caeeba1793 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
import org.apache.spark.unsafe.KVIterator
+import org.apache.spark.util.Utils
/**
* Hash-based aggregate operator that can also fallback to sorting when data exceeds memory size.
@@ -773,9 +774,9 @@ case class HashAggregateExec(
testFallbackStartsAt match {
case None =>
- val keyString = groupingExpressions.mkString("[", ",", "]")
- val functionString = allAggregateExpressions.mkString("[", ",", "]")
- val outputString = output.mkString("[", ",", "]")
+ val keyString = Utils.truncatedString(groupingExpressions, "[", ",", "]")
+ val functionString = Utils.truncatedString(allAggregateExpressions, "[", ",", "]")
+ val outputString = Utils.truncatedString(output, "[", ",", "]")
if (verbose) {
s"HashAggregate(key=$keyString, functions=$functionString, output=$outputString)"
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
index 41ba9f5b3f..17126519eb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, UnspecifiedDistribution}
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.util.Utils
/**
* Sort-based aggregate operator.
@@ -110,9 +111,9 @@ case class SortAggregateExec(
private def toString(verbose: Boolean): String = {
val allAggregateExpressions = aggregateExpressions
- val keyString = groupingExpressions.mkString("[", ",", "]")
- val functionString = allAggregateExpressions.mkString("[", ",", "]")
- val outputString = output.mkString("[", ",", "]")
+ val keyString = Utils.truncatedString(groupingExpressions, "[", ",", "]")
+ val functionString = Utils.truncatedString(allAggregateExpressions, "[", ",", "]")
+ val outputString = Utils.truncatedString(output, "[", ",", "]")
if (verbose) {
s"SortAggregate(key=$keyString, functions=$functionString, output=$outputString)"
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 0e0748ff32..a418d02983 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.util.Utils
/**
* Used to link a [[BaseRelation]] in to a logical query plan.
@@ -82,5 +83,5 @@ case class LogicalRelation(
expectedOutputAttributes,
metastoreTableIdentifier).asInstanceOf[this.type]
- override def simpleString: String = s"Relation[${output.mkString(",")}] $relation"
+ override def simpleString: String = s"Relation[${Utils.truncatedString(output, ",")}] $relation"
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index b71f3335c9..781c016095 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, LazilyGeneratedOrdering}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
+import org.apache.spark.util.Utils
/**
@@ -159,8 +160,8 @@ case class TakeOrderedAndProjectExec(
override def outputOrdering: Seq[SortOrder] = sortOrder
override def simpleString: String = {
- val orderByString = sortOrder.mkString("[", ",", "]")
- val outputString = output.mkString("[", ",", "]")
+ val orderByString = Utils.truncatedString(sortOrder, "[", ",", "]")
+ val outputString = Utils.truncatedString(output, "[", ",", "]")
s"TakeOrderedAndProject(limit=$limit, orderBy=$orderByString, output=$outputString)"
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index d9800e4afd..954fc33ecc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -336,7 +336,8 @@ class StreamExecution(
newData.get(source).map { data =>
val newPlan = data.logicalPlan
assert(output.size == newPlan.output.size,
- s"Invalid batch: ${output.mkString(",")} != ${newPlan.output.mkString(",")}")
+ s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
+ s"${Utils.truncatedString(newPlan.output, ",")}")
replacements ++= output.zip(newPlan.output)
newPlan
}.getOrElse {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 4496f41615..77fd043ef7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
object MemoryStream {
protected val currentBlockId = new AtomicInteger(0)
@@ -81,7 +82,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
}
}
- override def toString: String = s"MemoryStream[${output.mkString(",")}]"
+ override def toString: String = s"MemoryStream[${Utils.truncatedString(output, ",")}]"
override def getOffset: Option[Offset] = synchronized {
if (batches.isEmpty) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala
index b4811fe27a..06466e629b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala
@@ -155,6 +155,55 @@ many column field r/w: Best/Avg Time(ms) Rate(M/s) Per Ro
*/
}
+ ignore("wide shallowly nested struct field read and write") {
+ val benchmark = new Benchmark(
+ "wide shallowly nested struct field r/w", scaleFactor)
+ for (width <- widthsToTest) {
+ val numRows = scaleFactor / width
+ var datum: String = "{"
+ for (i <- 1 to width) {
+ if (i == 1) {
+ datum += s""""value_$i": 1"""
+ } else {
+ datum += s""", "value_$i": 1"""
+ }
+ }
+ datum += "}"
+ datum = s"""{"a": {"b": {"c": $datum, "d": $datum}, "e": $datum}}"""
+ val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache()
+ df.count() // force caching
+ addCases(benchmark, df, s"$width wide x $numRows rows", "a.b.c.value_1")
+ }
+ benchmark.run()
+
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.7.0_80-b15 on Linux 4.2.0-36-generic
+Intel(R) Xeon(R) CPU E5-1650 v3 @ 3.50GHz
+wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------
+1 wide x 100000 rows (read in-mem) 100 / 125 1.0 997.7 1.0X
+1 wide x 100000 rows (write in-mem) 130 / 147 0.8 1302.9 0.8X
+1 wide x 100000 rows (read parquet) 195 / 228 0.5 1951.4 0.5X
+1 wide x 100000 rows (write parquet) 248 / 259 0.4 2479.7 0.4X
+10 wide x 10000 rows (read in-mem) 76 / 89 1.3 757.2 1.3X
+10 wide x 10000 rows (write in-mem) 90 / 116 1.1 900.0 1.1X
+10 wide x 10000 rows (read parquet) 90 / 135 1.1 903.9 1.1X
+10 wide x 10000 rows (write parquet) 222 / 240 0.4 2222.8 0.4X
+100 wide x 1000 rows (read in-mem) 71 / 91 1.4 710.8 1.4X
+100 wide x 1000 rows (write in-mem) 252 / 324 0.4 2522.4 0.4X
+100 wide x 1000 rows (read parquet) 310 / 329 0.3 3095.9 0.3X
+100 wide x 1000 rows (write parquet) 253 / 267 0.4 2525.7 0.4X
+1000 wide x 100 rows (read in-mem) 144 / 160 0.7 1439.5 0.7X
+1000 wide x 100 rows (write in-mem) 2055 / 2326 0.0 20553.9 0.0X
+1000 wide x 100 rows (read parquet) 750 / 925 0.1 7496.8 0.1X
+1000 wide x 100 rows (write parquet) 235 / 317 0.4 2347.5 0.4X
+2500 wide x 40 rows (read in-mem) 219 / 227 0.5 2190.9 0.5X
+2500 wide x 40 rows (write in-mem) 5177 / 5423 0.0 51773.2 0.0X
+2500 wide x 40 rows (read parquet) 1642 / 1714 0.1 16417.7 0.1X
+2500 wide x 40 rows (write parquet) 357 / 381 0.3 3568.2 0.3X
+*/
+ }
+
ignore("wide struct field read and write") {
val benchmark = new Benchmark("wide struct field r/w", scaleFactor)
for (width <- widthsToTest) {