diff options
Diffstat (limited to 'examples/src/main/python/ml/simple_text_classification_pipeline.py')
-rw-r--r-- | examples/src/main/python/ml/simple_text_classification_pipeline.py | 44 |
1 files changed, 17 insertions, 27 deletions
diff --git a/examples/src/main/python/ml/simple_text_classification_pipeline.py b/examples/src/main/python/ml/simple_text_classification_pipeline.py index c7df3d7b74..b4d9355b68 100644 --- a/examples/src/main/python/ml/simple_text_classification_pipeline.py +++ b/examples/src/main/python/ml/simple_text_classification_pipeline.py @@ -36,43 +36,33 @@ if __name__ == "__main__": sqlCtx = SQLContext(sc) # Prepare training documents, which are labeled. - LabeledDocument = Row('id', 'text', 'label') - training = sqlCtx.inferSchema( - sc.parallelize([(0L, "a b c d e spark", 1.0), - (1L, "b d", 0.0), - (2L, "spark f g h", 1.0), - (3L, "hadoop mapreduce", 0.0)]) - .map(lambda x: LabeledDocument(*x))) + LabeledDocument = Row("id", "text", "label") + training = sc.parallelize([(0L, "a b c d e spark", 1.0), + (1L, "b d", 0.0), + (2L, "spark f g h", 1.0), + (3L, "hadoop mapreduce", 0.0)]) \ + .map(lambda x: LabeledDocument(*x)).toDF() # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr. - tokenizer = Tokenizer() \ - .setInputCol("text") \ - .setOutputCol("words") - hashingTF = HashingTF() \ - .setInputCol(tokenizer.getOutputCol()) \ - .setOutputCol("features") - lr = LogisticRegression() \ - .setMaxIter(10) \ - .setRegParam(0.01) - pipeline = Pipeline() \ - .setStages([tokenizer, hashingTF, lr]) + tokenizer = Tokenizer(inputCol="text", outputCol="words") + hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") + lr = LogisticRegression(maxIter=10, regParam=0.01) + pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) # Fit the pipeline to training documents. model = pipeline.fit(training) # Prepare test documents, which are unlabeled. - Document = Row('id', 'text') - test = sqlCtx.inferSchema( - sc.parallelize([(4L, "spark i j k"), - (5L, "l m n"), - (6L, "mapreduce spark"), - (7L, "apache hadoop")]) - .map(lambda x: Document(*x))) + Document = Row("id", "text") + test = sc.parallelize([(4L, "spark i j k"), + (5L, "l m n"), + (6L, "mapreduce spark"), + (7L, "apache hadoop")]) \ + .map(lambda x: Document(*x)).toDF() # Make predictions on test documents and print columns of interest. prediction = model.transform(test) - prediction.registerTempTable("prediction") - selected = sqlCtx.sql("SELECT id, text, prediction from prediction") + selected = prediction.select("id", "text", "prediction") for row in selected.collect(): print row |