aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala73
-rw-r--r--python/pyspark/mllib/linalg.py73
-rw-r--r--python/pyspark/mllib/tests.py6
3 files changed, 118 insertions, 34 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index f04df1c156..9f20cd5d00 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -18,6 +18,7 @@
package org.apache.spark.mllib.api.python
import java.io.OutputStream
+import java.nio.{ByteBuffer, ByteOrder}
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
import scala.collection.JavaConverters._
@@ -684,6 +685,7 @@ class PythonMLLibAPI extends Serializable {
private[spark] object SerDe extends Serializable {
val PYSPARK_PACKAGE = "pyspark.mllib"
+ val LATIN1 = "ISO-8859-1"
/**
* Base class used for pickle
@@ -735,7 +737,16 @@ private[spark] object SerDe extends Serializable {
def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
val vector: DenseVector = obj.asInstanceOf[DenseVector]
- saveObjects(out, pickler, vector.toArray)
+ val bytes = new Array[Byte](8 * vector.size)
+ val bb = ByteBuffer.wrap(bytes)
+ bb.order(ByteOrder.nativeOrder())
+ val db = bb.asDoubleBuffer()
+ db.put(vector.values)
+
+ out.write(Opcodes.BINSTRING)
+ out.write(PickleUtils.integer_to_bytes(bytes.length))
+ out.write(bytes)
+ out.write(Opcodes.TUPLE1)
}
def construct(args: Array[Object]): Object = {
@@ -743,7 +754,13 @@ private[spark] object SerDe extends Serializable {
if (args.length != 1) {
throw new PickleException("should be 1")
}
- new DenseVector(args(0).asInstanceOf[Array[Double]])
+ val bytes = args(0).asInstanceOf[String].getBytes(LATIN1)
+ val bb = ByteBuffer.wrap(bytes, 0, bytes.length)
+ bb.order(ByteOrder.nativeOrder())
+ val db = bb.asDoubleBuffer()
+ val ans = new Array[Double](bytes.length / 8)
+ db.get(ans)
+ Vectors.dense(ans)
}
}
@@ -752,15 +769,30 @@ private[spark] object SerDe extends Serializable {
def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
val m: DenseMatrix = obj.asInstanceOf[DenseMatrix]
- saveObjects(out, pickler, m.numRows, m.numCols, m.values)
+ val bytes = new Array[Byte](8 * m.values.size)
+ val order = ByteOrder.nativeOrder()
+ ByteBuffer.wrap(bytes).order(order).asDoubleBuffer().put(m.values)
+
+ out.write(Opcodes.BININT)
+ out.write(PickleUtils.integer_to_bytes(m.numRows))
+ out.write(Opcodes.BININT)
+ out.write(PickleUtils.integer_to_bytes(m.numCols))
+ out.write(Opcodes.BINSTRING)
+ out.write(PickleUtils.integer_to_bytes(bytes.length))
+ out.write(bytes)
+ out.write(Opcodes.TUPLE3)
}
def construct(args: Array[Object]): Object = {
if (args.length != 3) {
throw new PickleException("should be 3")
}
- new DenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
- args(2).asInstanceOf[Array[Double]])
+ val bytes = args(2).asInstanceOf[String].getBytes(LATIN1)
+ val n = bytes.length / 8
+ val values = new Array[Double](n)
+ val order = ByteOrder.nativeOrder()
+ ByteBuffer.wrap(bytes).order(order).asDoubleBuffer().get(values)
+ new DenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int], values)
}
}
@@ -769,15 +801,40 @@ private[spark] object SerDe extends Serializable {
def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
val v: SparseVector = obj.asInstanceOf[SparseVector]
- saveObjects(out, pickler, v.size, v.indices, v.values)
+ val n = v.indices.size
+ val indiceBytes = new Array[Byte](4 * n)
+ val order = ByteOrder.nativeOrder()
+ ByteBuffer.wrap(indiceBytes).order(order).asIntBuffer().put(v.indices)
+ val valueBytes = new Array[Byte](8 * n)
+ ByteBuffer.wrap(valueBytes).order(order).asDoubleBuffer().put(v.values)
+
+ out.write(Opcodes.BININT)
+ out.write(PickleUtils.integer_to_bytes(v.size))
+ out.write(Opcodes.BINSTRING)
+ out.write(PickleUtils.integer_to_bytes(indiceBytes.length))
+ out.write(indiceBytes)
+ out.write(Opcodes.BINSTRING)
+ out.write(PickleUtils.integer_to_bytes(valueBytes.length))
+ out.write(valueBytes)
+ out.write(Opcodes.TUPLE3)
}
def construct(args: Array[Object]): Object = {
if (args.length != 3) {
throw new PickleException("should be 3")
}
- new SparseVector(args(0).asInstanceOf[Int], args(1).asInstanceOf[Array[Int]],
- args(2).asInstanceOf[Array[Double]])
+ val size = args(0).asInstanceOf[Int]
+ val indiceBytes = args(1).asInstanceOf[String].getBytes(LATIN1)
+ val valueBytes = args(2).asInstanceOf[String].getBytes(LATIN1)
+ val n = indiceBytes.length / 4
+ val indices = new Array[Int](n)
+ val values = new Array[Double](n)
+ if (n > 0) {
+ val order = ByteOrder.nativeOrder()
+ ByteBuffer.wrap(indiceBytes).order(order).asIntBuffer().get(indices)
+ ByteBuffer.wrap(valueBytes).order(order).asDoubleBuffer().get(values)
+ }
+ new SparseVector(size, indices, values)
}
}
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index 537b176578..f7aa2b0cb0 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -30,7 +30,7 @@ import copy_reg
import numpy as np
from pyspark.sql import UserDefinedType, StructField, StructType, ArrayType, DoubleType, \
- IntegerType, ByteType, Row
+ IntegerType, ByteType
__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors', 'DenseMatrix', 'Matrices']
@@ -173,12 +173,16 @@ class DenseVector(Vector):
A dense vector represented by a value array.
"""
def __init__(self, ar):
- if not isinstance(ar, array.array):
- ar = array.array('d', ar)
+ if isinstance(ar, basestring):
+ ar = np.frombuffer(ar, dtype=np.float64)
+ elif not isinstance(ar, np.ndarray):
+ ar = np.array(ar, dtype=np.float64)
+ if ar.dtype != np.float64:
+ ar.astype(np.float64)
self.array = ar
def __reduce__(self):
- return DenseVector, (self.array,)
+ return DenseVector, (self.array.tostring(),)
def dot(self, other):
"""
@@ -207,9 +211,10 @@ class DenseVector(Vector):
...
AssertionError: dimension mismatch
"""
- if type(other) == np.ndarray and other.ndim > 1:
- assert len(self) == other.shape[0], "dimension mismatch"
- return np.dot(self.toArray(), other)
+ if type(other) == np.ndarray:
+ if other.ndim > 1:
+ assert len(self) == other.shape[0], "dimension mismatch"
+ return np.dot(self.array, other)
elif _have_scipy and scipy.sparse.issparse(other):
assert len(self) == other.shape[0], "dimension mismatch"
return other.transpose().dot(self.toArray())
@@ -261,7 +266,7 @@ class DenseVector(Vector):
return np.dot(diff, diff)
def toArray(self):
- return np.array(self.array)
+ return self.array
def __getitem__(self, item):
return self.array[item]
@@ -276,7 +281,7 @@ class DenseVector(Vector):
return "DenseVector([%s])" % (', '.join(_format_float(i) for i in self.array))
def __eq__(self, other):
- return isinstance(other, DenseVector) and self.array == other.array
+ return isinstance(other, DenseVector) and np.array_equal(self.array, other.array)
def __ne__(self, other):
return not self == other
@@ -314,18 +319,28 @@ class SparseVector(Vector):
if type(pairs) == dict:
pairs = pairs.items()
pairs = sorted(pairs)
- self.indices = array.array('i', [p[0] for p in pairs])
- self.values = array.array('d', [p[1] for p in pairs])
+ self.indices = np.array([p[0] for p in pairs], dtype=np.int32)
+ self.values = np.array([p[1] for p in pairs], dtype=np.float64)
else:
- assert len(args[0]) == len(args[1]), "index and value arrays not same length"
- self.indices = array.array('i', args[0])
- self.values = array.array('d', args[1])
+ if isinstance(args[0], basestring):
+ assert isinstance(args[1], str), "values should be string too"
+ if args[0]:
+ self.indices = np.frombuffer(args[0], np.int32)
+ self.values = np.frombuffer(args[1], np.float64)
+ else:
+ # np.frombuffer() doesn't work well with empty string in older version
+ self.indices = np.array([], dtype=np.int32)
+ self.values = np.array([], dtype=np.float64)
+ else:
+ self.indices = np.array(args[0], dtype=np.int32)
+ self.values = np.array(args[1], dtype=np.float64)
+ assert len(self.indices) == len(self.values), "index and value arrays not same length"
for i in xrange(len(self.indices) - 1):
if self.indices[i] >= self.indices[i + 1]:
raise TypeError("indices array must be sorted")
def __reduce__(self):
- return (SparseVector, (self.size, self.indices, self.values))
+ return (SparseVector, (self.size, self.indices.tostring(), self.values.tostring()))
def dot(self, other):
"""
@@ -461,8 +476,7 @@ class SparseVector(Vector):
Returns a copy of this SparseVector as a 1-dimensional NumPy array.
"""
arr = np.zeros((self.size,), dtype=np.float64)
- for i in xrange(len(self.indices)):
- arr[self.indices[i]] = self.values[i]
+ arr[self.indices] = self.values
return arr
def __len__(self):
@@ -493,8 +507,8 @@ class SparseVector(Vector):
"""
return (isinstance(other, self.__class__)
and other.size == self.size
- and other.indices == self.indices
- and other.values == self.values)
+ and np.array_equal(other.indices, self.indices)
+ and np.array_equal(other.values, self.values))
def __ne__(self, other):
return not self.__eq__(other)
@@ -577,25 +591,34 @@ class DenseMatrix(Matrix):
"""
def __init__(self, numRows, numCols, values):
Matrix.__init__(self, numRows, numCols)
+ if isinstance(values, basestring):
+ values = np.frombuffer(values, dtype=np.float64)
+ elif not isinstance(values, np.ndarray):
+ values = np.array(values, dtype=np.float64)
assert len(values) == numRows * numCols
- if not isinstance(values, array.array):
- values = array.array('d', values)
+ if values.dtype != np.float64:
+ values.astype(np.float64)
self.values = values
def __reduce__(self):
- return DenseMatrix, (self.numRows, self.numCols, self.values)
+ return DenseMatrix, (self.numRows, self.numCols, self.values.tostring())
def toArray(self):
"""
Return an numpy.ndarray
- >>> arr = array.array('d', [float(i) for i in range(4)])
- >>> m = DenseMatrix(2, 2, arr)
+ >>> m = DenseMatrix(2, 2, range(4))
>>> m.toArray()
array([[ 0., 2.],
[ 1., 3.]])
"""
- return np.reshape(self.values, (self.numRows, self.numCols), order='F')
+ return self.values.reshape((self.numRows, self.numCols), order='F')
+
+ def __eq__(self, other):
+ return (isinstance(other, DenseMatrix) and
+ self.numRows == other.numRows and
+ self.numCols == other.numCols and
+ all(self.values == other.values))
class Matrices(object):
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 9fa4d6f6a2..8332f8e061 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -33,7 +33,8 @@ if sys.version_info[:2] <= (2, 6):
else:
import unittest
-from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector
+from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
+ DenseMatrix
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.random import RandomRDDs
from pyspark.mllib.stat import Statistics
@@ -62,6 +63,7 @@ def _squared_distance(a, b):
class VectorTests(PySparkTestCase):
def _test_serialize(self, v):
+ self.assertEqual(v, ser.loads(ser.dumps(v)))
jvec = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(v)))
nv = ser.loads(str(self.sc._jvm.SerDe.dumps(jvec)))
self.assertEqual(v, nv)
@@ -75,6 +77,8 @@ class VectorTests(PySparkTestCase):
self._test_serialize(DenseVector(array([1., 2., 3., 4.])))
self._test_serialize(DenseVector(pyarray.array('d', range(10))))
self._test_serialize(SparseVector(4, {1: 1, 3: 2}))
+ self._test_serialize(SparseVector(3, {}))
+ self._test_serialize(DenseMatrix(2, 3, range(6)))
def test_dot(self):
sv = SparseVector(4, {1: 1, 3: 2})