aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-08-04 14:42:11 -0700
committerReynold Xin <rxin@databricks.com>2015-08-04 14:42:11 -0700
commitab8ee1a3b93286a62949569615086ef5030e9fae (patch)
tree88aa364451320f2a303bbeeb4857bcba57896c84 /sql
parentf4b1ac08a1327e6d0ddc317cdf3997a0f68dec72 (diff)
downloadspark-ab8ee1a3b93286a62949569615086ef5030e9fae.tar.gz
spark-ab8ee1a3b93286a62949569615086ef5030e9fae.tar.bz2
spark-ab8ee1a3b93286a62949569615086ef5030e9fae.zip
[SPARK-9452] [SQL] Support records larger than page size in UnsafeExternalSorter
This patch extends UnsafeExternalSorter to support records larger than the page size. The basic strategy is the same as in #7762: store large records in their own overflow pages. Author: Josh Rosen <joshrosen@databricks.com> Closes #7891 from JoshRosen/large-records-in-sql-sorter and squashes the following commits: 967580b [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter 948c344 [Josh Rosen] Add large records tests for KV sorter. 3c17288 [Josh Rosen] Combine memory and disk cleanup into general cleanupResources() method 380f217 [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter 27eafa0 [Josh Rosen] Fix page size in PackedRecordPointerSuite a49baef [Josh Rosen] Address initial round of review comments 3edb931 [Josh Rosen] Remove accidentally-committed debug statements. 2b164e2 [Josh Rosen] Support large records in UnsafeExternalSorter.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java2
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala210
3 files changed, 143 insertions, 80 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index 193906d247..a5ae2b9736 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -114,7 +114,7 @@ final class UnsafeExternalRowSorter {
}
private void cleanupResources() {
- sorter.freeMemory();
+ sorter.cleanupResources();
}
@VisibleForTesting
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index 312ec8ea0d..86a563df99 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -134,6 +134,10 @@ public final class UnsafeKVExternalSorter {
value.getBaseObject(), value.getBaseOffset(), value.getSizeInBytes(), prefix);
}
+ /**
+ * Returns a sorted iterator. It is the caller's responsibility to call `cleanupResources()`
+ * after consuming this iterator.
+ */
public KVSorterIterator sortedIterator() throws IOException {
try {
final UnsafeSorterIterator underlying = sorter.getSortedIterator();
@@ -158,8 +162,11 @@ public final class UnsafeKVExternalSorter {
sorter.closeCurrentPage();
}
- private void cleanupResources() {
- sorter.freeMemory();
+ /**
+ * Frees this sorter's in-memory data structures and cleans up its spill files.
+ */
+ public void cleanupResources() {
+ sorter.cleanupResources();
}
private static final class KVComparator extends RecordComparator {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 601a5a07ad..08156f0e39 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import scala.util.Random
import org.apache.spark._
-import org.apache.spark.sql.RandomDataGenerator
+import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, RowOrdering, UnsafeProjection}
import org.apache.spark.sql.test.TestSQLContext
@@ -46,6 +46,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite {
testKVSorter(keySchema, valueSchema, spill = i > 3)
}
+
/**
* Create a test case using randomly generated data for the given key and value schema.
*
@@ -60,96 +61,151 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite {
* If spill is set to true, the sorter will spill probabilistically roughly every 100 records.
*/
private def testKVSorter(keySchema: StructType, valueSchema: StructType, spill: Boolean): Unit = {
+ // Create the data converters
+ val kExternalConverter = CatalystTypeConverters.createToCatalystConverter(keySchema)
+ val vExternalConverter = CatalystTypeConverters.createToCatalystConverter(valueSchema)
+ val kConverter = UnsafeProjection.create(keySchema)
+ val vConverter = UnsafeProjection.create(valueSchema)
+
+ val keyDataGen = RandomDataGenerator.forType(keySchema, nullable = false).get
+ val valueDataGen = RandomDataGenerator.forType(valueSchema, nullable = false).get
+
+ val inputData = Seq.fill(1024) {
+ val k = kConverter(kExternalConverter.apply(keyDataGen.apply()).asInstanceOf[InternalRow])
+ val v = vConverter(vExternalConverter.apply(valueDataGen.apply()).asInstanceOf[InternalRow])
+ (k.asInstanceOf[InternalRow].copy(), v.asInstanceOf[InternalRow].copy())
+ }
val keySchemaStr = keySchema.map(_.dataType.simpleString).mkString("[", ",", "]")
val valueSchemaStr = valueSchema.map(_.dataType.simpleString).mkString("[", ",", "]")
test(s"kv sorting key schema $keySchemaStr and value schema $valueSchemaStr") {
- // Calling this make sure we have block manager and everything else setup.
- TestSQLContext
-
- val taskMemMgr = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP))
- val shuffleMemMgr = new TestShuffleMemoryManager
- TaskContext.setTaskContext(new TaskContextImpl(
- stageId = 0,
- partitionId = 0,
- taskAttemptId = 98456,
- attemptNumber = 0,
- taskMemoryManager = taskMemMgr,
- metricsSystem = null,
- internalAccumulators = Seq.empty))
-
- // Create the data converters
- val kExternalConverter = CatalystTypeConverters.createToCatalystConverter(keySchema)
- val vExternalConverter = CatalystTypeConverters.createToCatalystConverter(valueSchema)
- val kConverter = UnsafeProjection.create(keySchema)
- val vConverter = UnsafeProjection.create(valueSchema)
-
- val keyDataGen = RandomDataGenerator.forType(keySchema, nullable = false).get
- val valueDataGen = RandomDataGenerator.forType(valueSchema, nullable = false).get
-
- val input = Seq.fill(1024) {
- val k = kConverter(kExternalConverter.apply(keyDataGen.apply()).asInstanceOf[InternalRow])
- val v = vConverter(vExternalConverter.apply(valueDataGen.apply()).asInstanceOf[InternalRow])
- (k.asInstanceOf[InternalRow].copy(), v.asInstanceOf[InternalRow].copy())
- }
-
- val sorter = new UnsafeKVExternalSorter(
- keySchema, valueSchema, SparkEnv.get.blockManager, shuffleMemMgr, 16 * 1024 * 1024)
-
- // Insert generated keys and values into the sorter
- input.foreach { case (k, v) =>
- sorter.insertKV(k.asInstanceOf[UnsafeRow], v.asInstanceOf[UnsafeRow])
- // 1% chance we will spill
- if (rand.nextDouble() < 0.01 && spill) {
- shuffleMemMgr.markAsOutOfMemory()
- sorter.closeCurrentPage()
- }
- }
+ testKVSorter(
+ keySchema,
+ valueSchema,
+ inputData,
+ pageSize = 16 * 1024 * 1024,
+ spill
+ )
+ }
+ }
- // Collect the sorted output
- val out = new scala.collection.mutable.ArrayBuffer[(InternalRow, InternalRow)]
- val iter = sorter.sortedIterator()
- while (iter.next()) {
- out += Tuple2(iter.getKey.copy(), iter.getValue.copy())
+ /**
+ * Create a test case using the given input data for the given key and value schema.
+ *
+ * The approach works as follows:
+ *
+ * - Create input by randomly generating data based on the given schema
+ * - Run [[UnsafeKVExternalSorter]] on the input data
+ * - Collect the output from the sorter, and make sure the keys are sorted in ascending order
+ * - Sort the input by both key and value, and sort the sorter output also by both key and value.
+ * Compare the sorted input and sorted output together to make sure all the key/values match.
+ *
+ * If spill is set to true, the sorter will spill probabilistically roughly every 100 records.
+ */
+ private def testKVSorter(
+ keySchema: StructType,
+ valueSchema: StructType,
+ inputData: Seq[(InternalRow, InternalRow)],
+ pageSize: Long,
+ spill: Boolean): Unit = {
+ // Calling this make sure we have block manager and everything else setup.
+ TestSQLContext
+
+ val taskMemMgr = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP))
+ val shuffleMemMgr = new TestShuffleMemoryManager
+ TaskContext.setTaskContext(new TaskContextImpl(
+ stageId = 0,
+ partitionId = 0,
+ taskAttemptId = 98456,
+ attemptNumber = 0,
+ taskMemoryManager = taskMemMgr,
+ metricsSystem = null,
+ internalAccumulators = Seq.empty))
+
+ val sorter = new UnsafeKVExternalSorter(
+ keySchema, valueSchema, SparkEnv.get.blockManager, shuffleMemMgr, pageSize)
+
+ // Insert the keys and values into the sorter
+ inputData.foreach { case (k, v) =>
+ sorter.insertKV(k.asInstanceOf[UnsafeRow], v.asInstanceOf[UnsafeRow])
+ // 1% chance we will spill
+ if (rand.nextDouble() < 0.01 && spill) {
+ shuffleMemMgr.markAsOutOfMemory()
+ sorter.closeCurrentPage()
}
+ }
- val keyOrdering = RowOrdering.forSchema(keySchema.map(_.dataType))
- val valueOrdering = RowOrdering.forSchema(valueSchema.map(_.dataType))
- val kvOrdering = new Ordering[(InternalRow, InternalRow)] {
- override def compare(x: (InternalRow, InternalRow), y: (InternalRow, InternalRow)): Int = {
- keyOrdering.compare(x._1, y._1) match {
- case 0 => valueOrdering.compare(x._2, y._2)
- case cmp => cmp
- }
+ // Collect the sorted output
+ val out = new scala.collection.mutable.ArrayBuffer[(InternalRow, InternalRow)]
+ val iter = sorter.sortedIterator()
+ while (iter.next()) {
+ out += Tuple2(iter.getKey.copy(), iter.getValue.copy())
+ }
+ sorter.cleanupResources()
+
+ val keyOrdering = RowOrdering.forSchema(keySchema.map(_.dataType))
+ val valueOrdering = RowOrdering.forSchema(valueSchema.map(_.dataType))
+ val kvOrdering = new Ordering[(InternalRow, InternalRow)] {
+ override def compare(x: (InternalRow, InternalRow), y: (InternalRow, InternalRow)): Int = {
+ keyOrdering.compare(x._1, y._1) match {
+ case 0 => valueOrdering.compare(x._2, y._2)
+ case cmp => cmp
}
}
+ }
- // Testing to make sure output from the sorter is sorted by key
- var prevK: InternalRow = null
- out.zipWithIndex.foreach { case ((k, v), i) =>
- if (prevK != null) {
- assert(keyOrdering.compare(prevK, k) <= 0,
- s"""
- |key is not in sorted order:
- |previous key: $prevK
- |current key : $k
- """.stripMargin)
- }
- prevK = k
+ // Testing to make sure output from the sorter is sorted by key
+ var prevK: InternalRow = null
+ out.zipWithIndex.foreach { case ((k, v), i) =>
+ if (prevK != null) {
+ assert(keyOrdering.compare(prevK, k) <= 0,
+ s"""
+ |key is not in sorted order:
+ |previous key: $prevK
+ |current key : $k
+ """.stripMargin)
}
+ prevK = k
+ }
- // Testing to make sure the key/value in output matches input
- assert(out.sorted(kvOrdering) === input.sorted(kvOrdering))
+ // Testing to make sure the key/value in output matches input
+ assert(out.sorted(kvOrdering) === inputData.sorted(kvOrdering))
- // Make sure there is no memory leak
- val leakedUnsafeMemory: Long = taskMemMgr.cleanUpAllAllocatedMemory
- if (shuffleMemMgr != null) {
- val leakedShuffleMemory: Long = shuffleMemMgr.getMemoryConsumptionForThisTask()
- assert(0L === leakedShuffleMemory)
- }
- assert(0 === leakedUnsafeMemory)
- TaskContext.unset()
+ // Make sure there is no memory leak
+ val leakedUnsafeMemory: Long = taskMemMgr.cleanUpAllAllocatedMemory
+ if (shuffleMemMgr != null) {
+ val leakedShuffleMemory: Long = shuffleMemMgr.getMemoryConsumptionForThisTask()
+ assert(0L === leakedShuffleMemory)
}
+ assert(0 === leakedUnsafeMemory)
+ TaskContext.unset()
+ }
+
+ test("kv sorting with records that exceed page size") {
+ val pageSize = 128
+
+ val schema = StructType(StructField("b", BinaryType) :: Nil)
+ val externalConverter = CatalystTypeConverters.createToCatalystConverter(schema)
+ val converter = UnsafeProjection.create(schema)
+
+ val rand = new Random()
+ val inputData = Seq.fill(1024) {
+ val kBytes = new Array[Byte](rand.nextInt(pageSize))
+ val vBytes = new Array[Byte](rand.nextInt(pageSize))
+ rand.nextBytes(kBytes)
+ rand.nextBytes(vBytes)
+ val k = converter(externalConverter.apply(Row(kBytes)).asInstanceOf[InternalRow])
+ val v = converter(externalConverter.apply(Row(vBytes)).asInstanceOf[InternalRow])
+ (k.asInstanceOf[InternalRow].copy(), v.asInstanceOf[InternalRow].copy())
+ }
+
+ testKVSorter(
+ schema,
+ schema,
+ inputData,
+ pageSize,
+ spill = true
+ )
}
}