diff options
author | Dongjoon Hyun <dongjoon@apache.org> | 2016-05-04 14:31:36 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-05-04 14:31:36 -0700 |
commit | cdce4e62a5674e2034e5d395578b1a60e3d8c435 (patch) | |
tree | c715f2555dad353683f82820962576f89b2db452 /examples/src/main/python/streaming | |
parent | cf2e9da612397233ae7bca0e9ce57309f16226b5 (diff) | |
download | spark-cdce4e62a5674e2034e5d395578b1a60e3d8c435.tar.gz spark-cdce4e62a5674e2034e5d395578b1a60e3d8c435.tar.bz2 spark-cdce4e62a5674e2034e5d395578b1a60e3d8c435.zip |
[SPARK-15031][EXAMPLE] Use SparkSession in Scala/Python/Java example.
## What changes were proposed in this pull request?
This PR aims to update Scala/Python/Java examples by replacing `SQLContext` with newly added `SparkSession`.
- Use **SparkSession Builder Pattern** in 154(Scala 55, Java 52, Python 47) files.
- Add `getConf` in Python SparkContext class: `python/pyspark/context.py`
- Replace **SQLContext Singleton Pattern** with **SparkSession Singleton Pattern**:
- `SqlNetworkWordCount.scala`
- `JavaSqlNetworkWordCount.java`
- `sql_network_wordcount.py`
Now, `SQLContexts` are used only in R examples and the following two Python examples. The python examples are untouched in this PR since it already fails some unknown issue.
- `simple_params_example.py`
- `aft_survival_regression.py`
## How was this patch tested?
Manual.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes #12809 from dongjoon-hyun/SPARK-15031.
Diffstat (limited to 'examples/src/main/python/streaming')
-rw-r--r-- | examples/src/main/python/streaming/sql_network_wordcount.py | 19 |
1 files changed, 10 insertions, 9 deletions
diff --git a/examples/src/main/python/streaming/sql_network_wordcount.py b/examples/src/main/python/streaming/sql_network_wordcount.py index 1ba5e9fb78..588cbfee14 100644 --- a/examples/src/main/python/streaming/sql_network_wordcount.py +++ b/examples/src/main/python/streaming/sql_network_wordcount.py @@ -33,13 +33,14 @@ import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext -from pyspark.sql import SQLContext, Row +from pyspark.sql import Row, SparkSession -def getSqlContextInstance(sparkContext): - if ('sqlContextSingletonInstance' not in globals()): - globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext) - return globals()['sqlContextSingletonInstance'] +def getSparkSessionInstance(sparkConf): + if ('sparkSessionSingletonInstance' not in globals()): + globals()['sparkSessionSingletonInstance'] =\ + SparkSession.builder.config(conf=sparkConf).getOrCreate() + return globals()['sparkSessionSingletonInstance'] if __name__ == "__main__": @@ -60,19 +61,19 @@ if __name__ == "__main__": print("========= %s =========" % str(time)) try: - # Get the singleton instance of SQLContext - sqlContext = getSqlContextInstance(rdd.context) + # Get the singleton instance of SparkSession + spark = getSparkSessionInstance(rdd.context.getConf()) # Convert RDD[String] to RDD[Row] to DataFrame rowRdd = rdd.map(lambda w: Row(word=w)) - wordsDataFrame = sqlContext.createDataFrame(rowRdd) + wordsDataFrame = spark.createDataFrame(rowRdd) # Register as table wordsDataFrame.registerTempTable("words") # Do word count on table using SQL and print it wordCountsDataFrame = \ - sqlContext.sql("select word, count(*) as total from words group by word") + spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() except: pass |