aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala9
-rw-r--r--python/pyspark/mllib/tests.py43
-rw-r--r--python/pyspark/mllib/util.py22
3 files changed, 74 insertions, 0 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index a66a404d5c..458fab48fe 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -75,6 +75,15 @@ private[python] class PythonMLLibAPI extends Serializable {
minPartitions: Int): JavaRDD[LabeledPoint] =
MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions)
+ /**
+ * Loads and serializes vectors saved with `RDD#saveAsTextFile`.
+ * @param jsc Java SparkContext
+ * @param path file or directory path in any Hadoop-supported file system URI
+ * @return serialized vectors in a RDD
+ */
+ def loadVectors(jsc: JavaSparkContext, path: String): RDD[Vector] =
+ MLUtils.loadVectors(jsc.sc, path)
+
private def trainRegressionModel(
learner: GeneralizedLinearAlgorithm[_ <: GeneralizedLinearModel],
data: JavaRDD[LabeledPoint],
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index f0091d6fac..49ce125de7 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -54,6 +54,7 @@ from pyspark.mllib.feature import Word2Vec
from pyspark.mllib.feature import IDF
from pyspark.mllib.feature import StandardScaler, ElementwiseProduct
from pyspark.mllib.util import LinearDataGenerator
+from pyspark.mllib.util import MLUtils
from pyspark.serializers import PickleSerializer
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
@@ -1290,6 +1291,48 @@ class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):
self.assertTrue(mean_absolute_errors[1] - mean_absolute_errors[-1] > 2)
+class MLUtilsTests(MLlibTestCase):
+ def test_append_bias(self):
+ data = [2.0, 2.0, 2.0]
+ ret = MLUtils.appendBias(data)
+ self.assertEqual(ret[3], 1.0)
+ self.assertEqual(type(ret), DenseVector)
+
+ def test_append_bias_with_vector(self):
+ data = Vectors.dense([2.0, 2.0, 2.0])
+ ret = MLUtils.appendBias(data)
+ self.assertEqual(ret[3], 1.0)
+ self.assertEqual(type(ret), DenseVector)
+
+ def test_append_bias_with_sp_vector(self):
+ data = Vectors.sparse(3, {0: 2.0, 2: 2.0})
+ expected = Vectors.sparse(4, {0: 2.0, 2: 2.0, 3: 1.0})
+ # Returned value must be SparseVector
+ ret = MLUtils.appendBias(data)
+ self.assertEqual(ret, expected)
+ self.assertEqual(type(ret), SparseVector)
+
+ def test_load_vectors(self):
+ import shutil
+ data = [
+ [1.0, 2.0, 3.0],
+ [1.0, 2.0, 3.0]
+ ]
+ temp_dir = tempfile.mkdtemp()
+ load_vectors_path = os.path.join(temp_dir, "test_load_vectors")
+ try:
+ self.sc.parallelize(data).saveAsTextFile(load_vectors_path)
+ ret_rdd = MLUtils.loadVectors(self.sc, load_vectors_path)
+ ret = ret_rdd.collect()
+ self.assertEqual(len(ret), 2)
+ self.assertEqual(ret[0], DenseVector([1.0, 2.0, 3.0]))
+ self.assertEqual(ret[1], DenseVector([1.0, 2.0, 3.0]))
+ except:
+ self.fail()
+ finally:
+ shutil.rmtree(load_vectors_path)
+
+
if __name__ == "__main__":
if not _have_scipy:
print("NOTE: Skipping SciPy tests as it does not seem to be installed")
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 348238319e..875d3b2d64 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -169,6 +169,28 @@ class MLUtils(object):
minPartitions = minPartitions or min(sc.defaultParallelism, 2)
return callMLlibFunc("loadLabeledPoints", sc, path, minPartitions)
+ @staticmethod
+ def appendBias(data):
+ """
+ Returns a new vector with `1.0` (bias) appended to
+ the end of the input vector.
+ """
+ vec = _convert_to_vector(data)
+ if isinstance(vec, SparseVector):
+ newIndices = np.append(vec.indices, len(vec))
+ newValues = np.append(vec.values, 1.0)
+ return SparseVector(len(vec) + 1, newIndices, newValues)
+ else:
+ return _convert_to_vector(np.append(vec.toArray(), 1.0))
+
+ @staticmethod
+ def loadVectors(sc, path):
+ """
+ Loads vectors saved using `RDD[Vector].saveAsTextFile`
+ with the default number of partitions.
+ """
+ return callMLlibFunc("loadVectors", sc, path)
+
class Saveable(object):
"""