aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--LICENSE20
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/Sorter.java915
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala35
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala94
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala167
7 files changed, 1222 insertions, 32 deletions
diff --git a/LICENSE b/LICENSE
index 383f079df8..65e1f480d9 100644
--- a/LICENSE
+++ b/LICENSE
@@ -442,7 +442,7 @@ Written by Pavel Binko, Dino Ferrero Merlino, Wolfgang Hoschek, Tony Johnson, An
========================================================================
-Fo SnapTree:
+For SnapTree:
========================================================================
SNAPTREE LICENSE
@@ -483,6 +483,24 @@ SUCH DAMAGE.
========================================================================
+For Timsort (core/src/main/java/org/apache/spark/util/collection/Sorter.java):
+========================================================================
+Copyright (C) 2008 The Android Open Source Project
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+
+========================================================================
BSD-style licenses
========================================================================
diff --git a/core/src/main/java/org/apache/spark/util/collection/Sorter.java b/core/src/main/java/org/apache/spark/util/collection/Sorter.java
new file mode 100644
index 0000000000..64ad18c0e4
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/util/collection/Sorter.java
@@ -0,0 +1,915 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection;
+
+import java.util.Comparator;
+
+/**
+ * A port of the Android Timsort class, which utilizes a "stable, adaptive, iterative mergesort."
+ * See the method comment on sort() for more details.
+ *
+ * This has been kept in Java with the original style in order to match very closely with the
+ * Anroid source code, and thus be easy to verify correctness.
+ *
+ * The purpose of the port is to generalize the interface to the sort to accept input data formats
+ * besides simple arrays where every element is sorted individually. For instance, the AppendOnlyMap
+ * uses this to sort an Array with alternating elements of the form [key, value, key, value].
+ * This generalization comes with minimal overhead -- see SortDataFormat for more information.
+ */
+class Sorter<K, Buffer> {
+
+ /**
+ * This is the minimum sized sequence that will be merged. Shorter
+ * sequences will be lengthened by calling binarySort. If the entire
+ * array is less than this length, no merges will be performed.
+ *
+ * This constant should be a power of two. It was 64 in Tim Peter's C
+ * implementation, but 32 was empirically determined to work better in
+ * this implementation. In the unlikely event that you set this constant
+ * to be a number that's not a power of two, you'll need to change the
+ * minRunLength computation.
+ *
+ * If you decrease this constant, you must change the stackLen
+ * computation in the TimSort constructor, or you risk an
+ * ArrayOutOfBounds exception. See listsort.txt for a discussion
+ * of the minimum stack length required as a function of the length
+ * of the array being sorted and the minimum merge sequence length.
+ */
+ private static final int MIN_MERGE = 32;
+
+ private final SortDataFormat<K, Buffer> s;
+
+ public Sorter(SortDataFormat<K, Buffer> sortDataFormat) {
+ this.s = sortDataFormat;
+ }
+
+ /**
+ * A stable, adaptive, iterative mergesort that requires far fewer than
+ * n lg(n) comparisons when running on partially sorted arrays, while
+ * offering performance comparable to a traditional mergesort when run
+ * on random arrays. Like all proper mergesorts, this sort is stable and
+ * runs O(n log n) time (worst case). In the worst case, this sort requires
+ * temporary storage space for n/2 object references; in the best case,
+ * it requires only a small constant amount of space.
+ *
+ * This implementation was adapted from Tim Peters's list sort for
+ * Python, which is described in detail here:
+ *
+ * http://svn.python.org/projects/python/trunk/Objects/listsort.txt
+ *
+ * Tim's C code may be found here:
+ *
+ * http://svn.python.org/projects/python/trunk/Objects/listobject.c
+ *
+ * The underlying techniques are described in this paper (and may have
+ * even earlier origins):
+ *
+ * "Optimistic Sorting and Information Theoretic Complexity"
+ * Peter McIlroy
+ * SODA (Fourth Annual ACM-SIAM Symposium on Discrete Algorithms),
+ * pp 467-474, Austin, Texas, 25-27 January 1993.
+ *
+ * While the API to this class consists solely of static methods, it is
+ * (privately) instantiable; a TimSort instance holds the state of an ongoing
+ * sort, assuming the input array is large enough to warrant the full-blown
+ * TimSort. Small arrays are sorted in place, using a binary insertion sort.
+ *
+ * @author Josh Bloch
+ */
+ void sort(Buffer a, int lo, int hi, Comparator<? super K> c) {
+ assert c != null;
+
+ int nRemaining = hi - lo;
+ if (nRemaining < 2)
+ return; // Arrays of size 0 and 1 are always sorted
+
+ // If array is small, do a "mini-TimSort" with no merges
+ if (nRemaining < MIN_MERGE) {
+ int initRunLen = countRunAndMakeAscending(a, lo, hi, c);
+ binarySort(a, lo, hi, lo + initRunLen, c);
+ return;
+ }
+
+ /**
+ * March over the array once, left to right, finding natural runs,
+ * extending short natural runs to minRun elements, and merging runs
+ * to maintain stack invariant.
+ */
+ SortState sortState = new SortState(a, c, hi - lo);
+ int minRun = minRunLength(nRemaining);
+ do {
+ // Identify next run
+ int runLen = countRunAndMakeAscending(a, lo, hi, c);
+
+ // If run is short, extend to min(minRun, nRemaining)
+ if (runLen < minRun) {
+ int force = nRemaining <= minRun ? nRemaining : minRun;
+ binarySort(a, lo, lo + force, lo + runLen, c);
+ runLen = force;
+ }
+
+ // Push run onto pending-run stack, and maybe merge
+ sortState.pushRun(lo, runLen);
+ sortState.mergeCollapse();
+
+ // Advance to find next run
+ lo += runLen;
+ nRemaining -= runLen;
+ } while (nRemaining != 0);
+
+ // Merge all remaining runs to complete sort
+ assert lo == hi;
+ sortState.mergeForceCollapse();
+ assert sortState.stackSize == 1;
+ }
+
+ /**
+ * Sorts the specified portion of the specified array using a binary
+ * insertion sort. This is the best method for sorting small numbers
+ * of elements. It requires O(n log n) compares, but O(n^2) data
+ * movement (worst case).
+ *
+ * If the initial part of the specified range is already sorted,
+ * this method can take advantage of it: the method assumes that the
+ * elements from index {@code lo}, inclusive, to {@code start},
+ * exclusive are already sorted.
+ *
+ * @param a the array in which a range is to be sorted
+ * @param lo the index of the first element in the range to be sorted
+ * @param hi the index after the last element in the range to be sorted
+ * @param start the index of the first element in the range that is
+ * not already known to be sorted ({@code lo <= start <= hi})
+ * @param c comparator to used for the sort
+ */
+ @SuppressWarnings("fallthrough")
+ private void binarySort(Buffer a, int lo, int hi, int start, Comparator<? super K> c) {
+ assert lo <= start && start <= hi;
+ if (start == lo)
+ start++;
+
+ Buffer pivotStore = s.allocate(1);
+ for ( ; start < hi; start++) {
+ s.copyElement(a, start, pivotStore, 0);
+ K pivot = s.getKey(pivotStore, 0);
+
+ // Set left (and right) to the index where a[start] (pivot) belongs
+ int left = lo;
+ int right = start;
+ assert left <= right;
+ /*
+ * Invariants:
+ * pivot >= all in [lo, left).
+ * pivot < all in [right, start).
+ */
+ while (left < right) {
+ int mid = (left + right) >>> 1;
+ if (c.compare(pivot, s.getKey(a, mid)) < 0)
+ right = mid;
+ else
+ left = mid + 1;
+ }
+ assert left == right;
+
+ /*
+ * The invariants still hold: pivot >= all in [lo, left) and
+ * pivot < all in [left, start), so pivot belongs at left. Note
+ * that if there are elements equal to pivot, left points to the
+ * first slot after them -- that's why this sort is stable.
+ * Slide elements over to make room for pivot.
+ */
+ int n = start - left; // The number of elements to move
+ // Switch is just an optimization for arraycopy in default case
+ switch (n) {
+ case 2: s.copyElement(a, left + 1, a, left + 2);
+ case 1: s.copyElement(a, left, a, left + 1);
+ break;
+ default: s.copyRange(a, left, a, left + 1, n);
+ }
+ s.copyElement(pivotStore, 0, a, left);
+ }
+ }
+
+ /**
+ * Returns the length of the run beginning at the specified position in
+ * the specified array and reverses the run if it is descending (ensuring
+ * that the run will always be ascending when the method returns).
+ *
+ * A run is the longest ascending sequence with:
+ *
+ * a[lo] <= a[lo + 1] <= a[lo + 2] <= ...
+ *
+ * or the longest descending sequence with:
+ *
+ * a[lo] > a[lo + 1] > a[lo + 2] > ...
+ *
+ * For its intended use in a stable mergesort, the strictness of the
+ * definition of "descending" is needed so that the call can safely
+ * reverse a descending sequence without violating stability.
+ *
+ * @param a the array in which a run is to be counted and possibly reversed
+ * @param lo index of the first element in the run
+ * @param hi index after the last element that may be contained in the run.
+ It is required that {@code lo < hi}.
+ * @param c the comparator to used for the sort
+ * @return the length of the run beginning at the specified position in
+ * the specified array
+ */
+ private int countRunAndMakeAscending(Buffer a, int lo, int hi, Comparator<? super K> c) {
+ assert lo < hi;
+ int runHi = lo + 1;
+ if (runHi == hi)
+ return 1;
+
+ // Find end of run, and reverse range if descending
+ if (c.compare(s.getKey(a, runHi++), s.getKey(a, lo)) < 0) { // Descending
+ while (runHi < hi && c.compare(s.getKey(a, runHi), s.getKey(a, runHi - 1)) < 0)
+ runHi++;
+ reverseRange(a, lo, runHi);
+ } else { // Ascending
+ while (runHi < hi && c.compare(s.getKey(a, runHi), s.getKey(a, runHi - 1)) >= 0)
+ runHi++;
+ }
+
+ return runHi - lo;
+ }
+
+ /**
+ * Reverse the specified range of the specified array.
+ *
+ * @param a the array in which a range is to be reversed
+ * @param lo the index of the first element in the range to be reversed
+ * @param hi the index after the last element in the range to be reversed
+ */
+ private void reverseRange(Buffer a, int lo, int hi) {
+ hi--;
+ while (lo < hi) {
+ s.swap(a, lo, hi);
+ lo++;
+ hi--;
+ }
+ }
+
+ /**
+ * Returns the minimum acceptable run length for an array of the specified
+ * length. Natural runs shorter than this will be extended with
+ * {@link #binarySort}.
+ *
+ * Roughly speaking, the computation is:
+ *
+ * If n < MIN_MERGE, return n (it's too small to bother with fancy stuff).
+ * Else if n is an exact power of 2, return MIN_MERGE/2.
+ * Else return an int k, MIN_MERGE/2 <= k <= MIN_MERGE, such that n/k
+ * is close to, but strictly less than, an exact power of 2.
+ *
+ * For the rationale, see listsort.txt.
+ *
+ * @param n the length of the array to be sorted
+ * @return the length of the minimum run to be merged
+ */
+ private int minRunLength(int n) {
+ assert n >= 0;
+ int r = 0; // Becomes 1 if any 1 bits are shifted off
+ while (n >= MIN_MERGE) {
+ r |= (n & 1);
+ n >>= 1;
+ }
+ return n + r;
+ }
+
+ private class SortState {
+
+ /**
+ * The Buffer being sorted.
+ */
+ private final Buffer a;
+
+ /**
+ * Length of the sort Buffer.
+ */
+ private final int aLength;
+
+ /**
+ * The comparator for this sort.
+ */
+ private final Comparator<? super K> c;
+
+ /**
+ * When we get into galloping mode, we stay there until both runs win less
+ * often than MIN_GALLOP consecutive times.
+ */
+ private static final int MIN_GALLOP = 7;
+
+ /**
+ * This controls when we get *into* galloping mode. It is initialized
+ * to MIN_GALLOP. The mergeLo and mergeHi methods nudge it higher for
+ * random data, and lower for highly structured data.
+ */
+ private int minGallop = MIN_GALLOP;
+
+ /**
+ * Maximum initial size of tmp array, which is used for merging. The array
+ * can grow to accommodate demand.
+ *
+ * Unlike Tim's original C version, we do not allocate this much storage
+ * when sorting smaller arrays. This change was required for performance.
+ */
+ private static final int INITIAL_TMP_STORAGE_LENGTH = 256;
+
+ /**
+ * Temp storage for merges.
+ */
+ private Buffer tmp; // Actual runtime type will be Object[], regardless of T
+
+ /**
+ * Length of the temp storage.
+ */
+ private int tmpLength = 0;
+
+ /**
+ * A stack of pending runs yet to be merged. Run i starts at
+ * address base[i] and extends for len[i] elements. It's always
+ * true (so long as the indices are in bounds) that:
+ *
+ * runBase[i] + runLen[i] == runBase[i + 1]
+ *
+ * so we could cut the storage for this, but it's a minor amount,
+ * and keeping all the info explicit simplifies the code.
+ */
+ private int stackSize = 0; // Number of pending runs on stack
+ private final int[] runBase;
+ private final int[] runLen;
+
+ /**
+ * Creates a TimSort instance to maintain the state of an ongoing sort.
+ *
+ * @param a the array to be sorted
+ * @param c the comparator to determine the order of the sort
+ */
+ private SortState(Buffer a, Comparator<? super K> c, int len) {
+ this.aLength = len;
+ this.a = a;
+ this.c = c;
+
+ // Allocate temp storage (which may be increased later if necessary)
+ tmpLength = len < 2 * INITIAL_TMP_STORAGE_LENGTH ? len >>> 1 : INITIAL_TMP_STORAGE_LENGTH;
+ tmp = s.allocate(tmpLength);
+
+ /*
+ * Allocate runs-to-be-merged stack (which cannot be expanded). The
+ * stack length requirements are described in listsort.txt. The C
+ * version always uses the same stack length (85), but this was
+ * measured to be too expensive when sorting "mid-sized" arrays (e.g.,
+ * 100 elements) in Java. Therefore, we use smaller (but sufficiently
+ * large) stack lengths for smaller arrays. The "magic numbers" in the
+ * computation below must be changed if MIN_MERGE is decreased. See
+ * the MIN_MERGE declaration above for more information.
+ */
+ int stackLen = (len < 120 ? 5 :
+ len < 1542 ? 10 :
+ len < 119151 ? 19 : 40);
+ runBase = new int[stackLen];
+ runLen = new int[stackLen];
+ }
+
+ /**
+ * Pushes the specified run onto the pending-run stack.
+ *
+ * @param runBase index of the first element in the run
+ * @param runLen the number of elements in the run
+ */
+ private void pushRun(int runBase, int runLen) {
+ this.runBase[stackSize] = runBase;
+ this.runLen[stackSize] = runLen;
+ stackSize++;
+ }
+
+ /**
+ * Examines the stack of runs waiting to be merged and merges adjacent runs
+ * until the stack invariants are reestablished:
+ *
+ * 1. runLen[i - 3] > runLen[i - 2] + runLen[i - 1]
+ * 2. runLen[i - 2] > runLen[i - 1]
+ *
+ * This method is called each time a new run is pushed onto the stack,
+ * so the invariants are guaranteed to hold for i < stackSize upon
+ * entry to the method.
+ */
+ private void mergeCollapse() {
+ while (stackSize > 1) {
+ int n = stackSize - 2;
+ if (n > 0 && runLen[n-1] <= runLen[n] + runLen[n+1]) {
+ if (runLen[n - 1] < runLen[n + 1])
+ n--;
+ mergeAt(n);
+ } else if (runLen[n] <= runLen[n + 1]) {
+ mergeAt(n);
+ } else {
+ break; // Invariant is established
+ }
+ }
+ }
+
+ /**
+ * Merges all runs on the stack until only one remains. This method is
+ * called once, to complete the sort.
+ */
+ private void mergeForceCollapse() {
+ while (stackSize > 1) {
+ int n = stackSize - 2;
+ if (n > 0 && runLen[n - 1] < runLen[n + 1])
+ n--;
+ mergeAt(n);
+ }
+ }
+
+ /**
+ * Merges the two runs at stack indices i and i+1. Run i must be
+ * the penultimate or antepenultimate run on the stack. In other words,
+ * i must be equal to stackSize-2 or stackSize-3.
+ *
+ * @param i stack index of the first of the two runs to merge
+ */
+ private void mergeAt(int i) {
+ assert stackSize >= 2;
+ assert i >= 0;
+ assert i == stackSize - 2 || i == stackSize - 3;
+
+ int base1 = runBase[i];
+ int len1 = runLen[i];
+ int base2 = runBase[i + 1];
+ int len2 = runLen[i + 1];
+ assert len1 > 0 && len2 > 0;
+ assert base1 + len1 == base2;
+
+ /*
+ * Record the length of the combined runs; if i is the 3rd-last
+ * run now, also slide over the last run (which isn't involved
+ * in this merge). The current run (i+1) goes away in any case.
+ */
+ runLen[i] = len1 + len2;
+ if (i == stackSize - 3) {
+ runBase[i + 1] = runBase[i + 2];
+ runLen[i + 1] = runLen[i + 2];
+ }
+ stackSize--;
+
+ /*
+ * Find where the first element of run2 goes in run1. Prior elements
+ * in run1 can be ignored (because they're already in place).
+ */
+ int k = gallopRight(s.getKey(a, base2), a, base1, len1, 0, c);
+ assert k >= 0;
+ base1 += k;
+ len1 -= k;
+ if (len1 == 0)
+ return;
+
+ /*
+ * Find where the last element of run1 goes in run2. Subsequent elements
+ * in run2 can be ignored (because they're already in place).
+ */
+ len2 = gallopLeft(s.getKey(a, base1 + len1 - 1), a, base2, len2, len2 - 1, c);
+ assert len2 >= 0;
+ if (len2 == 0)
+ return;
+
+ // Merge remaining runs, using tmp array with min(len1, len2) elements
+ if (len1 <= len2)
+ mergeLo(base1, len1, base2, len2);
+ else
+ mergeHi(base1, len1, base2, len2);
+ }
+
+ /**
+ * Locates the position at which to insert the specified key into the
+ * specified sorted range; if the range contains an element equal to key,
+ * returns the index of the leftmost equal element.
+ *
+ * @param key the key whose insertion point to search for
+ * @param a the array in which to search
+ * @param base the index of the first element in the range
+ * @param len the length of the range; must be > 0
+ * @param hint the index at which to begin the search, 0 <= hint < n.
+ * The closer hint is to the result, the faster this method will run.
+ * @param c the comparator used to order the range, and to search
+ * @return the int k, 0 <= k <= n such that a[b + k - 1] < key <= a[b + k],
+ * pretending that a[b - 1] is minus infinity and a[b + n] is infinity.
+ * In other words, key belongs at index b + k; or in other words,
+ * the first k elements of a should precede key, and the last n - k
+ * should follow it.
+ */
+ private int gallopLeft(K key, Buffer a, int base, int len, int hint, Comparator<? super K> c) {
+ assert len > 0 && hint >= 0 && hint < len;
+ int lastOfs = 0;
+ int ofs = 1;
+ if (c.compare(key, s.getKey(a, base + hint)) > 0) {
+ // Gallop right until a[base+hint+lastOfs] < key <= a[base+hint+ofs]
+ int maxOfs = len - hint;
+ while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint + ofs)) > 0) {
+ lastOfs = ofs;
+ ofs = (ofs << 1) + 1;
+ if (ofs <= 0) // int overflow
+ ofs = maxOfs;
+ }
+ if (ofs > maxOfs)
+ ofs = maxOfs;
+
+ // Make offsets relative to base
+ lastOfs += hint;
+ ofs += hint;
+ } else { // key <= a[base + hint]
+ // Gallop left until a[base+hint-ofs] < key <= a[base+hint-lastOfs]
+ final int maxOfs = hint + 1;
+ while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint - ofs)) <= 0) {
+ lastOfs = ofs;
+ ofs = (ofs << 1) + 1;
+ if (ofs <= 0) // int overflow
+ ofs = maxOfs;
+ }
+ if (ofs > maxOfs)
+ ofs = maxOfs;
+
+ // Make offsets relative to base
+ int tmp = lastOfs;
+ lastOfs = hint - ofs;
+ ofs = hint - tmp;
+ }
+ assert -1 <= lastOfs && lastOfs < ofs && ofs <= len;
+
+ /*
+ * Now a[base+lastOfs] < key <= a[base+ofs], so key belongs somewhere
+ * to the right of lastOfs but no farther right than ofs. Do a binary
+ * search, with invariant a[base + lastOfs - 1] < key <= a[base + ofs].
+ */
+ lastOfs++;
+ while (lastOfs < ofs) {
+ int m = lastOfs + ((ofs - lastOfs) >>> 1);
+
+ if (c.compare(key, s.getKey(a, base + m)) > 0)
+ lastOfs = m + 1; // a[base + m] < key
+ else
+ ofs = m; // key <= a[base + m]
+ }
+ assert lastOfs == ofs; // so a[base + ofs - 1] < key <= a[base + ofs]
+ return ofs;
+ }
+
+ /**
+ * Like gallopLeft, except that if the range contains an element equal to
+ * key, gallopRight returns the index after the rightmost equal element.
+ *
+ * @param key the key whose insertion point to search for
+ * @param a the array in which to search
+ * @param base the index of the first element in the range
+ * @param len the length of the range; must be > 0
+ * @param hint the index at which to begin the search, 0 <= hint < n.
+ * The closer hint is to the result, the faster this method will run.
+ * @param c the comparator used to order the range, and to search
+ * @return the int k, 0 <= k <= n such that a[b + k - 1] <= key < a[b + k]
+ */
+ private int gallopRight(K key, Buffer a, int base, int len, int hint, Comparator<? super K> c) {
+ assert len > 0 && hint >= 0 && hint < len;
+
+ int ofs = 1;
+ int lastOfs = 0;
+ if (c.compare(key, s.getKey(a, base + hint)) < 0) {
+ // Gallop left until a[b+hint - ofs] <= key < a[b+hint - lastOfs]
+ int maxOfs = hint + 1;
+ while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint - ofs)) < 0) {
+ lastOfs = ofs;
+ ofs = (ofs << 1) + 1;
+ if (ofs <= 0) // int overflow
+ ofs = maxOfs;
+ }
+ if (ofs > maxOfs)
+ ofs = maxOfs;
+
+ // Make offsets relative to b
+ int tmp = lastOfs;
+ lastOfs = hint - ofs;
+ ofs = hint - tmp;
+ } else { // a[b + hint] <= key
+ // Gallop right until a[b+hint + lastOfs] <= key < a[b+hint + ofs]
+ int maxOfs = len - hint;
+ while (ofs < maxOfs && c.compare(key, s.getKey(a, base + hint + ofs)) >= 0) {
+ lastOfs = ofs;
+ ofs = (ofs << 1) + 1;
+ if (ofs <= 0) // int overflow
+ ofs = maxOfs;
+ }
+ if (ofs > maxOfs)
+ ofs = maxOfs;
+
+ // Make offsets relative to b
+ lastOfs += hint;
+ ofs += hint;
+ }
+ assert -1 <= lastOfs && lastOfs < ofs && ofs <= len;
+
+ /*
+ * Now a[b + lastOfs] <= key < a[b + ofs], so key belongs somewhere to
+ * the right of lastOfs but no farther right than ofs. Do a binary
+ * search, with invariant a[b + lastOfs - 1] <= key < a[b + ofs].
+ */
+ lastOfs++;
+ while (lastOfs < ofs) {
+ int m = lastOfs + ((ofs - lastOfs) >>> 1);
+
+ if (c.compare(key, s.getKey(a, base + m)) < 0)
+ ofs = m; // key < a[b + m]
+ else
+ lastOfs = m + 1; // a[b + m] <= key
+ }
+ assert lastOfs == ofs; // so a[b + ofs - 1] <= key < a[b + ofs]
+ return ofs;
+ }
+
+ /**
+ * Merges two adjacent runs in place, in a stable fashion. The first
+ * element of the first run must be greater than the first element of the
+ * second run (a[base1] > a[base2]), and the last element of the first run
+ * (a[base1 + len1-1]) must be greater than all elements of the second run.
+ *
+ * For performance, this method should be called only when len1 <= len2;
+ * its twin, mergeHi should be called if len1 >= len2. (Either method
+ * may be called if len1 == len2.)
+ *
+ * @param base1 index of first element in first run to be merged
+ * @param len1 length of first run to be merged (must be > 0)
+ * @param base2 index of first element in second run to be merged
+ * (must be aBase + aLen)
+ * @param len2 length of second run to be merged (must be > 0)
+ */
+ private void mergeLo(int base1, int len1, int base2, int len2) {
+ assert len1 > 0 && len2 > 0 && base1 + len1 == base2;
+
+ // Copy first run into temp array
+ Buffer a = this.a; // For performance
+ Buffer tmp = ensureCapacity(len1);
+ s.copyRange(a, base1, tmp, 0, len1);
+
+ int cursor1 = 0; // Indexes into tmp array
+ int cursor2 = base2; // Indexes int a
+ int dest = base1; // Indexes int a
+
+ // Move first element of second run and deal with degenerate cases
+ s.copyElement(a, cursor2++, a, dest++);
+ if (--len2 == 0) {
+ s.copyRange(tmp, cursor1, a, dest, len1);
+ return;
+ }
+ if (len1 == 1) {
+ s.copyRange(a, cursor2, a, dest, len2);
+ s.copyElement(tmp, cursor1, a, dest + len2); // Last elt of run 1 to end of merge
+ return;
+ }
+
+ Comparator<? super K> c = this.c; // Use local variable for performance
+ int minGallop = this.minGallop; // " " " " "
+ outer:
+ while (true) {
+ int count1 = 0; // Number of times in a row that first run won
+ int count2 = 0; // Number of times in a row that second run won
+
+ /*
+ * Do the straightforward thing until (if ever) one run starts
+ * winning consistently.
+ */
+ do {
+ assert len1 > 1 && len2 > 0;
+ if (c.compare(s.getKey(a, cursor2), s.getKey(tmp, cursor1)) < 0) {
+ s.copyElement(a, cursor2++, a, dest++);
+ count2++;
+ count1 = 0;
+ if (--len2 == 0)
+ break outer;
+ } else {
+ s.copyElement(tmp, cursor1++, a, dest++);
+ count1++;
+ count2 = 0;
+ if (--len1 == 1)
+ break outer;
+ }
+ } while ((count1 | count2) < minGallop);
+
+ /*
+ * One run is winning so consistently that galloping may be a
+ * huge win. So try that, and continue galloping until (if ever)
+ * neither run appears to be winning consistently anymore.
+ */
+ do {
+ assert len1 > 1 && len2 > 0;
+ count1 = gallopRight(s.getKey(a, cursor2), tmp, cursor1, len1, 0, c);
+ if (count1 != 0) {
+ s.copyRange(tmp, cursor1, a, dest, count1);
+ dest += count1;
+ cursor1 += count1;
+ len1 -= count1;
+ if (len1 <= 1) // len1 == 1 || len1 == 0
+ break outer;
+ }
+ s.copyElement(a, cursor2++, a, dest++);
+ if (--len2 == 0)
+ break outer;
+
+ count2 = gallopLeft(s.getKey(tmp, cursor1), a, cursor2, len2, 0, c);
+ if (count2 != 0) {
+ s.copyRange(a, cursor2, a, dest, count2);
+ dest += count2;
+ cursor2 += count2;
+ len2 -= count2;
+ if (len2 == 0)
+ break outer;
+ }
+ s.copyElement(tmp, cursor1++, a, dest++);
+ if (--len1 == 1)
+ break outer;
+ minGallop--;
+ } while (count1 >= MIN_GALLOP | count2 >= MIN_GALLOP);
+ if (minGallop < 0)
+ minGallop = 0;
+ minGallop += 2; // Penalize for leaving gallop mode
+ } // End of "outer" loop
+ this.minGallop = minGallop < 1 ? 1 : minGallop; // Write back to field
+
+ if (len1 == 1) {
+ assert len2 > 0;
+ s.copyRange(a, cursor2, a, dest, len2);
+ s.copyElement(tmp, cursor1, a, dest + len2); // Last elt of run 1 to end of merge
+ } else if (len1 == 0) {
+ throw new IllegalArgumentException(
+ "Comparison method violates its general contract!");
+ } else {
+ assert len2 == 0;
+ assert len1 > 1;
+ s.copyRange(tmp, cursor1, a, dest, len1);
+ }
+ }
+
+ /**
+ * Like mergeLo, except that this method should be called only if
+ * len1 >= len2; mergeLo should be called if len1 <= len2. (Either method
+ * may be called if len1 == len2.)
+ *
+ * @param base1 index of first element in first run to be merged
+ * @param len1 length of first run to be merged (must be > 0)
+ * @param base2 index of first element in second run to be merged
+ * (must be aBase + aLen)
+ * @param len2 length of second run to be merged (must be > 0)
+ */
+ private void mergeHi(int base1, int len1, int base2, int len2) {
+ assert len1 > 0 && len2 > 0 && base1 + len1 == base2;
+
+ // Copy second run into temp array
+ Buffer a = this.a; // For performance
+ Buffer tmp = ensureCapacity(len2);
+ s.copyRange(a, base2, tmp, 0, len2);
+
+ int cursor1 = base1 + len1 - 1; // Indexes into a
+ int cursor2 = len2 - 1; // Indexes into tmp array
+ int dest = base2 + len2 - 1; // Indexes into a
+
+ // Move last element of first run and deal with degenerate cases
+ s.copyElement(a, cursor1--, a, dest--);
+ if (--len1 == 0) {
+ s.copyRange(tmp, 0, a, dest - (len2 - 1), len2);
+ return;
+ }
+ if (len2 == 1) {
+ dest -= len1;
+ cursor1 -= len1;
+ s.copyRange(a, cursor1 + 1, a, dest + 1, len1);
+ s.copyElement(tmp, cursor2, a, dest);
+ return;
+ }
+
+ Comparator<? super K> c = this.c; // Use local variable for performance
+ int minGallop = this.minGallop; // " " " " "
+ outer:
+ while (true) {
+ int count1 = 0; // Number of times in a row that first run won
+ int count2 = 0; // Number of times in a row that second run won
+
+ /*
+ * Do the straightforward thing until (if ever) one run
+ * appears to win consistently.
+ */
+ do {
+ assert len1 > 0 && len2 > 1;
+ if (c.compare(s.getKey(tmp, cursor2), s.getKey(a, cursor1)) < 0) {
+ s.copyElement(a, cursor1--, a, dest--);
+ count1++;
+ count2 = 0;
+ if (--len1 == 0)
+ break outer;
+ } else {
+ s.copyElement(tmp, cursor2--, a, dest--);
+ count2++;
+ count1 = 0;
+ if (--len2 == 1)
+ break outer;
+ }
+ } while ((count1 | count2) < minGallop);
+
+ /*
+ * One run is winning so consistently that galloping may be a
+ * huge win. So try that, and continue galloping until (if ever)
+ * neither run appears to be winning consistently anymore.
+ */
+ do {
+ assert len1 > 0 && len2 > 1;
+ count1 = len1 - gallopRight(s.getKey(tmp, cursor2), a, base1, len1, len1 - 1, c);
+ if (count1 != 0) {
+ dest -= count1;
+ cursor1 -= count1;
+ len1 -= count1;
+ s.copyRange(a, cursor1 + 1, a, dest + 1, count1);
+ if (len1 == 0)
+ break outer;
+ }
+ s.copyElement(tmp, cursor2--, a, dest--);
+ if (--len2 == 1)
+ break outer;
+
+ count2 = len2 - gallopLeft(s.getKey(a, cursor1), tmp, 0, len2, len2 - 1, c);
+ if (count2 != 0) {
+ dest -= count2;
+ cursor2 -= count2;
+ len2 -= count2;
+ s.copyRange(tmp, cursor2 + 1, a, dest + 1, count2);
+ if (len2 <= 1) // len2 == 1 || len2 == 0
+ break outer;
+ }
+ s.copyElement(a, cursor1--, a, dest--);
+ if (--len1 == 0)
+ break outer;
+ minGallop--;
+ } while (count1 >= MIN_GALLOP | count2 >= MIN_GALLOP);
+ if (minGallop < 0)
+ minGallop = 0;
+ minGallop += 2; // Penalize for leaving gallop mode
+ } // End of "outer" loop
+ this.minGallop = minGallop < 1 ? 1 : minGallop; // Write back to field
+
+ if (len2 == 1) {
+ assert len1 > 0;
+ dest -= len1;
+ cursor1 -= len1;
+ s.copyRange(a, cursor1 + 1, a, dest + 1, len1);
+ s.copyElement(tmp, cursor2, a, dest); // Move first elt of run2 to front of merge
+ } else if (len2 == 0) {
+ throw new IllegalArgumentException(
+ "Comparison method violates its general contract!");
+ } else {
+ assert len1 == 0;
+ assert len2 > 0;
+ s.copyRange(tmp, 0, a, dest - (len2 - 1), len2);
+ }
+ }
+
+ /**
+ * Ensures that the external array tmp has at least the specified
+ * number of elements, increasing its size if necessary. The size
+ * increases exponentially to ensure amortized linear time complexity.
+ *
+ * @param minCapacity the minimum required capacity of the tmp array
+ * @return tmp, whether or not it grew
+ */
+ private Buffer ensureCapacity(int minCapacity) {
+ if (tmpLength < minCapacity) {
+ // Compute smallest power of 2 > minCapacity
+ int newSize = minCapacity;
+ newSize |= newSize >> 1;
+ newSize |= newSize >> 2;
+ newSize |= newSize >> 4;
+ newSize |= newSize >> 8;
+ newSize |= newSize >> 16;
+ newSize++;
+
+ if (newSize < 0) // Not bloody likely!
+ newSize = minCapacity;
+ else
+ newSize = Math.min(newSize, aLength >>> 1);
+
+ tmp = s.allocate(newSize);
+ tmpLength = newSize;
+ }
+ return tmp;
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index 1a6f1c2b55..290282c9c2 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -254,26 +254,21 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)
* Return an iterator of the map in sorted order. This provides a way to sort the map without
* using additional memory, at the expense of destroying the validity of the map.
*/
- def destructiveSortedIterator(cmp: Comparator[(K, V)]): Iterator[(K, V)] = {
+ def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {
destroyed = true
// Pack KV pairs into the front of the underlying array
var keyIndex, newIndex = 0
while (keyIndex < capacity) {
if (data(2 * keyIndex) != null) {
- data(newIndex) = (data(2 * keyIndex), data(2 * keyIndex + 1))
+ data(2 * newIndex) = data(2 * keyIndex)
+ data(2 * newIndex + 1) = data(2 * keyIndex + 1)
newIndex += 1
}
keyIndex += 1
}
assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
- // Sort by the given ordering
- val rawOrdering = new Comparator[AnyRef] {
- def compare(x: AnyRef, y: AnyRef): Int = {
- cmp.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
- }
- }
- Arrays.sort(data, 0, newIndex, rawOrdering)
+ new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)
new Iterator[(K, V)] {
var i = 0
@@ -284,7 +279,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)
nullValueReady = false
(null.asInstanceOf[K], nullValue)
} else {
- val item = data(i).asInstanceOf[(K, V)]
+ val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V])
i += 1
item
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 765254bf4c..71ab2a3e3b 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -30,6 +30,7 @@ import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{BlockId, BlockManager}
+import org.apache.spark.util.collection.ExternalAppendOnlyMap.HashComparator
/**
* :: DeveloperApi ::
@@ -66,8 +67,6 @@ class ExternalAppendOnlyMap[K, V, C](
blockManager: BlockManager = SparkEnv.get.blockManager)
extends Iterable[(K, C)] with Serializable with Logging {
- import ExternalAppendOnlyMap._
-
private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
private val spilledMaps = new ArrayBuffer[DiskMapIterator]
private val sparkConf = SparkEnv.get.conf
@@ -105,7 +104,7 @@ class ExternalAppendOnlyMap[K, V, C](
private var _diskBytesSpilled = 0L
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
- private val comparator = new KCComparator[K, C]
+ private val keyComparator = new HashComparator[K]
private val ser = serializer.newInstance()
/**
@@ -173,7 +172,7 @@ class ExternalAppendOnlyMap[K, V, C](
}
try {
- val it = currentMap.destructiveSortedIterator(comparator)
+ val it = currentMap.destructiveSortedIterator(keyComparator)
while (it.hasNext) {
val kv = it.next()
writer.write(kv)
@@ -231,7 +230,7 @@ class ExternalAppendOnlyMap[K, V, C](
// Input streams are derived both from the in-memory map and spilled maps on disk
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
- private val sortedMap = currentMap.destructiveSortedIterator(comparator)
+ private val sortedMap = currentMap.destructiveSortedIterator(keyComparator)
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
inputStreams.foreach { it =>
@@ -252,7 +251,7 @@ class ExternalAppendOnlyMap[K, V, C](
if (it.hasNext) {
var kc = it.next()
kcPairs += kc
- val minHash = getKeyHashCode(kc)
+ val minHash = hashKey(kc)
while (it.hasNext && it.head._1.hashCode() == minHash) {
kc = it.next()
kcPairs += kc
@@ -298,7 +297,7 @@ class ExternalAppendOnlyMap[K, V, C](
val minPair = minPairs.remove(0)
val minKey = minPair._1
var minCombiner = minPair._2
- assert(getKeyHashCode(minPair) == minHash)
+ assert(hashKey(minPair) == minHash)
// For all other streams that may have this key (i.e. have the same minimum key hash),
// merge in the corresponding value (if any) from that stream
@@ -339,7 +338,7 @@ class ExternalAppendOnlyMap[K, V, C](
// Invalid if there are no more pairs in this stream
def minKeyHash: Int = {
assert(pairs.length > 0)
- getKeyHashCode(pairs.head)
+ hashKey(pairs.head)
}
override def compareTo(other: StreamBuffer): Int = {
@@ -423,25 +422,27 @@ class ExternalAppendOnlyMap[K, V, C](
file.delete()
}
}
+
+ /** Convenience function to hash the given (K, C) pair by the key. */
+ private def hashKey(kc: (K, C)): Int = ExternalAppendOnlyMap.hash(kc._1)
}
private[spark] object ExternalAppendOnlyMap {
/**
- * Return the key hash code of the given (key, combiner) pair.
- * If the key is null, return a special hash code.
+ * Return the hash code of the given object. If the object is null, return a special hash code.
*/
- private def getKeyHashCode[K, C](kc: (K, C)): Int = {
- if (kc._1 == null) 0 else kc._1.hashCode()
+ private def hash[T](obj: T): Int = {
+ if (obj == null) 0 else obj.hashCode()
}
/**
- * A comparator for (key, combiner) pairs based on their key hash codes.
+ * A comparator which sorts arbitrary keys based on their hash codes.
*/
- private class KCComparator[K, C] extends Comparator[(K, C)] {
- def compare(kc1: (K, C), kc2: (K, C)): Int = {
- val hash1 = getKeyHashCode(kc1)
- val hash2 = getKeyHashCode(kc2)
+ private class HashComparator[K] extends Comparator[K] {
+ def compare(key1: K, key2: K): Int = {
+ val hash1 = hash(key1)
+ val hash2 = hash(key2)
if (hash1 < hash2) -1 else if (hash1 == hash2) 0 else 1
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala
new file mode 100644
index 0000000000..ac1528969f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.reflect.ClassTag
+
+/**
+ * Abstraction for sorting an arbitrary input buffer of data. This interface requires determining
+ * the sort key for a given element index, as well as swapping elements and moving data from one
+ * buffer to another.
+ *
+ * Example format: an array of numbers, where each element is also the key.
+ * See [[KVArraySortDataFormat]] for a more exciting format.
+ *
+ * This trait extends Any to ensure it is universal (and thus compiled to a Java interface).
+ *
+ * @tparam K Type of the sort key of each element
+ * @tparam Buffer Internal data structure used by a particular format (e.g., Array[Int]).
+ */
+// TODO: Making Buffer a real trait would be a better abstraction, but adds some complexity.
+private[spark] trait SortDataFormat[K, Buffer] extends Any {
+ /** Return the sort key for the element at the given index. */
+ protected def getKey(data: Buffer, pos: Int): K
+
+ /** Swap two elements. */
+ protected def swap(data: Buffer, pos0: Int, pos1: Int): Unit
+
+ /** Copy a single element from src(srcPos) to dst(dstPos). */
+ protected def copyElement(src: Buffer, srcPos: Int, dst: Buffer, dstPos: Int): Unit
+
+ /**
+ * Copy a range of elements starting at src(srcPos) to dst, starting at dstPos.
+ * Overlapping ranges are allowed.
+ */
+ protected def copyRange(src: Buffer, srcPos: Int, dst: Buffer, dstPos: Int, length: Int): Unit
+
+ /**
+ * Allocates a Buffer that can hold up to 'length' elements.
+ * All elements of the buffer should be considered invalid until data is explicitly copied in.
+ */
+ protected def allocate(length: Int): Buffer
+}
+
+/**
+ * Supports sorting an array of key-value pairs where the elements of the array alternate between
+ * keys and values, as used in [[AppendOnlyMap]].
+ *
+ * @tparam K Type of the sort key of each element
+ * @tparam T Type of the Array we're sorting. Typically this must extend AnyRef, to support cases
+ * when the keys and values are not the same type.
+ */
+private[spark]
+class KVArraySortDataFormat[K, T <: AnyRef : ClassTag] extends SortDataFormat[K, Array[T]] {
+
+ override protected def getKey(data: Array[T], pos: Int): K = data(2 * pos).asInstanceOf[K]
+
+ override protected def swap(data: Array[T], pos0: Int, pos1: Int) {
+ val tmpKey = data(2 * pos0)
+ val tmpVal = data(2 * pos0 + 1)
+ data(2 * pos0) = data(2 * pos1)
+ data(2 * pos0 + 1) = data(2 * pos1 + 1)
+ data(2 * pos1) = tmpKey
+ data(2 * pos1 + 1) = tmpVal
+ }
+
+ override protected def copyElement(src: Array[T], srcPos: Int, dst: Array[T], dstPos: Int) {
+ dst(2 * dstPos) = src(2 * srcPos)
+ dst(2 * dstPos + 1) = src(2 * srcPos + 1)
+ }
+
+ override protected def copyRange(src: Array[T], srcPos: Int,
+ dst: Array[T], dstPos: Int, length: Int) {
+ System.arraycopy(src, 2 * srcPos, dst, 2 * dstPos, 2 * length)
+ }
+
+ override protected def allocate(length: Int): Array[T] = {
+ new Array[T](2 * length)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
index 52c7288e18..cb99d14b27 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
@@ -170,10 +170,10 @@ class AppendOnlyMapSuite extends FunSuite {
case e: IllegalStateException => fail()
}
- val it = map.destructiveSortedIterator(new Comparator[(String, String)] {
- def compare(kv1: (String, String), kv2: (String, String)): Int = {
- val x = if (kv1 != null && kv1._1 != null) kv1._1.toInt else Int.MinValue
- val y = if (kv2 != null && kv2._1 != null) kv2._1.toInt else Int.MinValue
+ val it = map.destructiveSortedIterator(new Comparator[String] {
+ def compare(key1: String, key2: String): Int = {
+ val x = if (key1 != null) key1.toInt else Int.MinValue
+ val y = if (key2 != null) key2.toInt else Int.MinValue
x.compareTo(y)
}
})
diff --git a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
new file mode 100644
index 0000000000..6fe1079c27
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import java.lang.{Float => JFloat}
+import java.util.{Arrays, Comparator}
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.util.random.XORShiftRandom
+
+class SorterSuite extends FunSuite {
+
+ test("equivalent to Arrays.sort") {
+ val rand = new XORShiftRandom(123)
+ val data0 = Array.tabulate[Int](10000) { i => rand.nextInt() }
+ val data1 = data0.clone()
+
+ Arrays.sort(data0)
+ new Sorter(new IntArraySortDataFormat).sort(data1, 0, data1.length, Ordering.Int)
+
+ data0.zip(data1).foreach { case (x, y) => assert(x === y) }
+ }
+
+ test("KVArraySorter") {
+ val rand = new XORShiftRandom(456)
+
+ // Construct an array of keys (to Java sort) and an array where the keys and values
+ // alternate. Keys are random doubles, values are ordinals from 0 to length.
+ val keys = Array.tabulate[Double](5000) { i => rand.nextDouble() }
+ val keyValueArray = Array.tabulate[Number](10000) { i =>
+ if (i % 2 == 0) keys(i / 2) else new Integer(i / 2)
+ }
+
+ // Map from generated keys to values, to verify correctness later
+ val kvMap =
+ keyValueArray.grouped(2).map { case Array(k, v) => k.doubleValue() -> v.intValue() }.toMap
+
+ Arrays.sort(keys)
+ new Sorter(new KVArraySortDataFormat[Double, Number])
+ .sort(keyValueArray, 0, keys.length, Ordering.Double)
+
+ keys.zipWithIndex.foreach { case (k, i) =>
+ assert(k === keyValueArray(2 * i))
+ assert(kvMap(k) === keyValueArray(2 * i + 1))
+ }
+ }
+
+ /**
+ * This provides a simple benchmark for comparing the Sorter with Java internal sorting.
+ * Ideally these would be executed one at a time, each in their own JVM, so their listing
+ * here is mainly to have the code.
+ *
+ * The goal of this code is to sort an array of key-value pairs, where the array physically
+ * has the keys and values alternating. The basic Java sorts work only on the keys, so the
+ * real Java solution is to make Tuple2s to store the keys and values and sort an array of
+ * those, while the Sorter approach can work directly on the input data format.
+ *
+ * Note that the Java implementation varies tremendously between Java 6 and Java 7, when
+ * the Java sort changed from merge sort to Timsort.
+ */
+ ignore("Sorter benchmark") {
+
+ /** Runs an experiment several times. */
+ def runExperiment(name: String)(f: => Unit): Unit = {
+ val firstTry = org.apache.spark.util.Utils.timeIt(1)(f)
+ System.gc()
+
+ var i = 0
+ var next10: Long = 0
+ while (i < 10) {
+ val time = org.apache.spark.util.Utils.timeIt(1)(f)
+ next10 += time
+ println(s"$name: Took $time ms")
+ i += 1
+ }
+
+ println(s"$name: ($firstTry ms first try, ${next10 / 10} ms average)")
+ }
+
+ val numElements = 25000000 // 25 mil
+ val rand = new XORShiftRandom(123)
+
+ val keys = Array.tabulate[JFloat](numElements) { i =>
+ new JFloat(rand.nextFloat())
+ }
+
+ // Test our key-value pairs where each element is a Tuple2[Float, Integer)
+ val kvTupleArray = Array.tabulate[AnyRef](numElements) { i =>
+ (keys(i / 2): Float, i / 2: Int)
+ }
+ runExperiment("Tuple-sort using Arrays.sort()") {
+ Arrays.sort(kvTupleArray, new Comparator[AnyRef] {
+ override def compare(x: AnyRef, y: AnyRef): Int =
+ Ordering.Float.compare(x.asInstanceOf[(Float, _)]._1, y.asInstanceOf[(Float, _)]._1)
+ })
+ }
+
+ // Test our Sorter where each element alternates between Float and Integer, non-primitive
+ val keyValueArray = Array.tabulate[AnyRef](numElements * 2) { i =>
+ if (i % 2 == 0) keys(i / 2) else new Integer(i / 2)
+ }
+ val sorter = new Sorter(new KVArraySortDataFormat[JFloat, AnyRef])
+ runExperiment("KV-sort using Sorter") {
+ sorter.sort(keyValueArray, 0, keys.length, new Comparator[JFloat] {
+ override def compare(x: JFloat, y: JFloat): Int = Ordering.Float.compare(x, y)
+ })
+ }
+
+ // Test non-primitive sort on float array
+ runExperiment("Java Arrays.sort()") {
+ Arrays.sort(keys, new Comparator[JFloat] {
+ override def compare(x: JFloat, y: JFloat): Int = Ordering.Float.compare(x, y)
+ })
+ }
+
+ // Test primitive sort on float array
+ val primitiveKeys = Array.tabulate[Float](numElements) { i => rand.nextFloat() }
+ runExperiment("Java Arrays.sort() on primitive keys") {
+ Arrays.sort(primitiveKeys)
+ }
+ }
+}
+
+
+/** Format to sort a simple Array[Int]. Could be easily generified and specialized. */
+class IntArraySortDataFormat extends SortDataFormat[Int, Array[Int]] {
+ override protected def getKey(data: Array[Int], pos: Int): Int = {
+ data(pos)
+ }
+
+ override protected def swap(data: Array[Int], pos0: Int, pos1: Int): Unit = {
+ val tmp = data(pos0)
+ data(pos0) = data(pos1)
+ data(pos1) = tmp
+ }
+
+ override protected def copyElement(src: Array[Int], srcPos: Int, dst: Array[Int], dstPos: Int) {
+ dst(dstPos) = src(srcPos)
+ }
+
+ /** Copy a range of elements starting at src(srcPos) to dest, starting at destPos. */
+ override protected def copyRange(src: Array[Int], srcPos: Int,
+ dst: Array[Int], dstPos: Int, length: Int) {
+ System.arraycopy(src, srcPos, dst, dstPos, length)
+ }
+
+ /** Allocates a new structure that can hold up to 'length' elements. */
+ override protected def allocate(length: Int): Array[Int] = {
+ new Array[Int](length)
+ }
+}