aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/python
diff options
context:
space:
mode:
authorYanbo Liang <ybliang8@gmail.com>2015-12-11 18:02:24 -0800
committerJoseph K. Bradley <joseph@databricks.com>2015-12-11 18:02:24 -0800
commita0ff6d16ef4bcc1b6ff7282e82a9b345d8449454 (patch)
treed49c5a09b75606047fa890f1222191efa5afba89 /examples/src/main/python
parentaea676ca2d07c72b1a752e9308c961118e5bfc3c (diff)
downloadspark-a0ff6d16ef4bcc1b6ff7282e82a9b345d8449454.tar.gz
spark-a0ff6d16ef4bcc1b6ff7282e82a9b345d8449454.tar.bz2
spark-a0ff6d16ef4bcc1b6ff7282e82a9b345d8449454.zip
[SPARK-11978][ML] Move dataset_example.py to examples/ml and rename to dataframe_example.py
Since ```Dataset``` has a new meaning in Spark 1.6, we should rename it to avoid confusion. #9873 finished the work of Scala example, here we focus on the Python one. Move dataset_example.py to ```examples/ml``` and rename to ```dataframe_example.py```. BTW, fix minor missing issues of #9873. cc mengxr Author: Yanbo Liang <ybliang8@gmail.com> Closes #9957 from yanboliang/SPARK-11978.
Diffstat (limited to 'examples/src/main/python')
-rw-r--r--examples/src/main/python/ml/dataframe_example.py (renamed from examples/src/main/python/mllib/dataset_example.py)56
1 files changed, 34 insertions, 22 deletions
diff --git a/examples/src/main/python/mllib/dataset_example.py b/examples/src/main/python/ml/dataframe_example.py
index e23ecc0c5d..d2644ca335 100644
--- a/examples/src/main/python/mllib/dataset_example.py
+++ b/examples/src/main/python/ml/dataframe_example.py
@@ -16,8 +16,8 @@
#
"""
-An example of how to use DataFrame as a dataset for ML. Run with::
- bin/spark-submit examples/src/main/python/mllib/dataset_example.py
+An example of how to use DataFrame for ML. Run with::
+ bin/spark-submit examples/src/main/python/ml/dataframe_example.py <input>
"""
from __future__ import print_function
@@ -28,36 +28,48 @@ import shutil
from pyspark import SparkContext
from pyspark.sql import SQLContext
-from pyspark.mllib.util import MLUtils
from pyspark.mllib.stat import Statistics
-
-def summarize(dataset):
- print("schema: %s" % dataset.schema().json())
- labels = dataset.map(lambda r: r.label)
- print("label average: %f" % labels.mean())
- features = dataset.map(lambda r: r.features)
- summary = Statistics.colStats(features)
- print("features average: %r" % summary.mean())
-
if __name__ == "__main__":
if len(sys.argv) > 2:
- print("Usage: dataset_example.py <libsvm file>", file=sys.stderr)
+ print("Usage: dataframe_example.py <libsvm file>", file=sys.stderr)
exit(-1)
- sc = SparkContext(appName="DatasetExample")
+ sc = SparkContext(appName="DataFrameExample")
sqlContext = SQLContext(sc)
if len(sys.argv) == 2:
input = sys.argv[1]
else:
input = "data/mllib/sample_libsvm_data.txt"
- points = MLUtils.loadLibSVMFile(sc, input)
- dataset0 = sqlContext.inferSchema(points).setName("dataset0").cache()
- summarize(dataset0)
+
+ # Load input data
+ print("Loading LIBSVM file with UDT from " + input + ".")
+ df = sqlContext.read.format("libsvm").load(input).cache()
+ print("Schema from LIBSVM:")
+ df.printSchema()
+ print("Loaded training data as a DataFrame with " +
+ str(df.count()) + " records.")
+
+ # Show statistical summary of labels.
+ labelSummary = df.describe("label")
+ labelSummary.show()
+
+ # Convert features column to an RDD of vectors.
+ features = df.select("features").map(lambda r: r.features)
+ summary = Statistics.colStats(features)
+ print("Selected features column with average values:\n" +
+ str(summary.mean()))
+
+ # Save the records in a parquet file.
tempdir = tempfile.NamedTemporaryFile(delete=False).name
os.unlink(tempdir)
- print("Save dataset as a Parquet file to %s." % tempdir)
- dataset0.saveAsParquetFile(tempdir)
- print("Load it back and summarize it again.")
- dataset1 = sqlContext.parquetFile(tempdir).setName("dataset1").cache()
- summarize(dataset1)
+ print("Saving to " + tempdir + " as Parquet file.")
+ df.write.parquet(tempdir)
+
+ # Load the records back.
+ print("Loading Parquet file with UDT from " + tempdir)
+ newDF = sqlContext.read.parquet(tempdir)
+ print("Schema from Parquet:")
+ newDF.printSchema()
shutil.rmtree(tempdir)
+
+ sc.stop()