summaryrefslogblamecommitdiff
path: root/examples.md
blob: b4ac734603169bdb9c432dc19159df980c7e3f64 (plain) (tree)
1
2
3
4
5
6
7
8
9







                     
                              
 


                                                                                               








                                                                                                                                                            
 
                         
 

                                                                                                                                                     
 
                         

                                                                         
                                                              
     
 
                         












































                                                                                                                                                                                                                                                                                                                                                                                                              
 
                         

                                                                         
                                                              
     
 
                         





                                             
 





                                                            
 











                                                     
 































                                                                                                                      
 
                         

                                                                         
                                                              
     
 
                         



                                             
 

































































                                                                                                       
 
                         

                                                                         
                                                              
     
 
                         































































































                                                                                                                                                           

                         

                                                                         
                                                              
     
 
                         





































































                                                                          
 
                         
                            


                                                    
                                                                                                                                                                                                                                                                                                                                       
                                                                                                                                                                                                                                                                     
---
layout: global
title: Examples
type: "page singular"
navigation:
  weight: 4
  show: true
---
<h2>Apache Spark Examples</h2>

These examples give a quick overview of the Spark API.
Spark is built on the concept of <em>distributed datasets</em>, which contain arbitrary Java or
Python objects. You create a dataset from external data, then apply parallel operations
to it. The building block of the Spark API is its [RDD API](http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds).
In the RDD API,
there are two types of operations: <em>transformations</em>, which define a new dataset based on previous ones,
and <em>actions</em>, which kick off a job to execute on a cluster.
On top of Spark’s RDD API, high level APIs are provided, e.g.
[DataFrame API](http://spark.apache.org/docs/latest/sql-programming-guide.html#dataframes) and
[Machine Learning API](http://spark.apache.org/docs/latest/mllib-guide.html).
These high level APIs provide a concise way to conduct certain data operations.
In this page, we will show examples using RDD API as well as examples using high level APIs.

<h2>RDD API Examples</h2>

<h3>Word Count</h3>
<p>In this example, we use a few transformations to build a dataset of (String, Int) pairs called <code>counts</code> and then save it to a file.</p>

<ul class="nav nav-tabs">
  <li class="lang-tab lang-tab-python active"><a href="#">Python</a></li>
  <li class="lang-tab lang-tab-scala"><a href="#">Scala</a></li>
  <li class="lang-tab lang-tab-java"><a href="#">Java</a></li>
</ul>

<div class="tab-content">
<div class="tab-pane tab-pane-python active">
<div class="code code-tab">
{% highlight python %}
text_file = sc.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")
{% endhighlight %}
</div>
</div>

<div class="tab-pane tab-pane-scala">
<div class="code code-tab">
{% highlight scala %}
val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")
{% endhighlight %}
</div>
</div>

<div class="tab-pane tab-pane-java">
<div class="code code-tab">
{% highlight java %}
JavaRDD<String> textFile = sc.textFile("hdfs://...");
JavaRDD<String> words = textFile.flatMap(new FlatMapFunction<String, String>() {
  public Iterable<String> call(String s) { return Arrays.asList(s.split(" ")); }
});
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
  public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); }
});
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});
counts.saveAsTextFile("hdfs://...");
{% endhighlight %}
</div>
</div>
</div>

<h3>Pi Estimation</h3>
<p>Spark can also be used for compute-intensive tasks. This code estimates <span style="font-family: serif; font-size: 120%;">π</span> by "throwing darts" at a circle. We pick random points in the unit square ((0, 0) to (1,1)) and see how many fall in the unit circle. The fraction should be <span style="font-family: serif; font-size: 120%;">π / 4</span>, so we use this to get our estimate.</p>

<ul class="nav nav-tabs">
  <li class="lang-tab lang-tab-python active"><a href="#">Python</a></li>
  <li class="lang-tab lang-tab-scala"><a href="#">Scala</a></li>
  <li class="lang-tab lang-tab-java"><a href="#">Java</a></li>
</ul>

<div class="tab-content">
<div class="tab-pane tab-pane-python active">
<div class="code code-tab">
{% highlight python %}
def sample(p):
    x, y = random(), random()
    return 1 if x*x + y*y < 1 else 0

count = sc.parallelize(xrange(0, NUM_SAMPLES)).map(sample) \
             .reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)
{% endhighlight %}
</div>
</div>

