aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-03-03 09:54:09 +0000
committerSean Owen <sowen@cloudera.com>2016-03-03 09:54:09 +0000
commite97fc7f176f8bf501c9b3afd8410014e3b0e1602 (patch)
tree23a11a3646b13195aaf50078a0f35fad96190618 /graphx
parent02b7677e9584f5ccd68869abdb0bf980dc847ce1 (diff)
downloadspark-e97fc7f176f8bf501c9b3afd8410014e3b0e1602.tar.gz
spark-e97fc7f176f8bf501c9b3afd8410014e3b0e1602.tar.bz2
spark-e97fc7f176f8bf501c9b3afd8410014e3b0e1602.zip
[SPARK-13423][WIP][CORE][SQL][STREAMING] Static analysis fixes for 2.x
## What changes were proposed in this pull request? Make some cross-cutting code improvements according to static analysis. These are individually up for discussion since they exist in separate commits that can be reverted. The changes are broadly: - Inner class should be static - Mismatched hashCode/equals - Overflow in compareTo - Unchecked warnings - Misuse of assert, vs junit.assert - get(a) + getOrElse(b) -> getOrElse(a,b) - Array/String .size -> .length (occasionally, -> .isEmpty / .nonEmpty) to avoid implicit conversions - Dead code - tailrec - exists(_ == ) -> contains find + nonEmpty -> exists filter + size -> count - reduce(_+_) -> sum map + flatten -> map The most controversial may be .size -> .length simply because of its size. It is intended to avoid implicits that might be expensive in some places. ## How was the this patch tested? Existing Jenkins unit tests. Author: Sean Owen <sowen@cloudera.com> Closes #11292 from srowen/SPARK-13423.
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala6
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala10
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala16
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala5
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala4
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala2
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala2
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala2
10 files changed, 26 insertions, 25 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index 3e8c385302..87f3bc31e6 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -284,7 +284,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
if (selectedVertices.count > 1) {
found = true
val collectedVertices = selectedVertices.collect()
- retVal = collectedVertices(Random.nextInt(collectedVertices.size))
+ retVal = collectedVertices(Random.nextInt(collectedVertices.length))
}
}
retVal
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 53a9f92b82..5a0c479bb4 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -276,7 +276,7 @@ object VertexRDD {
def apply[VD: ClassTag](vertices: RDD[(VertexId, VD)]): VertexRDD[VD] = {
val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
case Some(p) => vertices
- case None => vertices.partitionBy(new HashPartitioner(vertices.partitions.size))
+ case None => vertices.partitionBy(new HashPartitioner(vertices.partitions.length))
}
val vertexPartitions = vPartitioned.mapPartitions(
iter => Iterator(ShippableVertexPartition(iter)),
@@ -317,7 +317,7 @@ object VertexRDD {
): VertexRDD[VD] = {
val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
case Some(p) => vertices
- case None => vertices.partitionBy(new HashPartitioner(vertices.partitions.size))
+ case None => vertices.partitionBy(new HashPartitioner(vertices.partitions.length))
}
val routingTables = createRoutingTables(edges, vPartitioned.partitioner.get)
val vertexPartitions = vPartitioned.zipPartitions(routingTables, preservesPartitioning = true) {
@@ -358,7 +358,7 @@ object VertexRDD {
Function.tupled(RoutingTablePartition.edgePartitionToMsgs)))
.setName("VertexRDD.createRoutingTables - vid2pid (aggregation)")
- val numEdgePartitions = edges.partitions.size
+ val numEdgePartitions = edges.partitions.length
vid2pid.partitionBy(vertexPartitioner).mapPartitions(
iter => Iterator(RoutingTablePartition.fromMsgs(numEdgePartitions, iter)),
preservesPartitioning = true)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index ab021a252e..b1da781663 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -151,9 +151,9 @@ class EdgePartition[
* applied to each edge
*/
def map[ED2: ClassTag](f: Edge[ED] => ED2): EdgePartition[ED2, VD] = {
- val newData = new Array[ED2](data.size)
+ val newData = new Array[ED2](data.length)
val edge = new Edge[ED]()
- val size = data.size
+ val size = data.length
var i = 0
while (i < size) {
edge.srcId = srcIds(i)
@@ -179,13 +179,13 @@ class EdgePartition[
*/
def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2, VD] = {
// Faster than iter.toArray, because the expected size is known.
- val newData = new Array[ED2](data.size)
+ val newData = new Array[ED2](data.length)
var i = 0
while (iter.hasNext) {
newData(i) = iter.next()
i += 1
}
- assert(newData.size == i)
+ assert(newData.length == i)
this.withData(newData)
}
@@ -311,7 +311,7 @@ class EdgePartition[
*
* @return size of the partition
*/
- val size: Int = localSrcIds.size
+ val size: Int = localSrcIds.length
/** The number of unique source vertices in the partition. */
def indexSize: Int = index.size
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index b122969b81..da3db3c4dc 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -38,9 +38,9 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla
val edgeArray = edges.trim().array
new Sorter(Edge.edgeArraySortDataFormat[ED])
.sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering)
- val localSrcIds = new Array[Int](edgeArray.size)
- val localDstIds = new Array[Int](edgeArray.size)
- val data = new Array[ED](edgeArray.size)
+ val localSrcIds = new Array[Int](edgeArray.length)
+ val localDstIds = new Array[Int](edgeArray.length)
+ val data = new Array[ED](edgeArray.length)
val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
val global2local = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
val local2global = new PrimitiveVector[VertexId]
@@ -52,7 +52,7 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla
var currSrcId: VertexId = edgeArray(0).srcId
var currLocalId = -1
var i = 0
- while (i < edgeArray.size) {
+ while (i < edgeArray.length) {
val srcId = edgeArray(i).srcId
val dstId = edgeArray(i).dstId
localSrcIds(i) = global2local.changeValue(srcId,
@@ -98,9 +98,9 @@ class ExistingEdgePartitionBuilder[
val edgeArray = edges.trim().array
new Sorter(EdgeWithLocalIds.edgeArraySortDataFormat[ED])
.sort(edgeArray, 0, edgeArray.length, EdgeWithLocalIds.lexicographicOrdering)
- val localSrcIds = new Array[Int](edgeArray.size)
- val localDstIds = new Array[Int](edgeArray.size)
- val data = new Array[ED](edgeArray.size)
+ val localSrcIds = new Array[Int](edgeArray.length)
+ val localDstIds = new Array[Int](edgeArray.length)
+ val data = new Array[ED](edgeArray.length)
val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
// Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
// adding them to the index
@@ -108,7 +108,7 @@ class ExistingEdgePartitionBuilder[
index.update(edgeArray(0).srcId, 0)
var currSrcId: VertexId = edgeArray(0).srcId
var i = 0
- while (i < edgeArray.size) {
+ while (i < edgeArray.length) {
localSrcIds(i) = edgeArray(i).localSrcId
localDstIds(i) = edgeArray(i).localDstId
data(i) = edgeArray(i).attr
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
index 6e153b7e80..98e082cc44 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -45,7 +45,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
* partitioner that allows co-partitioning with `partitionsRDD`.
*/
override val partitioner =
- partitionsRDD.partitioner.orElse(Some(new HashPartitioner(partitions.size)))
+ partitionsRDD.partitioner.orElse(Some(new HashPartitioner(partitions.length)))
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 699731b360..7903caa312 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -93,7 +93,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
}
override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
- partitionBy(partitionStrategy, edges.partitions.size)
+ partitionBy(partitionStrategy, edges.partitions.length)
}
override def partitionBy(
@@ -352,7 +352,8 @@ object GraphImpl {
edgeStorageLevel: StorageLevel,
vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
val edgesCached = edges.withTargetStorageLevel(edgeStorageLevel).cache()
- val vertices = VertexRDD.fromEdges(edgesCached, edgesCached.partitions.size, defaultVertexAttr)
+ val vertices =
+ VertexRDD.fromEdges(edgesCached, edgesCached.partitions.length, defaultVertexAttr)
.withTargetStorageLevel(vertexStorageLevel)
fromExistingRDDs(vertices, edgesCached)
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
index 3fd76902af..13e25b43f6 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
@@ -108,10 +108,10 @@ private[graphx]
class RoutingTablePartition(
private val routingTable: Array[(Array[VertexId], BitSet, BitSet)]) extends Serializable {
/** The maximum number of edge partitions this `RoutingTablePartition` is built to join with. */
- val numEdgePartitions: Int = routingTable.size
+ val numEdgePartitions: Int = routingTable.length
/** Returns the number of vertices that will be sent to the specified edge partition. */
- def partitionSize(pid: PartitionID): Int = routingTable(pid)._1.size
+ def partitionSize(pid: PartitionID): Int = routingTable(pid)._1.length
/** Returns an iterator over all vertex ids stored in this `RoutingTablePartition`. */
def iterator: Iterator[VertexId] = routingTable.iterator.flatMap(_._1.iterator)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
index 96d807f9f9..6dab465fb9 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
@@ -28,7 +28,7 @@ private[graphx]
class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexId], val attrs: Array[VD])
extends Serializable {
def iterator: Iterator[(VertexId, VD)] =
- (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) }
+ (0 until vids.length).iterator.map { i => (vids(i), attrs(i)) }
}
private[graphx]
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
index 51bcdf20de..026fb8bc7b 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala
@@ -70,7 +70,7 @@ object TriangleCount {
graph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
val set = new VertexSet(nbrs.length)
var i = 0
- while (i < nbrs.size) {
+ while (i < nbrs.length) {
// prevent self cycle
if (nbrs(i) != vid) {
set.add(nbrs(i))
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index f1aa685a79..0bb9e0a3ea 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -32,7 +32,7 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
val n = 100
val verts = vertices(sc, n)
val evens = verts.filter(q => ((q._2 % 2) == 0))
- assert(evens.count === (0 to n).filter(_ % 2 == 0).size)
+ assert(evens.count === (0 to n).count(_ % 2 == 0))
}
}