aboutsummaryrefslogtreecommitdiff
path: root/graphx/src
diff options
context:
space:
mode:
authorJoseph K. Bradley <joseph@databricks.com>2015-02-02 17:02:29 -0800
committerXiangrui Meng <meng@databricks.com>2015-02-02 17:02:29 -0800
commitf133dece569f7faedb06a0da6e9f86c5d615e9be (patch)
treec69d65adf9b42f4d9952709d1ec4c3ecebbb5d4c /graphx/src
parent8aa3cfff661753d6d87a8d9a87373d403436dd92 (diff)
downloadspark-f133dece569f7faedb06a0da6e9f86c5d615e9be.tar.gz
spark-f133dece569f7faedb06a0da6e9f86c5d615e9be.tar.bz2
spark-f133dece569f7faedb06a0da6e9f86c5d615e9be.zip
[SPARK-5534] [graphx] Graph getStorageLevel fix
This fixes getStorageLevel for EdgeRDDImpl and VertexRDDImpl (and therefore for Graph). See code example on JIRA which failed before but works with this patch: [https://issues.apache.org/jira/browse/SPARK-5534] (The added unit tests also failed before but work with this fix.) Note: I used partitionsRDD, assuming that getStorageLevel will only be called on the driver. CC: mengxr (related to LDA PR), rxin ankurdave Thanks in advance! Author: Joseph K. Bradley <joseph@databricks.com> Closes #4317 from jkbradley/graphx-storagelevel and squashes the following commits: 1c21e49 [Joseph K. Bradley] made graph getStorageLevel test more robust 18d64ca [Joseph K. Bradley] Added tests for getStorageLevel in VertexRDDSuite, EdgeRDDSuite, GraphSuite 17b488b [Joseph K. Bradley] overrode getStorageLevel in Vertex/EdgeRDDImpl to use partitionsRDD
Diffstat (limited to 'graphx/src')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala2
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala37
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala15
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala19
5 files changed, 71 insertions, 4 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
index 6c35d7029e..56cb41661e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -70,6 +70,8 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
this
}
+ override def getStorageLevel = partitionsRDD.getStorageLevel
+
override def checkpoint() = {
partitionsRDD.checkpoint()
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 3e4968d6c0..6dad167fa7 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -71,6 +71,8 @@ class VertexRDDImpl[VD] private[graphx] (
this
}
+ override def getStorageLevel = partitionsRDD.getStorageLevel
+
override def checkpoint() = {
partitionsRDD.checkpoint()
}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
new file mode 100644
index 0000000000..eb1dbe52c2
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.storage.StorageLevel
+
+class EdgeRDDSuite extends FunSuite with LocalSparkContext {
+
+ test("cache, getStorageLevel") {
+ // test to see if getStorageLevel returns correct value after caching
+ withSpark { sc =>
+ val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3)))
+ val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
+ assert(edges.getStorageLevel == StorageLevel.NONE)
+ edges.cache()
+ assert(edges.getStorageLevel == StorageLevel.MEMORY_ONLY)
+ }
+ }
+
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 59a57ba7a3..b61d9f0fbe 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -25,6 +25,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph._
import org.apache.spark.graphx.PartitionStrategy._
import org.apache.spark.rdd._
+import org.apache.spark.storage.StorageLevel
class GraphSuite extends FunSuite with LocalSparkContext {
@@ -390,6 +391,20 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
+ test("cache, getStorageLevel") {
+ // test to see if getStorageLevel returns correct value
+ withSpark { sc =>
+ val verts = sc.parallelize(List((1: VertexId, "a"), (2: VertexId, "b")), 1)
+ val edges = sc.parallelize(List(Edge(1, 2, 0), Edge(2, 1, 0)), 2)
+ val graph = Graph(verts, edges, "", StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY)
+ // Note: Before caching, graph.vertices is cached, but graph.edges is not (but graph.edges'
+ // parent RDD is cached).
+ graph.cache()
+ assert(graph.vertices.getStorageLevel == StorageLevel.MEMORY_ONLY)
+ assert(graph.edges.getStorageLevel == StorageLevel.MEMORY_ONLY)
+ }
+ }
+
test("non-default number of edge partitions") {
val n = 10
val defaultParallelism = 3
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index 42d3f21dba..131959cea3 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -17,12 +17,11 @@
package org.apache.spark.graphx
-import org.apache.spark.SparkContext
-import org.apache.spark.graphx.Graph._
-import org.apache.spark.graphx.impl.EdgePartition
-import org.apache.spark.rdd._
import org.scalatest.FunSuite
+import org.apache.spark.SparkContext
+import org.apache.spark.storage.StorageLevel
+
class VertexRDDSuite extends FunSuite with LocalSparkContext {
def vertices(sc: SparkContext, n: Int) = {
@@ -110,4 +109,16 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
}
}
+ test("cache, getStorageLevel") {
+ // test to see if getStorageLevel returns correct value after caching
+ withSpark { sc =>
+ val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3)))
+ val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
+ val rdd = VertexRDD(verts, edges, 0, (a: Int, b: Int) => a + b)
+ assert(rdd.getStorageLevel == StorageLevel.NONE)
+ rdd.cache()
+ assert(rdd.getStorageLevel == StorageLevel.MEMORY_ONLY)
+ }
+ }
+
}