From a4ae7c8b533b3998484879439c0982170c3c38a7 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sun, 7 Dec 2014 19:36:08 -0800 Subject: [SPARK-4646] Replace Scala.util.Sorting.quickSort with Sorter(TimSort) in Spark This patch just replaces a native quick sorter with Sorter(TimSort) in Spark. It could get performance gains by ~8% in my quick experiments. Author: Takeshi Yamamuro Closes #3507 from maropu/TimSortInEdgePartitionBuilderSpike and squashes the following commits: 8d4e5d2 [Takeshi Yamamuro] Remove a wildcard import 3527e00 [Takeshi Yamamuro] Replace Scala.util.Sorting.quickSort with Sorter(TimSort) in Spark (cherry picked from commit 2e6b736b0e6e5920d0523533c87832a53211db42) Signed-off-by: Ankur Dave --- .../main/scala/org/apache/spark/graphx/Edge.scala | 30 +++++++++++++++++ .../spark/graphx/impl/EdgePartitionBuilder.scala | 39 +++++++++++++++++++--- 2 files changed, 64 insertions(+), 5 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala index 7e842ec4cc..ecc37dcaad 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala @@ -17,6 +17,8 @@ package org.apache.spark.graphx +import org.apache.spark.util.collection.SortDataFormat + /** * A single directed edge consisting of a source id, target id, * and the data associated with the edge. @@ -65,4 +67,32 @@ object Edge { else 1 } } + + private[graphx] def edgeArraySortDataFormat[ED] = new SortDataFormat[Edge[ED], Array[Edge[ED]]] { + override def getKey(data: Array[Edge[ED]], pos: Int): Edge[ED] = { + data(pos) + } + + override def swap(data: Array[Edge[ED]], pos0: Int, pos1: Int): Unit = { + val tmp = data(pos0) + data(pos0) = data(pos1) + data(pos1) = tmp + } + + override def copyElement( + src: Array[Edge[ED]], srcPos: Int, + dst: Array[Edge[ED]], dstPos: Int) { + dst(dstPos) = src(srcPos) + } + + override def copyRange( + src: Array[Edge[ED]], srcPos: Int, + dst: Array[Edge[ED]], dstPos: Int, length: Int) { + System.arraycopy(src, srcPos, dst, dstPos, length) + } + + override def allocate(length: Int): Array[Edge[ED]] = { + new Array[Edge[ED]](length) + } + } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala index b0cb0fe47d..409cf60977 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala @@ -18,12 +18,10 @@ package org.apache.spark.graphx.impl import scala.reflect.ClassTag -import scala.util.Sorting - -import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector} import org.apache.spark.graphx._ import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap +import org.apache.spark.util.collection.{SortDataFormat, Sorter, PrimitiveVector} /** Constructs an EdgePartition from scratch. */ private[graphx] @@ -38,7 +36,8 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla def toEdgePartition: EdgePartition[ED, VD] = { val edgeArray = edges.trim().array - Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering) + new Sorter(Edge.edgeArraySortDataFormat[ED]) + .sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering) val localSrcIds = new Array[Int](edgeArray.size) val localDstIds = new Array[Int](edgeArray.size) val data = new Array[ED](edgeArray.size) @@ -97,7 +96,8 @@ class ExistingEdgePartitionBuilder[ def toEdgePartition: EdgePartition[ED, VD] = { val edgeArray = edges.trim().array - Sorting.quickSort(edgeArray)(EdgeWithLocalIds.lexicographicOrdering) + new Sorter(EdgeWithLocalIds.edgeArraySortDataFormat[ED]) + .sort(edgeArray, 0, edgeArray.length, EdgeWithLocalIds.lexicographicOrdering) val localSrcIds = new Array[Int](edgeArray.size) val localDstIds = new Array[Int](edgeArray.size) val data = new Array[ED](edgeArray.size) @@ -140,4 +140,33 @@ private[impl] object EdgeWithLocalIds { } } + private[graphx] def edgeArraySortDataFormat[ED] + = new SortDataFormat[EdgeWithLocalIds[ED], Array[EdgeWithLocalIds[ED]]] { + override def getKey( + data: Array[EdgeWithLocalIds[ED]], pos: Int): EdgeWithLocalIds[ED] = { + data(pos) + } + + override def swap(data: Array[EdgeWithLocalIds[ED]], pos0: Int, pos1: Int): Unit = { + val tmp = data(pos0) + data(pos0) = data(pos1) + data(pos1) = tmp + } + + override def copyElement( + src: Array[EdgeWithLocalIds[ED]], srcPos: Int, + dst: Array[EdgeWithLocalIds[ED]], dstPos: Int) { + dst(dstPos) = src(srcPos) + } + + override def copyRange( + src: Array[EdgeWithLocalIds[ED]], srcPos: Int, + dst: Array[EdgeWithLocalIds[ED]], dstPos: Int, length: Int) { + System.arraycopy(src, srcPos, dst, dstPos, length) + } + + override def allocate(length: Int): Array[EdgeWithLocalIds[ED]] = { + new Array[EdgeWithLocalIds[ED]](length) + } + } } -- cgit v1.2.3