aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--conf/metrics.properties.template8
-rw-r--r--core/pom.xml4
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala82
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala93
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala94
-rw-r--r--core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala76
-rw-r--r--docs/monitoring.md1
-rw-r--r--docs/tuning.md2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala11
-rw-r--r--pom.xml5
-rw-r--r--project/SparkBuild.scala3
16 files changed, 388 insertions, 63 deletions
diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
index ae10f615d1..1c3d94e1b0 100644
--- a/conf/metrics.properties.template
+++ b/conf/metrics.properties.template
@@ -80,6 +80,14 @@
# /metrics/aplications/json # App information
# /metrics/master/json # Master information
+# org.apache.spark.metrics.sink.GraphiteSink
+# Name: Default: Description:
+# host NONE Hostname of Graphite server
+# port NONE Port of Graphite server
+# period 10 Poll period
+# unit seconds Units of poll period
+# prefix EMPTY STRING Prefix to prepend to metric name
+
## Examples
# Enable JmxSink for all instances by class name
#*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
diff --git a/core/pom.xml b/core/pom.xml
index 468dd71249..e2033c9912 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -151,6 +151,10 @@
<artifactId>metrics-ganglia</artifactId>
</dependency>
<dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-graphite</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<scope>test</scope>
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
new file mode 100644
index 0000000000..cdcfec8ca7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+import java.net.InetSocketAddress
+
+import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.graphite.{GraphiteReporter, Graphite}
+
+import org.apache.spark.metrics.MetricsSystem
+
+class GraphiteSink(val property: Properties, val registry: MetricRegistry) extends Sink {
+ val GRAPHITE_DEFAULT_PERIOD = 10
+ val GRAPHITE_DEFAULT_UNIT = "SECONDS"
+ val GRAPHITE_DEFAULT_PREFIX = ""
+
+ val GRAPHITE_KEY_HOST = "host"
+ val GRAPHITE_KEY_PORT = "port"
+ val GRAPHITE_KEY_PERIOD = "period"
+ val GRAPHITE_KEY_UNIT = "unit"
+ val GRAPHITE_KEY_PREFIX = "prefix"
+
+ def propertyToOption(prop: String) = Option(property.getProperty(prop))
+
+ if (!propertyToOption(GRAPHITE_KEY_HOST).isDefined) {
+ throw new Exception("Graphite sink requires 'host' property.")
+ }
+
+ if (!propertyToOption(GRAPHITE_KEY_PORT).isDefined) {
+ throw new Exception("Graphite sink requires 'port' property.")
+ }
+
+ val host = propertyToOption(GRAPHITE_KEY_HOST).get
+ val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt
+
+ val pollPeriod = propertyToOption(GRAPHITE_KEY_PERIOD) match {
+ case Some(s) => s.toInt
+ case None => GRAPHITE_DEFAULT_PERIOD
+ }
+
+ val pollUnit = propertyToOption(GRAPHITE_KEY_UNIT) match {
+ case Some(s) => TimeUnit.valueOf(s.toUpperCase())
+ case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT)
+ }
+
+ val prefix = propertyToOption(GRAPHITE_KEY_PREFIX).getOrElse(GRAPHITE_DEFAULT_PREFIX)
+
+ MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+ val graphite: Graphite = new Graphite(new InetSocketAddress(host, port))
+
+ val reporter: GraphiteReporter = GraphiteReporter.forRegistry(registry)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .prefixedWith(prefix)
+ .build(graphite)
+
+ override def start() {
+ reporter.start(pollPeriod, pollUnit)
+ }
+
+ override def stop() {
+ reporter.stop()
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 3c237ca20a..5f6407aadc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -551,17 +551,27 @@ abstract class RDD[T: ClassTag](
def zipPartitions[B: ClassTag, V: ClassTag]
(rdd2: RDD[B])
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
- new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2)
+ new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, false)
+
+ def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
+ (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)
+ (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
+ new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning)
def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C])
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
- new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3)
+ new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, false)
+
+ def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
+ (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)
+ (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
+ new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning)
def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
- new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4)
+ new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, false)
// Actions (launch a job to return a value to the user program)
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index e02c17bf45..9313bf87ec 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -40,9 +40,13 @@ private[spark] class ZippedPartitionsPartition(
abstract class ZippedPartitionsBaseRDD[V: ClassTag](
sc: SparkContext,
- var rdds: Seq[RDD[_]])
+ var rdds: Seq[RDD[_]],
+ preservesPartitioning: Boolean = false)
extends RDD[V](sc, rdds.map(x => new OneToOneDependency(x))) {
+ override val partitioner =
+ if (preservesPartitioning) firstParent[Any].partitioner else None
+
override def getPartitions: Array[Partition] = {
val sizes = rdds.map(x => x.partitions.size)
if (!sizes.forall(x => x == sizes(0))) {
@@ -77,8 +81,9 @@ class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag](
sc: SparkContext,
f: (Iterator[A], Iterator[B]) => Iterator[V],
var rdd1: RDD[A],
- var rdd2: RDD[B])
- extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) {
+ var rdd2: RDD[B],
+ preservesPartitioning: Boolean = false)
+ extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) {
override def compute(s: Partition, context: TaskContext): Iterator[V] = {
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
@@ -98,8 +103,9 @@ class ZippedPartitionsRDD3
f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V],
var rdd1: RDD[A],
var rdd2: RDD[B],
- var rdd3: RDD[C])
- extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3)) {
+ var rdd3: RDD[C],
+ preservesPartitioning: Boolean = false)
+ extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3), preservesPartitioning) {
override def compute(s: Partition, context: TaskContext): Iterator[V] = {
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
@@ -123,8 +129,9 @@ class ZippedPartitionsRDD4
var rdd1: RDD[A],
var rdd2: RDD[B],
var rdd3: RDD[C],
- var rdd4: RDD[D])
- extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4)) {
+ var rdd4: RDD[D],
+ preservesPartitioning: Boolean = false)
+ extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4), preservesPartitioning) {
override def compute(s: Partition, context: TaskContext): Iterator[V] = {
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 1dc71a0428..0f2deb4bcb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -167,6 +167,7 @@ private[spark] class ShuffleMapTask(
var totalTime = 0L
val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit()
+ writer.close()
val size = writer.fileSegment().length
totalBytes += size
totalTime += writer.timeWriting()
@@ -184,14 +185,16 @@ private[spark] class ShuffleMapTask(
} catch { case e: Exception =>
// If there is an exception from running the task, revert the partial writes
// and throw the exception upstream to Spark.
- if (shuffle != null) {
- shuffle.writers.foreach(_.revertPartialWrites())
+ if (shuffle != null && shuffle.writers != null) {
+ for (writer <- shuffle.writers) {
+ writer.revertPartialWrites()
+ writer.close()
+ }
}
throw e
} finally {
// Release the writers back to the shuffle block manager.
if (shuffle != null && shuffle.writers != null) {
- shuffle.writers.foreach(_.close())
shuffle.releaseWriters(success)
}
// Execute the callbacks on task completion.
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 469e68fed7..b4451fc7b8 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -93,6 +93,8 @@ class DiskBlockObjectWriter(
def write(i: Int): Unit = callWithTiming(out.write(i))
override def write(b: Array[Byte]) = callWithTiming(out.write(b))
override def write(b: Array[Byte], off: Int, len: Int) = callWithTiming(out.write(b, off, len))
+ override def close() = out.close()
+ override def flush() = out.flush()
}
private val syncWrites = System.getProperty("spark.shuffle.sync", "false").toBoolean
diff --git a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
index f60deafc6f..8bb4ee3bfa 100644
--- a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
@@ -35,6 +35,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
private var capacity = nextPowerOf2(initialCapacity)
private var mask = capacity - 1
private var curSize = 0
+ private var growThreshold = LOAD_FACTOR * capacity
// Holds keys and values in the same array for memory locality; specifically, the order of
// elements is key0, value0, key1, value1, key2, value2, etc.
@@ -56,7 +57,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
var i = 1
while (true) {
val curKey = data(2 * pos)
- if (k.eq(curKey) || k == curKey) {
+ if (k.eq(curKey) || k.equals(curKey)) {
return data(2 * pos + 1).asInstanceOf[V]
} else if (curKey.eq(null)) {
return null.asInstanceOf[V]
@@ -80,9 +81,23 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
haveNullValue = true
return
}
- val isNewEntry = putInto(data, k, value.asInstanceOf[AnyRef])
- if (isNewEntry) {
- incrementSize()
+ var pos = rehash(key.hashCode) & mask
+ var i = 1
+ while (true) {
+ val curKey = data(2 * pos)
+ if (curKey.eq(null)) {
+ data(2 * pos) = k
+ data(2 * pos + 1) = value.asInstanceOf[AnyRef]
+ incrementSize() // Since we added a new key
+ return
+ } else if (k.eq(curKey) || k.equals(curKey)) {
+ data(2 * pos + 1) = value.asInstanceOf[AnyRef]
+ return
+ } else {
+ val delta = i
+ pos = (pos + delta) & mask
+ i += 1
+ }
}
}
@@ -104,7 +119,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
var i = 1
while (true) {
val curKey = data(2 * pos)
- if (k.eq(curKey) || k == curKey) {
+ if (k.eq(curKey) || k.equals(curKey)) {
val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
return newValue
@@ -161,45 +176,17 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
/** Increase table size by 1, rehashing if necessary */
private def incrementSize() {
curSize += 1
- if (curSize > LOAD_FACTOR * capacity) {
+ if (curSize > growThreshold) {
growTable()
}
}
/**
- * Re-hash a value to deal better with hash functions that don't differ
- * in the lower bits, similar to java.util.HashMap
+ * Re-hash a value to deal better with hash functions that don't differ in the lower bits.
+ * We use the Murmur Hash 3 finalization step that's also used in fastutil.
*/
private def rehash(h: Int): Int = {
- val r = h ^ (h >>> 20) ^ (h >>> 12)
- r ^ (r >>> 7) ^ (r >>> 4)
- }
-
- /**
- * Put an entry into a table represented by data, returning true if
- * this increases the size of the table or false otherwise. Assumes
- * that "data" has at least one empty slot.
- */
- private def putInto(data: Array[AnyRef], key: AnyRef, value: AnyRef): Boolean = {
- val mask = (data.length / 2) - 1
- var pos = rehash(key.hashCode) & mask
- var i = 1
- while (true) {
- val curKey = data(2 * pos)
- if (curKey.eq(null)) {
- data(2 * pos) = key
- data(2 * pos + 1) = value.asInstanceOf[AnyRef]
- return true
- } else if (curKey.eq(key) || curKey == key) {
- data(2 * pos + 1) = value.asInstanceOf[AnyRef]
- return false
- } else {
- val delta = i
- pos = (pos + delta) & mask
- i += 1
- }
- }
- return false // Never reached but needed to keep compiler happy
+ it.unimi.dsi.fastutil.HashCommon.murmurHash3(h)
}
/** Double the table's size and re-hash everything */
@@ -211,16 +198,36 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
throw new Exception("Can't make capacity bigger than 2^29 elements")
}
val newData = new Array[AnyRef](2 * newCapacity)
- var pos = 0
- while (pos < capacity) {
- if (!data(2 * pos).eq(null)) {
- putInto(newData, data(2 * pos), data(2 * pos + 1))
+ val newMask = newCapacity - 1
+ // Insert all our old values into the new array. Note that because our old keys are
+ // unique, there's no need to check for equality here when we insert.
+ var oldPos = 0
+ while (oldPos < capacity) {
+ if (!data(2 * oldPos).eq(null)) {
+ val key = data(2 * oldPos)
+ val value = data(2 * oldPos + 1)
+ var newPos = rehash(key.hashCode) & newMask
+ var i = 1
+ var keepGoing = true
+ while (keepGoing) {
+ val curKey = newData(2 * newPos)
+ if (curKey.eq(null)) {
+ newData(2 * newPos) = key
+ newData(2 * newPos + 1) = value
+ keepGoing = false
+ } else {
+ val delta = i
+ newPos = (newPos + delta) & newMask
+ i += 1
+ }
+ }
}
- pos += 1
+ oldPos += 1
}
data = newData
capacity = newCapacity
- mask = newCapacity - 1
+ mask = newMask
+ growThreshold = LOAD_FACTOR * newCapacity
}
private def nextPowerOf2(n: Int): Int = {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 02adcb41c6..3f7858d2de 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -824,4 +824,28 @@ private[spark] object Utils extends Logging {
return System.getProperties().clone()
.asInstanceOf[java.util.Properties].toMap[String, String]
}
+
+ /**
+ * Method executed for repeating a task for side effects.
+ * Unlike a for comprehension, it permits JVM JIT optimization
+ */
+ def times(numIters: Int)(f: => Unit): Unit = {
+ var i = 0
+ while (i < numIters) {
+ f
+ i += 1
+ }
+ }
+
+ /**
+ * Timing method based on iterations that permit JVM JIT optimization.
+ * @param numIters number of iterations
+ * @param f function to be executed
+ */
+ def timeIt(numIters: Int)(f: => Unit): Long = {
+ val start = System.currentTimeMillis
+ times(numIters)(f)
+ System.currentTimeMillis - start
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala b/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala
new file mode 100644
index 0000000000..e9907e6c85
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.{Random => JavaRandom}
+import org.apache.spark.util.Utils.timeIt
+
+/**
+ * This class implements a XORShift random number generator algorithm
+ * Source:
+ * Marsaglia, G. (2003). Xorshift RNGs. Journal of Statistical Software, Vol. 8, Issue 14.
+ * @see <a href="http://www.jstatsoft.org/v08/i14/paper">Paper</a>
+ * This implementation is approximately 3.5 times faster than
+ * {@link java.util.Random java.util.Random}, partly because of the algorithm, but also due
+ * to renouncing thread safety. JDK's implementation uses an AtomicLong seed, this class
+ * uses a regular Long. We can forgo thread safety since we use a new instance of the RNG
+ * for each thread.
+ */
+private[spark] class XORShiftRandom(init: Long) extends JavaRandom(init) {
+
+ def this() = this(System.nanoTime)
+
+ private var seed = init
+
+ // we need to just override next - this will be called by nextInt, nextDouble,
+ // nextGaussian, nextLong, etc.
+ override protected def next(bits: Int): Int = {
+ var nextSeed = seed ^ (seed << 21)
+ nextSeed ^= (nextSeed >>> 35)
+ nextSeed ^= (nextSeed << 4)
+ seed = nextSeed
+ (nextSeed & ((1L << bits) -1)).asInstanceOf[Int]
+ }
+}
+
+/** Contains benchmark method and main method to run benchmark of the RNG */
+private[spark] object XORShiftRandom {
+
+ /**
+ * Main method for running benchmark
+ * @param args takes one argument - the number of random numbers to generate
+ */
+ def main(args: Array[String]): Unit = {
+ if (args.length != 1) {
+ println("Benchmark of XORShiftRandom vis-a-vis java.util.Random")
+ println("Usage: XORShiftRandom number_of_random_numbers_to_generate")
+ System.exit(1)
+ }
+ println(benchmark(args(0).toInt))
+ }
+
+ /**
+ * @param numIters Number of random numbers to generate while running the benchmark
+ * @return Map of execution times for {@link java.util.Random java.util.Random}
+ * and XORShift
+ */
+ def benchmark(numIters: Int) = {
+
+ val seed = 1L
+ val million = 1e6.toInt
+ val javaRand = new JavaRandom(seed)
+ val xorRand = new XORShiftRandom(seed)
+
+ // this is just to warm up the JIT - we're not timing anything
+ timeIt(1e6.toInt) {
+ javaRand.nextInt()
+ xorRand.nextInt()
+ }
+
+ val iters = timeIt(numIters)(_)
+
+ /* Return results as a map instead of just printing to screen
+ in case the user wants to do something with them */
+ Map("javaTime" -> iters {javaRand.nextInt()},
+ "xorTime" -> iters {xorRand.nextInt()})
+
+ }
+
+} \ No newline at end of file
diff --git a/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala b/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala
new file mode 100644
index 0000000000..b78367b6ca
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.Random
+import org.scalatest.FlatSpec
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+import org.apache.spark.util.Utils.times
+
+class XORShiftRandomSuite extends FunSuite with ShouldMatchers {
+
+ def fixture = new {
+ val seed = 1L
+ val xorRand = new XORShiftRandom(seed)
+ val hundMil = 1e8.toInt
+ }
+
+ /*
+ * This test is based on a chi-squared test for randomness. The values are hard-coded
+ * so as not to create Spark's dependency on apache.commons.math3 just to call one
+ * method for calculating the exact p-value for a given number of random numbers
+ * and bins. In case one would want to move to a full-fledged test based on
+ * apache.commons.math3, the relevant class is here:
+ * org.apache.commons.math3.stat.inference.ChiSquareTest
+ */
+ test ("XORShift generates valid random numbers") {
+
+ val f = fixture
+
+ val numBins = 10
+ // create 10 bins
+ val bins = Array.fill(numBins)(0)
+
+ // populate bins based on modulus of the random number
+ times(f.hundMil) {bins(math.abs(f.xorRand.nextInt) % 10) += 1}
+
+ /* since the seed is deterministic, until the algorithm is changed, we know the result will be
+ * exactly this: Array(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272,
+ * 10000790, 10002286, 9998699), so the test will never fail at the prespecified (5%)
+ * significance level. However, should the RNG implementation change, the test should still
+ * pass at the same significance level. The chi-squared test done in R gave the following
+ * results:
+ * > chisq.test(c(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272,
+ * 10000790, 10002286, 9998699))
+ * Chi-squared test for given probabilities
+ * data: c(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, 10000790,
+ * 10002286, 9998699)
+ * X-squared = 11.975, df = 9, p-value = 0.2147
+ * Note that the p-value was ~0.22. The test will fail if alpha < 0.05, which for 100 million
+ * random numbers
+ * and 10 bins will happen at X-squared of ~16.9196. So, the test will fail if X-squared
+ * is greater than or equal to that number.
+ */
+ val binSize = f.hundMil/numBins
+ val xSquared = bins.map(x => math.pow((binSize - x), 2)/binSize).sum
+ xSquared should be < (16.9196)
+
+ }
+
+} \ No newline at end of file
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 5f456b999b..5ed0474477 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -50,6 +50,7 @@ Each instance can report to zero or more _sinks_. Sinks are contained in the
* `GangliaSink`: Sends metrics to a Ganglia node or multicast group.
* `JmxSink`: Registers metrics for viewing in a JXM console.
* `MetricsServlet`: Adds a servlet within the existing Spark UI to serve metrics data as JSON data.
+* `GraphiteSink`: Sends metrics to a Graphite node.
The syntax of the metrics configuration file is defined in an example configuration file,
`$SPARK_HOME/conf/metrics.conf.template`.
diff --git a/docs/tuning.md b/docs/tuning.md
index f491ae9b95..f33fda37eb 100644
--- a/docs/tuning.md
+++ b/docs/tuning.md
@@ -67,7 +67,7 @@ The [Kryo documentation](http://code.google.com/p/kryo/) describes more advanced
registration options, such as adding custom serialization code.
If your objects are large, you may also need to increase the `spark.kryoserializer.buffer.mb`
-system property. The default is 32, but this value needs to be large enough to hold the *largest*
+system property. The default is 2, but this value needs to be large enough to hold the *largest*
object you will serialize.
Finally, if you don't register your classes, Kryo will still work, but it will have to store the
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index edbf77dbcc..0dee9399a8 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -18,15 +18,16 @@
package org.apache.spark.mllib.clustering
import scala.collection.mutable.ArrayBuffer
-import scala.util.Random
+
+import org.jblas.DoubleMatrix
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.Logging
import org.apache.spark.mllib.util.MLUtils
+import org.apache.spark.util.XORShiftRandom
-import org.jblas.DoubleMatrix
/**
@@ -195,7 +196,7 @@ class KMeans private (
*/
private def initRandom(data: RDD[Array[Double]]): Array[ClusterCenters] = {
// Sample all the cluster centers in one pass to avoid repeated scans
- val sample = data.takeSample(true, runs * k, new Random().nextInt()).toSeq
+ val sample = data.takeSample(true, runs * k, new XORShiftRandom().nextInt()).toSeq
Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).toArray)
}
@@ -210,7 +211,7 @@ class KMeans private (
*/
private def initKMeansParallel(data: RDD[Array[Double]]): Array[ClusterCenters] = {
// Initialize each run's center to a random point
- val seed = new Random().nextInt()
+ val seed = new XORShiftRandom().nextInt()
val sample = data.takeSample(true, runs, seed).toSeq
val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r)))
@@ -222,7 +223,7 @@ class KMeans private (
for (r <- 0 until runs) yield (r, KMeans.pointCost(centerArrays(r), point))
}.reduceByKey(_ + _).collectAsMap()
val chosen = data.mapPartitionsWithIndex { (index, points) =>
- val rand = new Random(seed ^ (step << 16) ^ index)
+ val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
for {
p <- points
r <- 0 until runs
diff --git a/pom.xml b/pom.xml
index 4be9d3a3d2..8700a4828f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -355,6 +355,11 @@
<version>3.0.0</version>
</dependency>
<dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-graphite</artifactId>
+ <version>3.0.0</version>
+ </dependency>
+ <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index aa5fe70470..3584e884f1 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -205,9 +205,9 @@ object SparkBuild extends Build {
"log4j" % "log4j" % "1.2.17",
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
+ "commons-daemon" % "commons-daemon" % "1.0.10", // workaround for bug HADOOP-9407
"com.ning" % "compress-lzf" % "0.8.4",
"org.xerial.snappy" % "snappy-java" % "1.0.5",
- "commons-daemon" % "commons-daemon" % "1.0.10", // workaround for bug HADOOP-9407
"org.ow2.asm" % "asm" % "4.0",
"com.google.protobuf" % "protobuf-java" % "2.4.1",
"com.typesafe.akka" %% "akka-remote" % "2.2.3" excludeAll(excludeNetty),
@@ -226,6 +226,7 @@ object SparkBuild extends Build {
"com.codahale.metrics" % "metrics-jvm" % "3.0.0",
"com.codahale.metrics" % "metrics-json" % "3.0.0",
"com.codahale.metrics" % "metrics-ganglia" % "3.0.0",
+ "com.codahale.metrics" % "metrics-graphite" % "3.0.0",
"com.twitter" %% "chill" % "0.3.1",
"com.twitter" % "chill-java" % "0.3.1"
)