diff options
author | Davies Liu <davies@databricks.com> | 2014-11-24 16:37:14 -0800 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2014-11-24 16:37:14 -0800 |
commit | b660de7a9cbdea3df4a37fbcf60c1c33c71782b8 (patch) | |
tree | 34bec7b4e6789d63846f7519e1c9d51351033b61 | |
parent | cb0e9b0980f38befe88bf52aa037fe33262730f7 (diff) | |
download | spark-b660de7a9cbdea3df4a37fbcf60c1c33c71782b8.tar.gz spark-b660de7a9cbdea3df4a37fbcf60c1c33c71782b8.tar.bz2 spark-b660de7a9cbdea3df4a37fbcf60c1c33c71782b8.zip |
[SPARK-4562] [MLlib] speedup vector
This PR change the underline array of DenseVector to numpy.ndarray to avoid the conversion, because most of the users will using numpy.array.
It also improve the serialization of DenseVector.
Before this change:
trial | trainingTime | testTime
-------|--------|--------
0 | 5.126 | 1.786
1 |2.698 |1.693
After the change:
trial | trainingTime | testTime
-------|--------|--------
0 |4.692 |0.554
1 |2.307 |0.525
This could partially fix the performance regression during test.
Author: Davies Liu <davies@databricks.com>
Closes #3420 from davies/ser2 and squashes the following commits:
0e1e6f3 [Davies Liu] fix tests
426f5db [Davies Liu] impove toArray()
44707ec [Davies Liu] add name for ISO-8859-1
fa7d791 [Davies Liu] address comments
1cfb137 [Davies Liu] handle zero sparse vector
2548ee2 [Davies Liu] fix tests
9e6389d [Davies Liu] bugfix
470f702 [Davies Liu] speed up DenseMatrix
f0d3c40 [Davies Liu] speedup SparseVector
ef6ce70 [Davies Liu] speed up dense vector
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 73 | ||||
-rw-r--r-- | python/pyspark/mllib/linalg.py | 73 | ||||
-rw-r--r-- | python/pyspark/mllib/tests.py | 6 |
3 files changed, 118 insertions, 34 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index f04df1c156..9f20cd5d00 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -18,6 +18,7 @@ package org.apache.spark.mllib.api.python import java.io.OutputStream +import java.nio.{ByteBuffer, ByteOrder} import java.util.{ArrayList => JArrayList, List => JList, Map => JMap} import scala.collection.JavaConverters._ @@ -684,6 +685,7 @@ class PythonMLLibAPI extends Serializable { private[spark] object SerDe extends Serializable { val PYSPARK_PACKAGE = "pyspark.mllib" + val LATIN1 = "ISO-8859-1" /** * Base class used for pickle @@ -735,7 +737,16 @@ private[spark] object SerDe extends Serializable { def saveState(obj: Object, out: OutputStream, pickler: Pickler) = { val vector: DenseVector = obj.asInstanceOf[DenseVector] - saveObjects(out, pickler, vector.toArray) + val bytes = new Array[Byte](8 * vector.size) + val bb = ByteBuffer.wrap(bytes) + bb.order(ByteOrder.nativeOrder()) + val db = bb.asDoubleBuffer() + db.put(vector.values) + + out.write(Opcodes.BINSTRING) + out.write(PickleUtils.integer_to_bytes(bytes.length)) + out.write(bytes) + out.write(Opcodes.TUPLE1) } def construct(args: Array[Object]): Object = { @@ -743,7 +754,13 @@ private[spark] object SerDe extends Serializable { if (args.length != 1) { throw new PickleException("should be 1") } - new DenseVector(args(0).asInstanceOf[Array[Double]]) + val bytes = args(0).asInstanceOf[String].getBytes(LATIN1) + val bb = ByteBuffer.wrap(bytes, 0, bytes.length) + bb.order(ByteOrder.nativeOrder()) + val db = bb.asDoubleBuffer() + val ans = new Array[Double](bytes.length / 8) + db.get(ans) + Vectors.dense(ans) } } @@ -752,15 +769,30 @@ private[spark] object SerDe extends Serializable { def saveState(obj: Object, out: OutputStream, pickler: Pickler) = { val m: DenseMatrix = obj.asInstanceOf[DenseMatrix] - saveObjects(out, pickler, m.numRows, m.numCols, m.values) + val bytes = new Array[Byte](8 * m.values.size) + val order = ByteOrder.nativeOrder() + ByteBuffer.wrap(bytes).order(order).asDoubleBuffer().put(m.values) + + out.write(Opcodes.BININT) + out.write(PickleUtils.integer_to_bytes(m.numRows)) + out.write(Opcodes.BININT) + out.write(PickleUtils.integer_to_bytes(m.numCols)) + out.write(Opcodes.BINSTRING) + out.write(PickleUtils.integer_to_bytes(bytes.length)) + out.write(bytes) + out.write(Opcodes.TUPLE3) } def construct(args: Array[Object]): Object = { if (args.length != 3) { throw new PickleException("should be 3") } - new DenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int], - args(2).asInstanceOf[Array[Double]]) + val bytes = args(2).asInstanceOf[String].getBytes(LATIN1) + val n = bytes.length / 8 + val values = new Array[Double](n) + val order = ByteOrder.nativeOrder() + ByteBuffer.wrap(bytes).order(order).asDoubleBuffer().get(values) + new DenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int], values) } } @@ -769,15 +801,40 @@ private[spark] object SerDe extends Serializable { def saveState(obj: Object, out: OutputStream, pickler: Pickler) = { val v: SparseVector = obj.asInstanceOf[SparseVector] - saveObjects(out, pickler, v.size, v.indices, v.values) + val n = v.indices.size + val indiceBytes = new Array[Byte](4 * n) + val order = ByteOrder.nativeOrder() + ByteBuffer.wrap(indiceBytes).order(order).asIntBuffer().put(v.indices) + val valueBytes = new Array[Byte](8 * n) + ByteBuffer.wrap(valueBytes).order(order).asDoubleBuffer().put(v.values) + + out.write(Opcodes.BININT) + out.write(PickleUtils.integer_to_bytes(v.size)) + out.write(Opcodes.BINSTRING) + out.write(PickleUtils.integer_to_bytes(indiceBytes.length)) + out.write(indiceBytes) + out.write(Opcodes.BINSTRING) + out.write(PickleUtils.integer_to_bytes(valueBytes.length)) + out.write(valueBytes) + out.write(Opcodes.TUPLE3) } def construct(args: Array[Object]): Object = { if (args.length != 3) { throw new PickleException("should be 3") } - new SparseVector(args(0).asInstanceOf[Int], args(1).asInstanceOf[Array[Int]], - args(2).asInstanceOf[Array[Double]]) + val size = args(0).asInstanceOf[Int] + val indiceBytes = args(1).asInstanceOf[String].getBytes(LATIN1) + val valueBytes = args(2).asInstanceOf[String].getBytes(LATIN1) + val n = indiceBytes.length / 4 + val indices = new Array[Int](n) + val values = new Array[Double](n) + if (n > 0) { + val order = ByteOrder.nativeOrder() + ByteBuffer.wrap(indiceBytes).order(order).asIntBuffer().get(indices) + ByteBuffer.wrap(valueBytes).order(order).asDoubleBuffer().get(values) + } + new SparseVector(size, indices, values) } } diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py index 537b176578..f7aa2b0cb0 100644 --- a/python/pyspark/mllib/linalg.py +++ b/python/pyspark/mllib/linalg.py @@ -30,7 +30,7 @@ import copy_reg import numpy as np from pyspark.sql import UserDefinedType, StructField, StructType, ArrayType, DoubleType, \ - IntegerType, ByteType, Row + IntegerType, ByteType __all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors', 'DenseMatrix', 'Matrices'] @@ -173,12 +173,16 @@ class DenseVector(Vector): A dense vector represented by a value array. """ def __init__(self, ar): - if not isinstance(ar, array.array): - ar = array.array('d', ar) + if isinstance(ar, basestring): + ar = np.frombuffer(ar, dtype=np.float64) + elif not isinstance(ar, np.ndarray): + ar = np.array(ar, dtype=np.float64) + if ar.dtype != np.float64: + ar.astype(np.float64) self.array = ar def __reduce__(self): - return DenseVector, (self.array,) + return DenseVector, (self.array.tostring(),) def dot(self, other): """ @@ -207,9 +211,10 @@ class DenseVector(Vector): ... AssertionError: dimension mismatch """ - if type(other) == np.ndarray and other.ndim > 1: - assert len(self) == other.shape[0], "dimension mismatch" - return np.dot(self.toArray(), other) + if type(other) == np.ndarray: + if other.ndim > 1: + assert len(self) == other.shape[0], "dimension mismatch" + return np.dot(self.array, other) elif _have_scipy and scipy.sparse.issparse(other): assert len(self) == other.shape[0], "dimension mismatch" return other.transpose().dot(self.toArray()) @@ -261,7 +266,7 @@ class DenseVector(Vector): return np.dot(diff, diff) def toArray(self): - return np.array(self.array) + return self.array def __getitem__(self, item): return self.array[item] @@ -276,7 +281,7 @@ class DenseVector(Vector): return "DenseVector([%s])" % (', '.join(_format_float(i) for i in self.array)) def __eq__(self, other): - return isinstance(other, DenseVector) and self.array == other.array + return isinstance(other, DenseVector) and np.array_equal(self.array, other.array) def __ne__(self, other): return not self == other @@ -314,18 +319,28 @@ class SparseVector(Vector): if type(pairs) == dict: pairs = pairs.items() pairs = sorted(pairs) - self.indices = array.array('i', [p[0] for p in pairs]) - self.values = array.array('d', [p[1] for p in pairs]) + self.indices = np.array([p[0] for p in pairs], dtype=np.int32) + self.values = np.array([p[1] for p in pairs], dtype=np.float64) else: - assert len(args[0]) == len(args[1]), "index and value arrays not same length" - self.indices = array.array('i', args[0]) - self.values = array.array('d', args[1]) + if isinstance(args[0], basestring): + assert isinstance(args[1], str), "values should be string too" + if args[0]: + self.indices = np.frombuffer(args[0], np.int32) + self.values = np.frombuffer(args[1], np.float64) + else: + # np.frombuffer() doesn't work well with empty string in older version + self.indices = np.array([], dtype=np.int32) + self.values = np.array([], dtype=np.float64) + else: + self.indices = np.array(args[0], dtype=np.int32) + self.values = np.array(args[1], dtype=np.float64) + assert len(self.indices) == len(self.values), "index and value arrays not same length" for i in xrange(len(self.indices) - 1): if self.indices[i] >= self.indices[i + 1]: raise TypeError("indices array must be sorted") def __reduce__(self): - return (SparseVector, (self.size, self.indices, self.values)) + return (SparseVector, (self.size, self.indices.tostring(), self.values.tostring())) def dot(self, other): """ @@ -461,8 +476,7 @@ class SparseVector(Vector): Returns a copy of this SparseVector as a 1-dimensional NumPy array. """ arr = np.zeros((self.size,), dtype=np.float64) - for i in xrange(len(self.indices)): - arr[self.indices[i]] = self.values[i] + arr[self.indices] = self.values return arr def __len__(self): @@ -493,8 +507,8 @@ class SparseVector(Vector): """ return (isinstance(other, self.__class__) and other.size == self.size - and other.indices == self.indices - and other.values == self.values) + and np.array_equal(other.indices, self.indices) + and np.array_equal(other.values, self.values)) def __ne__(self, other): return not self.__eq__(other) @@ -577,25 +591,34 @@ class DenseMatrix(Matrix): """ def __init__(self, numRows, numCols, values): Matrix.__init__(self, numRows, numCols) + if isinstance(values, basestring): + values = np.frombuffer(values, dtype=np.float64) + elif not isinstance(values, np.ndarray): + values = np.array(values, dtype=np.float64) assert len(values) == numRows * numCols - if not isinstance(values, array.array): - values = array.array('d', values) + if values.dtype != np.float64: + values.astype(np.float64) self.values = values def __reduce__(self): - return DenseMatrix, (self.numRows, self.numCols, self.values) + return DenseMatrix, (self.numRows, self.numCols, self.values.tostring()) def toArray(self): """ Return an numpy.ndarray - >>> arr = array.array('d', [float(i) for i in range(4)]) - >>> m = DenseMatrix(2, 2, arr) + >>> m = DenseMatrix(2, 2, range(4)) >>> m.toArray() array([[ 0., 2.], [ 1., 3.]]) """ - return np.reshape(self.values, (self.numRows, self.numCols), order='F') + return self.values.reshape((self.numRows, self.numCols), order='F') + + def __eq__(self, other): + return (isinstance(other, DenseMatrix) and + self.numRows == other.numRows and + self.numCols == other.numCols and + all(self.values == other.values)) class Matrices(object): diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 9fa4d6f6a2..8332f8e061 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -33,7 +33,8 @@ if sys.version_info[:2] <= (2, 6): else: import unittest -from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector +from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\ + DenseMatrix from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.random import RandomRDDs from pyspark.mllib.stat import Statistics @@ -62,6 +63,7 @@ def _squared_distance(a, b): class VectorTests(PySparkTestCase): def _test_serialize(self, v): + self.assertEqual(v, ser.loads(ser.dumps(v))) jvec = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(v))) nv = ser.loads(str(self.sc._jvm.SerDe.dumps(jvec))) self.assertEqual(v, nv) @@ -75,6 +77,8 @@ class VectorTests(PySparkTestCase): self._test_serialize(DenseVector(array([1., 2., 3., 4.]))) self._test_serialize(DenseVector(pyarray.array('d', range(10)))) self._test_serialize(SparseVector(4, {1: 1, 3: 2})) + self._test_serialize(SparseVector(3, {})) + self._test_serialize(DenseMatrix(2, 3, range(6))) def test_dot(self): sv = SparseVector(4, {1: 1, 3: 2}) |