aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTakeshi Yamamuro <linguin.m.s@gmail.com>2014-12-07 19:36:08 -0800
committerAnkur Dave <ankurdave@gmail.com>2014-12-07 19:37:32 -0800
commita4ae7c8b533b3998484879439c0982170c3c38a7 (patch)
tree5d4b1faa16e4899543f50616972cf59fde77aa04
parent27d9f13af2df3bd7af029cf7ac48443ba6f4d6e0 (diff)
downloadspark-a4ae7c8b533b3998484879439c0982170c3c38a7.tar.gz
spark-a4ae7c8b533b3998484879439c0982170c3c38a7.tar.bz2
spark-a4ae7c8b533b3998484879439c0982170c3c38a7.zip
[SPARK-4646] Replace Scala.util.Sorting.quickSort with Sorter(TimSort) in Spark
This patch just replaces a native quick sorter with Sorter(TimSort) in Spark. It could get performance gains by ~8% in my quick experiments. Author: Takeshi Yamamuro <linguin.m.s@gmail.com> Closes #3507 from maropu/TimSortInEdgePartitionBuilderSpike and squashes the following commits: 8d4e5d2 [Takeshi Yamamuro] Remove a wildcard import 3527e00 [Takeshi Yamamuro] Replace Scala.util.Sorting.quickSort with Sorter(TimSort) in Spark (cherry picked from commit 2e6b736b0e6e5920d0523533c87832a53211db42) Signed-off-by: Ankur Dave <ankurdave@gmail.com>
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/Edge.scala30
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala39
2 files changed, 64 insertions, 5 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
index 7e842ec4cc..ecc37dcaad 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala
@@ -17,6 +17,8 @@
package org.apache.spark.graphx
+import org.apache.spark.util.collection.SortDataFormat
+
/**
* A single directed edge consisting of a source id, target id,
* and the data associated with the edge.
@@ -65,4 +67,32 @@ object Edge {
else 1
}
}
+
+ private[graphx] def edgeArraySortDataFormat[ED] = new SortDataFormat[Edge[ED], Array[Edge[ED]]] {
+ override def getKey(data: Array[Edge[ED]], pos: Int): Edge[ED] = {
+ data(pos)
+ }
+
+ override def swap(data: Array[Edge[ED]], pos0: Int, pos1: Int): Unit = {
+ val tmp = data(pos0)
+ data(pos0) = data(pos1)
+ data(pos1) = tmp
+ }
+
+ override def copyElement(
+ src: Array[Edge[ED]], srcPos: Int,
+ dst: Array[Edge[ED]], dstPos: Int) {
+ dst(dstPos) = src(srcPos)
+ }
+
+ override def copyRange(
+ src: Array[Edge[ED]], srcPos: Int,
+ dst: Array[Edge[ED]], dstPos: Int, length: Int) {
+ System.arraycopy(src, srcPos, dst, dstPos, length)
+ }
+
+ override def allocate(length: Int): Array[Edge[ED]] = {
+ new Array[Edge[ED]](length)
+ }
+ }
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index b0cb0fe47d..409cf60977 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -18,12 +18,10 @@
package org.apache.spark.graphx.impl
import scala.reflect.ClassTag
-import scala.util.Sorting
-
-import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.util.collection.{SortDataFormat, Sorter, PrimitiveVector}
/** Constructs an EdgePartition from scratch. */
private[graphx]
@@ -38,7 +36,8 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla
def toEdgePartition: EdgePartition[ED, VD] = {
val edgeArray = edges.trim().array
- Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering)
+ new Sorter(Edge.edgeArraySortDataFormat[ED])
+ .sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering)
val localSrcIds = new Array[Int](edgeArray.size)
val localDstIds = new Array[Int](edgeArray.size)
val data = new Array[ED](edgeArray.size)
@@ -97,7 +96,8 @@ class ExistingEdgePartitionBuilder[
def toEdgePartition: EdgePartition[ED, VD] = {
val edgeArray = edges.trim().array
- Sorting.quickSort(edgeArray)(EdgeWithLocalIds.lexicographicOrdering)
+ new Sorter(EdgeWithLocalIds.edgeArraySortDataFormat[ED])
+ .sort(edgeArray, 0, edgeArray.length, EdgeWithLocalIds.lexicographicOrdering)
val localSrcIds = new Array[Int](edgeArray.size)
val localDstIds = new Array[Int](edgeArray.size)
val data = new Array[ED](edgeArray.size)
@@ -140,4 +140,33 @@ private[impl] object EdgeWithLocalIds {
}
}
+ private[graphx] def edgeArraySortDataFormat[ED]
+ = new SortDataFormat[EdgeWithLocalIds[ED], Array[EdgeWithLocalIds[ED]]] {
+ override def getKey(
+ data: Array[EdgeWithLocalIds[ED]], pos: Int): EdgeWithLocalIds[ED] = {
+ data(pos)
+ }
+
+ override def swap(data: Array[EdgeWithLocalIds[ED]], pos0: Int, pos1: Int): Unit = {
+ val tmp = data(pos0)
+ data(pos0) = data(pos1)
+ data(pos1) = tmp
+ }
+
+ override def copyElement(
+ src: Array[EdgeWithLocalIds[ED]], srcPos: Int,
+ dst: Array[EdgeWithLocalIds[ED]], dstPos: Int) {
+ dst(dstPos) = src(srcPos)
+ }
+
+ override def copyRange(
+ src: Array[EdgeWithLocalIds[ED]], srcPos: Int,
+ dst: Array[EdgeWithLocalIds[ED]], dstPos: Int, length: Int) {
+ System.arraycopy(src, srcPos, dst, dstPos, length)
+ }
+
+ override def allocate(length: Int): Array[EdgeWithLocalIds[ED]] = {
+ new Array[EdgeWithLocalIds[ED]](length)
+ }
+ }
}