diff options
author | Josh Rosen <joshrosen@databricks.com> | 2015-08-04 14:42:11 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-08-04 14:42:11 -0700 |
commit | ab8ee1a3b93286a62949569615086ef5030e9fae (patch) | |
tree | 88aa364451320f2a303bbeeb4857bcba57896c84 /sql | |
parent | f4b1ac08a1327e6d0ddc317cdf3997a0f68dec72 (diff) | |
download | spark-ab8ee1a3b93286a62949569615086ef5030e9fae.tar.gz spark-ab8ee1a3b93286a62949569615086ef5030e9fae.tar.bz2 spark-ab8ee1a3b93286a62949569615086ef5030e9fae.zip |
[SPARK-9452] [SQL] Support records larger than page size in UnsafeExternalSorter
This patch extends UnsafeExternalSorter to support records larger than the page size. The basic strategy is the same as in #7762: store large records in their own overflow pages.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #7891 from JoshRosen/large-records-in-sql-sorter and squashes the following commits:
967580b [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter
948c344 [Josh Rosen] Add large records tests for KV sorter.
3c17288 [Josh Rosen] Combine memory and disk cleanup into general cleanupResources() method
380f217 [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter
27eafa0 [Josh Rosen] Fix page size in PackedRecordPointerSuite
a49baef [Josh Rosen] Address initial round of review comments
3edb931 [Josh Rosen] Remove accidentally-committed debug statements.
2b164e2 [Josh Rosen] Support large records in UnsafeExternalSorter.
Diffstat (limited to 'sql')
3 files changed, 143 insertions, 80 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java index 193906d247..a5ae2b9736 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java @@ -114,7 +114,7 @@ final class UnsafeExternalRowSorter { } private void cleanupResources() { - sorter.freeMemory(); + sorter.cleanupResources(); } @VisibleForTesting diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index 312ec8ea0d..86a563df99 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -134,6 +134,10 @@ public final class UnsafeKVExternalSorter { value.getBaseObject(), value.getBaseOffset(), value.getSizeInBytes(), prefix); } + /** + * Returns a sorted iterator. It is the caller's responsibility to call `cleanupResources()` + * after consuming this iterator. + */ public KVSorterIterator sortedIterator() throws IOException { try { final UnsafeSorterIterator underlying = sorter.getSortedIterator(); @@ -158,8 +162,11 @@ public final class UnsafeKVExternalSorter { sorter.closeCurrentPage(); } - private void cleanupResources() { - sorter.freeMemory(); + /** + * Frees this sorter's in-memory data structures and cleans up its spill files. + */ + public void cleanupResources() { + sorter.cleanupResources(); } private static final class KVComparator extends RecordComparator { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala index 601a5a07ad..08156f0e39 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import scala.util.Random import org.apache.spark._ -import org.apache.spark.sql.RandomDataGenerator +import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, RowOrdering, UnsafeProjection} import org.apache.spark.sql.test.TestSQLContext @@ -46,6 +46,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite { testKVSorter(keySchema, valueSchema, spill = i > 3) } + /** * Create a test case using randomly generated data for the given key and value schema. * @@ -60,96 +61,151 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite { * If spill is set to true, the sorter will spill probabilistically roughly every 100 records. */ private def testKVSorter(keySchema: StructType, valueSchema: StructType, spill: Boolean): Unit = { + // Create the data converters + val kExternalConverter = CatalystTypeConverters.createToCatalystConverter(keySchema) + val vExternalConverter = CatalystTypeConverters.createToCatalystConverter(valueSchema) + val kConverter = UnsafeProjection.create(keySchema) + val vConverter = UnsafeProjection.create(valueSchema) + + val keyDataGen = RandomDataGenerator.forType(keySchema, nullable = false).get + val valueDataGen = RandomDataGenerator.forType(valueSchema, nullable = false).get + + val inputData = Seq.fill(1024) { + val k = kConverter(kExternalConverter.apply(keyDataGen.apply()).asInstanceOf[InternalRow]) + val v = vConverter(vExternalConverter.apply(valueDataGen.apply()).asInstanceOf[InternalRow]) + (k.asInstanceOf[InternalRow].copy(), v.asInstanceOf[InternalRow].copy()) + } val keySchemaStr = keySchema.map(_.dataType.simpleString).mkString("[", ",", "]") val valueSchemaStr = valueSchema.map(_.dataType.simpleString).mkString("[", ",", "]") test(s"kv sorting key schema $keySchemaStr and value schema $valueSchemaStr") { - // Calling this make sure we have block manager and everything else setup. - TestSQLContext - - val taskMemMgr = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP)) - val shuffleMemMgr = new TestShuffleMemoryManager - TaskContext.setTaskContext(new TaskContextImpl( - stageId = 0, - partitionId = 0, - taskAttemptId = 98456, - attemptNumber = 0, - taskMemoryManager = taskMemMgr, - metricsSystem = null, - internalAccumulators = Seq.empty)) - - // Create the data converters - val kExternalConverter = CatalystTypeConverters.createToCatalystConverter(keySchema) - val vExternalConverter = CatalystTypeConverters.createToCatalystConverter(valueSchema) - val kConverter = UnsafeProjection.create(keySchema) - val vConverter = UnsafeProjection.create(valueSchema) - - val keyDataGen = RandomDataGenerator.forType(keySchema, nullable = false).get - val valueDataGen = RandomDataGenerator.forType(valueSchema, nullable = false).get - - val input = Seq.fill(1024) { - val k = kConverter(kExternalConverter.apply(keyDataGen.apply()).asInstanceOf[InternalRow]) - val v = vConverter(vExternalConverter.apply(valueDataGen.apply()).asInstanceOf[InternalRow]) - (k.asInstanceOf[InternalRow].copy(), v.asInstanceOf[InternalRow].copy()) - } - - val sorter = new UnsafeKVExternalSorter( - keySchema, valueSchema, SparkEnv.get.blockManager, shuffleMemMgr, 16 * 1024 * 1024) - - // Insert generated keys and values into the sorter - input.foreach { case (k, v) => - sorter.insertKV(k.asInstanceOf[UnsafeRow], v.asInstanceOf[UnsafeRow]) - // 1% chance we will spill - if (rand.nextDouble() < 0.01 && spill) { - shuffleMemMgr.markAsOutOfMemory() - sorter.closeCurrentPage() - } - } + testKVSorter( + keySchema, + valueSchema, + inputData, + pageSize = 16 * 1024 * 1024, + spill + ) + } + } - // Collect the sorted output - val out = new scala.collection.mutable.ArrayBuffer[(InternalRow, InternalRow)] - val iter = sorter.sortedIterator() - while (iter.next()) { - out += Tuple2(iter.getKey.copy(), iter.getValue.copy()) + /** + * Create a test case using the given input data for the given key and value schema. + * + * The approach works as follows: + * + * - Create input by randomly generating data based on the given schema + * - Run [[UnsafeKVExternalSorter]] on the input data + * - Collect the output from the sorter, and make sure the keys are sorted in ascending order + * - Sort the input by both key and value, and sort the sorter output also by both key and value. + * Compare the sorted input and sorted output together to make sure all the key/values match. + * + * If spill is set to true, the sorter will spill probabilistically roughly every 100 records. + */ + private def testKVSorter( + keySchema: StructType, + valueSchema: StructType, + inputData: Seq[(InternalRow, InternalRow)], + pageSize: Long, + spill: Boolean): Unit = { + // Calling this make sure we have block manager and everything else setup. + TestSQLContext + + val taskMemMgr = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP)) + val shuffleMemMgr = new TestShuffleMemoryManager + TaskContext.setTaskContext(new TaskContextImpl( + stageId = 0, + partitionId = 0, + taskAttemptId = 98456, + attemptNumber = 0, + taskMemoryManager = taskMemMgr, + metricsSystem = null, + internalAccumulators = Seq.empty)) + + val sorter = new UnsafeKVExternalSorter( + keySchema, valueSchema, SparkEnv.get.blockManager, shuffleMemMgr, pageSize) + + // Insert the keys and values into the sorter + inputData.foreach { case (k, v) => + sorter.insertKV(k.asInstanceOf[UnsafeRow], v.asInstanceOf[UnsafeRow]) + // 1% chance we will spill + if (rand.nextDouble() < 0.01 && spill) { + shuffleMemMgr.markAsOutOfMemory() + sorter.closeCurrentPage() } + } - val keyOrdering = RowOrdering.forSchema(keySchema.map(_.dataType)) - val valueOrdering = RowOrdering.forSchema(valueSchema.map(_.dataType)) - val kvOrdering = new Ordering[(InternalRow, InternalRow)] { - override def compare(x: (InternalRow, InternalRow), y: (InternalRow, InternalRow)): Int = { - keyOrdering.compare(x._1, y._1) match { - case 0 => valueOrdering.compare(x._2, y._2) - case cmp => cmp - } + // Collect the sorted output + val out = new scala.collection.mutable.ArrayBuffer[(InternalRow, InternalRow)] + val iter = sorter.sortedIterator() + while (iter.next()) { + out += Tuple2(iter.getKey.copy(), iter.getValue.copy()) + } + sorter.cleanupResources() + + val keyOrdering = RowOrdering.forSchema(keySchema.map(_.dataType)) + val valueOrdering = RowOrdering.forSchema(valueSchema.map(_.dataType)) + val kvOrdering = new Ordering[(InternalRow, InternalRow)] { + override def compare(x: (InternalRow, InternalRow), y: (InternalRow, InternalRow)): Int = { + keyOrdering.compare(x._1, y._1) match { + case 0 => valueOrdering.compare(x._2, y._2) + case cmp => cmp } } + } - // Testing to make sure output from the sorter is sorted by key - var prevK: InternalRow = null - out.zipWithIndex.foreach { case ((k, v), i) => - if (prevK != null) { - assert(keyOrdering.compare(prevK, k) <= 0, - s""" - |key is not in sorted order: - |previous key: $prevK - |current key : $k - """.stripMargin) - } - prevK = k + // Testing to make sure output from the sorter is sorted by key + var prevK: InternalRow = null + out.zipWithIndex.foreach { case ((k, v), i) => + if (prevK != null) { + assert(keyOrdering.compare(prevK, k) <= 0, + s""" + |key is not in sorted order: + |previous key: $prevK + |current key : $k + """.stripMargin) } + prevK = k + } - // Testing to make sure the key/value in output matches input - assert(out.sorted(kvOrdering) === input.sorted(kvOrdering)) + // Testing to make sure the key/value in output matches input + assert(out.sorted(kvOrdering) === inputData.sorted(kvOrdering)) - // Make sure there is no memory leak - val leakedUnsafeMemory: Long = taskMemMgr.cleanUpAllAllocatedMemory - if (shuffleMemMgr != null) { - val leakedShuffleMemory: Long = shuffleMemMgr.getMemoryConsumptionForThisTask() - assert(0L === leakedShuffleMemory) - } - assert(0 === leakedUnsafeMemory) - TaskContext.unset() + // Make sure there is no memory leak + val leakedUnsafeMemory: Long = taskMemMgr.cleanUpAllAllocatedMemory + if (shuffleMemMgr != null) { + val leakedShuffleMemory: Long = shuffleMemMgr.getMemoryConsumptionForThisTask() + assert(0L === leakedShuffleMemory) } + assert(0 === leakedUnsafeMemory) + TaskContext.unset() + } + + test("kv sorting with records that exceed page size") { + val pageSize = 128 + + val schema = StructType(StructField("b", BinaryType) :: Nil) + val externalConverter = CatalystTypeConverters.createToCatalystConverter(schema) + val converter = UnsafeProjection.create(schema) + + val rand = new Random() + val inputData = Seq.fill(1024) { + val kBytes = new Array[Byte](rand.nextInt(pageSize)) + val vBytes = new Array[Byte](rand.nextInt(pageSize)) + rand.nextBytes(kBytes) + rand.nextBytes(vBytes) + val k = converter(externalConverter.apply(Row(kBytes)).asInstanceOf[InternalRow]) + val v = converter(externalConverter.apply(Row(vBytes)).asInstanceOf[InternalRow]) + (k.asInstanceOf[InternalRow].copy(), v.asInstanceOf[InternalRow].copy()) + } + + testKVSorter( + schema, + schema, + inputData, + pageSize, + spill = true + ) } } |