<div class="tab-pane tab-pane-scala">
<div class="code code-tab">
{% highlight scala %}
val count = sc.parallelize(1 to NUM_SAMPLES).map{i =>
  val x = Math.random()
  val y = Math.random()
  if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)
{% endhighlight %}
</div>
</div>

<div class="tab-pane tab-pane-java">
<div class="code code-tab">
{% highlight java %}
List<Integer> l = new ArrayList<Integer>(NUM_SAMPLES);
for (int i = 0; i < NUM_SAMPLES; i++) {
  l.add(i);
}

long count = sc.parallelize(l).filter(new Function<Integer, Boolean>() {
  public Boolean call(Integer i) {
    double x = Math.random();
    double y = Math.random();
    return x*x + y*y < 1;
  }
}).count();
System.out.println("Pi is roughly " + 4.0 * count / NUM_SAMPLES);
{% endhighlight %}
</div>
</div>
</div>

<h2>DataFrame API Examples</h2>
<p>
In Spark, a <a href="http://spark.apache.org/docs/latest/sql-programming-guide.html#dataframes">DataFrame</a>
is a distributed collection of data organized into named columns.
Users can use DataFrame API to perform various relational operations on both external
data sources and Spark’s built-in distributed collections without providing specific procedures for processing data.
Also, programs based on DataFrame API will be automatically optimized by Spark’s built-in optimizer, Catalyst.
</p>

<h3>Text Search</h3>
<p>In this example, we search through the error messages in a log file.</p>

<ul class="nav nav-tabs">
  <li class="lang-tab lang-tab-python active"><a href="#">Python</a></li>
  <li class="lang-tab lang-tab-scala"><a href="#">Scala</a></li>
  <li class="lang-tab lang-tab-java"><a href="#">Java</a></li>
</ul>

<div class="tab-content">
<div class="tab-pane tab-pane-python active">
<div class="code code-tab">
{% highlight python %}
textFile = sc.textFile("hdfs://...")

# Creates a DataFrame having a single column named "line"
df = textFile.map(lambda r: Row(r)).toDF(["line"])
errors = df.filter(col("line").like("%ERROR%"))
# Counts all the errors
errors.count()
# Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
# Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect()
{% endhighlight %}
</div>
</div>

<div class="tab-pane tab-pane-scala">
<div class="code code-tab">
{% highlight scala %}
val textFile = sc.textFile("hdfs://...")

// Creates a DataFrame having a single column named "line"
val df = textFile.toDF("line")
val errors = df.filter(col("line").like("%ERROR%"))
// Counts all the errors
errors.count()
// Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count()
// Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect()
{% endhighlight %}
</div>
</div>

<div class="tab-pane tab-pane-java">
<div class="code code-tab">
{% highlight java %}
// Creates a DataFrame having a single column named "line"
JavaRDD<String> textFile = sc.textFile("hdfs://...");
JavaRDD<Row> rowRDD = textFile.map(
  new Function<String, Row>() {
    public Row call(String line) throws Exception {
      return RowFactory.create(line);
    }
  });
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("line", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);

DataFrame errors = df.filter(col("line").like("%ERROR%"));
// Counts all the errors
errors.count();
// Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count();
// Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect();
{% endhighlight %}
</div>
</div>
</div>

<h3>Simple Data Operations</h3>
<p>
In this example, we read a table stored in a database and calculate the number of people for every age.
Finally, we save the calculated result to S3 in the format of JSON.
A simple MySQL table "people" is used in the example and this table has two columns,
"name" and "age".
</p>

<ul class="nav nav-tabs">
  <li class="lang-tab lang-tab-python active"><a href="#">Python</a></li>
  <li class="lang-tab lang-tab-scala"><a href="#">Scala</a></li>
  <li class="lang-tab lang-tab-java"><a href="#">Java</a></li>
</ul>

<div class="tab-content">
<div class="tab-pane tab-pane-python active">
<div class="code code-tab">
{% highlight python %}
# Creates a DataFrame based on a table named "people"
# stored in a MySQL database.
url = \
  "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
df = sqlContext \
  .read \
  .format("jdbc") \
  .option("url", url) \
  .option("dbtable", "people") \
  .load()

# Looks the schema of this DataFrame.
df.printSchema()

# Counts people by age
countsByAge = df.groupBy("age").count()
countsByAge.show()

# Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")
{% endhighlight %}
</div>
</div>

