--- layout: global displayTitle: SparkR (R on Spark) title: SparkR (R on Spark) --- * This will become a table of contents (this text will be scraped). {:toc} # Overview SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. In Spark {{site.SPARK_VERSION}}, SparkR provides a distributed data frame implementation that supports operations like selection, filtering, aggregation etc. (similar to R data frames, [dplyr](https://github.com/hadley/dplyr)) but on large datasets. # SparkR DataFrames A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing local R data frames. All of the examples on this page use sample data included in R or the Spark distribution and can be run using the `./bin/sparkR` shell. ## Starting Up: SparkContext, SQLContext
The entry point into SparkR is the `SparkContext` which connects your R program to a Spark cluster. You can create a `SparkContext` using `sparkR.init` and pass in options such as the application name , any spark packages depended on, etc. Further, to work with DataFrames we will need a `SQLContext`, which can be created from the SparkContext. If you are working from the SparkR shell, the `SQLContext` and `SparkContext` should already be created for you. {% highlight r %} sc <- sparkR.init() sqlContext <- sparkRSQL.init(sc) {% endhighlight %}
## Creating DataFrames With a `SQLContext`, applications can create `DataFrame`s from a local R data frame, from a [Hive table](sql-programming-guide.html#hive-tables), or from other [data sources](sql-programming-guide.html#data-sources). ### From local data frames The simplest way to create a data frame is to convert a local R data frame into a SparkR DataFrame. Specifically we can use `createDataFrame` and pass in the local R data frame to create a SparkR DataFrame. As an example, the following creates a `DataFrame` based using the `faithful` dataset from R.
{% highlight r %} df <- createDataFrame(sqlContext, faithful) # Displays the content of the DataFrame to stdout head(df) ## eruptions waiting ##1 3.600 79 ##2 1.800 54 ##3 3.333 74 {% endhighlight %}
### From Data Sources SparkR supports operating on a variety of data sources through the `DataFrame` interface. This section describes the general methods for loading and saving data using Data Sources. You can check the Spark SQL programming guide for more [specific options](sql-programming-guide.html#manually-specifying-options) that are available for the built-in data sources. The general method for creating DataFrames from data sources is `read.df`. This method takes in the `SQLContext`, the path for the file to load and the type of data source. SparkR supports reading JSON and Parquet files natively and through [Spark Packages](http://spark-packages.org/) you can find data source connectors for popular file formats like [CSV](http://spark-packages.org/package/databricks/spark-csv) and [Avro](http://spark-packages.org/package/databricks/spark-avro). These packages can either be added by specifying `--packages` with `spark-submit` or `sparkR` commands, or if creating context through `init` you can specify the packages with the `packages` argument.
{% highlight r %} sc <- sparkR.init(sparkPackages="com.databricks:spark-csv_2.11:1.0.3") sqlContext <- sparkRSQL.init(sc) {% endhighlight %}
We can see how to use data sources using an example JSON input file. Note that the file that is used here is _not_ a typical JSON file. Each line in the file must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail.
{% highlight r %} people <- read.df(sqlContext, "./examples/src/main/resources/people.json", "json") head(people) ## age name ##1 NA Michael ##2 30 Andy ##3 19 Justin # SparkR automatically infers the schema from the JSON file printSchema(people) # root # |-- age: integer (nullable = true) # |-- name: string (nullable = true) {% endhighlight %}
The data sources API can also be used to save out DataFrames into multiple file formats. For example we can save the DataFrame from the previous example to a Parquet file using `write.df`
{% highlight r %} write.df(people, path="people.parquet", source="parquet", mode="overwrite") {% endhighlight %}
### From Hive tables You can also create SparkR DataFrames from Hive tables. To do this we will need to create a HiveContext which can access tables in the Hive MetaStore. Note that Spark should have been built with [Hive support](building-spark.html#building-with-hive-and-jdbc-support) and more details on the difference between SQLContext and HiveContext can be found in the [SQL programming guide](sql-programming-guide.html#starting-point-sqlcontext).
{% highlight r %} # sc is an existing SparkContext. hiveContext <- sparkRHive.init(sc) sql(hiveContext, "CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sql(hiveContext, "LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries can be expressed in HiveQL. results <- sql(hiveContext, "FROM src SELECT key, value") # results is now a DataFrame head(results) ## key value ## 1 238 val_238 ## 2 86 val_86 ## 3 311 val_311 {% endhighlight %}
## DataFrame Operations SparkR DataFrames support a number of functions to do structured data processing. Here we include some basic examples and a complete list can be found in the [API](api/R/index.html) docs: ### Selecting rows, columns
{% highlight r %} # Create the DataFrame df <- createDataFrame(sqlContext, faithful) # Get basic information about the DataFrame df ## DataFrame[eruptions:double, waiting:double] # Select only the "eruptions" column head(select(df, df$eruptions)) ## eruptions ##1 3.600 ##2 1.800 ##3 3.333 # You can also pass in column name as strings head(select(df, "eruptions")) # Filter the DataFrame to only retain rows with wait times shorter than 50 mins head(filter(df, df$waiting < 50)) ## eruptions waiting ##1 1.750 47 ##2 1.750 47 ##3 1.867 48 {% endhighlight %}
### Grouping, Aggregation SparkR data frames support a number of commonly used functions to aggregate data after grouping. For example we can compute a histogram of the `waiting` time in the `faithful` dataset as shown below
{% highlight r %} # We use the `n` operator to count the number of times each waiting time appears head(summarize(groupBy(df, df$waiting), count = n(df$waiting))) ## waiting count ##1 81 13 ##2 60 6 ##3 68 1 # We can also sort the output from the aggregation to get the most common waiting times waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting)) head(arrange(waiting_counts, desc(waiting_counts$count))) ## waiting count ##1 78 15 ##2 83 14 ##3 81 13 {% endhighlight %}
### Operating on Columns SparkR also provides a number of functions that can directly applied to columns for data processing and during aggregation. The example below shows the use of basic arithmetic functions.
{% highlight r %} # Convert waiting time from hours to seconds. # Note that we can assign this to a new column in the same DataFrame df$waiting_secs <- df$waiting * 60 head(df) ## eruptions waiting waiting_secs ##1 3.600 79 4740 ##2 1.800 54 3240 ##3 3.333 74 4440 {% endhighlight %}
## Running SQL Queries from SparkR A SparkR DataFrame can also be registered as a temporary table in Spark SQL and registering a DataFrame as a table allows you to run SQL queries over its data. The `sql` function enables applications to run SQL queries programmatically and returns the result as a `DataFrame`.
{% highlight r %} # Load a JSON file people <- read.df(sqlContext, "./examples/src/main/resources/people.json", "json") # Register this DataFrame as a table. registerTempTable(people, "people") # SQL statements can be run by using the sql method teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= 19") head(teenagers) ## name ##1 Justin {% endhighlight %}