diff options
Diffstat (limited to 'graphx')
10 files changed, 26 insertions, 25 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 3e8c385302..87f3bc31e6 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -284,7 +284,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali if (selectedVertices.count > 1) { found = true val collectedVertices = selectedVertices.collect() - retVal = collectedVertices(Random.nextInt(collectedVertices.size)) + retVal = collectedVertices(Random.nextInt(collectedVertices.length)) } } retVal diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 53a9f92b82..5a0c479bb4 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -276,7 +276,7 @@ object VertexRDD { def apply[VD: ClassTag](vertices: RDD[(VertexId, VD)]): VertexRDD[VD] = { val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match { case Some(p) => vertices - case None => vertices.partitionBy(new HashPartitioner(vertices.partitions.size)) + case None => vertices.partitionBy(new HashPartitioner(vertices.partitions.length)) } val vertexPartitions = vPartitioned.mapPartitions( iter => Iterator(ShippableVertexPartition(iter)), @@ -317,7 +317,7 @@ object VertexRDD { ): VertexRDD[VD] = { val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match { case Some(p) => vertices - case None => vertices.partitionBy(new HashPartitioner(vertices.partitions.size)) + case None => vertices.partitionBy(new HashPartitioner(vertices.partitions.length)) } val routingTables = createRoutingTables(edges, vPartitioned.partitioner.get) val vertexPartitions = vPartitioned.zipPartitions(routingTables, preservesPartitioning = true) { @@ -358,7 +358,7 @@ object VertexRDD { Function.tupled(RoutingTablePartition.edgePartitionToMsgs))) .setName("VertexRDD.createRoutingTables - vid2pid (aggregation)") - val numEdgePartitions = edges.partitions.size + val numEdgePartitions = edges.partitions.length vid2pid.partitionBy(vertexPartitioner).mapPartitions( iter => Iterator(RoutingTablePartition.fromMsgs(numEdgePartitions, iter)), preservesPartitioning = true) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index ab021a252e..b1da781663 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -151,9 +151,9 @@ class EdgePartition[ * applied to each edge */ def map[ED2: ClassTag](f: Edge[ED] => ED2): EdgePartition[ED2, VD] = { - val newData = new Array[ED2](data.size) + val newData = new Array[ED2](data.length) val edge = new Edge[ED]() - val size = data.size + val size = data.length var i = 0 while (i < size) { edge.srcId = srcIds(i) @@ -179,13 +179,13 @@ class EdgePartition[ */ def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2, VD] = { // Faster than iter.toArray, because the expected size is known. - val newData = new Array[ED2](data.size) + val newData = new Array[ED2](data.length) var i = 0 while (iter.hasNext) { newData(i) = iter.next() i += 1 } - assert(newData.size == i) + assert(newData.length == i) this.withData(newData) } @@ -311,7 +311,7 @@ class EdgePartition[ * * @return size of the partition */ - val size: Int = localSrcIds.size + val size: Int = localSrcIds.length /** The number of unique source vertices in the partition. */ def indexSize: Int = index.size diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala index b122969b81..da3db3c4dc 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala @@ -38,9 +38,9 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla val edgeArray = edges.trim().array new Sorter(Edge.edgeArraySortDataFormat[ED]) .sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering) - val localSrcIds = new Array[Int](edgeArray.size) - val localDstIds = new Array[Int](edgeArray.size) - val data = new Array[ED](edgeArray.size) + val localSrcIds = new Array[Int](edgeArray.length) + val localDstIds = new Array[Int](edgeArray.length) + val data = new Array[ED](edgeArray.length) val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int] val global2local = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int] val local2global = new PrimitiveVector[VertexId] @@ -52,7 +52,7 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla var currSrcId: VertexId = edgeArray(0).srcId var currLocalId = -1 var i = 0 - while (i < edgeArray.size) { + while (i < edgeArray.length) { val srcId = edgeArray(i).srcId val dstId = edgeArray(i).dstId localSrcIds(i) = global2local.changeValue(srcId, @@ -98,9 +98,9 @@ class ExistingEdgePartitionBuilder[ val edgeArray = edges.trim().array new Sorter(EdgeWithLocalIds.edgeArraySortDataFormat[ED]) .sort(edgeArray, 0, edgeArray.length, EdgeWithLocalIds.lexicographicOrdering) - val localSrcIds = new Array[Int](edgeArray.size) - val localDstIds = new Array[Int](edgeArray.size) - val data = new Array[ED](edgeArray.size) + val localSrcIds = new Array[Int](edgeArray.length) + val localDstIds = new Array[Int](edgeArray.length) + val data = new Array[ED](edgeArray.length) val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int] // Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and // adding them to the index @@ -108,7 +108,7 @@ class ExistingEdgePartitionBuilder[ index.update(edgeArray(0).srcId, 0) var currSrcId: VertexId = edgeArray(0).srcId var i = 0 - while (i < edgeArray.size) { + while (i < edgeArray.length) { localSrcIds(i) = edgeArray(i).localSrcId localDstIds(i) = edgeArray(i).localDstId data(i) = edgeArray(i).attr diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index 6e153b7e80..98e082cc44 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -45,7 +45,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( * partitioner that allows co-partitioning with `partitionsRDD`. */ override val partitioner = - partitionsRDD.partitioner.orElse(Some(new HashPartitioner(partitions.size))) + partitionsRDD.partitioner.orElse(Some(new HashPartitioner(partitions.length))) override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect() diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 699731b360..7903caa312 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -93,7 +93,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( } override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = { - partitionBy(partitionStrategy, edges.partitions.size) + partitionBy(partitionStrategy, edges.partitions.length) } override def partitionBy( @@ -352,7 +352,8 @@ object GraphImpl { edgeStorageLevel: StorageLevel, vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = { val edgesCached = edges.withTargetStorageLevel(edgeStorageLevel).cache() - val vertices = VertexRDD.fromEdges(edgesCached, edgesCached.partitions.size, defaultVertexAttr) + val vertices = + VertexRDD.fromEdges(edgesCached, edgesCached.partitions.length, defaultVertexAttr) .withTargetStorageLevel(vertexStorageLevel) fromExistingRDDs(vertices, edgesCached) } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala index 3fd76902af..13e25b43f6 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala @@ -108,10 +108,10 @@ private[graphx] class RoutingTablePartition( private val routingTable: Array[(Array[VertexId], BitSet, BitSet)]) extends Serializable { /** The maximum number of edge partitions this `RoutingTablePartition` is built to join with. */ - val numEdgePartitions: Int = routingTable.size + val numEdgePartitions: Int = routingTable.length /** Returns the number of vertices that will be sent to the specified edge partition. */ - def partitionSize(pid: PartitionID): Int = routingTable(pid)._1.size + def partitionSize(pid: PartitionID): Int = routingTable(pid)._1.length /** Returns an iterator over all vertex ids stored in this `RoutingTablePartition`. */ def iterator: Iterator[VertexId] = routingTable.iterator.flatMap(_._1.iterator) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala index 96d807f9f9..6dab465fb9 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala @@ -28,7 +28,7 @@ private[graphx] class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexId], val attrs: Array[VD]) extends Serializable { def iterator: Iterator[(VertexId, VD)] = - (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) } + (0 until vids.length).iterator.map { i => (vids(i), attrs(i)) } } private[graphx] diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala index 51bcdf20de..026fb8bc7b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala @@ -70,7 +70,7 @@ object TriangleCount { graph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) => val set = new VertexSet(nbrs.length) var i = 0 - while (i < nbrs.size) { + while (i < nbrs.length) { // prevent self cycle if (nbrs(i) != vid) { set.add(nbrs(i)) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala index f1aa685a79..0bb9e0a3ea 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala @@ -32,7 +32,7 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext { val n = 100 val verts = vertices(sc, n) val evens = verts.filter(q => ((q._2 % 2) == 0)) - assert(evens.count === (0 to n).filter(_ % 2 == 0).size) + assert(evens.count === (0 to n).count(_ % 2 == 0)) } } |