From b914e1930fd5c5f2808f92d4958ec6fbeddf2e30 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 9 Jun 2016 18:05:16 -0700 Subject: [SPARK-15794] Should truncate toString() of very wide plans ## What changes were proposed in this pull request? With very wide tables, e.g. thousands of fields, the plan output is unreadable and often causes OOMs due to inefficient string processing. This truncates all struct and operator field lists to a user configurable threshold to limit performance impact. It would also be nice to optimize string generation to avoid these sort of O(n^2) slowdowns entirely (i.e. use StringBuilder everywhere including expressions), but this is probably too large of a change for 2.0 at this point, and truncation has other benefits for usability. ## How was this patch tested? Added a microbenchmark that covers this case particularly well. I also ran the microbenchmark while varying the truncation threshold. ``` numFields = 5 wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ 2000 wide x 50 rows (write in-mem) 2336 / 2558 0.0 23364.4 0.1X numFields = 25 wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ 2000 wide x 50 rows (write in-mem) 4237 / 4465 0.0 42367.9 0.1X numFields = 100 wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ 2000 wide x 50 rows (write in-mem) 10458 / 11223 0.0 104582.0 0.0X numFields = Infinity wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ [info] java.lang.OutOfMemoryError: Java heap space ``` Author: Eric Liang Author: Eric Liang Closes #13537 from ericl/truncated-string. --- .../main/scala/org/apache/spark/util/Utils.scala | 47 +++++++++++++++++++++ .../scala/org/apache/spark/util/UtilsSuite.scala | 8 ++++ .../sql/catalyst/expressions/Expression.scala | 4 +- .../apache/spark/sql/catalyst/trees/TreeNode.scala | 6 +-- .../org/apache/spark/sql/types/StructType.scala | 7 ++-- .../apache/spark/sql/execution/ExistingRDD.scala | 10 +++-- .../spark/sql/execution/QueryExecution.scala | 5 ++- .../execution/aggregate/HashAggregateExec.scala | 7 ++-- .../execution/aggregate/SortAggregateExec.scala | 7 ++-- .../execution/datasources/LogicalRelation.scala | 3 +- .../org/apache/spark/sql/execution/limit.scala | 5 ++- .../sql/execution/streaming/StreamExecution.scala | 3 +- .../spark/sql/execution/streaming/memory.scala | 3 +- .../execution/benchmark/WideSchemaBenchmark.scala | 49 ++++++++++++++++++++++ 14 files changed, 140 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1a9dbcae8c..f9d05409e1 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -26,6 +26,7 @@ import java.nio.charset.StandardCharsets import java.nio.file.Files import java.util.{Locale, Properties, Random, UUID} import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicBoolean import javax.net.ssl.HttpsURLConnection import scala.annotation.tailrec @@ -78,6 +79,52 @@ private[spark] object Utils extends Logging { private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 @volatile private var localRootDirs: Array[String] = null + /** + * The performance overhead of creating and logging strings for wide schemas can be large. To + * limit the impact, we bound the number of fields to include by default. This can be overriden + * by setting the 'spark.debug.maxToStringFields' conf in SparkEnv. + */ + val DEFAULT_MAX_TO_STRING_FIELDS = 25 + + private def maxNumToStringFields = { + if (SparkEnv.get != null) { + SparkEnv.get.conf.getInt("spark.debug.maxToStringFields", DEFAULT_MAX_TO_STRING_FIELDS) + } else { + DEFAULT_MAX_TO_STRING_FIELDS + } + } + + /** Whether we have warned about plan string truncation yet. */ + private val truncationWarningPrinted = new AtomicBoolean(false) + + /** + * Format a sequence with semantics similar to calling .mkString(). Any elements beyond + * maxNumToStringFields will be dropped and replaced by a "... N more fields" placeholder. + * + * @return the trimmed and formatted string. + */ + def truncatedString[T]( + seq: Seq[T], + start: String, + sep: String, + end: String, + maxNumFields: Int = maxNumToStringFields): String = { + if (seq.length > maxNumFields) { + if (truncationWarningPrinted.compareAndSet(false, true)) { + logWarning( + "Truncated the string representation of a plan since it was too large. This " + + "behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.") + } + val numFields = math.max(0, maxNumFields - 1) + seq.take(numFields).mkString( + start, sep, sep + "... " + (seq.length - numFields) + " more fields" + end) + } else { + seq.mkString(start, sep, end) + } + } + + /** Shorthand for calling truncatedString() without start or end strings. */ + def truncatedString[T](seq: Seq[T], sep: String): String = truncatedString(seq, "", sep, "") /** Serialize an object using Java serialization */ def serialize[T](o: T): Array[Byte] = { diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 6698749866..a5363f0bfd 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -40,6 +40,14 @@ import org.apache.spark.network.util.ByteUnit class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { + test("truncatedString") { + assert(Utils.truncatedString(Nil, "[", ", ", "]", 2) == "[]") + assert(Utils.truncatedString(Seq(1, 2), "[", ", ", "]", 2) == "[1, 2]") + assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", 2) == "[1, ... 2 more fields]") + assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", -5) == "[, ... 3 more fields]") + assert(Utils.truncatedString(Seq(1, 2, 3), ", ") == "1, 2, 3") + } + test("timeConversion") { // Test -1 assert(Utils.timeStringAsSeconds("-1") === -1) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index efe592d69d..10a141254f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils //////////////////////////////////////////////////////////////////////////////////////////////////// // This file defines the basic expression abstract classes in Catalyst. @@ -196,7 +197,8 @@ abstract class Expression extends TreeNode[Expression] { override def simpleString: String = toString - override def toString: String = prettyName + flatArguments.mkString("(", ", ", ")") + override def toString: String = prettyName + Utils.truncatedString( + flatArguments.toSeq, "(", ", ", ")") /** * Returns SQL representation of this expression. For expressions extending [[NonSQLExpression]], diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index c67366d240..f924efe6e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -448,10 +448,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { case tn: TreeNode[_] => tn.simpleString :: Nil case seq: Seq[Any] if seq.toSet.subsetOf(allChildren.asInstanceOf[Set[Any]]) => Nil case iter: Iterable[_] if iter.isEmpty => Nil - case seq: Seq[_] => seq.mkString("[", ", ", "]") :: Nil - case set: Set[_] => set.mkString("{", ", ", "}") :: Nil + case seq: Seq[_] => Utils.truncatedString(seq, "[", ", ", "]") :: Nil + case set: Set[_] => Utils.truncatedString(set.toSeq, "{", ", ", "}") :: Nil case array: Array[_] if array.isEmpty => Nil - case array: Array[_] => array.mkString("[", ", ", "]") :: Nil + case array: Array[_] => Utils.truncatedString(array, "[", ", ", "]") :: Nil case null => Nil case None => Nil case Some(null) => Nil diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 9a92373759..436512ff69 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -22,11 +22,12 @@ import scala.util.Try import org.json4s.JsonDSL._ -import org.apache.spark.SparkException +import org.apache.spark.{SparkEnv, SparkException} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, InterpretedOrdering} import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, LegacyTypeStringParser} import org.apache.spark.sql.catalyst.util.quoteIdentifier +import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -293,8 +294,8 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru override def defaultSize: Int = fields.map(_.dataType.defaultSize).sum override def simpleString: String = { - val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}") - s"struct<${fieldTypes.mkString(",")}>" + val fieldTypes = fields.view.map(field => s"${field.name}:${field.dataType.simpleString}") + Utils.truncatedString(fieldTypes, "struct<", ",", ">") } override def sql: String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index b8b392608d..9ab98fd124 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.util.Utils object RDDConversions { def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = { @@ -123,7 +124,7 @@ private[sql] case class RDDScanExec( } override def simpleString: String = { - s"Scan $nodeName${output.mkString("[", ",", "]")}" + s"Scan $nodeName${Utils.truncatedString(output, "[", ",", "]")}" } } @@ -186,7 +187,8 @@ private[sql] case class RowDataSourceScanExec( key + ": " + StringUtils.abbreviate(value, 100) } - s"$nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" + s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}" + + s"${Utils.truncatedString(metadataEntries, " ", ", ", "")}" } override def inputRDDs(): Seq[RDD[InternalRow]] = { @@ -239,8 +241,8 @@ private[sql] case class BatchedDataSourceScanExec( val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield { key + ": " + StringUtils.abbreviate(value, 100) } - val metadataStr = metadataEntries.mkString(" ", ", ", "") - s"Batched$nodeName${output.mkString("[", ",", "]")}$metadataStr" + val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "") + s"Batched$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr" } override def inputRDDs(): Seq[RDD[InternalRow]] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 560214a65e..a2d45026e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCom import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _} +import org.apache.spark.util.Utils /** * The primary workflow for executing relational queries using Spark. Designed to allow easy @@ -206,8 +207,8 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { } override def toString: String = { - def output = - analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ") + def output = Utils.truncatedString( + analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ") val analyzedPlan = Seq( stringOrError(output), stringOrError(analyzed.treeString(verbose = true)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index b617e26418..caeeba1793 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.types.{DecimalType, StringType, StructType} import org.apache.spark.unsafe.KVIterator +import org.apache.spark.util.Utils /** * Hash-based aggregate operator that can also fallback to sorting when data exceeds memory size. @@ -773,9 +774,9 @@ case class HashAggregateExec( testFallbackStartsAt match { case None => - val keyString = groupingExpressions.mkString("[", ",", "]") - val functionString = allAggregateExpressions.mkString("[", ",", "]") - val outputString = output.mkString("[", ",", "]") + val keyString = Utils.truncatedString(groupingExpressions, "[", ",", "]") + val functionString = Utils.truncatedString(allAggregateExpressions, "[", ",", "]") + val outputString = Utils.truncatedString(output, "[", ",", "]") if (verbose) { s"HashAggregate(key=$keyString, functions=$functionString, output=$outputString)" } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index 41ba9f5b3f..17126519eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, UnspecifiedDistribution} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.util.Utils /** * Sort-based aggregate operator. @@ -110,9 +111,9 @@ case class SortAggregateExec( private def toString(verbose: Boolean): String = { val allAggregateExpressions = aggregateExpressions - val keyString = groupingExpressions.mkString("[", ",", "]") - val functionString = allAggregateExpressions.mkString("[", ",", "]") - val outputString = output.mkString("[", ",", "]") + val keyString = Utils.truncatedString(groupingExpressions, "[", ",", "]") + val functionString = Utils.truncatedString(allAggregateExpressions, "[", ",", "]") + val outputString = Utils.truncatedString(output, "[", ",", "]") if (verbose) { s"SortAggregate(key=$keyString, functions=$functionString, output=$outputString)" } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 0e0748ff32..a418d02983 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.util.Utils /** * Used to link a [[BaseRelation]] in to a logical query plan. @@ -82,5 +83,5 @@ case class LogicalRelation( expectedOutputAttributes, metastoreTableIdentifier).asInstanceOf[this.type] - override def simpleString: String = s"Relation[${output.mkString(",")}] $relation" + override def simpleString: String = s"Relation[${Utils.truncatedString(output, ",")}] $relation" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index b71f3335c9..781c016095 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, LazilyGeneratedOrdering} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.exchange.ShuffleExchange +import org.apache.spark.util.Utils /** @@ -159,8 +160,8 @@ case class TakeOrderedAndProjectExec( override def outputOrdering: Seq[SortOrder] = sortOrder override def simpleString: String = { - val orderByString = sortOrder.mkString("[", ",", "]") - val outputString = output.mkString("[", ",", "]") + val orderByString = Utils.truncatedString(sortOrder, "[", ",", "]") + val outputString = Utils.truncatedString(output, "[", ",", "]") s"TakeOrderedAndProject(limit=$limit, orderBy=$orderByString, output=$outputString)" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index d9800e4afd..954fc33ecc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -336,7 +336,8 @@ class StreamExecution( newData.get(source).map { data => val newPlan = data.logicalPlan assert(output.size == newPlan.output.size, - s"Invalid batch: ${output.mkString(",")} != ${newPlan.output.mkString(",")}") + s"Invalid batch: ${Utils.truncatedString(output, ",")} != " + + s"${Utils.truncatedString(newPlan.output, ",")}") replacements ++= output.zip(newPlan.output) newPlan }.getOrElse { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 4496f41615..77fd043ef7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LeafNode import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils object MemoryStream { protected val currentBlockId = new AtomicInteger(0) @@ -81,7 +82,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } } - override def toString: String = s"MemoryStream[${output.mkString(",")}]" + override def toString: String = s"MemoryStream[${Utils.truncatedString(output, ",")}]" override def getOffset: Option[Offset] = synchronized { if (batches.isEmpty) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala index b4811fe27a..06466e629b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala @@ -155,6 +155,55 @@ many column field r/w: Best/Avg Time(ms) Rate(M/s) Per Ro */ } + ignore("wide shallowly nested struct field read and write") { + val benchmark = new Benchmark( + "wide shallowly nested struct field r/w", scaleFactor) + for (width <- widthsToTest) { + val numRows = scaleFactor / width + var datum: String = "{" + for (i <- 1 to width) { + if (i == 1) { + datum += s""""value_$i": 1""" + } else { + datum += s""", "value_$i": 1""" + } + } + datum += "}" + datum = s"""{"a": {"b": {"c": $datum, "d": $datum}, "e": $datum}}""" + val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache() + df.count() // force caching + addCases(benchmark, df, s"$width wide x $numRows rows", "a.b.c.value_1") + } + benchmark.run() + +/* +Java HotSpot(TM) 64-Bit Server VM 1.7.0_80-b15 on Linux 4.2.0-36-generic +Intel(R) Xeon(R) CPU E5-1650 v3 @ 3.50GHz +wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +1 wide x 100000 rows (read in-mem) 100 / 125 1.0 997.7 1.0X +1 wide x 100000 rows (write in-mem) 130 / 147 0.8 1302.9 0.8X +1 wide x 100000 rows (read parquet) 195 / 228 0.5 1951.4 0.5X +1 wide x 100000 rows (write parquet) 248 / 259 0.4 2479.7 0.4X +10 wide x 10000 rows (read in-mem) 76 / 89 1.3 757.2 1.3X +10 wide x 10000 rows (write in-mem) 90 / 116 1.1 900.0 1.1X +10 wide x 10000 rows (read parquet) 90 / 135 1.1 903.9 1.1X +10 wide x 10000 rows (write parquet) 222 / 240 0.4 2222.8 0.4X +100 wide x 1000 rows (read in-mem) 71 / 91 1.4 710.8 1.4X +100 wide x 1000 rows (write in-mem) 252 / 324 0.4 2522.4 0.4X +100 wide x 1000 rows (read parquet) 310 / 329 0.3 3095.9 0.3X +100 wide x 1000 rows (write parquet) 253 / 267 0.4 2525.7 0.4X +1000 wide x 100 rows (read in-mem) 144 / 160 0.7 1439.5 0.7X +1000 wide x 100 rows (write in-mem) 2055 / 2326 0.0 20553.9 0.0X +1000 wide x 100 rows (read parquet) 750 / 925 0.1 7496.8 0.1X +1000 wide x 100 rows (write parquet) 235 / 317 0.4 2347.5 0.4X +2500 wide x 40 rows (read in-mem) 219 / 227 0.5 2190.9 0.5X +2500 wide x 40 rows (write in-mem) 5177 / 5423 0.0 51773.2 0.0X +2500 wide x 40 rows (read parquet) 1642 / 1714 0.1 16417.7 0.1X +2500 wide x 40 rows (write parquet) 357 / 381 0.3 3568.2 0.3X +*/ + } + ignore("wide struct field read and write") { val benchmark = new Benchmark("wide struct field r/w", scaleFactor) for (width <- widthsToTest) { -- cgit v1.2.3