aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/Analytics.scala10
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala3
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala12
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala94
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala215
5 files changed, 196 insertions, 138 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
index 8320b663a5..f542ec6069 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
@@ -241,6 +241,7 @@ object Analytics extends Logging {
var outFname = ""
var numVPart = 4
var numEPart = 4
+ var partitionStrategy: PartitionStrategy = RandomVertexCut()
options.foreach{
case ("numIter", v) => numIter = v.toInt
@@ -249,6 +250,15 @@ object Analytics extends Logging {
case ("output", v) => outFname = v
case ("numVPart", v) => numVPart = v.toInt
case ("numEPart", v) => numEPart = v.toInt
+ case ("partStrategy", v) => {
+ partitionStrategy = v match {
+ case "RandomVertexCut" => RandomVertexCut()
+ case "EdgePartition1D" => EdgePartition1D()
+ case "EdgePartition2D" => EdgePartition2D()
+ case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut()
+ case _ => throw new IllegalArgumentException("Invalid Partition Strategy: " + v)
+ }
+ }
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
index baf8099556..6f18e46ab2 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
@@ -5,6 +5,7 @@ import com.esotericsoftware.kryo.Kryo
import org.apache.spark.graph.impl._
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.util.collection.BitSet
+import org.apache.spark.graph._
class GraphKryoRegistrator extends KryoRegistrator {
@@ -19,6 +20,8 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[BitSet])
kryo.register(classOf[VertexIdToIndexMap])
kryo.register(classOf[VertexAttributeBlock[Object]])
+ kryo.register(classOf[PartitionStrategy])
+
// This avoids a large number of hash table lookups.
kryo.setReferences(false)
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
index 313737fdbe..4dc33a02ce 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphLoader.scala
@@ -27,8 +27,8 @@ object GraphLoader {
path: String,
edgeParser: Array[String] => ED,
minEdgePartitions: Int = 1,
- minVertexPartitions: Int = 1)
- : GraphImpl[Int, ED] = {
+ minVertexPartitions: Int = 1,
+ partitionStrategy: PartitionStrategy = RandomVertexCut()): GraphImpl[Int, ED] = {
// Parse the edge data table
val edges = sc.textFile(path, minEdgePartitions).flatMap { line =>
@@ -48,13 +48,15 @@ object GraphLoader {
}
}.cache()
- val graph = fromEdges(edges)
+ val graph = fromEdges(edges, partitionStrategy)
graph
}
- private def fromEdges[ED: ClassManifest](edges: RDD[Edge[ED]]): GraphImpl[Int, ED] = {
+ private def fromEdges[ED: ClassManifest](
+ edges: RDD[Edge[ED]],
+ partitionStrategy: PartitionStrategy): GraphImpl[Int, ED] = {
val vertices = edges.flatMap { edge => List((edge.srcId, 1), (edge.dstId, 1)) }
.reduceByKey(_ + _)
- GraphImpl(vertices, edges, 0)
+ GraphImpl(vertices, edges, 0, (a: Int, b: Int) => a, partitionStrategy)
}
}
diff --git a/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala b/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala
new file mode 100644
index 0000000000..cf65f50657
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala
@@ -0,0 +1,94 @@
+package org.apache.spark.graph
+
+
+sealed trait PartitionStrategy extends Serializable {
+ def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid
+}
+
+
+/**
+ * This function implements a classic 2D-Partitioning of a sparse matrix.
+ * Suppose we have a graph with 11 vertices that we want to partition
+ * over 9 machines. We can use the following sparse matrix representation:
+ *
+ * __________________________________
+ * v0 | P0 * | P1 | P2 * |
+ * v1 | **** | * | |
+ * v2 | ******* | ** | **** |
+ * v3 | ***** | * * | * |
+ * ----------------------------------
+ * v4 | P3 * | P4 *** | P5 ** * |
+ * v5 | * * | * | |
+ * v6 | * | ** | **** |
+ * v7 | * * * | * * | * |
+ * ----------------------------------
+ * v8 | P6 * | P7 * | P8 * *|
+ * v9 | * | * * | |
+ * v10 | * | ** | * * |
+ * v11 | * <-E | *** | ** |
+ * ----------------------------------
+ *
+ * The edge denoted by E connects v11 with v1 and is assigned to
+ * processor P6. To get the processor number we divide the matrix
+ * into sqrt(numProc) by sqrt(numProc) blocks. Notice that edges
+ * adjacent to v11 can only be in the first colum of
+ * blocks (P0, P3, P6) or the last row of blocks (P6, P7, P8).
+ * As a consequence we can guarantee that v11 will need to be
+ * replicated to at most 2 * sqrt(numProc) machines.
+ *
+ * Notice that P0 has many edges and as a consequence this
+ * partitioning would lead to poor work balance. To improve
+ * balance we first multiply each vertex id by a large prime
+ * to effectively shuffle the vertex locations.
+ *
+ * One of the limitations of this approach is that the number of
+ * machines must either be a perfect square. We partially address
+ * this limitation by computing the machine assignment to the next
+ * largest perfect square and then mapping back down to the actual
+ * number of machines. Unfortunately, this can also lead to work
+ * imbalance and so it is suggested that a perfect square is used.
+ *
+ *
+ */
+case class EdgePartition2D() extends PartitionStrategy {
+ override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = {
+ val ceilSqrtNumParts: Pid = math.ceil(math.sqrt(numParts)).toInt
+ val mixingPrime: Vid = 1125899906842597L
+ val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
+ val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
+ (col * ceilSqrtNumParts + row) % numParts
+ }
+}
+
+
+case class EdgePartition1D() extends PartitionStrategy {
+ override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = {
+ val mixingPrime: Vid = 1125899906842597L
+ (math.abs(src) * mixingPrime).toInt % numParts
+ }
+}
+
+
+/**
+ * Assign edges to an aribtrary machine corresponding to a
+ * random vertex cut.
+ */
+case class RandomVertexCut() extends PartitionStrategy {
+ override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = {
+ math.abs((src, dst).hashCode()) % numParts
+ }
+}
+
+
+/**
+ * Assign edges to an arbitrary machine corresponding to a random vertex cut. This
+ * function ensures that edges of opposite direction between the same two vertices
+ * will end up on the same partition.
+ */
+case class CanonicalRandomVertexCut() extends PartitionStrategy {
+ override def getPartition(src: Vid, dst: Vid, numParts: Pid): Pid = {
+ val lower = math.min(src, dst)
+ val higher = math.max(src, dst)
+ math.abs((lower, higher).hashCode()) % numParts
+ }
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index d16a81d203..6ad0ce60a7 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -8,6 +8,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkContext._
import org.apache.spark.HashPartitioner
import org.apache.spark.util.ClosureCleaner
+import org.apache.spark.SparkException
import org.apache.spark.Partitioner
import org.apache.spark.graph._
@@ -76,10 +77,11 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
@transient val vTable: VertexSetRDD[VD],
@transient val vid2pid: Vid2Pid,
@transient val localVidMap: RDD[(Pid, VertexIdToIndexMap)],
- @transient val eTable: RDD[(Pid, EdgePartition[ED])] )
+ @transient val eTable: RDD[(Pid, EdgePartition[ED])],
+ @transient val partitioner: PartitionStrategy)
extends Graph[VD, ED] {
- def this() = this(null, null, null, null)
+ def this() = this(null, null, null, null, null)
@transient val vTableReplicatedValues: VTableReplicatedValues[VD] =
new VTableReplicatedValues(vTable, vid2pid, localVidMap)
@@ -189,18 +191,18 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
override def reverse: Graph[VD, ED] = {
val newEtable = eTable.mapPartitions( _.map{ case (pid, epart) => (pid, epart.reverse) },
preservesPartitioning = true)
- new GraphImpl(vTable, vid2pid, localVidMap, newEtable)
+ new GraphImpl(vTable, vid2pid, localVidMap, newEtable, partitioner)
}
override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = {
val newVTable = vTable.mapValuesWithKeys((vid, data) => f(vid, data))
- new GraphImpl(newVTable, vid2pid, localVidMap, eTable)
+ new GraphImpl(newVTable, vid2pid, localVidMap, eTable, partitioner)
}
override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] = {
val newETable = eTable.mapPartitions(_.map{ case (pid, epart) => (pid, epart.map(f)) },
preservesPartitioning = true)
- new GraphImpl(vTable, vid2pid, localVidMap, newETable)
+ new GraphImpl(vTable, vid2pid, localVidMap, newETable, partitioner)
}
override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] =
@@ -234,7 +236,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
triplets.filter(
t => vpred( t.srcId, t.srcAttr ) && vpred( t.dstId, t.dstAttr ) && epred(t)
)
- .map( t => Edge(t.srcId, t.dstId, t.attr) ))
+ .map( t => Edge(t.srcId, t.dstId, t.attr) ), partitioner)
// Construct the Vid2Pid map. Here we assume that the filter operation
// behaves deterministically.
@@ -242,48 +244,60 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
val newVid2Pid = new Vid2Pid(newETable, newVTable.index)
val newVidMap = createLocalVidMap(newETable)
- new GraphImpl(newVTable, newVid2Pid, localVidMap, newETable)
+ new GraphImpl(newVTable, newVid2Pid, localVidMap, newETable, partitioner)
}
override def groupEdgeTriplets[ED2: ClassManifest](
f: Iterator[EdgeTriplet[VD,ED]] => ED2 ): Graph[VD,ED2] = {
- val newEdges: RDD[Edge[ED2]] = triplets.mapPartitions { partIter =>
- partIter
- // TODO(crankshaw) toList requires that the entire edge partition
- // can fit in memory right now.
- .toList
- // groups all ETs in this partition that have the same src and dst
- // Because all ETs with the same src and dst will live on the same
- // partition due to the EdgePartitioner, this guarantees that these
- // ET groups will be complete.
- .groupBy { t: EdgeTriplet[VD, ED] => (t.srcId, t.dstId) }
- .mapValues { ts: List[EdgeTriplet[VD, ED]] => f(ts.toIterator) }
- .toList
- .toIterator
- .map { case ((src, dst), data) => Edge(src, dst, data) }
- .toIterator
- }
+ partitioner match {
+ case _: CanonicalRandomVertexCut => {
+ val newEdges: RDD[Edge[ED2]] = triplets.mapPartitions { partIter =>
+ partIter
+ // TODO(crankshaw) toList requires that the entire edge partition
+ // can fit in memory right now.
+ .toList
+ // groups all ETs in this partition that have the same src and dst
+ // Because all ETs with the same src and dst will live on the same
+ // partition due to the canonicalRandomVertexCut partitioner, this
+ // guarantees that these ET groups will be complete.
+ .groupBy { t: EdgeTriplet[VD, ED] => (t.srcId, t.dstId) }
+ .mapValues { ts: List[EdgeTriplet[VD, ED]] => f(ts.toIterator) }
+ .toList
+ .toIterator
+ .map { case ((src, dst), data) => Edge(src, dst, data) }
+ .toIterator
+ }
+ //TODO(crankshaw) eliminate the need to call createETable
+ val newETable = createETable(newEdges, partitioner)
+ new GraphImpl(vTable, vid2pid, localVidMap, newETable, partitioner)
+ }
- //TODO(crankshaw) eliminate the need to call createETable
- val newETable = createETable(newEdges)
- new GraphImpl(vTable, vid2pid, localVidMap, newETable)
+ case _ => throw new SparkException(partitioner.getClass.getName
+ + " is incompatible with groupEdgeTriplets")
+ }
}
override def groupEdges[ED2: ClassManifest](f: Iterator[Edge[ED]] => ED2 ):
Graph[VD,ED2] = {
+ partitioner match {
+ case _: CanonicalRandomVertexCut => {
+ val newEdges: RDD[Edge[ED2]] = edges.mapPartitions { partIter =>
+ partIter.toList
+ .groupBy { e: Edge[ED] => (e.srcId, e.dstId) }
+ .mapValues { ts => f(ts.toIterator) }
+ .toList
+ .toIterator
+ .map { case ((src, dst), data) => Edge(src, dst, data) }
+ }
+ // TODO(crankshaw) eliminate the need to call createETable
+ val newETable = createETable(newEdges, partitioner)
- val newEdges: RDD[Edge[ED2]] = edges.mapPartitions { partIter =>
- partIter.toList
- .groupBy { e: Edge[ED] => (e.srcId, e.dstId) }
- .mapValues { ts => f(ts.toIterator) }
- .toList
- .toIterator
- .map { case ((src, dst), data) => Edge(src, dst, data) }
- }
- // TODO(crankshaw) eliminate the need to call createETable
- val newETable = createETable(newEdges)
+ new GraphImpl(vTable, vid2pid, localVidMap, newETable, partitioner)
+ }
- new GraphImpl(vTable, vid2pid, localVidMap, newETable)
+ case _ => throw new SparkException(partitioner.getClass.getName
+ + " is incompatible with groupEdges")
+ }
}
//////////////////////////////////////////////////////////////////////////////////////////////////
@@ -301,7 +315,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
: Graph[VD2, ED] = {
ClosureCleaner.clean(updateF)
val newVTable = vTable.leftJoin(updates)(updateF)
- new GraphImpl(newVTable, vid2pid, localVidMap, eTable)
+ new GraphImpl(newVTable, vid2pid, localVidMap, eTable, partitioner)
}
} // end of class GraphImpl
@@ -309,10 +323,18 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
object GraphImpl {
def apply[VD: ClassManifest, ED: ClassManifest](
- vertices: RDD[(Vid, VD)], edges: RDD[Edge[ED]],
- defaultVertexAttr: VD):
- GraphImpl[VD,ED] = {
- apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a)
+ vertices: RDD[(Vid, VD)],
+ edges: RDD[Edge[ED]],
+ defaultVertexAttr: VD): GraphImpl[VD,ED] = {
+ apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a, RandomVertexCut())
+ }
+
+ def apply[VD: ClassManifest, ED: ClassManifest](
+ vertices: RDD[(Vid, VD)],
+ edges: RDD[Edge[ED]],
+ defaultVertexAttr: VD,
+ partitionStrategy: PartitionStrategy): GraphImpl[VD,ED] = {
+ apply(vertices, edges, defaultVertexAttr, (a:VD, b:VD) => a, partitionStrategy)
}
def apply[VD: ClassManifest, ED: ClassManifest](
@@ -320,9 +342,18 @@ object GraphImpl {
edges: RDD[Edge[ED]],
defaultVertexAttr: VD,
mergeFunc: (VD, VD) => VD): GraphImpl[VD,ED] = {
+ apply(vertices, edges, defaultVertexAttr, mergeFunc, RandomVertexCut())
+ }
+
+ def apply[VD: ClassManifest, ED: ClassManifest](
+ vertices: RDD[(Vid, VD)],
+ edges: RDD[Edge[ED]],
+ defaultVertexAttr: VD,
+ mergeFunc: (VD, VD) => VD,
+ partitionStrategy: PartitionStrategy): GraphImpl[VD,ED] = {
vertices.cache
- val etable = createETable(edges).cache
+ val etable = createETable(edges, partitionStrategy).cache
// Get the set of all vids, preserving partitions
val partitioner = Partitioner.defaultPartitioner(vertices)
val implicitVids = etable.flatMap {
@@ -338,9 +369,10 @@ object GraphImpl {
val vid2pid = new Vid2Pid(etable, vtable.index)
val localVidMap = createLocalVidMap(etable)
- new GraphImpl(vtable, vid2pid, localVidMap, etable)
+ new GraphImpl(vtable, vid2pid, localVidMap, etable, partitionStrategy)
}
+
/**
* Create the edge table RDD, which is much more efficient for Java heap storage than the
* normal edges data structure (RDD[(Vid, Vid, ED)]).
@@ -349,17 +381,14 @@ object GraphImpl {
* key-value pair: the key is the partition id, and the value is an EdgePartition object
* containing all the edges in a partition.
*/
- protected def createETable[ED: ClassManifest](edges: RDD[Edge[ED]])
- : RDD[(Pid, EdgePartition[ED])] = {
+ protected def createETable[ED: ClassManifest](
+ edges: RDD[Edge[ED]],
+ partitionStrategy: PartitionStrategy): RDD[(Pid, EdgePartition[ED])] = {
// Get the number of partitions
val numPartitions = edges.partitions.size
- val ceilSqrt: Pid = math.ceil(math.sqrt(numPartitions)).toInt
+
edges.map { e =>
- // Random partitioning based on the source vertex id.
- // val part: Pid = edgePartitionFunction1D(e.srcId, e.dstId, numPartitions)
- // val part: Pid = edgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt)
- val part: Pid = randomVertexCut(e.srcId, e.dstId, numPartitions)
- //val part: Pid = canonicalEdgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt)
+ val part: Pid = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
// Should we be using 3-tuple or an optimized class
new MessageToPartition(part, (e.srcId, e.dstId, e.attr))
@@ -420,7 +449,7 @@ object GraphImpl {
}
Iterator((pid, newEdgePartition))
}
- new GraphImpl(g.vTable, g.vid2pid, g.localVidMap, newETable)
+ new GraphImpl(g.vTable, g.vid2pid, g.localVidMap, newETable, g.partitioner)
}
def mapReduceTriplets[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
@@ -491,86 +520,6 @@ object GraphImpl {
VertexSetRDD.aggregate(preAgg, g.vTable.index, reduceFunc)
}
- protected def edgePartitionFunction1D(src: Vid, dst: Vid, numParts: Pid): Pid = {
- val mixingPrime: Vid = 1125899906842597L
- (math.abs(src) * mixingPrime).toInt % numParts
- }
-
- /**
- * This function implements a classic 2D-Partitioning of a sparse matrix.
- * Suppose we have a graph with 11 vertices that we want to partition
- * over 9 machines. We can use the following sparse matrix representation:
- *
- * __________________________________
- * v0 | P0 * | P1 | P2 * |
- * v1 | **** | * | |
- * v2 | ******* | ** | **** |
- * v3 | ***** | * * | * |
- * ----------------------------------
- * v4 | P3 * | P4 *** | P5 ** * |
- * v5 | * * | * | |
- * v6 | * | ** | **** |
- * v7 | * * * | * * | * |
- * ----------------------------------
- * v8 | P6 * | P7 * | P8 * *|
- * v9 | * | * * | |
- * v10 | * | ** | * * |
- * v11 | * <-E | *** | ** |
- * ----------------------------------
- *
- * The edge denoted by E connects v11 with v1 and is assigned to
- * processor P6. To get the processor number we divide the matrix
- * into sqrt(numProc) by sqrt(numProc) blocks. Notice that edges
- * adjacent to v11 can only be in the first colum of
- * blocks (P0, P3, P6) or the last row of blocks (P6, P7, P8).
- * As a consequence we can guarantee that v11 will need to be
- * replicated to at most 2 * sqrt(numProc) machines.
- *
- * Notice that P0 has many edges and as a consequence this
- * partitioning would lead to poor work balance. To improve
- * balance we first multiply each vertex id by a large prime
- * to effectively shuffle the vertex locations.
- *
- * One of the limitations of this approach is that the number of
- * machines must either be a perfect square. We partially address
- * this limitation by computing the machine assignment to the next
- * largest perfect square and then mapping back down to the actual
- * number of machines. Unfortunately, this can also lead to work
- * imbalance and so it is suggested that a perfect square is used.
- *
- *
- */
- protected def edgePartitionFunction2D(src: Vid, dst: Vid,
- numParts: Pid, ceilSqrtNumParts: Pid): Pid = {
- val mixingPrime: Vid = 1125899906842597L
- val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
- val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
- (col * ceilSqrtNumParts + row) % numParts
- }
-
- /**
- * Assign edges to an aribtrary machine corresponding to a
- * random vertex cut.
- */
- protected def randomVertexCut(src: Vid, dst: Vid, numParts: Pid): Pid = {
- math.abs((src, dst).hashCode()) % numParts
- }
-
- /**
- * @todo This will only partition edges to the upper diagonal
- * of the 2D processor space.
- */
- protected def canonicalEdgePartitionFunction2D(srcOrig: Vid, dstOrig: Vid,
- numParts: Pid, ceilSqrtNumParts: Pid): Pid = {
- val mixingPrime: Vid = 1125899906842597L
- // Partitions by canonical edge direction
- val src = math.min(srcOrig, dstOrig)
- val dst = math.max(srcOrig, dstOrig)
- val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
- val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
- (col * ceilSqrtNumParts + row) % numParts
- }
-
private def accessesVertexAttr[VD: ClassManifest, ED: ClassManifest](
closure: AnyRef, attrName: String): Boolean = {
try {