diff options
author | WeichenXu <WeichenXu123@outlook.com> | 2016-07-06 10:41:48 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-07-06 10:41:48 -0700 |
commit | b1310425b30cbd711e4834d65a0accb3c5a8403a (patch) | |
tree | 3af09805f2b1adfc60790c710c5f8f06074890fa /docs/streaming-programming-guide.md | |
parent | 23eff5e512df5710ea6591a3fce321b53eb3fb0b (diff) | |
download | spark-b1310425b30cbd711e4834d65a0accb3c5a8403a.tar.gz spark-b1310425b30cbd711e4834d65a0accb3c5a8403a.tar.bz2 spark-b1310425b30cbd711e4834d65a0accb3c5a8403a.zip |
[DOC][SQL] update out-of-date code snippets using SQLContext in all documents.
## What changes were proposed in this pull request?
I search the whole documents directory using SQLContext, and update the following places:
- docs/configuration.md, sparkR code snippets.
- docs/streaming-programming-guide.md, several example code.
## How was this patch tested?
N/A
Author: WeichenXu <WeichenXu123@outlook.com>
Closes #14025 from WeichenXu123/WIP_SQLContext_update.
Diffstat (limited to 'docs/streaming-programming-guide.md')
-rw-r--r-- | docs/streaming-programming-guide.md | 39 |
1 files changed, 21 insertions, 18 deletions
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index db06a65b99..2ee3b80185 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1534,7 +1534,7 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/ma *** ## DataFrame and SQL Operations -You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL. +You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SparkSession using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SparkSession. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL. <div class="codetabs"> <div data-lang="scala" markdown="1"> @@ -1546,9 +1546,9 @@ val words: DStream[String] = ... words.foreachRDD { rdd => - // Get the singleton instance of SQLContext - val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) - import sqlContext.implicits._ + // Get the singleton instance of SparkSession + val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate() + import spark.implicits._ // Convert RDD[String] to DataFrame val wordsDataFrame = rdd.toDF("word") @@ -1558,7 +1558,7 @@ words.foreachRDD { rdd => // Do word count on DataFrame using SQL and print it val wordCountsDataFrame = - sqlContext.sql("select word, count(*) as total from words group by word") + spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() } @@ -1593,8 +1593,8 @@ words.foreachRDD( @Override public Void call(JavaRDD<String> rdd, Time time) { - // Get the singleton instance of SQLContext - SQLContext sqlContext = SQLContext.getOrCreate(rdd.context()); + // Get the singleton instance of SparkSession + SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate(); // Convert RDD[String] to RDD[case class] to DataFrame JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() { @@ -1604,14 +1604,14 @@ words.foreachRDD( return record; } }); - DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class); + DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class); // Creates a temporary view using the DataFrame wordsDataFrame.createOrReplaceTempView("words"); // Do word count on table using SQL and print it DataFrame wordCountsDataFrame = - sqlContext.sql("select word, count(*) as total from words group by word"); + spark.sql("select word, count(*) as total from words group by word"); wordCountsDataFrame.show(); return null; } @@ -1624,11 +1624,14 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/ma <div data-lang="python" markdown="1"> {% highlight python %} -# Lazily instantiated global instance of SQLContext -def getSqlContextInstance(sparkContext): - if ('sqlContextSingletonInstance' not in globals()): - globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext) - return globals()['sqlContextSingletonInstance'] +# Lazily instantiated global instance of SparkSession +def getSparkSessionInstance(sparkConf): + if ('sparkSessionSingletonInstance' not in globals()): + globals()['sparkSessionSingletonInstance'] = SparkSession\ + .builder\ + .config(conf=sparkConf)\ + .getOrCreate() + return globals()['sparkSessionSingletonInstance'] ... @@ -1639,18 +1642,18 @@ words = ... # DStream of strings def process(time, rdd): print("========= %s =========" % str(time)) try: - # Get the singleton instance of SQLContext - sqlContext = getSqlContextInstance(rdd.context) + # Get the singleton instance of SparkSession + spark = getSparkSessionInstance(rdd.context.getConf()) # Convert RDD[String] to RDD[Row] to DataFrame rowRdd = rdd.map(lambda w: Row(word=w)) - wordsDataFrame = sqlContext.createDataFrame(rowRdd) + wordsDataFrame = spark.createDataFrame(rowRdd) # Creates a temporary view using the DataFrame wordsDataFrame.createOrReplaceTempView("words") # Do word count on table using SQL and print it - wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word") + wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() except: pass |