aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/mllib/tree.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/mllib/tree.py')
-rw-r--r--python/pyspark/mllib/tree.py167
1 files changed, 84 insertions, 83 deletions
diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py
index 5b13ab682b..f59a818a6e 100644
--- a/python/pyspark/mllib/tree.py
+++ b/python/pyspark/mllib/tree.py
@@ -18,13 +18,9 @@
from py4j.java_collections import MapConverter
from pyspark import SparkContext, RDD
-from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, _serialize_double_vector, \
- _deserialize_labeled_point, _get_unmangled_labeled_point_rdd, \
- _deserialize_double
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+from pyspark.mllib.linalg import Vector, _convert_to_vector
from pyspark.mllib.regression import LabeledPoint
-from pyspark.serializers import NoOpSerializer
-
__all__ = ['DecisionTreeModel', 'DecisionTree']
@@ -55,21 +51,24 @@ class DecisionTreeModel(object):
:param x: Data point (feature vector),
or an RDD of data points (feature vectors).
"""
- pythonAPI = self._sc._jvm.PythonMLLibAPI()
+ SerDe = self._sc._jvm.SerDe
+ ser = PickleSerializer()
if isinstance(x, RDD):
# Bulk prediction
- if x.count() == 0:
+ first = x.take(1)
+ if not first:
return self._sc.parallelize([])
- dataBytes = _get_unmangled_double_vector_rdd(x, cache=False)
- jSerializedPreds = \
- pythonAPI.predictDecisionTreeModel(self._java_model,
- dataBytes._jrdd)
- serializedPreds = RDD(jSerializedPreds, self._sc, NoOpSerializer())
- return serializedPreds.map(lambda bytes: _deserialize_double(bytearray(bytes)))
+ if not isinstance(first[0], Vector):
+ x = x.map(_convert_to_vector)
+ jPred = self._java_model.predict(x._to_java_object_rdd()).toJavaRDD()
+ jpyrdd = self._sc._jvm.PythonRDD.javaToPython(jPred)
+ return RDD(jpyrdd, self._sc, BatchedSerializer(ser, 1024))
+
else:
# Assume x is a single data point.
- x_ = _serialize_double_vector(x)
- return pythonAPI.predictDecisionTreeModel(self._java_model, x_)
+ bytes = bytearray(ser.dumps(_convert_to_vector(x)))
+ vec = self._sc._jvm.SerDe.loads(bytes)
+ return self._java_model.predict(vec)
def numNodes(self):
return self._java_model.numNodes()
@@ -77,7 +76,7 @@ class DecisionTreeModel(object):
def depth(self):
return self._java_model.depth()
- def __str__(self):
+ def __repr__(self):
return self._java_model.toString()
@@ -90,53 +89,24 @@ class DecisionTree(object):
EXPERIMENTAL: This is an experimental API.
It will probably be modified for Spark v1.2.
- Example usage:
-
- >>> from numpy import array
- >>> import sys
- >>> from pyspark.mllib.regression import LabeledPoint
- >>> from pyspark.mllib.tree import DecisionTree
- >>> from pyspark.mllib.linalg import SparseVector
- >>>
- >>> data = [
- ... LabeledPoint(0.0, [0.0]),
- ... LabeledPoint(1.0, [1.0]),
- ... LabeledPoint(1.0, [2.0]),
- ... LabeledPoint(1.0, [3.0])
- ... ]
- >>> categoricalFeaturesInfo = {} # no categorical features
- >>> model = DecisionTree.trainClassifier(sc.parallelize(data), numClasses=2,
- ... categoricalFeaturesInfo=categoricalFeaturesInfo)
- >>> sys.stdout.write(model)
- DecisionTreeModel classifier
- If (feature 0 <= 0.5)
- Predict: 0.0
- Else (feature 0 > 0.5)
- Predict: 1.0
- >>> model.predict(array([1.0])) > 0
- True
- >>> model.predict(array([0.0])) == 0
- True
- >>> sparse_data = [
- ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
- ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
- ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
- ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
- ... ]
- >>>
- >>> model = DecisionTree.trainRegressor(sc.parallelize(sparse_data),
- ... categoricalFeaturesInfo=categoricalFeaturesInfo)
- >>> model.predict(array([0.0, 1.0])) == 1
- True
- >>> model.predict(array([0.0, 0.0])) == 0
- True
- >>> model.predict(SparseVector(2, {1: 1.0})) == 1
- True
- >>> model.predict(SparseVector(2, {1: 0.0})) == 0
- True
"""
@staticmethod
+ def _train(data, type, numClasses, categoricalFeaturesInfo,
+ impurity="gini", maxDepth=5, maxBins=32, minInstancesPerNode=1,
+ minInfoGain=0.0):
+ first = data.first()
+ assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint"
+ sc = data.context
+ jrdd = data._to_java_object_rdd()
+ cfiMap = MapConverter().convert(categoricalFeaturesInfo,
+ sc._gateway._gateway_client)
+ model = sc._jvm.PythonMLLibAPI().trainDecisionTreeModel(
+ jrdd, type, numClasses, cfiMap,
+ impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
+ return DecisionTreeModel(sc, model)
+
+ @staticmethod
def trainClassifier(data, numClasses, categoricalFeaturesInfo,
impurity="gini", maxDepth=5, maxBins=32, minInstancesPerNode=1,
minInfoGain=0.0):
@@ -159,18 +129,34 @@ class DecisionTree(object):
the parent split
:param minInfoGain: Min info gain required to create a split
:return: DecisionTreeModel
+
+ Example usage:
+
+ >>> from numpy import array
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> from pyspark.mllib.tree import DecisionTree
+ >>> from pyspark.mllib.linalg import SparseVector
+ >>>
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(1.0, [2.0]),
+ ... LabeledPoint(1.0, [3.0])
+ ... ]
+ >>> model = DecisionTree.trainClassifier(sc.parallelize(data), 2, {})
+ >>> print model, # it already has newline
+ DecisionTreeModel classifier
+ If (feature 0 <= 0.5)
+ Predict: 0.0
+ Else (feature 0 > 0.5)
+ Predict: 1.0
+ >>> model.predict(array([1.0])) > 0
+ True
+ >>> model.predict(array([0.0])) == 0
+ True
"""
- sc = data.context
- dataBytes = _get_unmangled_labeled_point_rdd(data)
- categoricalFeaturesInfoJMap = \
- MapConverter().convert(categoricalFeaturesInfo,
- sc._gateway._gateway_client)
- model = sc._jvm.PythonMLLibAPI().trainDecisionTreeModel(
- dataBytes._jrdd, "classification",
- numClasses, categoricalFeaturesInfoJMap,
- impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
- dataBytes.unpersist()
- return DecisionTreeModel(sc, model)
+ return DecisionTree._train(data, "classification", numClasses, categoricalFeaturesInfo,
+ impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
@staticmethod
def trainRegressor(data, categoricalFeaturesInfo,
@@ -194,18 +180,33 @@ class DecisionTree(object):
the parent split
:param minInfoGain: Min info gain required to create a split
:return: DecisionTreeModel
+
+ Example usage:
+
+ >>> from numpy import array
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> from pyspark.mllib.tree import DecisionTree
+ >>> from pyspark.mllib.linalg import SparseVector
+ >>>
+ >>> sparse_data = [
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
+ ... ]
+ >>>
+ >>> model = DecisionTree.trainRegressor(sc.parallelize(sparse_data), {})
+ >>> model.predict(array([0.0, 1.0])) == 1
+ True
+ >>> model.predict(array([0.0, 0.0])) == 0
+ True
+ >>> model.predict(SparseVector(2, {1: 1.0})) == 1
+ True
+ >>> model.predict(SparseVector(2, {1: 0.0})) == 0
+ True
"""
- sc = data.context
- dataBytes = _get_unmangled_labeled_point_rdd(data)
- categoricalFeaturesInfoJMap = \
- MapConverter().convert(categoricalFeaturesInfo,
- sc._gateway._gateway_client)
- model = sc._jvm.PythonMLLibAPI().trainDecisionTreeModel(
- dataBytes._jrdd, "regression",
- 0, categoricalFeaturesInfoJMap,
- impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
- dataBytes.unpersist()
- return DecisionTreeModel(sc, model)
+ return DecisionTree._train(data, "regression", 0, categoricalFeaturesInfo,
+ impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
def _test():