aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/mllib/tests.py43
-rw-r--r--python/pyspark/mllib/util.py22
2 files changed, 65 insertions, 0 deletions
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index f0091d6fac..49ce125de7 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -54,6 +54,7 @@ from pyspark.mllib.feature import Word2Vec
from pyspark.mllib.feature import IDF
from pyspark.mllib.feature import StandardScaler, ElementwiseProduct
from pyspark.mllib.util import LinearDataGenerator
+from pyspark.mllib.util import MLUtils
from pyspark.serializers import PickleSerializer
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
@@ -1290,6 +1291,48 @@ class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):
self.assertTrue(mean_absolute_errors[1] - mean_absolute_errors[-1] > 2)
+class MLUtilsTests(MLlibTestCase):
+ def test_append_bias(self):
+ data = [2.0, 2.0, 2.0]
+ ret = MLUtils.appendBias(data)
+ self.assertEqual(ret[3], 1.0)
+ self.assertEqual(type(ret), DenseVector)
+
+ def test_append_bias_with_vector(self):
+ data = Vectors.dense([2.0, 2.0, 2.0])
+ ret = MLUtils.appendBias(data)
+ self.assertEqual(ret[3], 1.0)
+ self.assertEqual(type(ret), DenseVector)
+
+ def test_append_bias_with_sp_vector(self):
+ data = Vectors.sparse(3, {0: 2.0, 2: 2.0})
+ expected = Vectors.sparse(4, {0: 2.0, 2: 2.0, 3: 1.0})
+ # Returned value must be SparseVector
+ ret = MLUtils.appendBias(data)
+ self.assertEqual(ret, expected)
+ self.assertEqual(type(ret), SparseVector)
+
+ def test_load_vectors(self):
+ import shutil
+ data = [
+ [1.0, 2.0, 3.0],
+ [1.0, 2.0, 3.0]
+ ]
+ temp_dir = tempfile.mkdtemp()
+ load_vectors_path = os.path.join(temp_dir, "test_load_vectors")
+ try:
+ self.sc.parallelize(data).saveAsTextFile(load_vectors_path)
+ ret_rdd = MLUtils.loadVectors(self.sc, load_vectors_path)
+ ret = ret_rdd.collect()
+ self.assertEqual(len(ret), 2)
+ self.assertEqual(ret[0], DenseVector([1.0, 2.0, 3.0]))
+ self.assertEqual(ret[1], DenseVector([1.0, 2.0, 3.0]))
+ except:
+ self.fail()
+ finally:
+ shutil.rmtree(load_vectors_path)
+
+
if __name__ == "__main__":
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 348238319e..875d3b2d64 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -169,6 +169,28 @@ class MLUtils(object):
minPartitions = minPartitions or min(sc.defaultParallelism, 2)
return callMLlibFunc("loadLabeledPoints", sc, path, minPartitions)
+ @staticmethod
+ def appendBias(data):
+ """
+ Returns a new vector with `1.0` (bias) appended to
+ the end of the input vector.
+ """
+ vec = _convert_to_vector(data)
+ if isinstance(vec, SparseVector):
+ newIndices = np.append(vec.indices, len(vec))
+ newValues = np.append(vec.values, 1.0)
+ return SparseVector(len(vec) + 1, newIndices, newValues)
+ else:
+ return _convert_to_vector(np.append(vec.toArray(), 1.0))
+
+ @staticmethod
+ def loadVectors(sc, path):
+ """
+ Loads vectors saved using `RDD[Vector].saveAsTextFile`
+ with the default number of partitions.
+ """
+ return callMLlibFunc("loadVectors", sc, path)
+
class Saveable(object):
"""