diff options
author | Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com> | 2014-11-01 01:18:07 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2014-11-01 01:18:07 -0700 |
commit | f4e0b28c859412ec8bdfdf452b6a1b2e1bee310e (patch) | |
tree | 8d26d32ca612579ab75ddabf0d2eea1fb3e5c12a /graphx | |
parent | 680fd87c65e3e7ef223e6a1573c7afe55bff6324 (diff) | |
download | spark-f4e0b28c859412ec8bdfdf452b6a1b2e1bee310e.tar.gz spark-f4e0b28c859412ec8bdfdf452b6a1b2e1bee310e.tar.bz2 spark-f4e0b28c859412ec8bdfdf452b6a1b2e1bee310e.zip |
[SPARK-4142][GraphX] Default numEdgePartitions
Changing the default number of edge partitions to match spark parallelism.
Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com>
Closes #3006 from jegonzal/default_partitions and squashes the following commits:
a9a5c4f [Joseph E. Gonzalez] Changing the default number of edge partitions to match spark parallelism
Diffstat (limited to 'graphx')
-rw-r--r-- | graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala | 12 |
1 files changed, 9 insertions, 3 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala index f4c79365b1..4933aecba1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala @@ -48,7 +48,8 @@ object GraphLoader extends Logging { * @param path the path to the file (e.g., /home/data/file or hdfs://file) * @param canonicalOrientation whether to orient edges in the positive * direction - * @param minEdgePartitions the number of partitions for the edge RDD + * @param numEdgePartitions the number of partitions for the edge RDD + * Setting this value to -1 will use the default parallelism. * @param edgeStorageLevel the desired storage level for the edge partitions * @param vertexStorageLevel the desired storage level for the vertex partitions */ @@ -56,7 +57,7 @@ object GraphLoader extends Logging { sc: SparkContext, path: String, canonicalOrientation: Boolean = false, - minEdgePartitions: Int = 1, + numEdgePartitions: Int = -1, edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) : Graph[Int, Int] = @@ -64,7 +65,12 @@ object GraphLoader extends Logging { val startTime = System.currentTimeMillis // Parse the edge data table directly into edge partitions - val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions) + val lines = + if (numEdgePartitions > 0) { + sc.textFile(path, numEdgePartitions).coalesce(numEdgePartitions) + } else { + sc.textFile(path) + } val edges = lines.mapPartitionsWithIndex { (pid, iter) => val builder = new EdgePartitionBuilder[Int, Int] iter.foreach { line => |