aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2016-04-25 10:20:51 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-04-25 10:20:51 -0700
commita680562a6f87a03a00f71bad1c424267ae75c641 (patch)
treefe08025da6437124c283e9450dd3bbc0e99c411d
parentbfda09991398ce44be91997252cf8e5ddd361737 (diff)
downloadspark-a680562a6f87a03a00f71bad1c424267ae75c641.tar.gz
spark-a680562a6f87a03a00f71bad1c424267ae75c641.tar.bz2
spark-a680562a6f87a03a00f71bad1c424267ae75c641.zip
[SPARK-14744][EXAMPLES] Clean up examples packaging, remove outdated examples.
First, make all dependencies in the examples module provided, and explicitly list a couple of ones that somehow are promoted to compile by maven. This means that to run streaming examples, the streaming connector package needs to be provided to run-examples using --packages or --jars, just like regular apps. Also, remove a couple of outdated examples. HBase has had Spark bindings for a while and is even including them in the HBase distribution in the next version, making the examples obsolete. The same applies to Cassandra, which seems to have a proper Spark binding library already. I just tested the build, which passes, and ran SparkPi. The examples jars directory now has only two jars: ``` $ ls -1 examples/target/scala-2.11/jars/ scopt_2.11-3.3.0.jar spark-examples_2.11-2.0.0-SNAPSHOT.jar ``` Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #12544 from vanzin/SPARK-14744.
-rw-r--r--examples/pom.xml229
-rw-r--r--examples/src/main/python/cassandra_inputformat.py84
-rw-r--r--examples/src/main/python/cassandra_outputformat.py88
-rw-r--r--examples/src/main/python/hbase_inputformat.py90
-rw-r--r--examples/src/main/python/hbase_outputformat.py73
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala135
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala215
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala65
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala70
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala84
-rw-r--r--pom.xml8
11 files changed, 16 insertions, 1125 deletions
diff --git a/examples/pom.xml b/examples/pom.xml
index fcd60e3b77..43f3d2e4de 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -35,6 +35,10 @@
<sbt.project.name>examples</sbt.project.name>
<build.testJarPhase>none</build.testJarPhase>
<build.copyDependenciesPhase>package</build.copyDependenciesPhase>
+ <flume.deps.scope>provided</flume.deps.scope>
+ <hadoop.deps.scope>provided</hadoop.deps.scope>
+ <hive.deps.scope>provided</hive.deps.scope>
+ <parquet.deps.scope>provided</parquet.deps.scope>
</properties>
<dependencies>
@@ -72,131 +76,13 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
<version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-protocol</artifactId>
- <version>${hbase.version}</version>
- <scope>${hbase.deps.scope}</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-common</artifactId>
- <version>${hbase.version}</version>
- <scope>${hbase.deps.scope}</scope>
- <exclusions>
- <exclusion>
- <!-- SPARK-4455 -->
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-annotations</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>${hbase.version}</version>
- <scope>${hbase.deps.scope}</scope>
- <exclusions>
- <exclusion>
- <!-- SPARK-4455 -->
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-annotations</artifactId>
- </exclusion>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <version>${hbase.version}</version>
- <scope>${hbase.deps.scope}</scope>
- <exclusions>
- <exclusion>
- <!-- SPARK-4455 -->
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-annotations</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-common</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-auth</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-annotations</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-hadoop1-compat</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-math</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-server</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-json</artifactId>
- </exclusion>
- <exclusion>
- <!-- hbase uses v2.4, which is better, but ...-->
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-hadoop-compat</artifactId>
- <version>${hbase.version}</version>
- <scope>${hbase.deps.scope}</scope>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
@@ -204,79 +90,25 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>com.twitter</groupId>
- <artifactId>algebird-core_${scala.binary.version}</artifactId>
- <version>0.11.0</version>
- </dependency>
- <dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.cassandra</groupId>
- <artifactId>cassandra-all</artifactId>
- <version>1.2.19</version>
- <exclusions>
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.googlecode.concurrentlinkedhashmap</groupId>
- <artifactId>concurrentlinkedhashmap-lru</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.ning</groupId>
- <artifactId>compress-lzf</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>jline</groupId>
- <artifactId>jline</artifactId>
- </exclusion>
- <exclusion>
- <groupId>net.jpountz.lz4</groupId>
- <artifactId>lz4</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.cassandra.deps</groupId>
- <artifactId>avro</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-math3</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.scopt</groupId>
<artifactId>scopt_${scala.binary.version}</artifactId>
<version>3.3.0</version>
</dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>parquet-hadoop-bundle</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
@@ -314,40 +146,9 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
</dependencies>
</profile>
-
- <!-- Profiles that disable inclusion of certain dependencies. -->
- <profile>
- <id>flume-provided</id>
- <properties>
- <flume.deps.scope>provided</flume.deps.scope>
- </properties>
- </profile>
- <profile>
- <id>hadoop-provided</id>
- <properties>
- <hadoop.deps.scope>provided</hadoop.deps.scope>
- </properties>
- </profile>
- <profile>
- <id>hbase-provided</id>
- <properties>
- <hbase.deps.scope>provided</hbase.deps.scope>
- </properties>
- </profile>
- <profile>
- <id>hive-provided</id>
- <properties>
- <hive.deps.scope>provided</hive.deps.scope>
- </properties>
- </profile>
- <profile>
- <id>parquet-provided</id>
- <properties>
- <parquet.deps.scope>provided</parquet.deps.scope>
- </properties>
- </profile>
</profiles>
</project>
diff --git a/examples/src/main/python/cassandra_inputformat.py b/examples/src/main/python/cassandra_inputformat.py
deleted file mode 100644
index 93ca0cfcc9..0000000000
--- a/examples/src/main/python/cassandra_inputformat.py
+++ /dev/null
@@ -1,84 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import print_function
-
-import sys
-
-from pyspark import SparkContext
-
-"""
-Create data in Cassandra fist
-(following: https://wiki.apache.org/cassandra/GettingStarted)
-
-cqlsh> CREATE KEYSPACE test
- ... WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
-cqlsh> use test;
-cqlsh:test> CREATE TABLE users (
- ... user_id int PRIMARY KEY,
- ... fname text,
- ... lname text
- ... );
-cqlsh:test> INSERT INTO users (user_id, fname, lname)
- ... VALUES (1745, 'john', 'smith');
-cqlsh:test> INSERT INTO users (user_id, fname, lname)
- ... VALUES (1744, 'john', 'doe');
-cqlsh:test> INSERT INTO users (user_id, fname, lname)
- ... VALUES (1746, 'john', 'smith');
-cqlsh:test> SELECT * FROM users;
-
- user_id | fname | lname
----------+-------+-------
- 1745 | john | smith
- 1744 | john | doe
- 1746 | john | smith
-"""
-if __name__ == "__main__":
- if len(sys.argv) != 4:
- print("""
- Usage: cassandra_inputformat <host> <keyspace> <cf>
-
- Run with example jar:
- ./bin/spark-submit --driver-class-path /path/to/example/jar \
- /path/to/examples/cassandra_inputformat.py <host> <keyspace> <cf>
- Assumes you have some data in Cassandra already, running on <host>, in <keyspace> and <cf>
- """, file=sys.stderr)
- exit(-1)
-
- host = sys.argv[1]
- keyspace = sys.argv[2]
- cf = sys.argv[3]
- sc = SparkContext(appName="CassandraInputFormat")
-
- conf = {"cassandra.input.thrift.address": host,
- "cassandra.input.thrift.port": "9160",
- "cassandra.input.keyspace": keyspace,
- "cassandra.input.columnfamily": cf,
- "cassandra.input.partitioner.class": "Murmur3Partitioner",
- "cassandra.input.page.row.size": "3"}
- cass_rdd = sc.newAPIHadoopRDD(
- "org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat",
- "java.util.Map",
- "java.util.Map",
- keyConverter="org.apache.spark.examples.pythonconverters.CassandraCQLKeyConverter",
- valueConverter="org.apache.spark.examples.pythonconverters.CassandraCQLValueConverter",
- conf=conf)
- output = cass_rdd.collect()
- for (k, v) in output:
- print((k, v))
-
- sc.stop()
diff --git a/examples/src/main/python/cassandra_outputformat.py b/examples/src/main/python/cassandra_outputformat.py
deleted file mode 100644
index 5d643eac92..0000000000
--- a/examples/src/main/python/cassandra_outputformat.py
+++ /dev/null
@@ -1,88 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import print_function
-
-import sys
-
-from pyspark import SparkContext
-
-"""
-Create data in Cassandra fist
-(following: https://wiki.apache.org/cassandra/GettingStarted)
-
-cqlsh> CREATE KEYSPACE test
- ... WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
-cqlsh> use test;
-cqlsh:test> CREATE TABLE users (
- ... user_id int PRIMARY KEY,
- ... fname text,
- ... lname text
- ... );
-
-> cassandra_outputformat <host> test users 1745 john smith
-> cassandra_outputformat <host> test users 1744 john doe
-> cassandra_outputformat <host> test users 1746 john smith
-
-cqlsh:test> SELECT * FROM users;
-
- user_id | fname | lname
----------+-------+-------
- 1745 | john | smith
- 1744 | john | doe
- 1746 | john | smith
-"""
-if __name__ == "__main__":
- if len(sys.argv) != 7:
- print("""
- Usage: cassandra_outputformat <host> <keyspace> <cf> <user_id> <fname> <lname>
-
- Run with example jar:
- ./bin/spark-submit --driver-class-path /path/to/example/jar \
- /path/to/examples/cassandra_outputformat.py <args>
- Assumes you have created the following table <cf> in Cassandra already,
- running on <host>, in <keyspace>.
-
- cqlsh:<keyspace>> CREATE TABLE <cf> (
- ... user_id int PRIMARY KEY,
- ... fname text,
- ... lname text
- ... );
- """, file=sys.stderr)
- exit(-1)
-
- host = sys.argv[1]
- keyspace = sys.argv[2]
- cf = sys.argv[3]
- sc = SparkContext(appName="CassandraOutputFormat")
-
- conf = {"cassandra.output.thrift.address": host,
- "cassandra.output.thrift.port": "9160",
- "cassandra.output.keyspace": keyspace,
- "cassandra.output.partitioner.class": "Murmur3Partitioner",
- "cassandra.output.cql": "UPDATE " + keyspace + "." + cf + " SET fname = ?, lname = ?",
- "mapreduce.output.basename": cf,
- "mapreduce.outputformat.class": "org.apache.cassandra.hadoop.cql3.CqlOutputFormat",
- "mapreduce.job.output.key.class": "java.util.Map",
- "mapreduce.job.output.value.class": "java.util.List"}
- key = {"user_id": int(sys.argv[4])}
- sc.parallelize([(key, sys.argv[5:])]).saveAsNewAPIHadoopDataset(
- conf=conf,
- keyConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLKeyConverter",
- valueConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLValueConverter")
-
- sc.stop()
diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py
deleted file mode 100644
index c5ae5d043b..0000000000
--- a/examples/src/main/python/hbase_inputformat.py
+++ /dev/null
@@ -1,90 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import print_function
-
-import sys
-import json
-
-from pyspark import SparkContext
-
-"""
-Create test data in HBase first:
-
-hbase(main):016:0> create 'test', 'f1'
-0 row(s) in 1.0430 seconds
-
-hbase(main):017:0> put 'test', 'row1', 'f1:a', 'value1'
-0 row(s) in 0.0130 seconds
-
-hbase(main):018:0> put 'test', 'row1', 'f1:b', 'value2'
-0 row(s) in 0.0030 seconds
-
-hbase(main):019:0> put 'test', 'row2', 'f1', 'value3'
-0 row(s) in 0.0050 seconds
-
-hbase(main):020:0> put 'test', 'row3', 'f1', 'value4'
-0 row(s) in 0.0110 seconds
-
-hbase(main):021:0> scan 'test'
-ROW COLUMN+CELL
- row1 column=f1:a, timestamp=1401883411986, value=value1
- row1 column=f1:b, timestamp=1401883415212, value=value2
- row2 column=f1:, timestamp=1401883417858, value=value3
- row3 column=f1:, timestamp=1401883420805, value=value4
-4 row(s) in 0.0240 seconds
-"""
-if __name__ == "__main__":
- if len(sys.argv) != 3:
- print("""
- Usage: hbase_inputformat <host> <table>
-
- Run with example jar:
- ./bin/spark-submit --driver-class-path /path/to/example/jar \
- /path/to/examples/hbase_inputformat.py <host> <table> [<znode>]
- Assumes you have some data in HBase already, running on <host>, in <table>
- optionally, you can specify parent znode for your hbase cluster - <znode>
- """, file=sys.stderr)
- exit(-1)
-
- host = sys.argv[1]
- table = sys.argv[2]
- sc = SparkContext(appName="HBaseInputFormat")
-
- # Other options for configuring scan behavior are available. More information available at
- # https://github.com/apache/hbase/blob/master/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java
- conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
- if len(sys.argv) > 3:
- conf = {"hbase.zookeeper.quorum": host, "zookeeper.znode.parent": sys.argv[3],
- "hbase.mapreduce.inputtable": table}
- keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
- valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
-
- hbase_rdd = sc.newAPIHadoopRDD(
- "org.apache.hadoop.hbase.mapreduce.TableInputFormat",
- "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
- "org.apache.hadoop.hbase.client.Result",
- keyConverter=keyConv,
- valueConverter=valueConv,
- conf=conf)
- hbase_rdd = hbase_rdd.flatMapValues(lambda v: v.split("\n")).mapValues(json.loads)
-
- output = hbase_rdd.collect()
- for (k, v) in output:
- print((k, v))
-
- sc.stop()
diff --git a/examples/src/main/python/hbase_outputformat.py b/examples/src/main/python/hbase_outputformat.py
deleted file mode 100644
index 9e5641789a..0000000000
--- a/examples/src/main/python/hbase_outputformat.py
+++ /dev/null
@@ -1,73 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from __future__ import print_function
-
-import sys
-
-from pyspark import SparkContext
-
-"""
-Create test table in HBase first:
-
-hbase(main):001:0> create 'test', 'f1'
-0 row(s) in 0.7840 seconds
-
-> hbase_outputformat <host> test row1 f1 q1 value1
-> hbase_outputformat <host> test row2 f1 q1 value2
-> hbase_outputformat <host> test row3 f1 q1 value3
-> hbase_outputformat <host> test row4 f1 q1 value4
-
-hbase(main):002:0> scan 'test'
-ROW COLUMN+CELL
- row1 column=f1:q1, timestamp=1405659615726, value=value1
- row2 column=f1:q1, timestamp=1405659626803, value=value2
- row3 column=f1:q1, timestamp=1405659640106, value=value3
- row4 column=f1:q1, timestamp=1405659650292, value=value4
-4 row(s) in 0.0780 seconds
-"""
-if __name__ == "__main__":
- if len(sys.argv) != 7:
- print("""
- Usage: hbase_outputformat <host> <table> <row> <family> <qualifier> <value>
-
- Run with example jar:
- ./bin/spark-submit --driver-class-path /path/to/example/jar \
- /path/to/examples/hbase_outputformat.py <args>
- Assumes you have created <table> with column family <family> in HBase
- running on <host> already
- """, file=sys.stderr)
- exit(-1)
-
- host = sys.argv[1]
- table = sys.argv[2]
- sc = SparkContext(appName="HBaseOutputFormat")
-
- conf = {"hbase.zookeeper.quorum": host,
- "hbase.mapred.outputtable": table,
- "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
- "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
- "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
- keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
- valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
-
- sc.parallelize([sys.argv[3:]]).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset(
- conf=conf,
- keyConverter=keyConv,
- valueConverter=valueConv)
-
- sc.stop()
diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
deleted file mode 100644
index ca4eea2356..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- // scalastyle:off println
-package org.apache.spark.examples
-
-import java.nio.ByteBuffer
-import java.util.Collections
-
-import org.apache.cassandra.hadoop.ConfigHelper
-import org.apache.cassandra.hadoop.cql3.CqlConfigHelper
-import org.apache.cassandra.hadoop.cql3.CqlOutputFormat
-import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
-import org.apache.cassandra.utils.ByteBufferUtil
-import org.apache.hadoop.mapreduce.Job
-
-import org.apache.spark.{SparkConf, SparkContext}
-
-/*
- Need to create following keyspace and column family in cassandra before running this example
- Start CQL shell using ./bin/cqlsh and execute following commands
- CREATE KEYSPACE retail WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
- use retail;
- CREATE TABLE salecount (prod_id text, sale_count int, PRIMARY KEY (prod_id));
- CREATE TABLE ordercf (user_id text,
- time timestamp,
- prod_id text,
- quantity int,
- PRIMARY KEY (user_id, time));
- INSERT INTO ordercf (user_id,
- time,
- prod_id,
- quantity) VALUES ('bob', 1385983646000, 'iphone', 1);
- INSERT INTO ordercf (user_id,
- time,
- prod_id,
- quantity) VALUES ('tom', 1385983647000, 'samsung', 4);
- INSERT INTO ordercf (user_id,
- time,
- prod_id,
- quantity) VALUES ('dora', 1385983648000, 'nokia', 2);
- INSERT INTO ordercf (user_id,
- time,
- prod_id,
- quantity) VALUES ('charlie', 1385983649000, 'iphone', 2);
-*/
-
-/**
- * This example demonstrates how to read and write to cassandra column family created using CQL3
- * using Spark.
- * Parameters : <cassandra_node> <cassandra_port>
- * Usage: ./bin/spark-submit examples.jar \
- * --class org.apache.spark.examples.CassandraCQLTest localhost 9160
- */
-object CassandraCQLTest {
-
- def main(args: Array[String]) {
- val sparkConf = new SparkConf().setAppName("CQLTestApp")
-
- val sc = new SparkContext(sparkConf)
- val cHost: String = args(0)
- val cPort: String = args(1)
- val KeySpace = "retail"
- val InputColumnFamily = "ordercf"
- val OutputColumnFamily = "salecount"
-
- val job = Job.getInstance()
- job.setInputFormatClass(classOf[CqlPagingInputFormat])
- val configuration = job.getConfiguration
- ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost)
- ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort)
- ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily)
- ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
- CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3")
-
- /** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "user_id='bob'") */
-
- /** An UPDATE writes one or more columns to a record in a Cassandra column family */
- val query = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET sale_count = ? "
- CqlConfigHelper.setOutputCql(job.getConfiguration(), query)
-
- job.setOutputFormatClass(classOf[CqlOutputFormat])
- ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily)
- ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost)
- ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort)
- ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
-
- val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
- classOf[CqlPagingInputFormat],
- classOf[java.util.Map[String, ByteBuffer]],
- classOf[java.util.Map[String, ByteBuffer]])
-
- println("Count: " + casRdd.count)
- val productSaleRDD = casRdd.map {
- case (key, value) =>
- (ByteBufferUtil.string(value.get("prod_id")), ByteBufferUtil.toInt(value.get("quantity")))
- }
- val aggregatedRDD = productSaleRDD.reduceByKey(_ + _)
- aggregatedRDD.collect().foreach {
- case (productId, saleCount) => println(productId + ":" + saleCount)
- }
-
- val casoutputCF = aggregatedRDD.map {
- case (productId, saleCount) =>
- val outKey = Collections.singletonMap("prod_id", ByteBufferUtil.bytes(productId))
- val outVal = Collections.singletonList(ByteBufferUtil.bytes(saleCount))
- (outKey, outVal)
- }
-
- casoutputCF.saveAsNewAPIHadoopFile(
- KeySpace,
- classOf[java.util.Map[String, ByteBuffer]],
- classOf[java.util.List[ByteBuffer]],
- classOf[CqlOutputFormat],
- job.getConfiguration()
- )
-
- sc.stop()
- }
-}
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
deleted file mode 100644
index eff840d36e..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples
-
-import java.nio.ByteBuffer
-import java.util.Arrays
-import java.util.SortedMap
-
-import org.apache.cassandra.db.IColumn
-import org.apache.cassandra.hadoop.ColumnFamilyInputFormat
-import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat
-import org.apache.cassandra.hadoop.ConfigHelper
-import org.apache.cassandra.thrift._
-import org.apache.cassandra.utils.ByteBufferUtil
-import org.apache.hadoop.mapreduce.Job
-
-import org.apache.spark.{SparkConf, SparkContext}
-
-/*
- * This example demonstrates using Spark with Cassandra with the New Hadoop API and Cassandra
- * support for Hadoop.
- *
- * To run this example, run this file with the following command params -
- * <cassandra_node> <cassandra_port>
- *
- * So if you want to run this on localhost this will be,
- * localhost 9160
- *
- * The example makes some assumptions:
- * 1. You have already created a keyspace called casDemo and it has a column family named Words
- * 2. There are column family has a column named "para" which has test content.
- *
- * You can create the content by running the following script at the bottom of this file with
- * cassandra-cli.
- *
- */
-object CassandraTest {
-
- def main(args: Array[String]) {
- val sparkConf = new SparkConf().setAppName("casDemo")
- // Get a SparkContext
- val sc = new SparkContext(sparkConf)
-
- // Build the job configuration with ConfigHelper provided by Cassandra
- val job = Job.getInstance()
- job.setInputFormatClass(classOf[ColumnFamilyInputFormat])
-
- val host: String = args(1)
- val port: String = args(2)
-
- ConfigHelper.setInputInitialAddress(job.getConfiguration(), host)
- ConfigHelper.setInputRpcPort(job.getConfiguration(), port)
- ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host)
- ConfigHelper.setOutputRpcPort(job.getConfiguration(), port)
- ConfigHelper.setInputColumnFamily(job.getConfiguration(), "casDemo", "Words")
- ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "casDemo", "WordCount")
-
- val predicate = new SlicePredicate()
- val sliceRange = new SliceRange()
- sliceRange.setStart(Array.empty[Byte])
- sliceRange.setFinish(Array.empty[Byte])
- predicate.setSlice_range(sliceRange)
- ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate)
-
- ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
- ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
-
- // Make a new Hadoop RDD
- val casRdd = sc.newAPIHadoopRDD(
- job.getConfiguration(),
- classOf[ColumnFamilyInputFormat],
- classOf[ByteBuffer],
- classOf[SortedMap[ByteBuffer, IColumn]])
-
- // Let us first get all the paragraphs from the retrieved rows
- val paraRdd = casRdd.map {
- case (key, value) =>
- ByteBufferUtil.string(value.get(ByteBufferUtil.bytes("para")).value())
- }
-
- // Lets get the word count in paras
- val counts = paraRdd.flatMap(p => p.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
-
- counts.collect().foreach {
- case (word, count) => println(word + ":" + count)
- }
-
- counts.map {
- case (word, count) =>
- val colWord = new org.apache.cassandra.thrift.Column()
- colWord.setName(ByteBufferUtil.bytes("word"))
- colWord.setValue(ByteBufferUtil.bytes(word))
- colWord.setTimestamp(System.currentTimeMillis)
-
- val colCount = new org.apache.cassandra.thrift.Column()
- colCount.setName(ByteBufferUtil.bytes("wcount"))
- colCount.setValue(ByteBufferUtil.bytes(count.toLong))
- colCount.setTimestamp(System.currentTimeMillis)
-
- val outputkey = ByteBufferUtil.bytes(word + "-COUNT-" + System.currentTimeMillis)
-
- val mutations = Arrays.asList(new Mutation(), new Mutation())
- mutations.get(0).setColumn_or_supercolumn(new ColumnOrSuperColumn())
- mutations.get(0).column_or_supercolumn.setColumn(colWord)
- mutations.get(1).setColumn_or_supercolumn(new ColumnOrSuperColumn())
- mutations.get(1).column_or_supercolumn.setColumn(colCount)
- (outputkey, mutations)
- }.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]],
- classOf[ColumnFamilyOutputFormat], job.getConfiguration)
-
- sc.stop()
- }
-}
-// scalastyle:on println
-
-/*
-create keyspace casDemo;
-use casDemo;
-
-create column family WordCount with comparator = UTF8Type;
-update column family WordCount with column_metadata =
- [{column_name: word, validation_class: UTF8Type},
- {column_name: wcount, validation_class: LongType}];
-
-create column family Words with comparator = UTF8Type;
-update column family Words with column_metadata =
- [{column_name: book, validation_class: UTF8Type},
- {column_name: para, validation_class: UTF8Type}];
-
-assume Words keys as utf8;
-
-set Words['3musk001']['book'] = 'The Three Musketeers';
-set Words['3musk001']['para'] = 'On the first Monday of the month of April, 1625, the market
- town of Meung, in which the author of ROMANCE OF THE ROSE was born, appeared to
- be in as perfect a state of revolution as if the Huguenots had just made
- a second La Rochelle of it. Many citizens, seeing the women flying
- toward the High Street, leaving their children crying at the open doors,
- hastened to don the cuirass, and supporting their somewhat uncertain
- courage with a musket or a partisan, directed their steps toward the
- hostelry of the Jolly Miller, before which was gathered, increasing
- every minute, a compact group, vociferous and full of curiosity.';
-
-set Words['3musk002']['book'] = 'The Three Musketeers';
-set Words['3musk002']['para'] = 'In those times panics were common, and few days passed without
- some city or other registering in its archives an event of this kind. There were
- nobles, who made war against each other; there was the king, who made
- war against the cardinal; there was Spain, which made war against the
- king. Then, in addition to these concealed or public, secret or open
- wars, there were robbers, mendicants, Huguenots, wolves, and scoundrels,
- who made war upon everybody. The citizens always took up arms readily
- against thieves, wolves or scoundrels, often against nobles or
- Huguenots, sometimes against the king, but never against cardinal or
- Spain. It resulted, then, from this habit that on the said first Monday
- of April, 1625, the citizens, on hearing the clamor, and seeing neither
- the red-and-yellow standard nor the livery of the Duc de Richelieu,
- rushed toward the hostel of the Jolly Miller. When arrived there, the
- cause of the hubbub was apparent to all';
-
-set Words['3musk003']['book'] = 'The Three Musketeers';
-set Words['3musk003']['para'] = 'You ought, I say, then, to husband the means you have, however
- large the sum may be; but you ought also to endeavor to perfect yourself in
- the exercises becoming a gentleman. I will write a letter today to the
- Director of the Royal Academy, and tomorrow he will admit you without
- any expense to yourself. Do not refuse this little service. Our
- best-born and richest gentlemen sometimes solicit it without being able
- to obtain it. You will learn horsemanship, swordsmanship in all its
- branches, and dancing. You will make some desirable acquaintances; and
- from time to time you can call upon me, just to tell me how you are
- getting on, and to say whether I can be of further service to you.';
-
-
-set Words['thelostworld001']['book'] = 'The Lost World';
-set Words['thelostworld001']['para'] = 'She sat with that proud, delicate profile of hers outlined
- against the red curtain. How beautiful she was! And yet how aloof! We had been
- friends, quite good friends; but never could I get beyond the same
- comradeship which I might have established with one of my
- fellow-reporters upon the Gazette,--perfectly frank, perfectly kindly,
- and perfectly unsexual. My instincts are all against a woman being too
- frank and at her ease with me. It is no compliment to a man. Where
- the real sex feeling begins, timidity and distrust are its companions,
- heritage from old wicked days when love and violence went often hand in
- hand. The bent head, the averted eye, the faltering voice, the wincing
- figure--these, and not the unshrinking gaze and frank reply, are the
- true signals of passion. Even in my short life I had learned as much
- as that--or had inherited it in that race memory which we call instinct.';
-
-set Words['thelostworld002']['book'] = 'The Lost World';
-set Words['thelostworld002']['para'] = 'I always liked McArdle, the crabbed, old, round-backed,
- red-headed news editor, and I rather hoped that he liked me. Of course, Beaumont was
- the real boss; but he lived in the rarefied atmosphere of some Olympian
- height from which he could distinguish nothing smaller than an
- international crisis or a split in the Cabinet. Sometimes we saw him
- passing in lonely majesty to his inner sanctum, with his eyes staring
- vaguely and his mind hovering over the Balkans or the Persian Gulf. He
- was above and beyond us. But McArdle was his first lieutenant, and it
- was he that we knew. The old man nodded as I entered the room, and he
- pushed his spectacles far up on his bald forehead.';
-
-*/
diff --git a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
deleted file mode 100644
index 65d7489586..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// scalastyle:off println
-package org.apache.spark.examples
-
-import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
-import org.apache.hadoop.hbase.client.HBaseAdmin
-import org.apache.hadoop.hbase.mapreduce.TableInputFormat
-
-import org.apache.spark._
-
-object HBaseTest {
- def main(args: Array[String]) {
- val sparkConf = new SparkConf().setAppName("HBaseTest")
- val sc = new SparkContext(sparkConf)
-
- // please ensure HBASE_CONF_DIR is on classpath of spark driver
- // e.g: set it through spark.driver.extraClassPath property
- // in spark-defaults.conf or through --driver-class-path
- // command line option of spark-submit
-
- val conf = HBaseConfiguration.create()
-
- if (args.length < 1) {
- System.err.println("Usage: HBaseTest <table_name>")
- System.exit(1)
- }
-
- // Other options for configuring scan behavior are available. More information available at
- // http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html
- conf.set(TableInputFormat.INPUT_TABLE, args(0))
-
- // Initialize hBase table if necessary
- val admin = new HBaseAdmin(conf)
- if (!admin.isTableAvailable(args(0))) {
- val tableDesc = new HTableDescriptor(TableName.valueOf(args(0)))
- admin.createTable(tableDesc)
- }
-
- val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
- classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
- classOf[org.apache.hadoop.hbase.client.Result])
-
- hBaseRDD.count()
-
- sc.stop()
- admin.close()
- }
-}
-// scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
deleted file mode 100644
index 00ce47af48..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.pythonconverters
-
-import java.nio.ByteBuffer
-
-import scala.collection.JavaConverters._
-
-import org.apache.cassandra.utils.ByteBufferUtil
-
-import org.apache.spark.api.python.Converter
-
-/**
- * Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra
- * output to a Map[String, Int]
- */
-class CassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, Int]] {
- override def convert(obj: Any): java.util.Map[String, Int] = {
- val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
- result.asScala.mapValues(ByteBufferUtil.toInt).asJava
- }
-}
-
-/**
- * Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra
- * output to a Map[String, String]
- */
-class CassandraCQLValueConverter extends Converter[Any, java.util.Map[String, String]] {
- override def convert(obj: Any): java.util.Map[String, String] = {
- val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
- result.asScala.mapValues(ByteBufferUtil.string).asJava
- }
-}
-
-/**
- * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
- * Map[String, Int] to Cassandra key
- */
-class ToCassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, ByteBuffer]] {
- override def convert(obj: Any): java.util.Map[String, ByteBuffer] = {
- val input = obj.asInstanceOf[java.util.Map[String, Int]]
- input.asScala.mapValues(ByteBufferUtil.bytes).asJava
- }
-}
-
-/**
- * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
- * List[String] to Cassandra value
- */
-class ToCassandraCQLValueConverter extends Converter[Any, java.util.List[ByteBuffer]] {
- override def convert(obj: Any): java.util.List[ByteBuffer] = {
- val input = obj.asInstanceOf[java.util.List[String]]
- input.asScala.map(ByteBufferUtil.bytes).asJava
- }
-}
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala
deleted file mode 100644
index e252ca882e..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.pythonconverters
-
-import scala.collection.JavaConverters._
-import scala.util.parsing.json.JSONObject
-
-import org.apache.hadoop.hbase.CellUtil
-import org.apache.hadoop.hbase.KeyValue.Type
-import org.apache.hadoop.hbase.client.{Put, Result}
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-import org.apache.hadoop.hbase.util.Bytes
-
-import org.apache.spark.api.python.Converter
-
-/**
- * Implementation of [[org.apache.spark.api.python.Converter]] that converts all
- * the records in an HBase Result to a String
- */
-class HBaseResultToStringConverter extends Converter[Any, String] {
- override def convert(obj: Any): String = {
- val result = obj.asInstanceOf[Result]
- val output = result.listCells.asScala.map(cell =>
- Map(
- "row" -> Bytes.toStringBinary(CellUtil.cloneRow(cell)),
- "columnFamily" -> Bytes.toStringBinary(CellUtil.cloneFamily(cell)),
- "qualifier" -> Bytes.toStringBinary(CellUtil.cloneQualifier(cell)),
- "timestamp" -> cell.getTimestamp.toString,
- "type" -> Type.codeToType(cell.getTypeByte).toString,
- "value" -> Bytes.toStringBinary(CellUtil.cloneValue(cell))
- )
- )
- output.map(JSONObject(_).toString()).mkString("\n")
- }
-}
-
-/**
- * Implementation of [[org.apache.spark.api.python.Converter]] that converts an
- * ImmutableBytesWritable to a String
- */
-class ImmutableBytesWritableToStringConverter extends Converter[Any, String] {
- override def convert(obj: Any): String = {
- val key = obj.asInstanceOf[ImmutableBytesWritable]
- Bytes.toStringBinary(key.get())
- }
-}
-
-/**
- * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
- * String to an ImmutableBytesWritable
- */
-class StringToImmutableBytesWritableConverter extends Converter[Any, ImmutableBytesWritable] {
- override def convert(obj: Any): ImmutableBytesWritable = {
- val bytes = Bytes.toBytes(obj.asInstanceOf[String])
- new ImmutableBytesWritable(bytes)
- }
-}
-
-/**
- * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
- * list of Strings to HBase Put
- */
-class StringListToPutConverter extends Converter[Any, Put] {
- override def convert(obj: Any): Put = {
- val output = obj.asInstanceOf[java.util.ArrayList[String]].asScala.map(Bytes.toBytes).toArray
- val put = new Put(output(0))
- put.add(output(1), output(2), output(3))
- }
-}
diff --git a/pom.xml b/pom.xml
index 73334a852d..d3a69df7f2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -126,8 +126,6 @@
<hadoop.version>2.2.0</hadoop.version>
<protobuf.version>2.5.0</protobuf.version>
<yarn.version>${hadoop.version}</yarn.version>
- <hbase.version>0.98.17-hadoop2</hbase.version>
- <hbase.artifact>hbase</hbase.artifact>
<flume.version>1.6.0</flume.version>
<zookeeper.version>3.4.5</zookeeper.version>
<curator.version>2.4.0</curator.version>
@@ -205,7 +203,6 @@
-->
<flume.deps.scope>compile</flume.deps.scope>
<hadoop.deps.scope>compile</hadoop.deps.scope>
- <hbase.deps.scope>compile</hbase.deps.scope>
<hive.deps.scope>compile</hive.deps.scope>
<parquet.deps.scope>compile</parquet.deps.scope>
<parquet.test.deps.scope>test</parquet.test.deps.scope>
@@ -739,7 +736,7 @@
<exclusion>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
- </exclusion>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -2573,9 +2570,6 @@
<id>hadoop-provided</id>
</profile>
<profile>
- <id>hbase-provided</id>
- </profile>
- <profile>
<id>hive-provided</id>
</profile>
<profile>