aboutsummaryrefslogtreecommitdiff
path: root/examples/src
diff options
context:
space:
mode:
Diffstat (limited to 'examples/src')
-rwxr-xr-xexamples/src/main/python/mllib/decision_tree_runner.py133
-rwxr-xr-xexamples/src/main/python/mllib/logistic_regression.py4
2 files changed, 136 insertions, 1 deletions
diff --git a/examples/src/main/python/mllib/decision_tree_runner.py b/examples/src/main/python/mllib/decision_tree_runner.py
new file mode 100755
index 0000000000..8efadb5223
--- /dev/null
+++ b/examples/src/main/python/mllib/decision_tree_runner.py
@@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Decision tree classification and regression using MLlib.
+"""
+
+import numpy, os, sys
+
+from operator import add
+
+from pyspark import SparkContext
+from pyspark.mllib.regression import LabeledPoint
+from pyspark.mllib.tree import DecisionTree
+from pyspark.mllib.util import MLUtils
+
+
+def getAccuracy(dtModel, data):
+ """
+ Return accuracy of DecisionTreeModel on the given RDD[LabeledPoint].
+ """
+ seqOp = (lambda acc, x: acc + (x[0] == x[1]))
+ predictions = dtModel.predict(data.map(lambda x: x.features))
+ truth = data.map(lambda p: p.label)
+ trainCorrect = predictions.zip(truth).aggregate(0, seqOp, add)
+ if data.count() == 0:
+ return 0
+ return trainCorrect / (0.0 + data.count())
+
+
+def getMSE(dtModel, data):
+ """
+ Return mean squared error (MSE) of DecisionTreeModel on the given
+ RDD[LabeledPoint].
+ """
+ seqOp = (lambda acc, x: acc + numpy.square(x[0] - x[1]))
+ predictions = dtModel.predict(data.map(lambda x: x.features))
+ truth = data.map(lambda p: p.label)
+ trainMSE = predictions.zip(truth).aggregate(0, seqOp, add)
+ if data.count() == 0:
+ return 0
+ return trainMSE / (0.0 + data.count())
+
+
+def reindexClassLabels(data):
+ """
+ Re-index class labels in a dataset to the range {0,...,numClasses-1}.
+ If all labels in that range already appear at least once,
+ then the returned RDD is the same one (without a mapping).
+ Note: If a label simply does not appear in the data,
+ the index will not include it.
+ Be aware of this when reindexing subsampled data.
+ :param data: RDD of LabeledPoint where labels are integer values
+ denoting labels for a classification problem.
+ :return: Pair (reindexedData, origToNewLabels) where
+ reindexedData is an RDD of LabeledPoint with labels in
+ the range {0,...,numClasses-1}, and
+ origToNewLabels is a dictionary mapping original labels
+ to new labels.
+ """
+ # classCounts: class --> # examples in class
+ classCounts = data.map(lambda x: x.label).countByValue()
+ numExamples = sum(classCounts.values())
+ sortedClasses = sorted(classCounts.keys())
+ numClasses = len(classCounts)
+ # origToNewLabels: class --> index in 0,...,numClasses-1
+ if (numClasses < 2):
+ print >> sys.stderr, \
+ "Dataset for classification should have at least 2 classes." + \
+ " The given dataset had only %d classes." % numClasses
+ exit(1)
+ origToNewLabels = dict([(sortedClasses[i], i) for i in range(0, numClasses)])
+
+ print "numClasses = %d" % numClasses
+ print "Per-class example fractions, counts:"
+ print "Class\tFrac\tCount"
+ for c in sortedClasses:
+ frac = classCounts[c] / (numExamples + 0.0)
+ print "%g\t%g\t%d" % (c, frac, classCounts[c])
+
+ if (sortedClasses[0] == 0 and sortedClasses[-1] == numClasses - 1):
+ return (data, origToNewLabels)
+ else:
+ reindexedData = \
+ data.map(lambda x: LabeledPoint(origToNewLabels[x.label], x.features))
+ return (reindexedData, origToNewLabels)
+
+
+def usage():
+ print >> sys.stderr, \
+ "Usage: decision_tree_runner [libsvm format data filepath]\n" + \
+ " Note: This only supports binary classification."
+ exit(1)
+
+
+if __name__ == "__main__":
+ if len(sys.argv) > 2:
+ usage()
+ sc = SparkContext(appName="PythonDT")
+
+ # Load data.
+ dataPath = 'data/mllib/sample_libsvm_data.txt'
+ if len(sys.argv) == 2:
+ dataPath = sys.argv[1]
+ if not os.path.isfile(dataPath):
+ usage()
+ points = MLUtils.loadLibSVMFile(sc, dataPath)
+
+ # Re-index class labels if needed.
+ (reindexedData, origToNewLabels) = reindexClassLabels(points)
+
+ # Train a classifier.
+ model = DecisionTree.trainClassifier(reindexedData, numClasses=2)
+ # Print learned tree and stats.
+ print "Trained DecisionTree for classification:"
+ print " Model numNodes: %d\n" % model.numNodes()
+ print " Model depth: %d\n" % model.depth()
+ print " Training accuracy: %g\n" % getAccuracy(model, reindexedData)
+ print model
diff --git a/examples/src/main/python/mllib/logistic_regression.py b/examples/src/main/python/mllib/logistic_regression.py
index 6e0f7a4ee5..9d547ff77c 100755
--- a/examples/src/main/python/mllib/logistic_regression.py
+++ b/examples/src/main/python/mllib/logistic_regression.py
@@ -30,8 +30,10 @@ from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
-# Parse a line of text into an MLlib LabeledPoint object
def parsePoint(line):
+ """
+ Parse a line of text into an MLlib LabeledPoint object.
+ """
values = [float(s) for s in line.split(' ')]
if values[0] == -1: # Convert -1 labels to 0 for MLlib
values[0] = 0