aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorKan Zhang <kzhang@apache.org>2014-07-30 13:19:05 -0700
committerJosh Rosen <joshrosen@apache.org>2014-07-30 13:19:05 -0700
commit94d1f46fc43c0cb85125f757fb40db9271caf1f4 (patch)
tree8878443a963ad6ce5ba3af679567d893c8df70cc /examples
parent437dc8c5b54f0dcf9564c1fb07e8dce9e771c8cd (diff)
downloadspark-94d1f46fc43c0cb85125f757fb40db9271caf1f4.tar.gz
spark-94d1f46fc43c0cb85125f757fb40db9271caf1f4.tar.bz2
spark-94d1f46fc43c0cb85125f757fb40db9271caf1f4.zip
[SPARK-2024] Add saveAsSequenceFile to PySpark
JIRA issue: https://issues.apache.org/jira/browse/SPARK-2024 This PR is a followup to #455 and adds capabilities for saving PySpark RDDs using SequenceFile or any Hadoop OutputFormats. * Added RDD methods ```saveAsSequenceFile```, ```saveAsHadoopFile``` and ```saveAsHadoopDataset```, for both old and new MapReduce APIs. * Default converter for converting common data types to Writables. Users may specify custom converters to convert to desired data types. * No out-of-box support for reading/writing arrays, since ArrayWritable itself doesn't have a no-arg constructor for creating an empty instance upon reading. Users need to provide ArrayWritable subtypes. Custom converters for converting arrays to suitable ArrayWritable subtypes are also needed when writing. When reading, the default converter will convert any custom ArrayWritable subtypes to ```Object[]``` and they get pickled to Python tuples. * Added HBase and Cassandra output examples to show how custom output formats and converters can be used. cc MLnick mateiz ahirreddy pwendell Author: Kan Zhang <kzhang@apache.org> Closes #1338 from kanzhang/SPARK-2024 and squashes the following commits: c01e3ef [Kan Zhang] [SPARK-2024] code formatting 6591e37 [Kan Zhang] [SPARK-2024] renaming pickled -> pickledRDD d998ad6 [Kan Zhang] [SPARK-2024] refectoring to get method params below 10 57a7a5e [Kan Zhang] [SPARK-2024] correcting typo 75ca5bd [Kan Zhang] [SPARK-2024] Better type checking for batch serialized RDD 0bdec55 [Kan Zhang] [SPARK-2024] Refactoring newly added tests 9f39ff4 [Kan Zhang] [SPARK-2024] Adding 2 saveAsHadoopDataset tests 0c134f3 [Kan Zhang] [SPARK-2024] Test refactoring and adding couple unbatched cases 7a176df [Kan Zhang] [SPARK-2024] Add saveAsSequenceFile to PySpark
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/python/cassandra_outputformat.py83
-rw-r--r--examples/src/main/python/hbase_inputformat.py3
-rw-r--r--examples/src/main/python/hbase_outputformat.py65
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala24
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala33
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala70
6 files changed, 243 insertions, 35 deletions
diff --git a/examples/src/main/python/cassandra_outputformat.py b/examples/src/main/python/cassandra_outputformat.py
new file mode 100644
index 0000000000..1dfbf98604
--- /dev/null
+++ b/examples/src/main/python/cassandra_outputformat.py
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from pyspark import SparkContext
+
+"""
+Create data in Cassandra fist
+(following: https://wiki.apache.org/cassandra/GettingStarted)
+
+cqlsh> CREATE KEYSPACE test
+ ... WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
+cqlsh> use test;
+cqlsh:test> CREATE TABLE users (
+ ... user_id int PRIMARY KEY,
+ ... fname text,
+ ... lname text
+ ... );
+
+> cassandra_outputformat <host> test users 1745 john smith
+> cassandra_outputformat <host> test users 1744 john doe
+> cassandra_outputformat <host> test users 1746 john smith
+
+cqlsh:test> SELECT * FROM users;
+
+ user_id | fname | lname
+---------+-------+-------
+ 1745 | john | smith
+ 1744 | john | doe
+ 1746 | john | smith
+"""
+if __name__ == "__main__":
+ if len(sys.argv) != 7:
+ print >> sys.stderr, """
+ Usage: cassandra_outputformat <host> <keyspace> <cf> <user_id> <fname> <lname>
+
+ Run with example jar:
+ ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_outputformat.py <args>
+ Assumes you have created the following table <cf> in Cassandra already,
+ running on <host>, in <keyspace>.
+
+ cqlsh:<keyspace>> CREATE TABLE <cf> (
+ ... user_id int PRIMARY KEY,
+ ... fname text,
+ ... lname text
+ ... );
+ """
+ exit(-1)
+
+ host = sys.argv[1]
+ keyspace = sys.argv[2]
+ cf = sys.argv[3]
+ sc = SparkContext(appName="CassandraOutputFormat")
+
+ conf = {"cassandra.output.thrift.address":host,
+ "cassandra.output.thrift.port":"9160",
+ "cassandra.output.keyspace":keyspace,
+ "cassandra.output.partitioner.class":"Murmur3Partitioner",
+ "cassandra.output.cql":"UPDATE " + keyspace + "." + cf + " SET fname = ?, lname = ?",
+ "mapreduce.output.basename":cf,
+ "mapreduce.outputformat.class":"org.apache.cassandra.hadoop.cql3.CqlOutputFormat",
+ "mapreduce.job.output.key.class":"java.util.Map",
+ "mapreduce.job.output.value.class":"java.util.List"}
+ key = {"user_id" : int(sys.argv[4])}
+ sc.parallelize([(key, sys.argv[5:])]).saveAsNewAPIHadoopDataset(
+ conf=conf,
+ keyConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLKeyConverter",
+ valueConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLValueConverter")
diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py
index 3289d9880a..c9fa8e171c 100644
--- a/examples/src/main/python/hbase_inputformat.py
+++ b/examples/src/main/python/hbase_inputformat.py
@@ -65,7 +65,8 @@ if __name__ == "__main__":
"org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result",
- valueConverter="org.apache.spark.examples.pythonconverters.HBaseConverter",
+ keyConverter="org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter",
+ valueConverter="org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter",
conf=conf)
output = hbase_rdd.collect()
for (k, v) in output:
diff --git a/examples/src/main/python/hbase_outputformat.py b/examples/src/main/python/hbase_outputformat.py
new file mode 100644
index 0000000000..5e11548fd1
--- /dev/null
+++ b/examples/src/main/python/hbase_outputformat.py
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from pyspark import SparkContext
+
+"""
+Create test table in HBase first:
+
+hbase(main):001:0> create 'test', 'f1'
+0 row(s) in 0.7840 seconds
+
+> hbase_outputformat <host> test row1 f1 q1 value1
+> hbase_outputformat <host> test row2 f1 q1 value2
+> hbase_outputformat <host> test row3 f1 q1 value3
+> hbase_outputformat <host> test row4 f1 q1 value4
+
+hbase(main):002:0> scan 'test'
+ROW COLUMN+CELL
+ row1 column=f1:q1, timestamp=1405659615726, value=value1
+ row2 column=f1:q1, timestamp=1405659626803, value=value2
+ row3 column=f1:q1, timestamp=1405659640106, value=value3
+ row4 column=f1:q1, timestamp=1405659650292, value=value4
+4 row(s) in 0.0780 seconds
+"""
+if __name__ == "__main__":
+ if len(sys.argv) != 7:
+ print >> sys.stderr, """
+ Usage: hbase_outputformat <host> <table> <row> <family> <qualifier> <value>
+
+ Run with example jar:
+ ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/hbase_outputformat.py <args>
+ Assumes you have created <table> with column family <family> in HBase running on <host> already
+ """
+ exit(-1)
+
+ host = sys.argv[1]
+ table = sys.argv[2]
+ sc = SparkContext(appName="HBaseOutputFormat")
+
+ conf = {"hbase.zookeeper.quorum": host,
+ "hbase.mapred.outputtable": table,
+ "mapreduce.outputformat.class" : "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
+ "mapreduce.job.output.key.class" : "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
+ "mapreduce.job.output.value.class" : "org.apache.hadoop.io.Writable"}
+
+ sc.parallelize([sys.argv[3:]]).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset(
+ conf=conf,
+ keyConverter="org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter",
+ valueConverter="org.apache.spark.examples.pythonconverters.StringListToPutConverter")
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
index 29a65c7a5f..83feb5703b 100644
--- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala
@@ -20,7 +20,7 @@ package org.apache.spark.examples.pythonconverters
import org.apache.spark.api.python.Converter
import java.nio.ByteBuffer
import org.apache.cassandra.utils.ByteBufferUtil
-import collection.JavaConversions.{mapAsJavaMap, mapAsScalaMap}
+import collection.JavaConversions._
/**
@@ -44,3 +44,25 @@ class CassandraCQLValueConverter extends Converter[Any, java.util.Map[String, St
mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb)))
}
}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
+ * Map[String, Int] to Cassandra key
+ */
+class ToCassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, ByteBuffer]] {
+ override def convert(obj: Any): java.util.Map[String, ByteBuffer] = {
+ val input = obj.asInstanceOf[java.util.Map[String, Int]]
+ mapAsJavaMap(input.mapValues(i => ByteBufferUtil.bytes(i)))
+ }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
+ * List[String] to Cassandra value
+ */
+class ToCassandraCQLValueConverter extends Converter[Any, java.util.List[ByteBuffer]] {
+ override def convert(obj: Any): java.util.List[ByteBuffer] = {
+ val input = obj.asInstanceOf[java.util.List[String]]
+ seqAsJavaList(input.map(s => ByteBufferUtil.bytes(s)))
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala
deleted file mode 100644
index 42ae960bd6..0000000000
--- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.pythonconverters
-
-import org.apache.spark.api.python.Converter
-import org.apache.hadoop.hbase.client.Result
-import org.apache.hadoop.hbase.util.Bytes
-
-/**
- * Implementation of [[org.apache.spark.api.python.Converter]] that converts a HBase Result
- * to a String
- */
-class HBaseConverter extends Converter[Any, String] {
- override def convert(obj: Any): String = {
- val result = obj.asInstanceOf[Result]
- Bytes.toStringBinary(result.value())
- }
-}
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala
new file mode 100644
index 0000000000..273bee0a8b
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.pythonconverters
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.api.python.Converter
+import org.apache.hadoop.hbase.client.{Put, Result}
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts an
+ * HBase Result to a String
+ */
+class HBaseResultToStringConverter extends Converter[Any, String] {
+ override def convert(obj: Any): String = {
+ val result = obj.asInstanceOf[Result]
+ Bytes.toStringBinary(result.value())
+ }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts an
+ * ImmutableBytesWritable to a String
+ */
+class ImmutableBytesWritableToStringConverter extends Converter[Any, String] {
+ override def convert(obj: Any): String = {
+ val key = obj.asInstanceOf[ImmutableBytesWritable]
+ Bytes.toStringBinary(key.get())
+ }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
+ * String to an ImmutableBytesWritable
+ */
+class StringToImmutableBytesWritableConverter extends Converter[Any, ImmutableBytesWritable] {
+ override def convert(obj: Any): ImmutableBytesWritable = {
+ val bytes = Bytes.toBytes(obj.asInstanceOf[String])
+ new ImmutableBytesWritable(bytes)
+ }
+}
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts a
+ * list of Strings to HBase Put
+ */
+class StringListToPutConverter extends Converter[Any, Put] {
+ override def convert(obj: Any): Put = {
+ val output = obj.asInstanceOf[java.util.ArrayList[String]].map(Bytes.toBytes(_)).toArray
+ val put = new Put(output(0))
+ put.add(output(1), output(2), output(3))
+ }
+}