<div class="tab-pane tab-pane-scala">
<div class="code code-tab">
{% highlight scala %}
// Creates a DataFrame based on a table named "people"
// stored in a MySQL database.
val url =
  "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
val df = sqlContext
  .read
  .format("jdbc")
  .option("url", url)
  .option("dbtable", "people")
  .load()

// Looks the schema of this DataFrame.
df.printSchema()

// Counts people by age
val countsByAge = df.groupBy("age").count()
countsByAge.show()

// Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")
{% endhighlight %}
</div>
</div>

<div class="tab-pane tab-pane-java">
<div class="code code-tab">
{% highlight java %}
// Creates a DataFrame based on a table named "people"
// stored in a MySQL database.
String url =
  "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword";
DataFrame df = sqlContext
  .read()
  .format("jdbc")
  .option("url", url)
  .option("dbtable", "people")
  .load();

// Looks the schema of this DataFrame.
df.printSchema();

// Counts people by age
DataFrame countsByAge = df.groupBy("age").count();
countsByAge.show();

// Saves countsByAge to S3 in the JSON format.
countsByAge.write().format("json").save("s3a://...");
{% endhighlight %}
</div>
</div>
</div>

<h2>Machine Learning Example</h2>
<p>
<a href="http://spark.apache.org/docs/latest/mllib-guide.html">MLlib</a>, Spark’s Machine Learning (ML) library, provides many distributed ML algorithms.
These algorithms cover tasks such as feature extraction, classification, regression, clustering,
recommendation, and more. 
MLlib also provides tools such as ML Pipelines for building workflows, CrossValidator for tuning parameters,
and model persistence for saving and loading models.
</p>

<h3>Prediction with Logistic Regression</h3>
<p>
In this example, we take a dataset of labels and feature vectors.
We learn to predict the labels from feature vectors using the Logistic Regression algorithm.
</p>

<ul class="nav nav-tabs">
  <li class="lang-tab lang-tab-python active"><a href="#">Python</a></li>
  <li class="lang-tab lang-tab-scala"><a href="#">Scala</a></li>
  <li class="lang-tab lang-tab-java"><a href="#">Java</a></li>
</ul>

<div class="tab-content">
<div class="tab-pane tab-pane-python active">
<div class="code code-tab">
{% highlight python %}
# Every record of this DataFrame contains the label and
# features represented by a vector.
df = sqlContext.createDataFrame(data, ["label", "features"])

# Set parameters for the algorithm.
# Here, we limit the number of iterations to 10.
lr = LogisticRegression(maxIter=10)

# Fit the model to the data.
model = lr.fit(df)

# Given a dataset, predict each point's label, and show the results.
model.transform(df).show()
{% endhighlight %}
</div>
</div>

<div class="tab-pane tab-pane-scala">
<div class="code code-tab">
{% highlight scala %}
// Every record of this DataFrame contains the label and
// features represented by a vector.
val df = sqlContext.createDataFrame(data).toDF("label", "features")

// Set parameters for the algorithm.
// Here, we limit the number of iterations to 10.
val lr = new LogisticRegression().setMaxIter(10)

// Fit the model to the data.
val model = lr.fit(df)

// Inspect the model: get the feature weights.
val weights = model.weights

// Given a dataset, predict each point's label, and show the results.
model.transform(df).show()
{% endhighlight %}
</div>
</div>

<div class="tab-pane tab-pane-java">
<div class="code code-tab">
{% highlight java %}
// Every record of this DataFrame contains the label and
// features represented by a vector.
StructType schema = new StructType(new StructField[]{
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
DataFrame df = jsql.createDataFrame(data, schema);

// Set parameters for the algorithm.
// Here, we limit the number of iterations to 10.
LogisticRegression lr = new LogisticRegression().setMaxIter(10);

// Fit the model to the data.
LogisticRegressionModel model = lr.fit(df);

// Inspect the model: get the feature weights.
Vector weights = model.weights();

// Given a dataset, predict each point's label, and show the results.
model.transform(df).show();
{% endhighlight %}
</div>
</div>
</div>

<a name="additional"></a>
<h1>Additional Examples</h1>

Many additional examples are distributed with Spark:

 * Basic Spark: [Scala examples](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples), [Java examples](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples), [Python examples](https://github.com/apache/spark/tree/master/examples/src/main/python)
 * Spark Streaming: [Scala examples](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming), [Java examples](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples/streaming)