aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-08-03 14:22:07 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-08-03 14:22:07 -0700
commit702aa9d7fb16c98a50e046edfd76b8a7861d0391 (patch)
tree42f3a10ebd1086fda3d7b185b73cb452623964ca /sql
parente4765a46833baff1dd7465c4cf50e947de7e8f21 (diff)
downloadspark-702aa9d7fb16c98a50e046edfd76b8a7861d0391.tar.gz
spark-702aa9d7fb16c98a50e046edfd76b8a7861d0391.tar.bz2
spark-702aa9d7fb16c98a50e046edfd76b8a7861d0391.zip
[SPARK-8735] [SQL] Expose memory usage for shuffles, joins and aggregations
This patch exposes the memory used by internal data structures on the SparkUI. This tracks memory used by all spilling operations and SQL operators backed by Tungsten, e.g. `BroadcastHashJoin`, `ExternalSort`, `GeneratedAggregate` etc. The metric exposed is "peak execution memory", which broadly refers to the peak in-memory sizes of each of these data structure. A separate patch will extend this by linking the new information to the SQL operators themselves. <img width="950" alt="screen shot 2015-07-29 at 7 43 17 pm" src="https://cloud.githubusercontent.com/assets/2133137/8974776/b90fc980-362a-11e5-9e2b-842da75b1641.png"> <img width="802" alt="screen shot 2015-07-29 at 7 43 05 pm" src="https://cloud.githubusercontent.com/assets/2133137/8974777/baa76492-362a-11e5-9b77-e364a6a6b64e.png"> <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/7770) <!-- Reviewable:end --> Author: Andrew Or <andrew@databricks.com> Closes #7770 from andrewor14/expose-memory-metrics and squashes the following commits: 9abecb9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics f5b0d68 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics d7df332 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics 8eefbc5 [Andrew Or] Fix non-failing tests 9de2a12 [Andrew Or] Fix tests due to another logical merge conflict 876bfa4 [Andrew Or] Fix failing test after logical merge conflict 361a359 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics 40b4802 [Andrew Or] Fix style? d0fef87 [Andrew Or] Fix tests? b3b92f6 [Andrew Or] Address comments 0625d73 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics c00a197 [Andrew Or] Fix potential NPEs 10da1cd [Andrew Or] Fix compile 17f4c2d [Andrew Or] Fix compile? a87b4d0 [Andrew Or] Fix compile? d70874d [Andrew Or] Fix test compile + address comments 2840b7d [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics 6aa2f7a [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics b889a68 [Andrew Or] Minor changes: comments, spacing, style 663a303 [Andrew Or] UnsafeShuffleWriter: update peak memory before close d090a94 [Andrew Or] Fix style 2480d84 [Andrew Or] Expand test coverage 5f1235b [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics 1ecf678 [Andrew Or] Minor changes: comments, style, unused imports 0b6926c [Andrew Or] Oops 111a05e [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics a7a39a5 [Andrew Or] Strengthen presence check for accumulator a919eb7 [Andrew Or] Add tests for unsafe shuffle writer 23c845d [Andrew Or] Add tests for SQL operators a757550 [Andrew Or] Address comments b5c51c1 [Andrew Or] Re-enable test in JavaAPISuite 5107691 [Andrew Or] Add tests for internal accumulators 59231e4 [Andrew Or] Fix tests 9528d09 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics 5b5e6f3 [Andrew Or] Add peak execution memory to summary table + tooltip 92b4b6b [Andrew Or] Display peak execution memory on the UI eee5437 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics d9b9015 [Andrew Or] Track execution memory in unsafe shuffles 770ee54 [Andrew Or] Track execution memory in broadcast joins 9c605a4 [Andrew Or] Track execution memory in GeneratedAggregate 9e824f2 [Andrew Or] Add back execution memory tracking for *ExternalSort 4ef4cb1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into expose-memory-metrics e6c3e2f [Andrew Or] Move internal accumulators creation to Stage a417592 [Andrew Or] Expose memory metrics in UnsafeExternalSorter 3c4f042 [Andrew Or] Track memory usage in ExternalAppendOnlyMap / ExternalSorter bd7ab3f [Andrew Or] Add internal accumulators to TaskContext
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java7
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala60
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala94
13 files changed, 231 insertions, 29 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index 5e4c6232c9..193906d247 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -106,6 +106,13 @@ final class UnsafeExternalRowSorter {
sorter.spill();
}
+ /**
+ * Return the peak memory used so far, in bytes.
+ */
+ public long getPeakMemoryUsage() {
+ return sorter.getPeakMemoryUsedBytes();
+ }
+
private void cleanupResources() {
sorter.freeMemory();
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
index 9e2c9334a7..43d06ce9bd 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
@@ -209,6 +209,14 @@ public final class UnsafeFixedWidthAggregationMap {
}
/**
+ * The memory used by this map's managed structures, in bytes.
+ * Note that this is also the peak memory used by this map, since the map is append-only.
+ */
+ public long getMemoryUsage() {
+ return map.getTotalMemoryConsumption();
+ }
+
+ /**
* Free the memory associated with this map. This is idempotent and can be called multiple times.
*/
public void free() {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
index cd87b8deba..bf4905dc1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
import java.io.IOException
-import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.{InternalAccumulator, SparkEnv, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -263,11 +263,12 @@ case class GeneratedAggregate(
assert(iter.hasNext, "There should be at least one row for this path")
log.info("Using Unsafe-based aggregator")
val pageSizeBytes = SparkEnv.get.conf.getSizeAsBytes("spark.buffer.pageSize", "64m")
+ val taskContext = TaskContext.get()
val aggregationMap = new UnsafeFixedWidthAggregationMap(
newAggregationBuffer(EmptyRow),
aggregationBufferSchema,
groupKeySchema,
- TaskContext.get.taskMemoryManager(),
+ taskContext.taskMemoryManager(),
SparkEnv.get.shuffleMemoryManager,
1024 * 16, // initial capacity
pageSizeBytes,
@@ -284,6 +285,10 @@ case class GeneratedAggregate(
updateProjection.target(aggregationBuffer)(joinedRow(aggregationBuffer, currentRow))
}
+ // Record memory used in the process
+ taskContext.internalMetricsToAccumulators(
+ InternalAccumulator.PEAK_EXECUTION_MEMORY).add(aggregationMap.getMemoryUsage)
+
new Iterator[InternalRow] {
private[this] val mapIterator = aggregationMap.iterator()
private[this] val resultProjection = resultProjectionBuilder()
@@ -300,7 +305,7 @@ case class GeneratedAggregate(
} else {
// This is the last element in the iterator, so let's free the buffer. Before we do,
// though, we need to make a defensive copy of the result so that we don't return an
- // object that might contain dangling pointers to the freed memory
+ // object that might contain dangling pointers to the freed memory.
val resultCopy = result.copy()
aggregationMap.free()
resultCopy
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index 624efc1b1d..e73e2523a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.joins
import scala.concurrent._
import scala.concurrent.duration._
+import org.apache.spark.{InternalAccumulator, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -70,7 +71,14 @@ case class BroadcastHashJoin(
val broadcastRelation = Await.result(broadcastFuture, timeout)
streamedPlan.execute().mapPartitions { streamedIter =>
- hashJoin(streamedIter, broadcastRelation.value)
+ val hashedRelation = broadcastRelation.value
+ hashedRelation match {
+ case unsafe: UnsafeHashedRelation =>
+ TaskContext.get().internalMetricsToAccumulators(
+ InternalAccumulator.PEAK_EXECUTION_MEMORY).add(unsafe.getUnsafeSize)
+ case _ =>
+ }
+ hashJoin(streamedIter, hashedRelation)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
index 309716a0ef..c35e439cc9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.joins
import scala.concurrent._
import scala.concurrent.duration._
+import org.apache.spark.{InternalAccumulator, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -75,6 +76,13 @@ case class BroadcastHashOuterJoin(
val hashTable = broadcastRelation.value
val keyGenerator = streamedKeyGenerator
+ hashTable match {
+ case unsafe: UnsafeHashedRelation =>
+ TaskContext.get().internalMetricsToAccumulators(
+ InternalAccumulator.PEAK_EXECUTION_MEMORY).add(unsafe.getUnsafeSize)
+ case _ =>
+ }
+
joinType match {
case LeftOuter =>
streamedIter.flatMap(currentRow => {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
index a60593911f..5bd06fbdca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.joins
+import org.apache.spark.{InternalAccumulator, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -51,7 +52,14 @@ case class BroadcastLeftSemiJoinHash(
val broadcastedRelation = sparkContext.broadcast(hashRelation)
left.execute().mapPartitions { streamIter =>
- hashSemiJoin(streamIter, broadcastedRelation.value)
+ val hashedRelation = broadcastedRelation.value
+ hashedRelation match {
+ case unsafe: UnsafeHashedRelation =>
+ TaskContext.get().internalMetricsToAccumulators(
+ InternalAccumulator.PEAK_EXECUTION_MEMORY).add(unsafe.getUnsafeSize)
+ case _ =>
+ }
+ hashSemiJoin(streamIter, hashedRelation)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index cc8bbfd2f8..58b4236f7b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -183,8 +183,27 @@ private[joins] final class UnsafeHashedRelation(
private[joins] def this() = this(null) // Needed for serialization
// Use BytesToBytesMap in executor for better performance (it's created when deserialization)
+ // This is used in broadcast joins and distributed mode only
@transient private[this] var binaryMap: BytesToBytesMap = _
+ /**
+ * Return the size of the unsafe map on the executors.
+ *
+ * For broadcast joins, this hashed relation is bigger on the driver because it is
+ * represented as a Java hash map there. While serializing the map to the executors,
+ * however, we rehash the contents in a binary map to reduce the memory footprint on
+ * the executors.
+ *
+ * For non-broadcast joins or in local mode, return 0.
+ */
+ def getUnsafeSize: Long = {
+ if (binaryMap != null) {
+ binaryMap.getTotalMemoryConsumption
+ } else {
+ 0
+ }
+ }
+
override def get(key: InternalRow): Seq[InternalRow] = {
val unsafeKey = key.asInstanceOf[UnsafeRow]
@@ -214,7 +233,7 @@ private[joins] final class UnsafeHashedRelation(
}
} else {
- // Use the JavaHashMap in Local mode or ShuffleHashJoin
+ // Use the Java HashMap in local mode or for non-broadcast joins (e.g. ShuffleHashJoin)
hashTable.get(unsafeKey)
}
}
@@ -316,6 +335,7 @@ private[joins] object UnsafeHashedRelation {
keyGenerator: UnsafeProjection,
sizeEstimate: Int): HashedRelation = {
+ // Use a Java hash table here because unsafe maps expect fixed size records
val hashTable = new JavaHashMap[UnsafeRow, CompactBuffer[UnsafeRow]](sizeEstimate)
// Create a mapping of buildKeys -> rows
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
index 92cf328c76..3192b6ebe9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution
+import org.apache.spark.{InternalAccumulator, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
@@ -76,6 +77,11 @@ case class ExternalSort(
val sorter = new ExternalSorter[InternalRow, Null, InternalRow](ordering = Some(ordering))
sorter.insertAll(iterator.map(r => (r.copy(), null)))
val baseIterator = sorter.iterator.map(_._1)
+ val context = TaskContext.get()
+ context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
+ context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
+ context.internalMetricsToAccumulators(
+ InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.peakMemoryUsedBytes)
// TODO(marmbrus): The complex type signature below thwarts inference for no reason.
CompletionIterator[InternalRow, Iterator[InternalRow]](baseIterator, sorter.stop())
}, preservesPartitioning = true)
@@ -137,7 +143,11 @@ case class TungstenSort(
if (testSpillFrequency > 0) {
sorter.setTestSpillFrequency(testSpillFrequency)
}
- sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
+ val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
+ val taskContext = TaskContext.get()
+ taskContext.internalMetricsToAccumulators(
+ InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.getPeakMemoryUsage)
+ sortedIterator
}, preservesPartitioning = true)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index f1abae0720..29dfcf2575 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -21,6 +21,7 @@ import java.sql.Timestamp
import org.scalatest.BeforeAndAfterAll
+import org.apache.spark.AccumulatorSuite
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.DefaultParserDialect
import org.apache.spark.sql.catalyst.errors.DialectException
@@ -258,6 +259,23 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
}
}
+ private def testCodeGen(sqlText: String, expectedResults: Seq[Row]): Unit = {
+ val df = sql(sqlText)
+ // First, check if we have GeneratedAggregate.
+ val hasGeneratedAgg = df.queryExecution.executedPlan
+ .collect { case _: GeneratedAggregate | _: aggregate.Aggregate => true }
+ .nonEmpty
+ if (!hasGeneratedAgg) {
+ fail(
+ s"""
+ |Codegen is enabled, but query $sqlText does not have GeneratedAggregate in the plan.
+ |${df.queryExecution.simpleString}
+ """.stripMargin)
+ }
+ // Then, check results.
+ checkAnswer(df, expectedResults)
+ }
+
test("aggregation with codegen") {
val originalValue = sqlContext.conf.codegenEnabled
sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true)
@@ -267,26 +285,6 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
.unionAll(sqlContext.table("testData"))
.registerTempTable("testData3x")
- def testCodeGen(sqlText: String, expectedResults: Seq[Row]): Unit = {
- val df = sql(sqlText)
- // First, check if we have GeneratedAggregate.
- var hasGeneratedAgg = false
- df.queryExecution.executedPlan.foreach {
- case generatedAgg: GeneratedAggregate => hasGeneratedAgg = true
- case newAggregate: aggregate.Aggregate => hasGeneratedAgg = true
- case _ =>
- }
- if (!hasGeneratedAgg) {
- fail(
- s"""
- |Codegen is enabled, but query $sqlText does not have GeneratedAggregate in the plan.
- |${df.queryExecution.simpleString}
- """.stripMargin)
- }
- // Then, check results.
- checkAnswer(df, expectedResults)
- }
-
try {
// Just to group rows.
testCodeGen(
@@ -1605,6 +1603,28 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
Row(new CalendarInterval(-(12 * 3 - 3), -(7L * MICROS_PER_WEEK + 123))))
}
+ test("aggregation with codegen updates peak execution memory") {
+ withSQLConf(
+ (SQLConf.CODEGEN_ENABLED.key, "true"),
+ (SQLConf.USE_SQL_AGGREGATE2.key, "false")) {
+ val sc = sqlContext.sparkContext
+ AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "aggregation with codegen") {
+ testCodeGen(
+ "SELECT key, count(value) FROM testData GROUP BY key",
+ (1 to 100).map(i => Row(i, 1)))
+ }
+ }
+ }
+
+ test("external sorting updates peak execution memory") {
+ withSQLConf((SQLConf.EXTERNAL_SORT.key, "true")) {
+ val sc = sqlContext.sparkContext
+ AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "external sort") {
+ sortTest()
+ }
+ }
+ }
+
test("SPARK-9511: error with table starting with number") {
val df = sqlContext.sparkContext.parallelize(1 to 10).map(i => (i, i.toString))
.toDF("num", "str")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
index c794984851..88bce0e319 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TungstenSortSuite.scala
@@ -21,6 +21,7 @@ import scala.util.Random
import org.scalatest.BeforeAndAfterAll
+import org.apache.spark.AccumulatorSuite
import org.apache.spark.sql.{RandomDataGenerator, Row, SQLConf}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.test.TestSQLContext
@@ -59,6 +60,17 @@ class TungstenSortSuite extends SparkPlanTest with BeforeAndAfterAll {
)
}
+ test("sorting updates peak execution memory") {
+ val sc = TestSQLContext.sparkContext
+ AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "unsafe external sort") {
+ checkThatPlansAgree(
+ (1 to 100).map(v => Tuple1(v)).toDF("a"),
+ (child: SparkPlan) => TungstenSort('a.asc :: Nil, true, child),
+ (child: SparkPlan) => Sort('a.asc :: Nil, global = true, child),
+ sortAnswers = false)
+ }
+ }
+
// Test sorting on different data types
for (
dataType <- DataTypeTestUtils.atomicTypes ++ Set(NullType);
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index 7c591f6143..ef827b0fe9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -69,7 +69,8 @@ class UnsafeFixedWidthAggregationMapSuite extends SparkFunSuite with Matchers {
taskAttemptId = Random.nextInt(10000),
attemptNumber = 0,
taskMemoryManager = taskMemoryManager,
- metricsSystem = null))
+ metricsSystem = null,
+ internalAccumulators = Seq.empty))
try {
f
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 0282b25b9d..601a5a07ad 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -76,7 +76,8 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite {
taskAttemptId = 98456,
attemptNumber = 0,
taskMemoryManager = taskMemMgr,
- metricsSystem = null))
+ metricsSystem = null,
+ internalAccumulators = Seq.empty))
// Create the data converters
val kExternalConverter = CatalystTypeConverters.createToCatalystConverter(keySchema)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
new file mode 100644
index 0000000000..0554e11d25
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -0,0 +1,94 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+// TODO: uncomment the test here! It is currently failing due to
+// bad interaction with org.apache.spark.sql.test.TestSQLContext.
+
+// scalastyle:off
+//package org.apache.spark.sql.execution.joins
+//
+//import scala.reflect.ClassTag
+//
+//import org.scalatest.BeforeAndAfterAll
+//
+//import org.apache.spark.{AccumulatorSuite, SparkConf, SparkContext}
+//import org.apache.spark.sql.functions._
+//import org.apache.spark.sql.{SQLConf, SQLContext, QueryTest}
+//
+///**
+// * Test various broadcast join operators with unsafe enabled.
+// *
+// * This needs to be its own suite because [[org.apache.spark.sql.test.TestSQLContext]] runs
+// * in local mode, but for tests in this suite we need to run Spark in local-cluster mode.
+// * In particular, the use of [[org.apache.spark.unsafe.map.BytesToBytesMap]] in
+// * [[org.apache.spark.sql.execution.joins.UnsafeHashedRelation]] is not triggered without
+// * serializing the hashed relation, which does not happen in local mode.
+// */
+//class BroadcastJoinSuite extends QueryTest with BeforeAndAfterAll {
+// private var sc: SparkContext = null
+// private var sqlContext: SQLContext = null
+//
+// /**
+// * Create a new [[SQLContext]] running in local-cluster mode with unsafe and codegen enabled.
+// */
+// override def beforeAll(): Unit = {
+// super.beforeAll()
+// val conf = new SparkConf()
+// .setMaster("local-cluster[2,1,1024]")
+// .setAppName("testing")
+// sc = new SparkContext(conf)
+// sqlContext = new SQLContext(sc)
+// sqlContext.setConf(SQLConf.UNSAFE_ENABLED, true)
+// sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true)
+// }
+//
+// override def afterAll(): Unit = {
+// sc.stop()
+// sc = null
+// sqlContext = null
+// }
+//
+// /**
+// * Test whether the specified broadcast join updates the peak execution memory accumulator.
+// */
+// private def testBroadcastJoin[T: ClassTag](name: String, joinType: String): Unit = {
+// AccumulatorSuite.verifyPeakExecutionMemorySet(sc, name) {
+// val df1 = sqlContext.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
+// val df2 = sqlContext.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
+// // Comparison at the end is for broadcast left semi join
+// val joinExpression = df1("key") === df2("key") && df1("value") > df2("value")
+// val df3 = df1.join(broadcast(df2), joinExpression, joinType)
+// val plan = df3.queryExecution.executedPlan
+// assert(plan.collect { case p: T => p }.size === 1)
+// plan.executeCollect()
+// }
+// }
+//
+// test("unsafe broadcast hash join updates peak execution memory") {
+// testBroadcastJoin[BroadcastHashJoin]("unsafe broadcast hash join", "inner")
+// }
+//
+// test("unsafe broadcast hash outer join updates peak execution memory") {
+// testBroadcastJoin[BroadcastHashOuterJoin]("unsafe broadcast hash outer join", "left_outer")
+// }
+//
+// test("unsafe broadcast left semi join updates peak execution memory") {
+// testBroadcastJoin[BroadcastLeftSemiJoinHash]("unsafe broadcast left semi join", "leftsemi")
+// }
+//
+//}
+// scalastyle:on