diff options
author | Dongjoon Hyun <dongjoon@apache.org> | 2016-05-04 14:31:36 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-05-04 14:31:36 -0700 |
commit | cdce4e62a5674e2034e5d395578b1a60e3d8c435 (patch) | |
tree | c715f2555dad353683f82820962576f89b2db452 /examples/src/main/python | |
parent | cf2e9da612397233ae7bca0e9ce57309f16226b5 (diff) | |
download | spark-cdce4e62a5674e2034e5d395578b1a60e3d8c435.tar.gz spark-cdce4e62a5674e2034e5d395578b1a60e3d8c435.tar.bz2 spark-cdce4e62a5674e2034e5d395578b1a60e3d8c435.zip |
[SPARK-15031][EXAMPLE] Use SparkSession in Scala/Python/Java example.
## What changes were proposed in this pull request?
This PR aims to update Scala/Python/Java examples by replacing `SQLContext` with newly added `SparkSession`.
- Use **SparkSession Builder Pattern** in 154(Scala 55, Java 52, Python 47) files.
- Add `getConf` in Python SparkContext class: `python/pyspark/context.py`
- Replace **SQLContext Singleton Pattern** with **SparkSession Singleton Pattern**:
- `SqlNetworkWordCount.scala`
- `JavaSqlNetworkWordCount.java`
- `sql_network_wordcount.py`
Now, `SQLContexts` are used only in R examples and the following two Python examples. The python examples are untouched in this PR since it already fails some unknown issue.
- `simple_params_example.py`
- `aft_survival_regression.py`
## How was this patch tested?
Manual.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes #12809 from dongjoon-hyun/SPARK-15031.
Diffstat (limited to 'examples/src/main/python')
47 files changed, 238 insertions, 313 deletions
diff --git a/examples/src/main/python/ml/als_example.py b/examples/src/main/python/ml/als_example.py index 0c9ac583b2..e36444f185 100644 --- a/examples/src/main/python/ml/als_example.py +++ b/examples/src/main/python/ml/als_example.py @@ -21,8 +21,7 @@ import sys if sys.version >= '3': long = int -from pyspark import SparkContext -from pyspark.sql import SQLContext +from pyspark.sql import SparkSession # $example on$ from pyspark.ml.evaluation import RegressionEvaluator @@ -31,15 +30,14 @@ from pyspark.sql import Row # $example off$ if __name__ == "__main__": - sc = SparkContext(appName="ALSExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("ALSExample").getOrCreate() # $example on$ - lines = sc.textFile("data/mllib/als/sample_movielens_ratings.txt") - parts = lines.map(lambda l: l.split("::")) + lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd + parts = lines.map(lambda row: row.value.split("::")) ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]), rating=float(p[2]), timestamp=long(p[3]))) - ratings = sqlContext.createDataFrame(ratingsRDD) + ratings = spark.createDataFrame(ratingsRDD) (training, test) = ratings.randomSplit([0.8, 0.2]) # Build the recommendation model using ALS on the training data @@ -56,4 +54,4 @@ if __name__ == "__main__": rmse = evaluator.evaluate(predictions) print("Root-mean-square error = " + str(rmse)) # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/binarizer_example.py b/examples/src/main/python/ml/binarizer_example.py index 317cfa638a..072187e645 100644 --- a/examples/src/main/python/ml/binarizer_example.py +++ b/examples/src/main/python/ml/binarizer_example.py @@ -17,18 +17,16 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext +from pyspark.sql import SparkSession # $example on$ from pyspark.ml.feature import Binarizer # $example off$ if __name__ == "__main__": - sc = SparkContext(appName="BinarizerExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("BinarizerExample").getOrCreate() # $example on$ - continuousDataFrame = sqlContext.createDataFrame([ + continuousDataFrame = spark.createDataFrame([ (0, 0.1), (1, 0.8), (2, 0.2) @@ -40,4 +38,4 @@ if __name__ == "__main__": print(binarized_feature) # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/bisecting_k_means_example.py b/examples/src/main/python/ml/bisecting_k_means_example.py index e6f6bfd7e8..836a89cde0 100644 --- a/examples/src/main/python/ml/bisecting_k_means_example.py +++ b/examples/src/main/python/ml/bisecting_k_means_example.py @@ -17,28 +17,26 @@ from __future__ import print_function -from pyspark import SparkContext # $example on$ from pyspark.ml.clustering import BisectingKMeans, BisectingKMeansModel from pyspark.mllib.linalg import VectorUDT, _convert_to_vector, Vectors from pyspark.mllib.linalg import Vectors from pyspark.sql.types import Row # $example off$ -from pyspark.sql import SQLContext +from pyspark.sql import SparkSession """ A simple example demonstrating a bisecting k-means clustering. """ if __name__ == "__main__": - - sc = SparkContext(appName="PythonBisectingKMeansExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("PythonBisectingKMeansExample").getOrCreate() # $example on$ - data = sc.textFile("data/mllib/kmeans_data.txt") - parsed = data.map(lambda l: Row(features=Vectors.dense([float(x) for x in l.split(' ')]))) - training = sqlContext.createDataFrame(parsed) + data = spark.read.text("data/mllib/kmeans_data.txt").rdd + parsed = data\ + .map(lambda row: Row(features=Vectors.dense([float(x) for x in row.value.split(' ')]))) + training = spark.createDataFrame(parsed) kmeans = BisectingKMeans().setK(2).setSeed(1).setFeaturesCol("features") @@ -54,4 +52,4 @@ if __name__ == "__main__": print(center) # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/bucketizer_example.py b/examples/src/main/python/ml/bucketizer_example.py index 4304255f35..288ec62bdf 100644 --- a/examples/src/main/python/ml/bucketizer_example.py +++ b/examples/src/main/python/ml/bucketizer_example.py @@ -17,21 +17,19 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext +from pyspark.sql import SparkSession # $example on$ from pyspark.ml.feature import Bucketizer # $example off$ if __name__ == "__main__": - sc = SparkContext(appName="BucketizerExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("BucketizerExample").getOrCreate() # $example on$ splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")] data = [(-0.5,), (-0.3,), (0.0,), (0.2,)] - dataFrame = sqlContext.createDataFrame(data, ["features"]) + dataFrame = spark.createDataFrame(data, ["features"]) bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures") @@ -40,4 +38,4 @@ if __name__ == "__main__": bucketedData.show() # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/chisq_selector_example.py b/examples/src/main/python/ml/chisq_selector_example.py index 997a504735..8f58fc28de 100644 --- a/examples/src/main/python/ml/chisq_selector_example.py +++ b/examples/src/main/python/ml/chisq_selector_example.py @@ -17,19 +17,17 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext +from pyspark.sql import SparkSession # $example on$ from pyspark.ml.feature import ChiSqSelector from pyspark.mllib.linalg import Vectors # $example off$ if __name__ == "__main__": - sc = SparkContext(appName="ChiSqSelectorExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("ChiSqSelectorExample").getOrCreate() # $example on$ - df = sqlContext.createDataFrame([ + df = spark.createDataFrame([ (7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,), (8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,), (9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"]) @@ -41,4 +39,4 @@ if __name__ == "__main__": result.show() # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/count_vectorizer_example.py b/examples/src/main/python/ml/count_vectorizer_example.py index e839f645f7..9dbf9959d1 100644 --- a/examples/src/main/python/ml/count_vectorizer_example.py +++ b/examples/src/main/python/ml/count_vectorizer_example.py @@ -17,19 +17,17 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext +from pyspark.sql import SparkSession # $example on$ from pyspark.ml.feature import CountVectorizer # $example off$ if __name__ == "__main__": - sc = SparkContext(appName="CountVectorizerExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("CountVectorizerExample").getOrCreate() # $example on$ # Input data: Each row is a bag of words with a ID. - df = sqlContext.createDataFrame([ + df = spark.createDataFrame([ (0, "a b c".split(" ")), (1, "a b b c a".split(" ")) ], ["id", "words"]) @@ -41,4 +39,4 @@ if __name__ == "__main__": result.show() # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/cross_validator.py b/examples/src/main/python/ml/cross_validator.py index 5f0ef20218..a61d0f63d2 100644 --- a/examples/src/main/python/ml/cross_validator.py +++ b/examples/src/main/python/ml/cross_validator.py @@ -17,15 +17,14 @@ from __future__ import print_function -from pyspark import SparkContext # $example on$ from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import BinaryClassificationEvaluator from pyspark.ml.feature import HashingTF, Tokenizer from pyspark.ml.tuning import CrossValidator, ParamGridBuilder -from pyspark.sql import Row, SQLContext # $example off$ +from pyspark.sql import Row, SparkSession """ A simple example demonstrating model selection using CrossValidator. @@ -36,25 +35,23 @@ Run with: """ if __name__ == "__main__": - sc = SparkContext(appName="CrossValidatorExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("CrossValidatorExample").getOrCreate() # $example on$ # Prepare training documents, which are labeled. - LabeledDocument = Row("id", "text", "label") - training = sc.parallelize([(0, "a b c d e spark", 1.0), - (1, "b d", 0.0), - (2, "spark f g h", 1.0), - (3, "hadoop mapreduce", 0.0), - (4, "b spark who", 1.0), - (5, "g d a y", 0.0), - (6, "spark fly", 1.0), - (7, "was mapreduce", 0.0), - (8, "e spark program", 1.0), - (9, "a e c l", 0.0), - (10, "spark compile", 1.0), - (11, "hadoop software", 0.0) - ]) \ - .map(lambda x: LabeledDocument(*x)).toDF() + training = spark.createDataFrame([ + (0, "a b c d e spark", 1.0), + (1, "b d", 0.0), + (2, "spark f g h", 1.0), + (3, "hadoop mapreduce", 0.0), + (4, "b spark who", 1.0), + (5, "g d a y", 0.0), + (6, "spark fly", 1.0), + (7, "was mapreduce", 0.0), + (8, "e spark program", 1.0), + (9, "a e c l", 0.0), + (10, "spark compile", 1.0), + (11, "hadoop software", 0.0) + ], ["id", "text", "label"]) # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr. tokenizer = Tokenizer(inputCol="text", outputCol="words") @@ -82,12 +79,12 @@ if __name__ == "__main__": cvModel = crossval.fit(training) # Prepare test documents, which are unlabeled. - Document = Row("id", "text") - test = sc.parallelize([(4L, "spark i j k"), - (5L, "l m n"), - (6L, "mapreduce spark"), - (7L, "apache hadoop")]) \ - .map(lambda x: Document(*x)).toDF() + test = spark.createDataFrame([ + (4L, "spark i j k"), + (5L, "l m n"), + (6L, "mapreduce spark"), + (7L, "apache hadoop") + ], ["id", "text"]) # Make predictions on test documents. cvModel uses the best model found (lrModel). prediction = cvModel.transform(test) @@ -96,4 +93,4 @@ if __name__ == "__main__": print(row) # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/dataframe_example.py b/examples/src/main/python/ml/dataframe_example.py index d2644ca335..b3e671038e 100644 --- a/examples/src/main/python/ml/dataframe_example.py +++ b/examples/src/main/python/ml/dataframe_example.py @@ -26,16 +26,14 @@ import sys import tempfile import shutil -from pyspark import SparkContext -from pyspark.sql import SQLContext +from pyspark.sql import SparkSession from pyspark.mllib.stat import Statistics if __name__ == "__main__": if len(sys.argv) > 2: print("Usage: dataframe_example.py <libsvm file>", file=sys.stderr) exit(-1) - sc = SparkContext(appName="DataFrameExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("DataFrameExample").getOrCreate() if len(sys.argv) == 2: input = sys.argv[1] else: @@ -43,7 +41,7 @@ if __name__ == "__main__": # Load input data print("Loading LIBSVM file with UDT from " + input + ".") - df = sqlContext.read.format("libsvm").load(input).cache() + df = spark.read.format("libsvm").load(input).cache() print("Schema from LIBSVM:") df.printSchema() print("Loaded training data as a DataFrame with " + @@ -54,7 +52,7 @@ if __name__ == "__main__": labelSummary.show() # Convert features column to an RDD of vectors. - features = df.select("features").map(lambda r: r.features) + features = df.select("features").rdd.map(lambda r: r.features) summary = Statistics.colStats(features) print("Selected features column with average values:\n" + str(summary.mean())) @@ -67,9 +65,9 @@ if __name__ == "__main__": # Load the records back. print("Loading Parquet file with UDT from " + tempdir) - newDF = sqlContext.read.parquet(tempdir) + newDF = spark.read.parquet(tempdir) print("Schema from Parquet:") newDF.printSchema() shutil.rmtree(tempdir) - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/dct_example.py b/examples/src/main/python/ml/dct_example.py index 264d47f404..1bf8fc6d14 100644 --- a/examples/src/main/python/ml/dct_example.py +++ b/examples/src/main/python/ml/dct_example.py @@ -17,19 +17,17 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.feature import DCT from pyspark.mllib.linalg import Vectors # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="DCTExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("DCTExample").getOrCreate() # $example on$ - df = sqlContext.createDataFrame([ + df = spark.createDataFrame([ (Vectors.dense([0.0, 1.0, -2.0, 3.0]),), (Vectors.dense([-1.0, 2.0, 4.0, -7.0]),), (Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"]) @@ -42,4 +40,4 @@ if __name__ == "__main__": print(dcts) # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/decision_tree_classification_example.py b/examples/src/main/python/ml/decision_tree_classification_example.py index 86bdc65392..d2318e2436 100644 --- a/examples/src/main/python/ml/decision_tree_classification_example.py +++ b/examples/src/main/python/ml/decision_tree_classification_example.py @@ -21,20 +21,19 @@ Decision Tree Classification Example. from __future__ import print_function # $example on$ -from pyspark import SparkContext, SQLContext from pyspark.ml import Pipeline from pyspark.ml.classification import DecisionTreeClassifier from pyspark.ml.feature import StringIndexer, VectorIndexer from pyspark.ml.evaluation import MulticlassClassificationEvaluator # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="decision_tree_classification_example") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("decision_tree_classification_example").getOrCreate() # $example on$ # Load the data stored in LIBSVM format as a DataFrame. - data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") + data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") # Index labels, adding metadata to the label column. # Fit on whole dataset to include all labels in index. @@ -72,3 +71,5 @@ if __name__ == "__main__": # summary only print(treeModel) # $example off$ + + spark.stop() diff --git a/examples/src/main/python/ml/decision_tree_regression_example.py b/examples/src/main/python/ml/decision_tree_regression_example.py index 8e20d5d857..9e8cb382a9 100644 --- a/examples/src/main/python/ml/decision_tree_regression_example.py +++ b/examples/src/main/python/ml/decision_tree_regression_example.py @@ -20,21 +20,20 @@ Decision Tree Regression Example. """ from __future__ import print_function -from pyspark import SparkContext, SQLContext # $example on$ from pyspark.ml import Pipeline from pyspark.ml.regression import DecisionTreeRegressor from pyspark.ml.feature import VectorIndexer from pyspark.ml.evaluation import RegressionEvaluator # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="decision_tree_classification_example") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("decision_tree_classification_example").getOrCreate() # $example on$ # Load the data stored in LIBSVM format as a DataFrame. - data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") + data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") # Automatically identify categorical features, and index them. # We specify maxCategories so features with > 4 distinct values are treated as continuous. @@ -69,3 +68,5 @@ if __name__ == "__main__": # summary only print(treeModel) # $example off$ + + spark.stop() diff --git a/examples/src/main/python/ml/elementwise_product_example.py b/examples/src/main/python/ml/elementwise_product_example.py index c85cb0d895..6fa641b772 100644 --- a/examples/src/main/python/ml/elementwise_product_example.py +++ b/examples/src/main/python/ml/elementwise_product_example.py @@ -17,23 +17,21 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.feature import ElementwiseProduct from pyspark.mllib.linalg import Vectors # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="ElementwiseProductExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("ElementwiseProductExample").getOrCreate() # $example on$ data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)] - df = sqlContext.createDataFrame(data, ["vector"]) + df = spark.createDataFrame(data, ["vector"]) transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]), inputCol="vector", outputCol="transformedVector") transformer.transform(df).show() # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/estimator_transformer_param_example.py b/examples/src/main/python/ml/estimator_transformer_param_example.py index 9a8993dac4..4993b5a984 100644 --- a/examples/src/main/python/ml/estimator_transformer_param_example.py +++ b/examples/src/main/python/ml/estimator_transformer_param_example.py @@ -18,20 +18,19 @@ """ Estimator Transformer Param Example. """ -from pyspark import SparkContext, SQLContext + # $example on$ from pyspark.mllib.linalg import Vectors from pyspark.ml.classification import LogisticRegression # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - - sc = SparkContext(appName="EstimatorTransformerParamExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("EstimatorTransformerParamExample").getOrCreate() # $example on$ # Prepare training data from a list of (label, features) tuples. - training = sqlContext.createDataFrame([ + training = spark.createDataFrame([ (1.0, Vectors.dense([0.0, 1.1, 0.1])), (0.0, Vectors.dense([2.0, 1.0, -1.0])), (0.0, Vectors.dense([2.0, 1.3, 1.0])), @@ -69,7 +68,7 @@ if __name__ == "__main__": print model2.extractParamMap() # Prepare test data - test = sqlContext.createDataFrame([ + test = spark.createDataFrame([ (1.0, Vectors.dense([-1.0, 1.5, 1.3])), (0.0, Vectors.dense([3.0, 2.0, -0.1])), (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"]) @@ -84,4 +83,4 @@ if __name__ == "__main__": print row # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/gradient_boosted_tree_classifier_example.py b/examples/src/main/python/ml/gradient_boosted_tree_classifier_example.py index f7e842f4b3..b09ad41da3 100644 --- a/examples/src/main/python/ml/gradient_boosted_tree_classifier_example.py +++ b/examples/src/main/python/ml/gradient_boosted_tree_classifier_example.py @@ -20,21 +20,20 @@ Gradient Boosted Tree Classifier Example. """ from __future__ import print_function -from pyspark import SparkContext, SQLContext # $example on$ from pyspark.ml import Pipeline from pyspark.ml.classification import GBTClassifier from pyspark.ml.feature import StringIndexer, VectorIndexer from pyspark.ml.evaluation import MulticlassClassificationEvaluator # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="gradient_boosted_tree_classifier_example") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("gradient_boosted_tree_classifier_example").getOrCreate() # $example on$ # Load and parse the data file, converting it to a DataFrame. - data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") + data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") # Index labels, adding metadata to the label column. # Fit on whole dataset to include all labels in index. @@ -72,4 +71,4 @@ if __name__ == "__main__": print(gbtModel) # summary only # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/gradient_boosted_tree_regressor_example.py b/examples/src/main/python/ml/gradient_boosted_tree_regressor_example.py index f8b4de651c..caa7cfc4e1 100644 --- a/examples/src/main/python/ml/gradient_boosted_tree_regressor_example.py +++ b/examples/src/main/python/ml/gradient_boosted_tree_regressor_example.py @@ -20,21 +20,20 @@ Gradient Boosted Tree Regressor Example. """ from __future__ import print_function -from pyspark import SparkContext, SQLContext # $example on$ from pyspark.ml import Pipeline from pyspark.ml.regression import GBTRegressor from pyspark.ml.feature import VectorIndexer from pyspark.ml.evaluation import RegressionEvaluator # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="gradient_boosted_tree_regressor_example") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("gradient_boosted_tree_regressor_example").getOrCreate() # $example on$ # Load and parse the data file, converting it to a DataFrame. - data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") + data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") # Automatically identify categorical features, and index them. # Set maxCategories so features with > 4 distinct values are treated as continuous. @@ -69,4 +68,4 @@ if __name__ == "__main__": print(gbtModel) # summary only # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/index_to_string_example.py b/examples/src/main/python/ml/index_to_string_example.py index fb0ba2950b..dd04b2c4b0 100644 --- a/examples/src/main/python/ml/index_to_string_example.py +++ b/examples/src/main/python/ml/index_to_string_example.py @@ -17,18 +17,16 @@ from __future__ import print_function -from pyspark import SparkContext # $example on$ from pyspark.ml.feature import IndexToString, StringIndexer # $example off$ -from pyspark.sql import SQLContext +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="IndexToStringExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("IndexToStringExample").getOrCreate() # $example on$ - df = sqlContext.createDataFrame( + df = spark.createDataFrame( [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")], ["id", "category"]) @@ -42,4 +40,4 @@ if __name__ == "__main__": converted.select("id", "originalCategory").show() # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/kmeans_example.py b/examples/src/main/python/ml/kmeans_example.py index fa57a4d3ad..7d9d80e645 100644 --- a/examples/src/main/python/ml/kmeans_example.py +++ b/examples/src/main/python/ml/kmeans_example.py @@ -20,10 +20,9 @@ from __future__ import print_function import sys import numpy as np -from pyspark import SparkContext from pyspark.ml.clustering import KMeans, KMeansModel from pyspark.mllib.linalg import VectorUDT, _convert_to_vector -from pyspark.sql import SQLContext +from pyspark.sql import SparkSession from pyspark.sql.types import Row, StructField, StructType """ @@ -35,8 +34,8 @@ This example requires NumPy (http://www.numpy.org/). """ -def parseVector(line): - array = np.array([float(x) for x in line.split(' ')]) +def parseVector(row): + array = np.array([float(x) for x in row.value.split(' ')]) return _convert_to_vector(array) @@ -50,14 +49,13 @@ if __name__ == "__main__": path = sys.argv[1] k = sys.argv[2] - sc = SparkContext(appName="PythonKMeansExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("PythonKMeansExample").getOrCreate() - lines = sc.textFile(path) + lines = spark.read.text(path).rdd data = lines.map(parseVector) row_rdd = data.map(lambda x: Row(x)) schema = StructType([StructField(FEATURES_COL, VectorUDT(), False)]) - df = sqlContext.createDataFrame(row_rdd, schema) + df = spark.createDataFrame(row_rdd, schema) kmeans = KMeans().setK(2).setSeed(1).setFeaturesCol(FEATURES_COL) model = kmeans.fit(df) @@ -67,4 +65,4 @@ if __name__ == "__main__": for center in centers: print(center) - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/linear_regression_with_elastic_net.py b/examples/src/main/python/ml/linear_regression_with_elastic_net.py index a4cd40cf26..99b7f7fe99 100644 --- a/examples/src/main/python/ml/linear_regression_with_elastic_net.py +++ b/examples/src/main/python/ml/linear_regression_with_elastic_net.py @@ -17,19 +17,17 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.regression import LinearRegression # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="LinearRegressionWithElasticNet") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("LinearRegressionWithElasticNet").getOrCreate() # $example on$ # Load training data - training = sqlContext.read.format("libsvm")\ + training = spark.read.format("libsvm")\ .load("data/mllib/sample_linear_regression_data.txt") lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8) @@ -42,4 +40,4 @@ if __name__ == "__main__": print("Intercept: " + str(lrModel.intercept)) # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/logistic_regression_with_elastic_net.py b/examples/src/main/python/ml/logistic_regression_with_elastic_net.py index b0b1d27e13..0d7112e723 100644 --- a/examples/src/main/python/ml/logistic_regression_with_elastic_net.py +++ b/examples/src/main/python/ml/logistic_regression_with_elastic_net.py @@ -17,19 +17,17 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.classification import LogisticRegression # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="LogisticRegressionWithElasticNet") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("LogisticRegressionWithElasticNet").getOrCreate() # $example on$ # Load training data - training = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") + training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8) @@ -41,4 +39,4 @@ if __name__ == "__main__": print("Intercept: " + str(lrModel.intercept)) # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/max_abs_scaler_example.py b/examples/src/main/python/ml/max_abs_scaler_example.py index d9b69eef1c..1cb95a98f0 100644 --- a/examples/src/main/python/ml/max_abs_scaler_example.py +++ b/examples/src/main/python/ml/max_abs_scaler_example.py @@ -17,18 +17,16 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.feature import MaxAbsScaler # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="MaxAbsScalerExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("MaxAbsScalerExample").getOrCreate() # $example on$ - dataFrame = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") + dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures") @@ -40,4 +38,4 @@ if __name__ == "__main__": scaledData.show() # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/min_max_scaler_example.py b/examples/src/main/python/ml/min_max_scaler_example.py index 2f8e4ade46..8d91a59e2b 100644 --- a/examples/src/main/python/ml/min_max_scaler_example.py +++ b/examples/src/main/python/ml/min_max_scaler_example.py @@ -17,18 +17,16 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.feature import MinMaxScaler # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="MinMaxScalerExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("MinMaxScalerExample").getOrCreate() # $example on$ - dataFrame = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") + dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures") @@ -40,4 +38,4 @@ if __name__ == "__main__": scaledData.show() # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/multilayer_perceptron_classification.py b/examples/src/main/python/ml/multilayer_perceptron_classification.py index f84588f547..8bededc14d 100644 --- a/examples/src/main/python/ml/multilayer_perceptron_classification.py +++ b/examples/src/main/python/ml/multilayer_perceptron_classification.py @@ -17,21 +17,19 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.classification import MultilayerPerceptronClassifier from pyspark.ml.evaluation import MulticlassClassificationEvaluator # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - - sc = SparkContext(appName="multilayer_perceptron_classification_example") - sqlContext = SQLContext(sc) + spark = SparkSession\ + .builder.appName("multilayer_perceptron_classification_example").getOrCreate() # $example on$ # Load training data - data = sqlContext.read.format("libsvm")\ + data = spark.read.format("libsvm")\ .load("data/mllib/sample_multiclass_classification_data.txt") # Split the data into train and test splits = data.randomSplit([0.6, 0.4], 1234) @@ -52,4 +50,4 @@ if __name__ == "__main__": print("Precision:" + str(evaluator.evaluate(predictionAndLabels))) # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/n_gram_example.py b/examples/src/main/python/ml/n_gram_example.py index f2d85f53e7..b7fecf0d68 100644 --- a/examples/src/main/python/ml/n_gram_example.py +++ b/examples/src/main/python/ml/n_gram_example.py @@ -17,18 +17,16 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.feature import NGram # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="NGramExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("NGramExample").getOrCreate() # $example on$ - wordDataFrame = sqlContext.createDataFrame([ + wordDataFrame = spark.createDataFrame([ (0, ["Hi", "I", "heard", "about", "Spark"]), (1, ["I", "wish", "Java", "could", "use", "case", "classes"]), (2, ["Logistic", "regression", "models", "are", "neat"]) @@ -39,4 +37,4 @@ if __name__ == "__main__": print(ngrams_label) # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/naive_bayes_example.py b/examples/src/main/python/ml/naive_bayes_example.py index db8fbea9bf..e37035542c 100644 --- a/examples/src/main/python/ml/naive_bayes_example.py +++ b/examples/src/main/python/ml/naive_bayes_example.py @@ -17,21 +17,18 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.classification import NaiveBayes from pyspark.ml.evaluation import MulticlassClassificationEvaluator # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - - sc = SparkContext(appName="naive_bayes_example") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("naive_bayes_example").getOrCreate() # $example on$ # Load training data - data = sqlContext.read.format("libsvm") \ + data = spark.read.format("libsvm") \ .load("data/mllib/sample_libsvm_data.txt") # Split the data into train and test splits = data.randomSplit([0.6, 0.4], 1234) @@ -50,4 +47,4 @@ if __name__ == "__main__": print("Precision:" + str(evaluator.evaluate(predictionAndLabels))) # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/normalizer_example.py b/examples/src/main/python/ml/normalizer_example.py index d490221474..ae25537619 100644 --- a/examples/src/main/python/ml/normalizer_example.py +++ b/examples/src/main/python/ml/normalizer_example.py @@ -17,18 +17,16 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.feature import Normalizer # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="NormalizerExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("NormalizerExample").getOrCreate() # $example on$ - dataFrame = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") + dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") # Normalize each Vector using $L^1$ norm. normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0) @@ -40,4 +38,4 @@ if __name__ == "__main__": lInfNormData.show() # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/onehot_encoder_example.py b/examples/src/main/python/ml/onehot_encoder_example.py index 0f94c26638..9acc363dc9 100644 --- a/examples/src/main/python/ml/onehot_encoder_example.py +++ b/examples/src/main/python/ml/onehot_encoder_example.py @@ -17,18 +17,16 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.feature import OneHotEncoder, StringIndexer # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="OneHotEncoderExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("OneHotEncoderExample").getOrCreate() # $example on$ - df = sqlContext.createDataFrame([ + df = spark.createDataFrame([ (0, "a"), (1, "b"), (2, "c"), @@ -45,4 +43,4 @@ if __name__ == "__main__": encoded.select("id", "categoryVec").show() # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/pca_example.py b/examples/src/main/python/ml/pca_example.py index a17181f1b8..adab151734 100644 --- a/examples/src/main/python/ml/pca_example.py +++ b/examples/src/main/python/ml/pca_example.py @@ -17,26 +17,24 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.feature import PCA from pyspark.mllib.linalg import Vectors # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="PCAExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("PCAExample").getOrCreate() # $example on$ data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),), (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),), (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)] - df = sqlContext.createDataFrame(data, ["features"]) + df = spark.createDataFrame(data, ["features"]) pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures") model = pca.fit(df) result = model.transform(df).select("pcaFeatures") result.show(truncate=False) # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/pipeline_example.py b/examples/src/main/python/ml/pipeline_example.py index 3288568f0c..ed9765d961 100644 --- a/examples/src/main/python/ml/pipeline_example.py +++ b/examples/src/main/python/ml/pipeline_example.py @@ -18,21 +18,20 @@ """ Pipeline Example. """ -from pyspark import SparkContext, SQLContext + # $example on$ from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF, Tokenizer # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - - sc = SparkContext(appName="PipelineExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("PipelineExample").getOrCreate() # $example on$ # Prepare training documents from a list of (id, text, label) tuples. - training = sqlContext.createDataFrame([ + training = spark.createDataFrame([ (0L, "a b c d e spark", 1.0), (1L, "b d", 0.0), (2L, "spark f g h", 1.0), @@ -48,7 +47,7 @@ if __name__ == "__main__": model = pipeline.fit(training) # Prepare test documents, which are unlabeled (id, text) tuples. - test = sqlContext.createDataFrame([ + test = spark.createDataFrame([ (4L, "spark i j k"), (5L, "l m n"), (6L, "mapreduce spark"), @@ -61,4 +60,4 @@ if __name__ == "__main__": print(row) # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/polynomial_expansion_example.py b/examples/src/main/python/ml/polynomial_expansion_example.py index 89f5cbe8f2..328b559320 100644 --- a/examples/src/main/python/ml/polynomial_expansion_example.py +++ b/examples/src/main/python/ml/polynomial_expansion_example.py @@ -17,19 +17,17 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.feature import PolynomialExpansion from pyspark.mllib.linalg import Vectors # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="PolynomialExpansionExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("PolynomialExpansionExample").getOrCreate() # $example on$ - df = sqlContext\ + df = spark\ .createDataFrame([(Vectors.dense([-2.0, 2.3]),), (Vectors.dense([0.0, 0.0]),), (Vectors.dense([0.6, -1.1]),)], @@ -40,4 +38,4 @@ if __name__ == "__main__": print(expanded) # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/random_forest_classifier_example.py b/examples/src/main/python/ml/random_forest_classifier_example.py index c3570438c5..b0a93e050c 100644 --- a/examples/src/main/python/ml/random_forest_classifier_example.py +++ b/examples/src/main/python/ml/random_forest_classifier_example.py @@ -20,21 +20,20 @@ Random Forest Classifier Example. """ from __future__ import print_function -from pyspark import SparkContext, SQLContext # $example on$ from pyspark.ml import Pipeline from pyspark.ml.classification import RandomForestClassifier from pyspark.ml.feature import StringIndexer, VectorIndexer from pyspark.ml.evaluation import MulticlassClassificationEvaluator # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="random_forest_classifier_example") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("random_forest_classifier_example").getOrCreate() # $example on$ # Load and parse the data file, converting it to a DataFrame. - data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") + data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") # Index labels, adding metadata to the label column. # Fit on whole dataset to include all labels in index. @@ -72,4 +71,4 @@ if __name__ == "__main__": print(rfModel) # summary only # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/random_forest_regressor_example.py b/examples/src/main/python/ml/random_forest_regressor_example.py index b77014f379..4bb84f0de8 100644 --- a/examples/src/main/python/ml/random_forest_regressor_example.py +++ b/examples/src/main/python/ml/random_forest_regressor_example.py @@ -20,21 +20,20 @@ Random Forest Regressor Example. """ from __future__ import print_function -from pyspark import SparkContext, SQLContext # $example on$ from pyspark.ml import Pipeline from pyspark.ml.regression import RandomForestRegressor from pyspark.ml.feature import VectorIndexer from pyspark.ml.evaluation import RegressionEvaluator # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="random_forest_regressor_example") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("random_forest_regressor_example").getOrCreate() # $example on$ # Load and parse the data file, converting it to a DataFrame. - data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") + data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") # Automatically identify categorical features, and index them. # Set maxCategories so features with > 4 distinct values are treated as continuous. @@ -69,4 +68,4 @@ if __name__ == "__main__": print(rfModel) # summary only # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/rformula_example.py b/examples/src/main/python/ml/rformula_example.py index b544a14700..45cc116ac2 100644 --- a/examples/src/main/python/ml/rformula_example.py +++ b/examples/src/main/python/ml/rformula_example.py @@ -17,18 +17,16 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.feature import RFormula # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="RFormulaExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("RFormulaExample").getOrCreate() # $example on$ - dataset = sqlContext.createDataFrame( + dataset = spark.createDataFrame( [(7, "US", 18, 1.0), (8, "CA", 12, 0.0), (9, "NZ", 15, 0.0)], @@ -41,4 +39,4 @@ if __name__ == "__main__": output.select("features", "label").show() # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/simple_text_classification_pipeline.py b/examples/src/main/python/ml/simple_text_classification_pipeline.py index b4f06bf888..3600c12211 100644 --- a/examples/src/main/python/ml/simple_text_classification_pipeline.py +++ b/examples/src/main/python/ml/simple_text_classification_pipeline.py @@ -17,11 +17,10 @@ from __future__ import print_function -from pyspark import SparkContext from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF, Tokenizer -from pyspark.sql import Row, SQLContext +from pyspark.sql import Row, SparkSession """ @@ -34,16 +33,15 @@ pipeline in Python. Run with: if __name__ == "__main__": - sc = SparkContext(appName="SimpleTextClassificationPipeline") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("SimpleTextClassificationPipeline").getOrCreate() # Prepare training documents, which are labeled. - LabeledDocument = Row("id", "text", "label") - training = sc.parallelize([(0, "a b c d e spark", 1.0), - (1, "b d", 0.0), - (2, "spark f g h", 1.0), - (3, "hadoop mapreduce", 0.0)]) \ - .map(lambda x: LabeledDocument(*x)).toDF() + training = spark.createDataFrame([ + (0, "a b c d e spark", 1.0), + (1, "b d", 0.0), + (2, "spark f g h", 1.0), + (3, "hadoop mapreduce", 0.0) + ], ["id", "text", "label"]) # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr. tokenizer = Tokenizer(inputCol="text", outputCol="words") @@ -55,12 +53,12 @@ if __name__ == "__main__": model = pipeline.fit(training) # Prepare test documents, which are unlabeled. - Document = Row("id", "text") - test = sc.parallelize([(4, "spark i j k"), - (5, "l m n"), - (6, "spark hadoop spark"), - (7, "apache hadoop")]) \ - .map(lambda x: Document(*x)).toDF() + test = spark.createDataFrame([ + (4, "spark i j k"), + (5, "l m n"), + (6, "spark hadoop spark"), + (7, "apache hadoop") + ], ["id", "text"]) # Make predictions on test documents and print columns of interest. prediction = model.transform(test) @@ -68,4 +66,4 @@ if __name__ == "__main__": for row in selected.collect(): print(row) - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/sql_transformer.py b/examples/src/main/python/ml/sql_transformer.py index 9575d728d8..26045db4be 100644 --- a/examples/src/main/python/ml/sql_transformer.py +++ b/examples/src/main/python/ml/sql_transformer.py @@ -17,18 +17,16 @@ from __future__ import print_function -from pyspark import SparkContext # $example on$ from pyspark.ml.feature import SQLTransformer # $example off$ -from pyspark.sql import SQLContext +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="SQLTransformerExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("SQLTransformerExample").getOrCreate() # $example on$ - df = sqlContext.createDataFrame([ + df = spark.createDataFrame([ (0, 1.0, 3.0), (2, 2.0, 5.0) ], ["id", "v1", "v2"]) @@ -37,4 +35,4 @@ if __name__ == "__main__": sqlTrans.transform(df).show() # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/standard_scaler_example.py b/examples/src/main/python/ml/standard_scaler_example.py index ae7aa85005..c50804f6bf 100644 --- a/examples/src/main/python/ml/standard_scaler_example.py +++ b/examples/src/main/python/ml/standard_scaler_example.py @@ -17,18 +17,16 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.feature import StandardScaler # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="StandardScalerExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("StandardScalerExample").getOrCreate() # $example on$ - dataFrame = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") + dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False) @@ -40,4 +38,4 @@ if __name__ == "__main__": scaledData.show() # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/stopwords_remover_example.py b/examples/src/main/python/ml/stopwords_remover_example.py index 01f94af8ca..57362673df 100644 --- a/examples/src/main/python/ml/stopwords_remover_example.py +++ b/examples/src/main/python/ml/stopwords_remover_example.py @@ -17,18 +17,16 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.feature import StopWordsRemover # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="StopWordsRemoverExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("StopWordsRemoverExample").getOrCreate() # $example on$ - sentenceData = sqlContext.createDataFrame([ + sentenceData = spark.createDataFrame([ (0, ["I", "saw", "the", "red", "baloon"]), (1, ["Mary", "had", "a", "little", "lamb"]) ], ["label", "raw"]) @@ -37,4 +35,4 @@ if __name__ == "__main__": remover.transform(sentenceData).show(truncate=False) # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/string_indexer_example.py b/examples/src/main/python/ml/string_indexer_example.py index 58a8cb5d56..aacd4f999b 100644 --- a/examples/src/main/python/ml/string_indexer_example.py +++ b/examples/src/main/python/ml/string_indexer_example.py @@ -17,18 +17,16 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.feature import StringIndexer # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="StringIndexerExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("StringIndexerExample").getOrCreate() # $example on$ - df = sqlContext.createDataFrame( + df = spark.createDataFrame( [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")], ["id", "category"]) indexer = StringIndexer(inputCol="category", outputCol="categoryIndex") @@ -36,4 +34,4 @@ if __name__ == "__main__": indexed.show() # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/tf_idf_example.py b/examples/src/main/python/ml/tf_idf_example.py index 141324d458..25df8166ef 100644 --- a/examples/src/main/python/ml/tf_idf_example.py +++ b/examples/src/main/python/ml/tf_idf_example.py @@ -17,18 +17,16 @@ from __future__ import print_function -from pyspark import SparkContext # $example on$ from pyspark.ml.feature import HashingTF, IDF, Tokenizer # $example off$ -from pyspark.sql import SQLContext +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="TfIdfExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("TfIdfExample").getOrCreate() # $example on$ - sentenceData = sqlContext.createDataFrame([ + sentenceData = spark.createDataFrame([ (0, "Hi I heard about Spark"), (0, "I wish Java could use case classes"), (1, "Logistic regression models are neat") @@ -46,4 +44,4 @@ if __name__ == "__main__": print(features_label) # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/tokenizer_example.py b/examples/src/main/python/ml/tokenizer_example.py index ce9b225be5..5be4b4cfe3 100644 --- a/examples/src/main/python/ml/tokenizer_example.py +++ b/examples/src/main/python/ml/tokenizer_example.py @@ -17,18 +17,16 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.feature import Tokenizer, RegexTokenizer # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="TokenizerExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("TokenizerExample").getOrCreate() # $example on$ - sentenceDataFrame = sqlContext.createDataFrame([ + sentenceDataFrame = spark.createDataFrame([ (0, "Hi I heard about Spark"), (1, "I wish Java could use case classes"), (2, "Logistic,regression,models,are,neat") @@ -41,4 +39,4 @@ if __name__ == "__main__": # alternatively, pattern="\\w+", gaps(False) # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/train_validation_split.py b/examples/src/main/python/ml/train_validation_split.py index 161a200c61..2e43a0f8ae 100644 --- a/examples/src/main/python/ml/train_validation_split.py +++ b/examples/src/main/python/ml/train_validation_split.py @@ -15,13 +15,12 @@ # limitations under the License. # -from pyspark import SparkContext # $example on$ from pyspark.ml.evaluation import RegressionEvaluator from pyspark.ml.regression import LinearRegression from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit -from pyspark.sql import SQLContext # $example off$ +from pyspark.sql import SparkSession """ This example demonstrates applying TrainValidationSplit to split data @@ -32,11 +31,10 @@ Run with: """ if __name__ == "__main__": - sc = SparkContext(appName="TrainValidationSplit") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("TrainValidationSplit").getOrCreate() # $example on$ # Prepare training and test data. - data = sqlContext.read.format("libsvm")\ + data = spark.read.format("libsvm")\ .load("data/mllib/sample_linear_regression_data.txt") train, test = data.randomSplit([0.7, 0.3]) lr = LinearRegression(maxIter=10, regParam=0.1) @@ -65,4 +63,4 @@ if __name__ == "__main__": for row in prediction.take(5): print(row) # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/vector_assembler_example.py b/examples/src/main/python/ml/vector_assembler_example.py index 04f64839f1..019a9ea6f7 100644 --- a/examples/src/main/python/ml/vector_assembler_example.py +++ b/examples/src/main/python/ml/vector_assembler_example.py @@ -17,19 +17,17 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.mllib.linalg import Vectors from pyspark.ml.feature import VectorAssembler # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="VectorAssemblerExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("VectorAssemblerExample").getOrCreate() # $example on$ - dataset = sqlContext.createDataFrame( + dataset = spark.createDataFrame( [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)], ["id", "hour", "mobile", "userFeatures", "clicked"]) assembler = VectorAssembler( @@ -39,4 +37,4 @@ if __name__ == "__main__": print(output.select("features", "clicked").first()) # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/vector_indexer_example.py b/examples/src/main/python/ml/vector_indexer_example.py index 146f41c1dd..3cf5b8ebf1 100644 --- a/examples/src/main/python/ml/vector_indexer_example.py +++ b/examples/src/main/python/ml/vector_indexer_example.py @@ -17,18 +17,16 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.feature import VectorIndexer # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="VectorIndexerExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("VectorIndexerExample").getOrCreate() # $example on$ - data = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") + data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10) indexerModel = indexer.fit(data) @@ -37,4 +35,4 @@ if __name__ == "__main__": indexedData.show() # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/vector_slicer_example.py b/examples/src/main/python/ml/vector_slicer_example.py index 31a753073c..0531bcdb06 100644 --- a/examples/src/main/python/ml/vector_slicer_example.py +++ b/examples/src/main/python/ml/vector_slicer_example.py @@ -17,20 +17,18 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.feature import VectorSlicer from pyspark.mllib.linalg import Vectors from pyspark.sql.types import Row # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="VectorSlicerExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("VectorSlicerExample").getOrCreate() # $example on$ - df = sqlContext.createDataFrame([ + df = spark.createDataFrame([ Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3}),), Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]),)]) @@ -41,4 +39,4 @@ if __name__ == "__main__": output.select("userFeatures", "features").show() # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/ml/word2vec_example.py b/examples/src/main/python/ml/word2vec_example.py index 53c77feb10..6766a7b6aa 100644 --- a/examples/src/main/python/ml/word2vec_example.py +++ b/examples/src/main/python/ml/word2vec_example.py @@ -17,19 +17,17 @@ from __future__ import print_function -from pyspark import SparkContext -from pyspark.sql import SQLContext # $example on$ from pyspark.ml.feature import Word2Vec # $example off$ +from pyspark.sql import SparkSession if __name__ == "__main__": - sc = SparkContext(appName="Word2VecExample") - sqlContext = SQLContext(sc) + spark = SparkSession.builder.appName("Word2VecExample").getOrCreate() # $example on$ # Input data: Each row is a bag of words from a sentence or document. - documentDF = sqlContext.createDataFrame([ + documentDF = spark.createDataFrame([ ("Hi I heard about Spark".split(" "), ), ("I wish Java could use case classes".split(" "), ), ("Logistic regression models are neat".split(" "), ) @@ -42,4 +40,4 @@ if __name__ == "__main__": print(feature) # $example off$ - sc.stop() + spark.stop() diff --git a/examples/src/main/python/mllib/binary_classification_metrics_example.py b/examples/src/main/python/mllib/binary_classification_metrics_example.py index 4e7ea289b2..8f0fc9d45d 100644 --- a/examples/src/main/python/mllib/binary_classification_metrics_example.py +++ b/examples/src/main/python/mllib/binary_classification_metrics_example.py @@ -18,7 +18,7 @@ Binary Classification Metrics Example. """ from __future__ import print_function -from pyspark import SparkContext, SQLContext +from pyspark import SparkContext # $example on$ from pyspark.mllib.classification import LogisticRegressionWithLBFGS from pyspark.mllib.evaluation import BinaryClassificationMetrics @@ -27,7 +27,7 @@ from pyspark.mllib.util import MLUtils if __name__ == "__main__": sc = SparkContext(appName="BinaryClassificationMetricsExample") - sqlContext = SQLContext(sc) + # $example on$ # Several of the methods available in scala are currently missing from pyspark # Load training data in LIBSVM format @@ -52,3 +52,5 @@ if __name__ == "__main__": # Area under ROC curve print("Area under ROC = %s" % metrics.areaUnderROC) # $example off$ + + sc.stop() diff --git a/examples/src/main/python/sql.py b/examples/src/main/python/sql.py index ea6a22dbfe..59a46cb283 100644 --- a/examples/src/main/python/sql.py +++ b/examples/src/main/python/sql.py @@ -63,7 +63,7 @@ if __name__ == "__main__": # |-- age: long (nullable = true) # |-- name: string (nullable = true) - # Register this DataFrame as a table. + # Register this DataFrame as a temporary table. people.registerTempTable("people") # SQL statements can be run by using the sql methods provided by sqlContext diff --git a/examples/src/main/python/streaming/sql_network_wordcount.py b/examples/src/main/python/streaming/sql_network_wordcount.py index 1ba5e9fb78..588cbfee14 100644 --- a/examples/src/main/python/streaming/sql_network_wordcount.py +++ b/examples/src/main/python/streaming/sql_network_wordcount.py @@ -33,13 +33,14 @@ import sys from pyspark import SparkContext from pyspark.streaming import StreamingContext -from pyspark.sql import SQLContext, Row +from pyspark.sql import Row, SparkSession -def getSqlContextInstance(sparkContext): - if ('sqlContextSingletonInstance' not in globals()): - globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext) - return globals()['sqlContextSingletonInstance'] +def getSparkSessionInstance(sparkConf): + if ('sparkSessionSingletonInstance' not in globals()): + globals()['sparkSessionSingletonInstance'] =\ + SparkSession.builder.config(conf=sparkConf).getOrCreate() + return globals()['sparkSessionSingletonInstance'] if __name__ == "__main__": @@ -60,19 +61,19 @@ if __name__ == "__main__": print("========= %s =========" % str(time)) try: - # Get the singleton instance of SQLContext - sqlContext = getSqlContextInstance(rdd.context) + # Get the singleton instance of SparkSession + spark = getSparkSessionInstance(rdd.context.getConf()) # Convert RDD[String] to RDD[Row] to DataFrame rowRdd = rdd.map(lambda w: Row(word=w)) - wordsDataFrame = sqlContext.createDataFrame(rowRdd) + wordsDataFrame = spark.createDataFrame(rowRdd) # Register as table wordsDataFrame.registerTempTable("words") # Do word count on table using SQL and print it wordCountsDataFrame = \ - sqlContext.sql("select word, count(*) as total from words group by word") + spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() except: pass |