aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorJoseph E. Gonzalez <joseph.e.gonzalez@gmail.com>2014-11-01 01:18:07 -0700
committerReynold Xin <rxin@databricks.com>2014-11-01 01:18:07 -0700
commitf4e0b28c859412ec8bdfdf452b6a1b2e1bee310e (patch)
tree8d26d32ca612579ab75ddabf0d2eea1fb3e5c12a /graphx
parent680fd87c65e3e7ef223e6a1573c7afe55bff6324 (diff)
downloadspark-f4e0b28c859412ec8bdfdf452b6a1b2e1bee310e.tar.gz
spark-f4e0b28c859412ec8bdfdf452b6a1b2e1bee310e.tar.bz2
spark-f4e0b28c859412ec8bdfdf452b6a1b2e1bee310e.zip
[SPARK-4142][GraphX] Default numEdgePartitions
Changing the default number of edge partitions to match spark parallelism. Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com> Closes #3006 from jegonzal/default_partitions and squashes the following commits: a9a5c4f [Joseph E. Gonzalez] Changing the default number of edge partitions to match spark parallelism
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala12
1 files changed, 9 insertions, 3 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
index f4c79365b1..4933aecba1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
@@ -48,7 +48,8 @@ object GraphLoader extends Logging {
* @param path the path to the file (e.g., /home/data/file or hdfs://file)
* @param canonicalOrientation whether to orient edges in the positive
* direction
- * @param minEdgePartitions the number of partitions for the edge RDD
+ * @param numEdgePartitions the number of partitions for the edge RDD
+ * Setting this value to -1 will use the default parallelism.
* @param edgeStorageLevel the desired storage level for the edge partitions
* @param vertexStorageLevel the desired storage level for the vertex partitions
*/
@@ -56,7 +57,7 @@ object GraphLoader extends Logging {
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
- minEdgePartitions: Int = 1,
+ numEdgePartitions: Int = -1,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
: Graph[Int, Int] =
@@ -64,7 +65,12 @@ object GraphLoader extends Logging {
val startTime = System.currentTimeMillis
// Parse the edge data table directly into edge partitions
- val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)
+ val lines =
+ if (numEdgePartitions > 0) {
+ sc.textFile(path, numEdgePartitions).coalesce(numEdgePartitions)
+ } else {
+ sc.textFile(path)
+ }
val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[Int, Int]
iter.foreach { line =>