aboutsummaryrefslogtreecommitdiff
path: root/examples/src
diff options
context:
space:
mode:
Diffstat (limited to 'examples/src')
-rw-r--r--examples/src/main/python/cassandra_inputformat.py79
-rw-r--r--examples/src/main/python/hbase_inputformat.py72
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala46
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala33
6 files changed, 232 insertions, 1 deletions
diff --git a/examples/src/main/python/cassandra_inputformat.py b/examples/src/main/python/cassandra_inputformat.py
new file mode 100644
index 0000000000..39fa6b0d22
--- /dev/null
+++ b/examples/src/main/python/cassandra_inputformat.py
@@ -0,0 +1,79 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from pyspark import SparkContext
+
+"""
+Create data in Cassandra fist
+(following: https://wiki.apache.org/cassandra/GettingStarted)
+
+cqlsh> CREATE KEYSPACE test
+ ... WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
+cqlsh> use test;
+cqlsh:test> CREATE TABLE users (
+ ... user_id int PRIMARY KEY,
+ ... fname text,
+ ... lname text
+ ... );
+cqlsh:test> INSERT INTO users (user_id, fname, lname)
+ ... VALUES (1745, 'john', 'smith');
+cqlsh:test> INSERT INTO users (user_id, fname, lname)
+ ... VALUES (1744, 'john', 'doe');
+cqlsh:test> INSERT INTO users (user_id, fname, lname)
+ ... VALUES (1746, 'john', 'smith');
+cqlsh:test> SELECT * FROM users;
+
+ user_id | fname | lname
+---------+-------+-------
+ 1745 | john | smith
+ 1744 | john | doe
+ 1746 | john | smith
+"""
+if __name__ == "__main__":
+ if len(sys.argv) != 4:
+ print >> sys.stderr, """
+ Usage: cassandra_inputformat <host> <keyspace> <cf>
+
+ Run with example jar:
+ ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_inputformat.py <host> <keyspace> <cf>
+ Assumes you have some data in Cassandra already, running on <host>, in <keyspace> and <cf>
+ """
+ exit(-1)
+
+ host = sys.argv[1]
+ keyspace = sys.argv[2]
+ cf = sys.argv[3]
+ sc = SparkContext(appName="CassandraInputFormat")
+
+ conf = {"cassandra.input.thrift.address":host,
+ "cassandra.input.thrift.port":"9160",
+ "cassandra.input.keyspace":keyspace,
+ "cassandra.input.columnfamily":cf,
+ "cassandra.input.partitioner.class":"Murmur3Partitioner",
+ "cassandra.input.page.row.size":"3"}
+ cass_rdd = sc.newAPIHadoopRDD(
+ "org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat",
+ "java.util.Map",
+ "java.util.Map",
+ keyConverter="org.apache.spark.examples.pythonconverters.CassandraCQLKeyConverter",
+ valueConverter="org.apache.spark.examples.pythonconverters.CassandraCQLValueConverter",
+ conf=conf)
+ output = cass_rdd.collect()
+ for (k, v) in output:
+ print (k, v)
diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py
new file mode 100644
index 0000000000..3289d9880a
--- /dev/null
+++ b/examples/src/main/python/hbase_inputformat.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from pyspark import SparkContext
+
+"""
+Create test data in HBase first:
+
+hbase(main):016:0> create 'test', 'f1'
+0 row(s) in 1.0430 seconds
+
+hbase(main):017:0> put 'test', 'row1', 'f1', 'value1'
+0 row(s) in 0.0130 seconds
+
+hbase(main):018:0> put 'test', 'row2', 'f1', 'value2'
+0 row(s) in 0.0030 seconds
+
+hbase(main):019:0> put 'test', 'row3', 'f1', 'value3'
+0 row(s) in 0.0050 seconds
+
+hbase(main):020:0> put 'test', 'row4', 'f1', 'value4'
+0 row(s) in 0.0110 seconds
+
+hbase(main):021:0> scan 'test'
+ROW COLUMN+CELL
+ row1 column=f1:, timestamp=1401883411986, value=value1
+ row2 column=f1:, timestamp=1401883415212, value=value2
+ row3 column=f1:, timestamp=1401883417858, value=value3
+ row4 column=f1:, timestamp=1401883420805, value=value4
+4 row(s) in 0.0240 seconds
+"""
+if __name__ == "__main__":
+ if len(sys.argv) != 3:
+ print >> sys.stderr, """
+ Usage: hbase_inputformat <host> <table>
+
+ Run with example jar:
+ ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/hbase_inputformat.py <host> <table>
+ Assumes you have some data in HBase already, running on <host>, in <table>
+ """
+ exit(-1)
+
+ host = sys.argv[1]
+ table = sys.argv[2]
+ sc = SparkContext(appName="HBaseInputFormat")
+
+ conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
+ hbase_rdd = sc.newAPIHadoopRDD(
+ "org.apache.hadoop.hbase.mapreduce.TableInputFormat",
+ "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
+ "org.apache.hadoop.hbase.client.Result",
+ valueConverter="org.apache.spark.examples.pythonconverters.HBaseConverter",
+ conf=conf)
+ output = hbase_rdd.collect()
+ for (k, v) in output:
+ print (k, v)
diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
index 9a00701f98..71f53af68f 100644
--- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
+
/*
Need to create following keyspace and column family in cassandra before running this example
Start CQL shell using ./bin/cqlsh and execute following commands
diff --git a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
index a8c338480e..4893b017ed 100644
--- a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
@@ -22,7 +22,7 @@ import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark._
-import org.apache.spark.rdd.NewHadoopRDD
+
object HBaseTest {
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
new file mode 100644
index 0000000000..29a65c7a5f
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.pythonconverters
+
+import org.apache.spark.api.python.Converter
+import java.nio.ByteBuffer
+import org.apache.cassandra.utils.ByteBufferUtil
+import collection.JavaConversions.{mapAsJavaMap, mapAsScalaMap}
+
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra
+ * output to a Map[String, Int]
+ */
+class CassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, Int]] {
+ override def convert(obj: Any): java.util.Map[String, Int] = {
+ val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
+ mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.toInt(bb)))
+ }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra
+ * output to a Map[String, String]
+ */
+class CassandraCQLValueConverter extends Converter[Any, java.util.Map[String, String]] {
+ override def convert(obj: Any): java.util.Map[String, String] = {
+ val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
+ mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb)))
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala
new file mode 100644
index 0000000000..42ae960bd6
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.pythonconverters
+
+import org.apache.spark.api.python.Converter
+import org.apache.hadoop.hbase.client.Result
+import org.apache.hadoop.hbase.util.Bytes
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a HBase Result
+ * to a String
+ */
+class HBaseConverter extends Converter[Any, String] {
+ override def convert(obj: Any): String = {
+ val result = obj.asInstanceOf[Result]
+ Bytes.toStringBinary(result.value())
+ }
+}