From 9dbd4b864efacd09a8353d00c998be87f9eeacb2 Mon Sep 17 00:00:00 2001 From: David Navas Date: Sat, 17 Sep 2016 16:22:23 +0100 Subject: [SPARK-17529][CORE] Implement BitSet.clearUntil and use it during merge joins ## What changes were proposed in this pull request? Add a clearUntil() method on BitSet (adapted from the pre-existing setUntil() method). Use this method to clear the subset of the BitSet which needs to be used during merge joins. ## How was this patch tested? dev/run-tests, as well as performance tests on skewed data as described in jira. I expect there to be a small local performance hit using BitSet.clearUntil rather than BitSet.clear for normally shaped (unskewed) joins (additional read on the last long). This is expected to be de-minimis and was not specifically tested. Author: David Navas Closes #15084 from davidnavas/bitSet. --- .../org/apache/spark/util/collection/BitSet.scala | 28 ++++++++++++------- .../apache/spark/util/collection/BitSetSuite.scala | 32 ++++++++++++++++++++++ .../sql/execution/joins/SortMergeJoinExec.scala | 4 +-- 3 files changed, 52 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala index 7ab67fc3a2..e63e0e3e1f 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala @@ -17,6 +17,8 @@ package org.apache.spark.util.collection +import java.util.Arrays + /** * A simple, fixed-size bit set implementation. This implementation is fast because it avoids * safety/bound checking. @@ -35,21 +37,14 @@ class BitSet(numBits: Int) extends Serializable { /** * Clear all set bits. */ - def clear(): Unit = { - var i = 0 - while (i < numWords) { - words(i) = 0L - i += 1 - } - } + def clear(): Unit = Arrays.fill(words, 0) /** * Set all the bits up to a given index */ - def setUntil(bitIndex: Int) { + def setUntil(bitIndex: Int): Unit = { val wordIndex = bitIndex >> 6 // divide by 64 - var i = 0 - while(i < wordIndex) { words(i) = -1; i += 1 } + Arrays.fill(words, 0, wordIndex, -1) if(wordIndex < words.length) { // Set the remaining bits (note that the mask could still be zero) val mask = ~(-1L << (bitIndex & 0x3f)) @@ -57,6 +52,19 @@ class BitSet(numBits: Int) extends Serializable { } } + /** + * Clear all the bits up to a given index + */ + def clearUntil(bitIndex: Int): Unit = { + val wordIndex = bitIndex >> 6 // divide by 64 + Arrays.fill(words, 0, wordIndex, 0) + if(wordIndex < words.length) { + // Clear the remaining bits + val mask = -1L << (bitIndex & 0x3f) + words(wordIndex) &= mask + } + } + /** * Compute the bit-wise AND of the two sets returning the * result. diff --git a/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala index 69dbfa9cd7..0169c9926e 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala @@ -152,4 +152,36 @@ class BitSetSuite extends SparkFunSuite { assert(bitsetDiff.nextSetBit(85) === 85) assert(bitsetDiff.nextSetBit(86) === -1) } + + test( "[gs]etUntil" ) { + val bitSet = new BitSet(100) + + bitSet.setUntil(bitSet.capacity) + + (0 until bitSet.capacity).foreach { i => + assert(bitSet.get(i)) + } + + bitSet.clearUntil(bitSet.capacity) + + (0 until bitSet.capacity).foreach { i => + assert(!bitSet.get(i)) + } + + val setUntil = bitSet.capacity / 2 + bitSet.setUntil(setUntil) + + val clearUntil = setUntil / 2 + bitSet.clearUntil(clearUntil) + + (0 until clearUntil).foreach { i => + assert(!bitSet.get(i)) + } + (clearUntil until setUntil).foreach { i => + assert(bitSet.get(i)) + } + (setUntil until bitSet.capacity).foreach { i => + assert(!bitSet.get(i)) + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index b46af2a99a..81b3e1d224 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -954,12 +954,12 @@ private class SortMergeFullOuterJoinScanner( } if (leftMatches.size <= leftMatched.capacity) { - leftMatched.clear() + leftMatched.clearUntil(leftMatches.size) } else { leftMatched = new BitSet(leftMatches.size) } if (rightMatches.size <= rightMatched.capacity) { - rightMatched.clear() + rightMatched.clearUntil(rightMatches.size) } else { rightMatched = new BitSet(rightMatches.size) } -- cgit v1.2.3