aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/python/ml/simple_text_classification_pipeline.py
diff options
context:
space:
mode:
Diffstat (limited to 'examples/src/main/python/ml/simple_text_classification_pipeline.py')
-rw-r--r--examples/src/main/python/ml/simple_text_classification_pipeline.py44
1 files changed, 17 insertions, 27 deletions
diff --git a/examples/src/main/python/ml/simple_text_classification_pipeline.py b/examples/src/main/python/ml/simple_text_classification_pipeline.py
index c7df3d7b74..b4d9355b68 100644
--- a/examples/src/main/python/ml/simple_text_classification_pipeline.py
+++ b/examples/src/main/python/ml/simple_text_classification_pipeline.py
@@ -36,43 +36,33 @@ if __name__ == "__main__":
sqlCtx = SQLContext(sc)
# Prepare training documents, which are labeled.
- LabeledDocument = Row('id', 'text', 'label')
- training = sqlCtx.inferSchema(
- sc.parallelize([(0L, "a b c d e spark", 1.0),
- (1L, "b d", 0.0),
- (2L, "spark f g h", 1.0),
- (3L, "hadoop mapreduce", 0.0)])
- .map(lambda x: LabeledDocument(*x)))
+ LabeledDocument = Row("id", "text", "label")
+ training = sc.parallelize([(0L, "a b c d e spark", 1.0),
+ (1L, "b d", 0.0),
+ (2L, "spark f g h", 1.0),
+ (3L, "hadoop mapreduce", 0.0)]) \
+ .map(lambda x: LabeledDocument(*x)).toDF()
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
- tokenizer = Tokenizer() \
- .setInputCol("text") \
- .setOutputCol("words")
- hashingTF = HashingTF() \
- .setInputCol(tokenizer.getOutputCol()) \
- .setOutputCol("features")
- lr = LogisticRegression() \
- .setMaxIter(10) \
- .setRegParam(0.01)
- pipeline = Pipeline() \
- .setStages([tokenizer, hashingTF, lr])
+ tokenizer = Tokenizer(inputCol="text", outputCol="words")
+ hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
+ lr = LogisticRegression(maxIter=10, regParam=0.01)
+ pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# Fit the pipeline to training documents.
model = pipeline.fit(training)
# Prepare test documents, which are unlabeled.
- Document = Row('id', 'text')
- test = sqlCtx.inferSchema(
- sc.parallelize([(4L, "spark i j k"),
- (5L, "l m n"),
- (6L, "mapreduce spark"),
- (7L, "apache hadoop")])
- .map(lambda x: Document(*x)))
+ Document = Row("id", "text")
+ test = sc.parallelize([(4L, "spark i j k"),
+ (5L, "l m n"),
+ (6L, "mapreduce spark"),
+ (7L, "apache hadoop")]) \
+ .map(lambda x: Document(*x)).toDF()
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
- prediction.registerTempTable("prediction")
- selected = sqlCtx.sql("SELECT id, text, prediction from prediction")
+ selected = prediction.select("id", "text", "prediction")
for row in selected.collect():
print row