aboutsummaryrefslogtreecommitdiff
path: root/docs/streaming-programming-guide.md
diff options
context:
space:
mode:
authorWeichenXu <WeichenXu123@outlook.com>2016-07-06 10:41:48 -0700
committerReynold Xin <rxin@databricks.com>2016-07-06 10:41:48 -0700
commitb1310425b30cbd711e4834d65a0accb3c5a8403a (patch)
tree3af09805f2b1adfc60790c710c5f8f06074890fa /docs/streaming-programming-guide.md
parent23eff5e512df5710ea6591a3fce321b53eb3fb0b (diff)
downloadspark-b1310425b30cbd711e4834d65a0accb3c5a8403a.tar.gz
spark-b1310425b30cbd711e4834d65a0accb3c5a8403a.tar.bz2
spark-b1310425b30cbd711e4834d65a0accb3c5a8403a.zip
[DOC][SQL] update out-of-date code snippets using SQLContext in all documents.
## What changes were proposed in this pull request? I search the whole documents directory using SQLContext, and update the following places: - docs/configuration.md, sparkR code snippets. - docs/streaming-programming-guide.md, several example code. ## How was this patch tested? N/A Author: WeichenXu <WeichenXu123@outlook.com> Closes #14025 from WeichenXu123/WIP_SQLContext_update.
Diffstat (limited to 'docs/streaming-programming-guide.md')
-rw-r--r--docs/streaming-programming-guide.md39
1 files changed, 21 insertions, 18 deletions
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index db06a65b99..2ee3b80185 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -1534,7 +1534,7 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/ma
***
## DataFrame and SQL Operations
-You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL.
+You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SparkSession using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SparkSession. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL.
<div class="codetabs">
<div data-lang="scala" markdown="1">
@@ -1546,9 +1546,9 @@ val words: DStream[String] = ...
words.foreachRDD { rdd =>
- // Get the singleton instance of SQLContext
- val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
- import sqlContext.implicits._
+ // Get the singleton instance of SparkSession
+ val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
+ import spark.implicits._
// Convert RDD[String] to DataFrame
val wordsDataFrame = rdd.toDF("word")
@@ -1558,7 +1558,7 @@ words.foreachRDD { rdd =>
// Do word count on DataFrame using SQL and print it
val wordCountsDataFrame =
- sqlContext.sql("select word, count(*) as total from words group by word")
+ spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
}
@@ -1593,8 +1593,8 @@ words.foreachRDD(
@Override
public Void call(JavaRDD<String> rdd, Time time) {
- // Get the singleton instance of SQLContext
- SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
+ // Get the singleton instance of SparkSession
+ SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();
// Convert RDD[String] to RDD[case class] to DataFrame
JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
@@ -1604,14 +1604,14 @@ words.foreachRDD(
return record;
}
});
- DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class);
+ DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);
// Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words");
// Do word count on table using SQL and print it
DataFrame wordCountsDataFrame =
- sqlContext.sql("select word, count(*) as total from words group by word");
+ spark.sql("select word, count(*) as total from words group by word");
wordCountsDataFrame.show();
return null;
}
@@ -1624,11 +1624,14 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/ma
<div data-lang="python" markdown="1">
{% highlight python %}
-# Lazily instantiated global instance of SQLContext
-def getSqlContextInstance(sparkContext):
- if ('sqlContextSingletonInstance' not in globals()):
- globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
- return globals()['sqlContextSingletonInstance']
+# Lazily instantiated global instance of SparkSession
+def getSparkSessionInstance(sparkConf):
+ if ('sparkSessionSingletonInstance' not in globals()):
+ globals()['sparkSessionSingletonInstance'] = SparkSession\
+ .builder\
+ .config(conf=sparkConf)\
+ .getOrCreate()
+ return globals()['sparkSessionSingletonInstance']
...
@@ -1639,18 +1642,18 @@ words = ... # DStream of strings
def process(time, rdd):
print("========= %s =========" % str(time))
try:
- # Get the singleton instance of SQLContext
- sqlContext = getSqlContextInstance(rdd.context)
+ # Get the singleton instance of SparkSession
+ spark = getSparkSessionInstance(rdd.context.getConf())
# Convert RDD[String] to RDD[Row] to DataFrame
rowRdd = rdd.map(lambda w: Row(word=w))
- wordsDataFrame = sqlContext.createDataFrame(rowRdd)
+ wordsDataFrame = spark.createDataFrame(rowRdd)
# Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words")
# Do word count on table using SQL and print it
- wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word")
+ wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
except:
pass