From ef85a51f85c9720bc091367a0d4f80e7ed6b9778 Mon Sep 17 00:00:00 2001 From: Russell Cardullo Date: Fri, 8 Nov 2013 16:36:03 -0800 Subject: Add graphite sink for metrics This adds a metrics sink for graphite. The sink must be configured with the host and port of a graphite node and optionally may be configured with a prefix that will be prepended to all metrics that are sent to graphite. --- core/pom.xml | 4 ++ .../apache/spark/metrics/sink/GraphiteSink.scala | 82 ++++++++++++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala (limited to 'core') diff --git a/core/pom.xml b/core/pom.xml index 8621d257e5..6af229c71d 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -158,6 +158,10 @@ com.codahale.metrics metrics-ganglia + + com.codahale.metrics + metrics-graphite + org.apache.derby derby diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala new file mode 100644 index 0000000000..eb1315e6de --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.graphite.{GraphiteReporter, Graphite} + +import java.util.Properties +import java.util.concurrent.TimeUnit +import java.net.InetSocketAddress + +import org.apache.spark.metrics.MetricsSystem + +class GraphiteSink(val property: Properties, val registry: MetricRegistry) extends Sink { + val GRAPHITE_DEFAULT_PERIOD = 10 + val GRAPHITE_DEFAULT_UNIT = "SECONDS" + val GRAPHITE_DEFAULT_PREFIX = "" + + val GRAPHITE_KEY_HOST = "host" + val GRAPHITE_KEY_PORT = "port" + val GRAPHITE_KEY_PERIOD = "period" + val GRAPHITE_KEY_UNIT = "unit" + val GRAPHITE_KEY_PREFIX = "prefix" + + def propertyToOption(prop: String) = Option(property.getProperty(prop)) + + if (!propertyToOption(GRAPHITE_KEY_HOST).isDefined) { + throw new Exception("Graphite sink requires 'host' property.") + } + + if (!propertyToOption(GRAPHITE_KEY_PORT).isDefined) { + throw new Exception("Graphite sink requires 'port' property.") + } + + val host = propertyToOption(GRAPHITE_KEY_HOST).get + val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt + + val pollPeriod = Option(property.getProperty(GRAPHITE_KEY_PERIOD)) match { + case Some(s) => s.toInt + case None => GRAPHITE_DEFAULT_PERIOD + } + + val pollUnit = Option(property.getProperty(GRAPHITE_KEY_UNIT)) match { + case Some(s) => TimeUnit.valueOf(s.toUpperCase()) + case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT) + } + + val prefix = propertyToOption(GRAPHITE_KEY_PREFIX).getOrElse(GRAPHITE_DEFAULT_PREFIX) + + MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) + + val graphite: Graphite = new Graphite(new InetSocketAddress(host, port)) + + val reporter: GraphiteReporter = GraphiteReporter.forRegistry(registry) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .prefixedWith(prefix) + .build(graphite) + + override def start() { + reporter.start(pollPeriod, pollUnit) + } + + override def stop() { + reporter.stop() + } +} -- cgit v1.2.3 From 1360f62d15170bd295ceaba85f39401fd8109e51 Mon Sep 17 00:00:00 2001 From: Russell Cardullo Date: Mon, 18 Nov 2013 08:37:09 -0800 Subject: Cleanup GraphiteSink.scala based on feedback * Reorder imports according to the style guide * Consistently use propertyToOption in all places --- .../scala/org/apache/spark/metrics/sink/GraphiteSink.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala index eb1315e6de..cdcfec8ca7 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -17,13 +17,13 @@ package org.apache.spark.metrics.sink -import com.codahale.metrics.MetricRegistry -import com.codahale.metrics.graphite.{GraphiteReporter, Graphite} - import java.util.Properties import java.util.concurrent.TimeUnit import java.net.InetSocketAddress +import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.graphite.{GraphiteReporter, Graphite} + import org.apache.spark.metrics.MetricsSystem class GraphiteSink(val property: Properties, val registry: MetricRegistry) extends Sink { @@ -50,12 +50,12 @@ class GraphiteSink(val property: Properties, val registry: MetricRegistry) exten val host = propertyToOption(GRAPHITE_KEY_HOST).get val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt - val pollPeriod = Option(property.getProperty(GRAPHITE_KEY_PERIOD)) match { + val pollPeriod = propertyToOption(GRAPHITE_KEY_PERIOD) match { case Some(s) => s.toInt case None => GRAPHITE_DEFAULT_PERIOD } - val pollUnit = Option(property.getProperty(GRAPHITE_KEY_UNIT)) match { + val pollUnit = propertyToOption(GRAPHITE_KEY_UNIT) match { case Some(s) => TimeUnit.valueOf(s.toUpperCase()) case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT) } -- cgit v1.2.3 From 09bdfe3b163559fdcf8771b52ffbe2542883c912 Mon Sep 17 00:00:00 2001 From: Marek Kolodziej Date: Mon, 18 Nov 2013 15:21:43 -0500 Subject: XORShift RNG with unit tests and benchmark To run unit test, start SBT console and type: compile test-only org.apache.spark.util.XORShiftRandomSuite To run benchmark, type: project core console Once the Scala console starts, type: org.apache.spark.util.XORShiftRandom.benchmark(100000000) --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../main/scala/org/apache/spark/util/Utils.scala | 35 +++++++++- .../org/apache/spark/util/XORShiftRandom.scala | 63 ++++++++++++++++++ .../apache/spark/util/XORShiftRandomSuite.scala | 76 ++++++++++++++++++++++ .../org/apache/spark/mllib/clustering/KMeans.scala | 2 +- 5 files changed, 175 insertions(+), 3 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala create mode 100644 core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 6e88be6f6a..dd9c32f253 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.rdd -import java.util.Random +import org.apache.spark.util.{XORShiftRandom => Random} import scala.collection.Map import scala.collection.JavaConversions.mapAsScalaMap diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index fe932d8ede..2df7108d31 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -818,9 +818,42 @@ private[spark] object Utils extends Logging { hashAbs } - /** Returns a copy of the system properties that is thread-safe to iterator over. */ + /* Returns a copy of the system properties that is thread-safe to iterator over. */ def getSystemProperties(): Map[String, String] = { return System.getProperties().clone() .asInstanceOf[java.util.Properties].toMap[String, String] } + + /* Used for performance tersting along with the intToTimesInt() and timeIt methods + * It uses a while loop instead of a for comprehension since the JIT will + * optimize the while loop better than the "for" closure + * e.g. + * import org.apache.spark.util.Utils.{TimesInt, intToTimesInt, timeIt} + * import java.util.Random + * val rand = new Random() + * timeIt(rand.nextDouble, 10000000) + */ + class TimesInt(i: Int) { + def times(f: => Unit) = { + var x = 1 + while (x <= i) { + f + x += 1 + } + } + } + + /* Used in conjunction with TimesInt since it's Scala 2.9.3 + * instead of 2.10 and we don't have implicit classes */ + implicit def intToTimesInt(i: Int) = new TimesInt(i) + + /* See TimesInt for use example */ + def timeIt(f: => Unit, iters: Int): Long = { + + val start = System.currentTimeMillis + iters.times(f) + System.currentTimeMillis - start + + } + } diff --git a/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala b/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala new file mode 100644 index 0000000000..3c189c1b69 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.util.{Random => JavaRandom} +import Utils.{TimesInt, intToTimesInt, timeIt} + +class XORShiftRandom(init: Long) extends JavaRandom(init) { + + def this() = this(System.nanoTime) + + var seed = init + + // we need to just override next - this will be called by nextInt, nextDouble, + // nextGaussian, nextLong, etc. + override protected def next(bits: Int): Int = { + + var nextSeed = seed ^ (seed << 21) + nextSeed ^= (nextSeed >>> 35) + nextSeed ^= (nextSeed << 4) + seed = nextSeed + (nextSeed & ((1L << bits) -1)).asInstanceOf[Int] + } +} + +object XORShiftRandom { + + def benchmark(numIters: Int) = { + + val seed = 1L + val million = 1e6.toInt + val javaRand = new JavaRandom(seed) + val xorRand = new XORShiftRandom(seed) + + // warm up the JIT + million.times { + javaRand.nextInt + xorRand.nextInt + } + + /* Return results as a map instead of just printing to screen + in case the user wants to do something with them */ + Map("javaTime" -> timeIt(javaRand.nextInt, numIters), + "xorTime" -> timeIt(xorRand.nextInt, numIters)) + + } + +} \ No newline at end of file diff --git a/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala b/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala new file mode 100644 index 0000000000..1691cb4f01 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.util.Random +import org.scalatest.FlatSpec +import org.scalatest.FunSuite +import org.scalatest.matchers.ShouldMatchers +import org.apache.spark.util.Utils.{TimesInt, intToTimesInt, timeIt} + +class XORShiftRandomSuite extends FunSuite with ShouldMatchers { + + def fixture = new { + val seed = 1L + val xorRand = new XORShiftRandom(seed) + val hundMil = 1e8.toInt + } + + /* + * This test is based on a chi-squared test for randomness. The values are hard-coded + * so as not to create Spark's dependency on apache.commons.math3 just to call one + * method for calculating the exact p-value for a given number of random numbers + * and bins. In case one would want to move to a full-fledged test based on + * apache.commons.math3, the relevant class is here: + * org.apache.commons.math3.stat.inference.ChiSquareTest + */ + test ("XORShift generates valid random numbers") { + + val f = fixture + + val numBins = 10 + // create 10 bins + val bins = Array.fill(numBins)(0) + + // populate bins based on modulus of the random number + f.hundMil.times(bins(math.abs(f.xorRand.nextInt) % 10) += 1) + + /* since the seed is deterministic, until the algorithm is changed, we know the result will be + * exactly this: Array(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, + * 10000790, 10002286, 9998699), so the test will never fail at the prespecified (5%) + * significance level. However, should the RNG implementation change, the test should still + * pass at the same significance level. The chi-squared test done in R gave the following + * results: + * > chisq.test(c(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, + * 10000790, 10002286, 9998699)) + * Chi-squared test for given probabilities + * data: c(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, 10000790, + * 10002286, 9998699) + * X-squared = 11.975, df = 9, p-value = 0.2147 + * Note that the p-value was ~0.22. The test will fail if alpha < 0.05, which for 100 million + * random numbers + * and 10 bins will happen at X-squared of ~16.9196. So, the test will fail if X-squared + * is greater than or equal to that number. + */ + val binSize = f.hundMil/numBins + val xSquared = bins.map(x => math.pow((binSize - x), 2)/binSize).sum + xSquared should be < (16.9196) + + } + +} \ No newline at end of file diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index edbf77dbcc..56bcb6c82a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -18,7 +18,7 @@ package org.apache.spark.mllib.clustering import scala.collection.mutable.ArrayBuffer -import scala.util.Random +import org.apache.spark.util.{XORShiftRandom => Random} import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ -- cgit v1.2.3 From 99cfe89c688ee1499d2723d8ea909651995abe86 Mon Sep 17 00:00:00 2001 From: Marek Kolodziej Date: Mon, 18 Nov 2013 22:00:36 -0500 Subject: Updates to reflect pull request code review --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 4 +- .../main/scala/org/apache/spark/util/Utils.scala | 43 +++++++---------- .../org/apache/spark/util/XORShiftRandom.scala | 55 +++++++++++++++++----- .../apache/spark/util/XORShiftRandomSuite.scala | 10 ++-- .../org/apache/spark/mllib/clustering/KMeans.scala | 5 +- 5 files changed, 69 insertions(+), 48 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index dd9c32f253..e738bfbdc2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -17,8 +17,6 @@ package org.apache.spark.rdd -import org.apache.spark.util.{XORShiftRandom => Random} - import scala.collection.Map import scala.collection.JavaConversions.mapAsScalaMap import scala.collection.mutable.ArrayBuffer @@ -38,7 +36,7 @@ import org.apache.spark.partial.CountEvaluator import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{Utils, BoundedPriorityQueue} +import org.apache.spark.util.{Utils, BoundedPriorityQueue, XORShiftRandom => Random} import org.apache.spark.SparkContext._ import org.apache.spark._ diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 2df7108d31..b98a81053d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -818,42 +818,33 @@ private[spark] object Utils extends Logging { hashAbs } - /* Returns a copy of the system properties that is thread-safe to iterator over. */ + /** Returns a copy of the system properties that is thread-safe to iterator over. */ def getSystemProperties(): Map[String, String] = { return System.getProperties().clone() .asInstanceOf[java.util.Properties].toMap[String, String] } - /* Used for performance tersting along with the intToTimesInt() and timeIt methods - * It uses a while loop instead of a for comprehension since the JIT will - * optimize the while loop better than the "for" closure - * e.g. - * import org.apache.spark.util.Utils.{TimesInt, intToTimesInt, timeIt} - * import java.util.Random - * val rand = new Random() - * timeIt(rand.nextDouble, 10000000) + /** + * Method executed for repeating a task for side effects. + * Unlike a for comprehension, it permits JVM JIT optimization */ - class TimesInt(i: Int) { - def times(f: => Unit) = { - var x = 1 - while (x <= i) { - f - x += 1 + def times(numIters: Int)(f: => Unit): Unit = { + var i = 0 + while (i < numIters) { + f + i += 1 } - } } - - /* Used in conjunction with TimesInt since it's Scala 2.9.3 - * instead of 2.10 and we don't have implicit classes */ - implicit def intToTimesInt(i: Int) = new TimesInt(i) - - /* See TimesInt for use example */ - def timeIt(f: => Unit, iters: Int): Long = { + /** + * Timing method based on iterations that permit JVM JIT optimization. + * @param numIters number of iterations + * @param f function to be executed + */ + def timeIt(numIters: Int)(f: => Unit): Long = { val start = System.currentTimeMillis - iters.times(f) + times(numIters)(f) System.currentTimeMillis - start - } - + } diff --git a/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala b/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala index 3c189c1b69..d443595c24 100644 --- a/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala +++ b/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala @@ -18,18 +18,28 @@ package org.apache.spark.util import java.util.{Random => JavaRandom} -import Utils.{TimesInt, intToTimesInt, timeIt} +import org.apache.spark.util.Utils.timeIt +/** + * This class implements a XORShift random number generator algorithm + * Source: + * Marsaglia, G. (2003). Xorshift RNGs. Journal of Statistical Software, Vol. 8, Issue 14. + * @see Paper + * This implementation is approximately 3.5 times faster than + * {@link java.util.Random java.util.Random}, partly because of the algorithm, but also due + * to renouncing thread safety. JDK's implementation uses an AtomicLong seed, this class + * uses a regular Long. We can forgo thread safety since we use a new instance of the RNG + * for each thread. + */ class XORShiftRandom(init: Long) extends JavaRandom(init) { def this() = this(System.nanoTime) - var seed = init + private var seed = init // we need to just override next - this will be called by nextInt, nextDouble, // nextGaussian, nextLong, etc. - override protected def next(bits: Int): Int = { - + override protected def next(bits: Int): Int = { var nextSeed = seed ^ (seed << 21) nextSeed ^= (nextSeed >>> 35) nextSeed ^= (nextSeed << 4) @@ -38,25 +48,46 @@ class XORShiftRandom(init: Long) extends JavaRandom(init) { } } +/** Contains benchmark method and main method to run benchmark of the RNG */ object XORShiftRandom { + /** + * Main method for running benchmark + * @param args takes one argument - the number of random numbers to generate + */ + def main(args: Array[String]): Unit = { + if (args.length != 1) { + println("Benchmark of XORShiftRandom vis-a-vis java.util.Random") + println("Usage: XORShiftRandom number_of_random_numbers_to_generate") + System.exit(1) + } + println(benchmark(args(0).toInt)) + } + + /** + * @param numIters Number of random numbers to generate while running the benchmark + * @return Map of execution times for {@link java.util.Random java.util.Random} + * and XORShift + */ def benchmark(numIters: Int) = { val seed = 1L val million = 1e6.toInt val javaRand = new JavaRandom(seed) val xorRand = new XORShiftRandom(seed) - - // warm up the JIT - million.times { - javaRand.nextInt - xorRand.nextInt + + // this is just to warm up the JIT - we're not timing anything + timeIt(1e6.toInt) { + javaRand.nextInt() + xorRand.nextInt() } + val iters = timeIt(numIters)(_) + /* Return results as a map instead of just printing to screen - in case the user wants to do something with them */ - Map("javaTime" -> timeIt(javaRand.nextInt, numIters), - "xorTime" -> timeIt(xorRand.nextInt, numIters)) + in case the user wants to do something with them */ + Map("javaTime" -> iters {javaRand.nextInt()}, + "xorTime" -> iters {xorRand.nextInt()}) } diff --git a/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala b/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala index 1691cb4f01..b78367b6ca 100644 --- a/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/XORShiftRandomSuite.scala @@ -21,7 +21,7 @@ import java.util.Random import org.scalatest.FlatSpec import org.scalatest.FunSuite import org.scalatest.matchers.ShouldMatchers -import org.apache.spark.util.Utils.{TimesInt, intToTimesInt, timeIt} +import org.apache.spark.util.Utils.times class XORShiftRandomSuite extends FunSuite with ShouldMatchers { @@ -48,7 +48,7 @@ class XORShiftRandomSuite extends FunSuite with ShouldMatchers { val bins = Array.fill(numBins)(0) // populate bins based on modulus of the random number - f.hundMil.times(bins(math.abs(f.xorRand.nextInt) % 10) += 1) + times(f.hundMil) {bins(math.abs(f.xorRand.nextInt) % 10) += 1} /* since the seed is deterministic, until the algorithm is changed, we know the result will be * exactly this: Array(10004908, 9993136, 9994600, 10000744, 10000091, 10002474, 10002272, @@ -67,9 +67,9 @@ class XORShiftRandomSuite extends FunSuite with ShouldMatchers { * and 10 bins will happen at X-squared of ~16.9196. So, the test will fail if X-squared * is greater than or equal to that number. */ - val binSize = f.hundMil/numBins - val xSquared = bins.map(x => math.pow((binSize - x), 2)/binSize).sum - xSquared should be < (16.9196) + val binSize = f.hundMil/numBins + val xSquared = bins.map(x => math.pow((binSize - x), 2)/binSize).sum + xSquared should be < (16.9196) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 56bcb6c82a..f09ea9e2f7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -18,15 +18,16 @@ package org.apache.spark.mllib.clustering import scala.collection.mutable.ArrayBuffer -import org.apache.spark.util.{XORShiftRandom => Random} + +import org.jblas.DoubleMatrix import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.Logging import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.util.{XORShiftRandom => Random} -import org.jblas.DoubleMatrix /** -- cgit v1.2.3 From bcc6ed30bf7189ebf0226f212b4e39830b830b6e Mon Sep 17 00:00:00 2001 From: Marek Kolodziej Date: Tue, 19 Nov 2013 20:50:38 -0500 Subject: Formatting and scoping (private[spark]) updates --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index b98a81053d..a79e64e810 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -833,7 +833,7 @@ private[spark] object Utils extends Logging { while (i < numIters) { f i += 1 - } + } } /** diff --git a/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala b/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala index d443595c24..e9907e6c85 100644 --- a/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala +++ b/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala @@ -31,7 +31,7 @@ import org.apache.spark.util.Utils.timeIt * uses a regular Long. We can forgo thread safety since we use a new instance of the RNG * for each thread. */ -class XORShiftRandom(init: Long) extends JavaRandom(init) { +private[spark] class XORShiftRandom(init: Long) extends JavaRandom(init) { def this() = this(System.nanoTime) @@ -49,7 +49,7 @@ class XORShiftRandom(init: Long) extends JavaRandom(init) { } /** Contains benchmark method and main method to run benchmark of the RNG */ -object XORShiftRandom { +private[spark] object XORShiftRandom { /** * Main method for running benchmark -- cgit v1.2.3 From 22724659db8d711492f58c90d530be2f4a5b3de9 Mon Sep 17 00:00:00 2001 From: Marek Kolodziej Date: Wed, 20 Nov 2013 07:03:36 -0500 Subject: Make XORShiftRandom explicit in KMeans and roll it back for RDD --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 4 +++- .../src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala | 8 ++++---- 2 files changed, 7 insertions(+), 5 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index e738bfbdc2..6e88be6f6a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -17,6 +17,8 @@ package org.apache.spark.rdd +import java.util.Random + import scala.collection.Map import scala.collection.JavaConversions.mapAsScalaMap import scala.collection.mutable.ArrayBuffer @@ -36,7 +38,7 @@ import org.apache.spark.partial.CountEvaluator import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{Utils, BoundedPriorityQueue, XORShiftRandom => Random} +import org.apache.spark.util.{Utils, BoundedPriorityQueue} import org.apache.spark.SparkContext._ import org.apache.spark._ diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index f09ea9e2f7..0dee9399a8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -26,7 +26,7 @@ import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.Logging import org.apache.spark.mllib.util.MLUtils -import org.apache.spark.util.{XORShiftRandom => Random} +import org.apache.spark.util.XORShiftRandom @@ -196,7 +196,7 @@ class KMeans private ( */ private def initRandom(data: RDD[Array[Double]]): Array[ClusterCenters] = { // Sample all the cluster centers in one pass to avoid repeated scans - val sample = data.takeSample(true, runs * k, new Random().nextInt()).toSeq + val sample = data.takeSample(true, runs * k, new XORShiftRandom().nextInt()).toSeq Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).toArray) } @@ -211,7 +211,7 @@ class KMeans private ( */ private def initKMeansParallel(data: RDD[Array[Double]]): Array[ClusterCenters] = { // Initialize each run's center to a random point - val seed = new Random().nextInt() + val seed = new XORShiftRandom().nextInt() val sample = data.takeSample(true, runs, seed).toSeq val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r))) @@ -223,7 +223,7 @@ class KMeans private ( for (r <- 0 until runs) yield (r, KMeans.pointCost(centerArrays(r), point)) }.reduceByKey(_ + _).collectAsMap() val chosen = data.mapPartitionsWithIndex { (index, points) => - val rand = new Random(seed ^ (step << 16) ^ index) + val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) for { p <- points r <- 0 until runs -- cgit v1.2.3 From 53b94ef2f5179bdbebe70883b2593b569518e77e Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 21 Nov 2013 17:17:06 -0800 Subject: TimeTrackingOutputStream should pass on calls to close() and flush(). Without this fix you get a huge number of open shuffles after running shuffles. --- core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala | 2 ++ 1 file changed, 2 insertions(+) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 32d2dd0694..0a32df7c89 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -101,6 +101,8 @@ class DiskBlockObjectWriter( def write(i: Int): Unit = callWithTiming(out.write(i)) override def write(b: Array[Byte]) = callWithTiming(out.write(b)) override def write(b: Array[Byte], off: Int, len: Int) = callWithTiming(out.write(b, off, len)) + override def close() = out.close() + override def flush() = out.flush() } private val syncWrites = System.getProperty("spark.shuffle.sync", "false").toBoolean -- cgit v1.2.3 From ccea38b759c81abea27bc0a51157a31d369839b5 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Thu, 21 Nov 2013 21:36:08 -0800 Subject: Fix 'timeWriting' stat for shuffle files Due to concurrent git branches, changes from shuffle file consolidation patch caused the shuffle write timing patch to no longer actually measure the time, since it requires time be measured after the stream has been closed. --- .../main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 1dc71a0428..0f2deb4bcb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -167,6 +167,7 @@ private[spark] class ShuffleMapTask( var totalTime = 0L val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter => writer.commit() + writer.close() val size = writer.fileSegment().length totalBytes += size totalTime += writer.timeWriting() @@ -184,14 +185,16 @@ private[spark] class ShuffleMapTask( } catch { case e: Exception => // If there is an exception from running the task, revert the partial writes // and throw the exception upstream to Spark. - if (shuffle != null) { - shuffle.writers.foreach(_.revertPartialWrites()) + if (shuffle != null && shuffle.writers != null) { + for (writer <- shuffle.writers) { + writer.revertPartialWrites() + writer.close() + } } throw e } finally { // Release the writers back to the shuffle block manager. if (shuffle != null && shuffle.writers != null) { - shuffle.writers.foreach(_.close()) shuffle.releaseWriters(success) } // Execute the callbacks on task completion. -- cgit v1.2.3 From c1507afc6ca161608a83967cdebe1404051658d3 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Sat, 23 Nov 2013 02:32:37 -0800 Subject: Support preservesPartitioning in RDD.zipPartitions --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 21 ++++++++++++++++++--- .../org/apache/spark/rdd/ZippedPartitionsRDD.scala | 21 ++++++++++++++------- 2 files changed, 32 insertions(+), 10 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 6e88be6f6a..7623c44d88 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -545,20 +545,35 @@ abstract class RDD[T: ClassManifest]( * *same number of partitions*, but does *not* require them to have the same number * of elements in each partition. */ + def zipPartitions[B: ClassManifest, V: ClassManifest] + (rdd2: RDD[B], preservesPartitioning: Boolean) + (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = + new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning) + def zipPartitions[B: ClassManifest, V: ClassManifest] (rdd2: RDD[B]) (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = - new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2) + new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, false) + + def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest] + (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean) + (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = + new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning) def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest] (rdd2: RDD[B], rdd3: RDD[C]) (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = - new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3) + new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, false) + + def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest] + (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean) + (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = + new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning) def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest] (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D]) (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = - new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4) + new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, false) // Actions (launch a job to return a value to the user program) diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index 31e6fd519d..faeb316664 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -39,9 +39,13 @@ private[spark] class ZippedPartitionsPartition( abstract class ZippedPartitionsBaseRDD[V: ClassManifest]( sc: SparkContext, - var rdds: Seq[RDD[_]]) + var rdds: Seq[RDD[_]], + preservesPartitioning: Boolean = false) extends RDD[V](sc, rdds.map(x => new OneToOneDependency(x))) { + override val partitioner = + if (preservesPartitioning) firstParent[Any].partitioner else None + override def getPartitions: Array[Partition] = { val sizes = rdds.map(x => x.partitions.size) if (!sizes.forall(x => x == sizes(0))) { @@ -76,8 +80,9 @@ class ZippedPartitionsRDD2[A: ClassManifest, B: ClassManifest, V: ClassManifest] sc: SparkContext, f: (Iterator[A], Iterator[B]) => Iterator[V], var rdd1: RDD[A], - var rdd2: RDD[B]) - extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) { + var rdd2: RDD[B], + preservesPartitioning: Boolean = false) + extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions @@ -97,8 +102,9 @@ class ZippedPartitionsRDD3 f: (Iterator[A], Iterator[B], Iterator[C]) => Iterator[V], var rdd1: RDD[A], var rdd2: RDD[B], - var rdd3: RDD[C]) - extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3)) { + var rdd3: RDD[C], + preservesPartitioning: Boolean = false) + extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions @@ -122,8 +128,9 @@ class ZippedPartitionsRDD4 var rdd1: RDD[A], var rdd2: RDD[B], var rdd3: RDD[C], - var rdd4: RDD[D]) - extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4)) { + var rdd4: RDD[D], + preservesPartitioning: Boolean = false) + extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions -- cgit v1.2.3 From 7535d7fbcbe3c0c2515a2d17a806fa523917e398 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sat, 23 Nov 2013 17:21:37 -0800 Subject: Fixes to AppendOnlyMap: - Use Murmur Hash 3 finalization step to scramble the bits of HashCode instead of the simpler version in java.util.HashMap; the latter one had trouble with ranges of consecutive integers. Murmur Hash 3 is used by fastutil. - Use Object.equals() instead of Scala's == to compare keys, because the latter does extra casts for numeric types (see the equals method in https://github.com/scala/scala/blob/master/src/library/scala/runtime/BoxesRunTime.java) --- .../main/scala/org/apache/spark/util/AppendOnlyMap.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala index f60deafc6f..8542541fe6 100644 --- a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala @@ -56,7 +56,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi var i = 1 while (true) { val curKey = data(2 * pos) - if (k.eq(curKey) || k == curKey) { + if (k.eq(curKey) || k.equals(curKey)) { return data(2 * pos + 1).asInstanceOf[V] } else if (curKey.eq(null)) { return null.asInstanceOf[V] @@ -104,7 +104,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi var i = 1 while (true) { val curKey = data(2 * pos) - if (k.eq(curKey) || k == curKey) { + if (k.eq(curKey) || k.equals(curKey)) { val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V]) data(2 * pos + 1) = newValue.asInstanceOf[AnyRef] return newValue @@ -167,12 +167,11 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi } /** - * Re-hash a value to deal better with hash functions that don't differ - * in the lower bits, similar to java.util.HashMap + * Re-hash a value to deal better with hash functions that don't differ in the lower bits. + * We use the Murmur Hash 3 finalization step that's also used in fastutil. */ private def rehash(h: Int): Int = { - val r = h ^ (h >>> 20) ^ (h >>> 12) - r ^ (r >>> 7) ^ (r >>> 4) + it.unimi.dsi.fastutil.HashCommon.murmurHash3(h) } /** @@ -190,7 +189,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi data(2 * pos) = key data(2 * pos + 1) = value.asInstanceOf[AnyRef] return true - } else if (curKey.eq(key) || curKey == key) { + } else if (curKey.eq(key) || curKey.equals(key)) { data(2 * pos + 1) = value.asInstanceOf[AnyRef] return false } else { -- cgit v1.2.3 From 9837a60234964333916ccbf02c8610909462a7ad Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sat, 23 Nov 2013 17:38:29 -0800 Subject: Some other optimizations to AppendOnlyMap: - Don't check keys for equality when re-inserting due to growing the table; the keys will already be unique - Remember the grow threshold instead of recomputing it on each insert --- .../org/apache/spark/util/AppendOnlyMap.scala | 82 ++++++++++++---------- 1 file changed, 45 insertions(+), 37 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala index 8542541fe6..8bb4ee3bfa 100644 --- a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala @@ -35,6 +35,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi private var capacity = nextPowerOf2(initialCapacity) private var mask = capacity - 1 private var curSize = 0 + private var growThreshold = LOAD_FACTOR * capacity // Holds keys and values in the same array for memory locality; specifically, the order of // elements is key0, value0, key1, value1, key2, value2, etc. @@ -80,9 +81,23 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi haveNullValue = true return } - val isNewEntry = putInto(data, k, value.asInstanceOf[AnyRef]) - if (isNewEntry) { - incrementSize() + var pos = rehash(key.hashCode) & mask + var i = 1 + while (true) { + val curKey = data(2 * pos) + if (curKey.eq(null)) { + data(2 * pos) = k + data(2 * pos + 1) = value.asInstanceOf[AnyRef] + incrementSize() // Since we added a new key + return + } else if (k.eq(curKey) || k.equals(curKey)) { + data(2 * pos + 1) = value.asInstanceOf[AnyRef] + return + } else { + val delta = i + pos = (pos + delta) & mask + i += 1 + } } } @@ -161,7 +176,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi /** Increase table size by 1, rehashing if necessary */ private def incrementSize() { curSize += 1 - if (curSize > LOAD_FACTOR * capacity) { + if (curSize > growThreshold) { growTable() } } @@ -174,33 +189,6 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi it.unimi.dsi.fastutil.HashCommon.murmurHash3(h) } - /** - * Put an entry into a table represented by data, returning true if - * this increases the size of the table or false otherwise. Assumes - * that "data" has at least one empty slot. - */ - private def putInto(data: Array[AnyRef], key: AnyRef, value: AnyRef): Boolean = { - val mask = (data.length / 2) - 1 - var pos = rehash(key.hashCode) & mask - var i = 1 - while (true) { - val curKey = data(2 * pos) - if (curKey.eq(null)) { - data(2 * pos) = key - data(2 * pos + 1) = value.asInstanceOf[AnyRef] - return true - } else if (curKey.eq(key) || curKey.equals(key)) { - data(2 * pos + 1) = value.asInstanceOf[AnyRef] - return false - } else { - val delta = i - pos = (pos + delta) & mask - i += 1 - } - } - return false // Never reached but needed to keep compiler happy - } - /** Double the table's size and re-hash everything */ private def growTable() { val newCapacity = capacity * 2 @@ -210,16 +198,36 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi throw new Exception("Can't make capacity bigger than 2^29 elements") } val newData = new Array[AnyRef](2 * newCapacity) - var pos = 0 - while (pos < capacity) { - if (!data(2 * pos).eq(null)) { - putInto(newData, data(2 * pos), data(2 * pos + 1)) + val newMask = newCapacity - 1 + // Insert all our old values into the new array. Note that because our old keys are + // unique, there's no need to check for equality here when we insert. + var oldPos = 0 + while (oldPos < capacity) { + if (!data(2 * oldPos).eq(null)) { + val key = data(2 * oldPos) + val value = data(2 * oldPos + 1) + var newPos = rehash(key.hashCode) & newMask + var i = 1 + var keepGoing = true + while (keepGoing) { + val curKey = newData(2 * newPos) + if (curKey.eq(null)) { + newData(2 * newPos) = key + newData(2 * newPos + 1) = value + keepGoing = false + } else { + val delta = i + newPos = (newPos + delta) & newMask + i += 1 + } + } } - pos += 1 + oldPos += 1 } data = newData capacity = newCapacity - mask = newCapacity - 1 + mask = newMask + growThreshold = LOAD_FACTOR * newCapacity } private def nextPowerOf2(n: Int): Int = { -- cgit v1.2.3