aboutsummaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-01-29 12:01:13 -0800
committerCheng Lian <lian@databricks.com>2016-01-29 12:01:13 -0800
commit2b027e9a386fe4009f61ad03b169335af5a9a5c6 (patch)
treedb810e14454273a97bc849fc1f23bd452767433c /common
parent5f686cc8b74ea9e36f56c31f14df90d134fd9343 (diff)
downloadspark-2b027e9a386fe4009f61ad03b169335af5a9a5c6.tar.gz
spark-2b027e9a386fe4009f61ad03b169335af5a9a5c6.tar.bz2
spark-2b027e9a386fe4009f61ad03b169335af5a9a5c6.zip
[SPARK-12818] Polishes spark-sketch module
Fixes various minor code and Javadoc styling issues. Author: Cheng Lian <lian@databricks.com> Closes #10985 from liancheng/sketch-polishing.
Diffstat (limited to 'common')
-rw-r--r--common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java2
-rw-r--r--common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java111
-rw-r--r--common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java40
-rw-r--r--common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java26
-rw-r--r--common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java12
-rw-r--r--common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java2
6 files changed, 110 insertions, 83 deletions
diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java
index 2a0484e324..480a0a79db 100644
--- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java
+++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java
@@ -22,7 +22,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
-public final class BitArray {
+final class BitArray {
private final long[] data;
private long bitCount;
diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
index 81772fcea0..c0b425e729 100644
--- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
+++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
@@ -22,16 +22,10 @@ import java.io.InputStream;
import java.io.OutputStream;
/**
- * A Bloom filter is a space-efficient probabilistic data structure, that is used to test whether
- * an element is a member of a set. It returns false when the element is definitely not in the
- * set, returns true when the element is probably in the set.
- *
- * Internally a Bloom filter is initialized with 2 information: how many space to use(number of
- * bits) and how many hash values to calculate for each record. To get as lower false positive
- * probability as possible, user should call {@link BloomFilter#create} to automatically pick a
- * best combination of these 2 parameters.
- *
- * Currently the following data types are supported:
+ * A Bloom filter is a space-efficient probabilistic data structure that offers an approximate
+ * containment test with one-sided error: if it claims that an item is contained in it, this
+ * might be in error, but if it claims that an item is <i>not</i> contained in it, then this is
+ * definitely true. Currently supported data types include:
* <ul>
* <li>{@link Byte}</li>
* <li>{@link Short}</li>
@@ -39,14 +33,17 @@ import java.io.OutputStream;
* <li>{@link Long}</li>
* <li>{@link String}</li>
* </ul>
+ * The false positive probability ({@code FPP}) of a Bloom filter is defined as the probability that
+ * {@linkplain #mightContain(Object)} will erroneously return {@code true} for an object that hasu
+ * not actually been put in the {@code BloomFilter}.
*
- * The implementation is largely based on the {@code BloomFilter} class from guava.
+ * The implementation is largely based on the {@code BloomFilter} class from Guava.
*/
public abstract class BloomFilter {
public enum Version {
/**
- * {@code BloomFilter} binary format version 1 (all values written in big-endian order):
+ * {@code BloomFilter} binary format version 1. All values written in big-endian order:
* <ul>
* <li>Version number, always 1 (32 bit)</li>
* <li>Number of hash functions (32 bit)</li>
@@ -68,14 +65,13 @@ public abstract class BloomFilter {
}
/**
- * Returns the false positive probability, i.e. the probability that
- * {@linkplain #mightContain(Object)} will erroneously return {@code true} for an object that
- * has not actually been put in the {@code BloomFilter}.
+ * Returns the probability that {@linkplain #mightContain(Object)} erroneously return {@code true}
+ * for an object that has not actually been put in the {@code BloomFilter}.
*
- * <p>Ideally, this number should be close to the {@code fpp} parameter
- * passed in to create this bloom filter, or smaller. If it is
- * significantly higher, it is usually the case that too many elements (more than
- * expected) have been put in the {@code BloomFilter}, degenerating it.
+ * Ideally, this number should be close to the {@code fpp} parameter passed in
+ * {@linkplain #create(long, double)}, or smaller. If it is significantly higher, it is usually
+ * the case that too many items (more than expected) have been put in the {@code BloomFilter},
+ * degenerating it.
*/
public abstract double expectedFpp();
@@ -85,8 +81,8 @@ public abstract class BloomFilter {
public abstract long bitSize();
/**
- * Puts an element into this {@code BloomFilter}. Ensures that subsequent invocations of
- * {@link #mightContain(Object)} with the same element will always return {@code true}.
+ * Puts an item into this {@code BloomFilter}. Ensures that subsequent invocations of
+ * {@linkplain #mightContain(Object)} with the same item will always return {@code true}.
*
* @return true if the bloom filter's bits changed as a result of this operation. If the bits
* changed, this is <i>definitely</i> the first time {@code object} has been added to the
@@ -98,19 +94,19 @@ public abstract class BloomFilter {
public abstract boolean put(Object item);
/**
- * A specialized variant of {@link #put(Object)}, that can only be used to put utf-8 string.
+ * A specialized variant of {@link #put(Object)} that only supports {@code String} items.
*/
- public abstract boolean putString(String str);
+ public abstract boolean putString(String item);
/**
- * A specialized variant of {@link #put(Object)}, that can only be used to put long.
+ * A specialized variant of {@link #put(Object)} that only supports {@code long} items.
*/
- public abstract boolean putLong(long l);
+ public abstract boolean putLong(long item);
/**
- * A specialized variant of {@link #put(Object)}, that can only be used to put byte array.
+ * A specialized variant of {@link #put(Object)} that only supports byte array items.
*/
- public abstract boolean putBinary(byte[] bytes);
+ public abstract boolean putBinary(byte[] item);
/**
* Determines whether a given bloom filter is compatible with this bloom filter. For two
@@ -137,38 +133,36 @@ public abstract class BloomFilter {
public abstract boolean mightContain(Object item);
/**
- * A specialized variant of {@link #mightContain(Object)}, that can only be used to test utf-8
- * string.
+ * A specialized variant of {@link #mightContain(Object)} that only tests {@code String} items.
*/
- public abstract boolean mightContainString(String str);
+ public abstract boolean mightContainString(String item);
/**
- * A specialized variant of {@link #mightContain(Object)}, that can only be used to test long.
+ * A specialized variant of {@link #mightContain(Object)} that only tests {@code long} items.
*/
- public abstract boolean mightContainLong(long l);
+ public abstract boolean mightContainLong(long item);
/**
- * A specialized variant of {@link #mightContain(Object)}, that can only be used to test byte
- * array.
+ * A specialized variant of {@link #mightContain(Object)} that only tests byte array items.
*/
- public abstract boolean mightContainBinary(byte[] bytes);
+ public abstract boolean mightContainBinary(byte[] item);
/**
- * Writes out this {@link BloomFilter} to an output stream in binary format.
- * It is the caller's responsibility to close the stream.
+ * Writes out this {@link BloomFilter} to an output stream in binary format. It is the caller's
+ * responsibility to close the stream.
*/
public abstract void writeTo(OutputStream out) throws IOException;
/**
- * Reads in a {@link BloomFilter} from an input stream.
- * It is the caller's responsibility to close the stream.
+ * Reads in a {@link BloomFilter} from an input stream. It is the caller's responsibility to close
+ * the stream.
*/
public static BloomFilter readFrom(InputStream in) throws IOException {
return BloomFilterImpl.readFrom(in);
}
/**
- * Computes the optimal k (number of hashes per element inserted in Bloom filter), given the
+ * Computes the optimal k (number of hashes per item inserted in Bloom filter), given the
* expected insertions and total number of bits in the Bloom filter.
*
* See http://en.wikipedia.org/wiki/File:Bloom_filter_fp_probability.svg for the formula.
@@ -197,21 +191,31 @@ public abstract class BloomFilter {
static final double DEFAULT_FPP = 0.03;
/**
- * Creates a {@link BloomFilter} with given {@code expectedNumItems} and the default {@code fpp}.
+ * Creates a {@link BloomFilter} with the expected number of insertions and a default expected
+ * false positive probability of 3%.
+ *
+ * Note that overflowing a {@code BloomFilter} with significantly more elements than specified,
+ * will result in its saturation, and a sharp deterioration of its false positive probability.
*/
public static BloomFilter create(long expectedNumItems) {
return create(expectedNumItems, DEFAULT_FPP);
}
/**
- * Creates a {@link BloomFilter} with given {@code expectedNumItems} and {@code fpp}, it will pick
- * an optimal {@code numBits} and {@code numHashFunctions} for the bloom filter.
+ * Creates a {@link BloomFilter} with the expected number of insertions and expected false
+ * positive probability.
+ *
+ * Note that overflowing a {@code BloomFilter} with significantly more elements than specified,
+ * will result in its saturation, and a sharp deterioration of its false positive probability.
*/
public static BloomFilter create(long expectedNumItems, double fpp) {
- assert fpp > 0.0 : "False positive probability must be > 0.0";
- assert fpp < 1.0 : "False positive probability must be < 1.0";
- long numBits = optimalNumOfBits(expectedNumItems, fpp);
- return create(expectedNumItems, numBits);
+ if (fpp <= 0D || fpp >= 1D) {
+ throw new IllegalArgumentException(
+ "False positive probability must be within range (0.0, 1.0)"
+ );
+ }
+
+ return create(expectedNumItems, optimalNumOfBits(expectedNumItems, fpp));
}
/**
@@ -219,9 +223,14 @@ public abstract class BloomFilter {
* pick an optimal {@code numHashFunctions} which can minimize {@code fpp} for the bloom filter.
*/
public static BloomFilter create(long expectedNumItems, long numBits) {
- assert expectedNumItems > 0 : "Expected insertions must be > 0";
- assert numBits > 0 : "number of bits must be > 0";
- int numHashFunctions = optimalNumOfHashFunctions(expectedNumItems, numBits);
- return new BloomFilterImpl(numHashFunctions, numBits);
+ if (expectedNumItems <= 0) {
+ throw new IllegalArgumentException("Expected insertions must be positive");
+ }
+
+ if (numBits <= 0) {
+ throw new IllegalArgumentException("Number of bits must be positive");
+ }
+
+ return new BloomFilterImpl(optimalNumOfHashFunctions(expectedNumItems, numBits), numBits);
}
}
diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java
index 35107e0b38..92c28bcb56 100644
--- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java
+++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java
@@ -19,9 +19,10 @@ package org.apache.spark.util.sketch;
import java.io.*;
-public class BloomFilterImpl extends BloomFilter implements Serializable {
+class BloomFilterImpl extends BloomFilter implements Serializable {
private int numHashFunctions;
+
private BitArray bits;
BloomFilterImpl(int numHashFunctions, long numBits) {
@@ -77,14 +78,14 @@ public class BloomFilterImpl extends BloomFilter implements Serializable {
}
@Override
- public boolean putString(String str) {
- return putBinary(Utils.getBytesFromUTF8String(str));
+ public boolean putString(String item) {
+ return putBinary(Utils.getBytesFromUTF8String(item));
}
@Override
- public boolean putBinary(byte[] bytes) {
- int h1 = Murmur3_x86_32.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, 0);
- int h2 = Murmur3_x86_32.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, h1);
+ public boolean putBinary(byte[] item) {
+ int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, 0);
+ int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1);
long bitSize = bits.bitSize();
boolean bitsChanged = false;
@@ -100,14 +101,14 @@ public class BloomFilterImpl extends BloomFilter implements Serializable {
}
@Override
- public boolean mightContainString(String str) {
- return mightContainBinary(Utils.getBytesFromUTF8String(str));
+ public boolean mightContainString(String item) {
+ return mightContainBinary(Utils.getBytesFromUTF8String(item));
}
@Override
- public boolean mightContainBinary(byte[] bytes) {
- int h1 = Murmur3_x86_32.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, 0);
- int h2 = Murmur3_x86_32.hashUnsafeBytes(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length, h1);
+ public boolean mightContainBinary(byte[] item) {
+ int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, 0);
+ int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1);
long bitSize = bits.bitSize();
for (int i = 1; i <= numHashFunctions; i++) {
@@ -124,14 +125,14 @@ public class BloomFilterImpl extends BloomFilter implements Serializable {
}
@Override
- public boolean putLong(long l) {
+ public boolean putLong(long item) {
// Here we first hash the input long element into 2 int hash values, h1 and h2, then produce n
// hash values by `h1 + i * h2` with 1 <= i <= numHashFunctions.
// Note that `CountMinSketch` use a different strategy, it hash the input long element with
// every i to produce n hash values.
// TODO: the strategy of `CountMinSketch` looks more advanced, should we follow it here?
- int h1 = Murmur3_x86_32.hashLong(l, 0);
- int h2 = Murmur3_x86_32.hashLong(l, h1);
+ int h1 = Murmur3_x86_32.hashLong(item, 0);
+ int h2 = Murmur3_x86_32.hashLong(item, h1);
long bitSize = bits.bitSize();
boolean bitsChanged = false;
@@ -147,9 +148,9 @@ public class BloomFilterImpl extends BloomFilter implements Serializable {
}
@Override
- public boolean mightContainLong(long l) {
- int h1 = Murmur3_x86_32.hashLong(l, 0);
- int h2 = Murmur3_x86_32.hashLong(l, h1);
+ public boolean mightContainLong(long item) {
+ int h1 = Murmur3_x86_32.hashLong(item, 0);
+ int h2 = Murmur3_x86_32.hashLong(item, h1);
long bitSize = bits.bitSize();
for (int i = 1; i <= numHashFunctions; i++) {
@@ -197,7 +198,7 @@ public class BloomFilterImpl extends BloomFilter implements Serializable {
throw new IncompatibleMergeException("Cannot merge null bloom filter");
}
- if (!(other instanceof BloomFilter)) {
+ if (!(other instanceof BloomFilterImpl)) {
throw new IncompatibleMergeException(
"Cannot merge bloom filter of class " + other.getClass().getName()
);
@@ -211,7 +212,8 @@ public class BloomFilterImpl extends BloomFilter implements Serializable {
if (this.numHashFunctions != that.numHashFunctions) {
throw new IncompatibleMergeException(
- "Cannot merge bloom filters with different number of hash functions");
+ "Cannot merge bloom filters with different number of hash functions"
+ );
}
this.bits.putAll(that.bits);
diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java
index f0aac5bb00..48f98680f4 100644
--- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java
+++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketch.java
@@ -22,7 +22,7 @@ import java.io.InputStream;
import java.io.OutputStream;
/**
- * A Count-Min sketch is a probabilistic data structure used for summarizing streams of data in
+ * A Count-min sketch is a probabilistic data structure used for summarizing streams of data in
* sub-linear space. Currently, supported data types include:
* <ul>
* <li>{@link Byte}</li>
@@ -31,8 +31,7 @@ import java.io.OutputStream;
* <li>{@link Long}</li>
* <li>{@link String}</li>
* </ul>
- * Each {@link CountMinSketch} is initialized with a random seed, and a pair
- * of parameters:
+ * A {@link CountMinSketch} is initialized with a random seed, and a pair of parameters:
* <ol>
* <li>relative error (or {@code eps}), and
* <li>confidence (or {@code delta})
@@ -49,16 +48,13 @@ import java.io.OutputStream;
* <li>{@code w = ceil(-log(1 - confidence) / log(2))}</li>
* </ul>
*
- * See http://www.eecs.harvard.edu/~michaelm/CS222/countmin.pdf for technical details,
- * including proofs of the estimates and error bounds used in this implementation.
- *
* This implementation is largely based on the {@code CountMinSketch} class from stream-lib.
*/
abstract public class CountMinSketch {
public enum Version {
/**
- * {@code CountMinSketch} binary format version 1 (all values written in big-endian order):
+ * {@code CountMinSketch} binary format version 1. All values written in big-endian order:
* <ul>
* <li>Version number, always 1 (32 bit)</li>
* <li>Total count of added items (64 bit)</li>
@@ -172,14 +168,14 @@ abstract public class CountMinSketch {
throws IncompatibleMergeException;
/**
- * Writes out this {@link CountMinSketch} to an output stream in binary format.
- * It is the caller's responsibility to close the stream.
+ * Writes out this {@link CountMinSketch} to an output stream in binary format. It is the caller's
+ * responsibility to close the stream.
*/
public abstract void writeTo(OutputStream out) throws IOException;
/**
- * Reads in a {@link CountMinSketch} from an input stream.
- * It is the caller's responsibility to close the stream.
+ * Reads in a {@link CountMinSketch} from an input stream. It is the caller's responsibility to
+ * close the stream.
*/
public static CountMinSketch readFrom(InputStream in) throws IOException {
return CountMinSketchImpl.readFrom(in);
@@ -188,6 +184,10 @@ abstract public class CountMinSketch {
/**
* Creates a {@link CountMinSketch} with given {@code depth}, {@code width}, and random
* {@code seed}.
+ *
+ * @param depth depth of the Count-min Sketch, must be positive
+ * @param width width of the Count-min Sketch, must be positive
+ * @param seed random seed
*/
public static CountMinSketch create(int depth, int width, int seed) {
return new CountMinSketchImpl(depth, width, seed);
@@ -196,6 +196,10 @@ abstract public class CountMinSketch {
/**
* Creates a {@link CountMinSketch} with given relative error ({@code eps}), {@code confidence},
* and random {@code seed}.
+ *
+ * @param eps relative error, must be positive
+ * @param confidence confidence, must be positive and less than 1.0
+ * @param seed random seed
*/
public static CountMinSketch create(double eps, double confidence, int seed) {
return new CountMinSketchImpl(eps, confidence, seed);
diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java
index c0631c6778..2acbb247b1 100644
--- a/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java
+++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/CountMinSketchImpl.java
@@ -42,6 +42,10 @@ class CountMinSketchImpl extends CountMinSketch implements Serializable {
private CountMinSketchImpl() {}
CountMinSketchImpl(int depth, int width, int seed) {
+ if (depth <= 0 || width <= 0) {
+ throw new IllegalArgumentException("Depth and width must be both positive");
+ }
+
this.depth = depth;
this.width = width;
this.eps = 2.0 / width;
@@ -50,6 +54,14 @@ class CountMinSketchImpl extends CountMinSketch implements Serializable {
}
CountMinSketchImpl(double eps, double confidence, int seed) {
+ if (eps <= 0D) {
+ throw new IllegalArgumentException("Relative error must be positive");
+ }
+
+ if (confidence <= 0D || confidence >= 1D) {
+ throw new IllegalArgumentException("Confidence must be within range (0.0, 1.0)");
+ }
+
// 2/w = eps ; w = 2/eps
// 1/2^depth <= 1-confidence ; depth >= -log2 (1-confidence)
this.eps = eps;
diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java
index a6b3331303..feb601d44f 100644
--- a/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java
+++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/Utils.java
@@ -19,7 +19,7 @@ package org.apache.spark.util.sketch;
import java.io.UnsupportedEncodingException;
-public class Utils {
+class Utils {
public static byte[] getBytesFromUTF8String(String str) {
try {
return str.getBytes("utf-8");