diff options
author | Sandy Ryza <sandy@cloudera.com> | 2014-10-21 21:53:09 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-10-21 21:53:09 -0700 |
commit | 6bb56faea8d238ea22c2de33db93b1b39f492b3a (patch) | |
tree | 4f172ee623cd64827b6c78d8459554cd395b1660 /examples | |
parent | 856b081729057f9da31a86e4bfa0dc0013492042 (diff) | |
download | spark-6bb56faea8d238ea22c2de33db93b1b39f492b3a.tar.gz spark-6bb56faea8d238ea22c2de33db93b1b39f492b3a.tar.bz2 spark-6bb56faea8d238ea22c2de33db93b1b39f492b3a.zip |
SPARK-1813. Add a utility to SparkConf that makes using Kryo really easy
Author: Sandy Ryza <sandy@cloudera.com>
Closes #789 from sryza/sandy-spark-1813 and squashes the following commits:
48b05e9 [Sandy Ryza] Simplify
b824932 [Sandy Ryza] Allow both spark.kryo.classesToRegister and spark.kryo.registrator at the same time
6a15bb7 [Sandy Ryza] Small fix
a2278c0 [Sandy Ryza] Respond to review comments
6ef592e [Sandy Ryza] SPARK-1813. Add a utility to SparkConf that makes using Kryo really easy
Diffstat (limited to 'examples')
5 files changed, 6 insertions, 37 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala index e06f4dcd54..e322d4ce5a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala @@ -18,17 +18,7 @@ package org.apache.spark.examples.bagel import org.apache.spark._ -import org.apache.spark.SparkContext._ -import org.apache.spark.serializer.KryoRegistrator - import org.apache.spark.bagel._ -import org.apache.spark.bagel.Bagel._ - -import scala.collection.mutable.ArrayBuffer - -import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream} - -import com.esotericsoftware.kryo._ class PageRankUtils extends Serializable { def computeWithCombiner(numVertices: Long, epsilon: Double)( @@ -99,13 +89,6 @@ class PRMessage() extends Message[String] with Serializable { } } -class PRKryoRegistrator extends KryoRegistrator { - def registerClasses(kryo: Kryo) { - kryo.register(classOf[PRVertex]) - kryo.register(classOf[PRMessage]) - } -} - class CustomPartitioner(partitions: Int) extends Partitioner { def numPartitions = partitions diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala index e4db3ec513..859abedf2a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala @@ -38,8 +38,7 @@ object WikipediaPageRank { } val sparkConf = new SparkConf() sparkConf.setAppName("WikipediaPageRank") - sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - sparkConf.set("spark.kryo.registrator", classOf[PRKryoRegistrator].getName) + sparkConf.registerKryoClasses(Array(classOf[PRVertex], classOf[PRMessage])) val inputFile = args(0) val threshold = args(1).toDouble diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala index 45527d9382..d70d93608a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala @@ -46,10 +46,8 @@ object Analytics extends Logging { } val options = mutable.Map(optionsList: _*) - val conf = new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") - .set("spark.locality.wait", "100000") + val conf = new SparkConf().set("spark.locality.wait", "100000") + GraphXUtils.registerKryoClasses(conf) val numEPart = options.remove("numEPart").map(_.toInt).getOrElse { println("Set the number of edge partitions using --numEPart.") diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala index 5f35a58364..0567602171 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala @@ -18,7 +18,7 @@ package org.apache.spark.examples.graphx import org.apache.spark.SparkContext._ -import org.apache.spark.graphx.PartitionStrategy +import org.apache.spark.graphx.{GraphXUtils, PartitionStrategy} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.graphx.util.GraphGenerators import java.io.{PrintWriter, FileOutputStream} @@ -80,8 +80,7 @@ object SynthBenchmark { val conf = new SparkConf() .setAppName(s"GraphX Synth Benchmark (nverts = $numVertices, app = $app)") - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") + GraphXUtils.registerKryoClasses(conf) val sc = new SparkContext(conf) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala index fc6678013b..8796c28db8 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala @@ -19,7 +19,6 @@ package org.apache.spark.examples.mllib import scala.collection.mutable -import com.esotericsoftware.kryo.Kryo import org.apache.log4j.{Level, Logger} import scopt.OptionParser @@ -27,7 +26,6 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating} import org.apache.spark.rdd.RDD -import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator} /** * An example app for ALS on MovieLens data (http://grouplens.org/datasets/movielens/). @@ -40,13 +38,6 @@ import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator} */ object MovieLensALS { - class ALSRegistrator extends KryoRegistrator { - override def registerClasses(kryo: Kryo) { - kryo.register(classOf[Rating]) - kryo.register(classOf[mutable.BitSet]) - } - } - case class Params( input: String = null, kryo: Boolean = false, @@ -108,8 +99,7 @@ object MovieLensALS { def run(params: Params) { val conf = new SparkConf().setAppName(s"MovieLensALS with $params") if (params.kryo) { - conf.set("spark.serializer", classOf[KryoSerializer].getName) - .set("spark.kryo.registrator", classOf[ALSRegistrator].getName) + conf.registerKryoClasses(Array(classOf[mutable.BitSet], classOf[Rating])) .set("spark.kryoserializer.buffer.mb", "8") } val sc = new SparkContext(conf) |