From 6bb56faea8d238ea22c2de33db93b1b39f492b3a Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Tue, 21 Oct 2014 21:53:09 -0700 Subject: SPARK-1813. Add a utility to SparkConf that makes using Kryo really easy Author: Sandy Ryza Closes #789 from sryza/sandy-spark-1813 and squashes the following commits: 48b05e9 [Sandy Ryza] Simplify b824932 [Sandy Ryza] Allow both spark.kryo.classesToRegister and spark.kryo.registrator at the same time 6a15bb7 [Sandy Ryza] Small fix a2278c0 [Sandy Ryza] Respond to review comments 6ef592e [Sandy Ryza] SPARK-1813. Add a utility to SparkConf that makes using Kryo really easy --- .../apache/spark/graphx/GraphKryoRegistrator.scala | 2 +- .../org/apache/spark/graphx/GraphXUtils.scala | 47 ++++++++++++++++++++++ .../apache/spark/graphx/LocalSparkContext.scala | 6 +-- .../spark/graphx/impl/EdgePartitionSuite.scala | 6 +-- .../spark/graphx/impl/VertexPartitionSuite.scala | 6 +-- 5 files changed, 55 insertions(+), 12 deletions(-) create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala (limited to 'graphx') diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala index 1948c978c3..563c948957 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala @@ -27,10 +27,10 @@ import org.apache.spark.graphx.impl._ import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap import org.apache.spark.util.collection.OpenHashSet - /** * Registers GraphX classes with Kryo for improved performance. */ +@deprecated("Register GraphX classes with Kryo using GraphXUtils.registerKryoClasses", "1.2.0") class GraphKryoRegistrator extends KryoRegistrator { def registerClasses(kryo: Kryo) { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala new file mode 100644 index 0000000000..2cb07937ea --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx + +import org.apache.spark.SparkConf + +import org.apache.spark.graphx.impl._ +import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap + +import org.apache.spark.util.collection.{OpenHashSet, BitSet} +import org.apache.spark.util.BoundedPriorityQueue + +object GraphXUtils { + /** + * Registers classes that GraphX uses with Kryo. + */ + def registerKryoClasses(conf: SparkConf) { + conf.registerKryoClasses(Array( + classOf[Edge[Object]], + classOf[(VertexId, Object)], + classOf[EdgePartition[Object, Object]], + classOf[BitSet], + classOf[VertexIdToIndexMap], + classOf[VertexAttributeBlock[Object]], + classOf[PartitionStrategy], + classOf[BoundedPriorityQueue[Object]], + classOf[EdgeDirection], + classOf[GraphXPrimitiveKeyOpenHashMap[VertexId, Int]], + classOf[OpenHashSet[Int]], + classOf[OpenHashSet[Long]])) + } +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala index 47594a800a..a3e28efc75 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala @@ -17,9 +17,6 @@ package org.apache.spark.graphx -import org.scalatest.Suite -import org.scalatest.BeforeAndAfterEach - import org.apache.spark.SparkConf import org.apache.spark.SparkContext @@ -31,8 +28,7 @@ trait LocalSparkContext { /** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */ def withSpark[T](f: SparkContext => T) = { val conf = new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") + GraphXUtils.registerKryoClasses(conf) val sc = new SparkContext("local", "test", conf) try { f(sc) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala index 9d00f76327..db1dac6160 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala @@ -129,9 +129,9 @@ class EdgePartitionSuite extends FunSuite { val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0)) val a: EdgePartition[Int, Int] = makeEdgePartition(aList) val javaSer = new JavaSerializer(new SparkConf()) - val kryoSer = new KryoSerializer(new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")) + val conf = new SparkConf() + GraphXUtils.registerKryoClasses(conf) + val kryoSer = new KryoSerializer(conf) for (ser <- List(javaSer, kryoSer); s = ser.newInstance()) { val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a)) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala index f9e771a900..fe8304c1cd 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala @@ -125,9 +125,9 @@ class VertexPartitionSuite extends FunSuite { val verts = Set((0L, 1), (1L, 1), (2L, 1)) val vp = VertexPartition(verts.iterator) val javaSer = new JavaSerializer(new SparkConf()) - val kryoSer = new KryoSerializer(new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")) + val conf = new SparkConf() + GraphXUtils.registerKryoClasses(conf) + val kryoSer = new KryoSerializer(conf) for (ser <- List(javaSer, kryoSer); s = ser.newInstance()) { val vpSer: VertexPartition[Int] = s.deserialize(s.serialize(vp)) -- cgit v1.2.3