aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/python/pagerank.py
diff options
context:
space:
mode:
authorZheng RuiFeng <ruifengz@foxmail.com>2016-05-20 16:40:33 -0700
committerAndrew Or <andrew@databricks.com>2016-05-20 16:40:33 -0700
commit127bf1bb07967e2e4f99ad7abaa7f6fab3b3f407 (patch)
treea127031cd361df2f1d895cb11489f8e183c76f73 /examples/src/main/python/pagerank.py
parent06c9f520714e07259c6f8ce6f9ea5a230a278cb5 (diff)
downloadspark-127bf1bb07967e2e4f99ad7abaa7f6fab3b3f407.tar.gz
spark-127bf1bb07967e2e4f99ad7abaa7f6fab3b3f407.tar.bz2
spark-127bf1bb07967e2e4f99ad7abaa7f6fab3b3f407.zip
[SPARK-15031][EXAMPLE] Use SparkSession in examples
## What changes were proposed in this pull request? Use `SparkSession` according to [SPARK-15031](https://issues.apache.org/jira/browse/SPARK-15031) `MLLLIB` is not recommended to use now, so examples in `MLLIB` are ignored in this PR. `StreamingContext` can not be directly obtained from `SparkSession`, so example in `Streaming` are ignored too. cc andrewor14 ## How was this patch tested? manual tests with spark-submit Author: Zheng RuiFeng <ruifengz@foxmail.com> Closes #13164 from zhengruifeng/use_sparksession_ii.
Diffstat (limited to 'examples/src/main/python/pagerank.py')
-rwxr-xr-xexamples/src/main/python/pagerank.py11
1 files changed, 7 insertions, 4 deletions
diff --git a/examples/src/main/python/pagerank.py b/examples/src/main/python/pagerank.py
index 2fdc9773d4..a399a9c37c 100755
--- a/examples/src/main/python/pagerank.py
+++ b/examples/src/main/python/pagerank.py
@@ -25,7 +25,7 @@ import re
import sys
from operator import add
-from pyspark import SparkContext
+from pyspark.sql import SparkSession
def computeContribs(urls, rank):
@@ -51,14 +51,17 @@ if __name__ == "__main__":
file=sys.stderr)
# Initialize the spark context.
- sc = SparkContext(appName="PythonPageRank")
+ spark = SparkSession\
+ .builder\
+ .appName("PythonPageRank")\
+ .getOrCreate()
# Loads in input file. It should be in format of:
# URL neighbor URL
# URL neighbor URL
# URL neighbor URL
# ...
- lines = sc.textFile(sys.argv[1], 1)
+ lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
# Loads all URLs from input file and initialize their neighbors.
links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
@@ -79,4 +82,4 @@ if __name__ == "__main__":
for (link, rank) in ranks.collect():
print("%s has rank: %s." % (link, rank))
- sc.stop()
+ spark.stop()