From f971d6cb60d642178d6544217a25fa16ece34889 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Mon, 9 Jun 2014 22:21:03 -0700 Subject: SPARK-1416: PySpark support for SequenceFile and Hadoop InputFormats So I finally resurrected this PR. It seems the old one against the incubator mirror is no longer available, so I cannot reference it. This adds initial support for reading Hadoop ```SequenceFile```s, as well as arbitrary Hadoop ```InputFormat```s, in PySpark. # Overview The basics are as follows: 1. ```PythonRDD``` object contains the relevant methods, that are in turn invoked by ```SparkContext``` in PySpark 2. The SequenceFile or InputFormat is read on the Scala side and converted from ```Writable``` instances to the relevant Scala classes (in the case of primitives) 3. Pyrolite is used to serialize Java objects. If this fails, the fallback is ```toString``` 4. ```PickleSerializer``` on the Python side deserializes. This works "out the box" for simple ```Writable```s: * ```Text``` * ```IntWritable```, ```DoubleWritable```, ```FloatWritable``` * ```NullWritable``` * ```BooleanWritable``` * ```BytesWritable``` * ```MapWritable``` It also works for simple, "struct-like" classes. Due to the way Pyrolite works, this requires that the classes satisfy the JavaBeans convenstions (i.e. with fields and a no-arg constructor and getters/setters). (Perhaps in future some sugar for case classes and reflection could be added). I've tested it out with ```ESInputFormat``` as an example and it works very nicely: ```python conf = {"es.resource" : "index/type" } rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) rdd.first() ``` I suspect for things like HBase/Cassandra it will be a bit trickier to get it to work out the box. # Some things still outstanding: 1. ~~Requires ```msgpack-python``` and will fail without it. As originally discussed with Josh, add a ```as_strings``` argument that defaults to ```False```, that can be used if ```msgpack-python``` is not available~~ 2. ~~I see from https://github.com/apache/spark/pull/363 that Pyrolite is being used there for SerDe between Scala and Python. @ahirreddy @mateiz what is the plan behind this - is Pyrolite preferred? It seems from a cursory glance that adapting the ```msgpack```-based SerDe here to use Pyrolite wouldn't be too hard~~ 3. ~~Support the key and value "wrapper" that would allow a Scala/Java function to be plugged in that would transform whatever the key/value Writable class is into something that can be serialized (e.g. convert some custom Writable to a JavaBean or ```java.util.Map``` that can be easily serialized)~~ 4. Support ```saveAsSequenceFile``` and ```saveAsHadoopFile``` etc. This would require SerDe in the reverse direction, that can be handled by Pyrolite. Will work on this as a separate PR Author: Nick Pentreath Closes #455 from MLnick/pyspark-inputformats and squashes the following commits: 268df7e [Nick Pentreath] Documentation changes mer @pwendell comments 761269b [Nick Pentreath] Address @pwendell comments, simplify default writable conversions and remove registry. 4c972d8 [Nick Pentreath] Add license headers d150431 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats cde6af9 [Nick Pentreath] Parameterize converter trait 5ebacfa [Nick Pentreath] Update docs for PySpark input formats a985492 [Nick Pentreath] Move Converter examples to own package 365d0be [Nick Pentreath] Make classes private[python]. Add docs and @Experimental annotation to Converter interface. eeb8205 [Nick Pentreath] Fix path relative to SPARK_HOME in tests 1eaa08b [Nick Pentreath] HBase -> Cassandra app name oversight 3f90c3e [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 2c18513 [Nick Pentreath] Add examples for reading HBase and Cassandra InputFormats from Python b65606f [Nick Pentreath] Add converter interface 5757f6e [Nick Pentreath] Default key/value classes for sequenceFile asre None 085b55f [Nick Pentreath] Move input format tests to tests.py and clean up docs 43eb728 [Nick Pentreath] PySpark InputFormats docs into programming guide 94beedc [Nick Pentreath] Clean up args in PythonRDD. Set key/value converter defaults to None for PySpark context.py methods 1a4a1d6 [Nick Pentreath] Address @mateiz style comments 01e0813 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 15a7d07 [Nick Pentreath] Remove default args for key/value classes. Arg names to camelCase 9fe6bd5 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 84fe8e3 [Nick Pentreath] Python programming guide space formatting d0f52b6 [Nick Pentreath] Python programming guide 7caa73a [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 93ef995 [Nick Pentreath] Add back context.py changes 9ef1896 [Nick Pentreath] Recover earlier changes lost in previous merge for serializers.py 077ecb2 [Nick Pentreath] Recover earlier changes lost in previous merge for context.py 5af4770 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 35b8e3a [Nick Pentreath] Another fix for test ordering bef3afb [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats e001b94 [Nick Pentreath] Fix test failures due to ordering 78978d9 [Nick Pentreath] Add doc for SequenceFile and InputFormat support to Python programming guide 64eb051 [Nick Pentreath] Scalastyle fix e7552fa [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 44f2857 [Nick Pentreath] Remove msgpack dependency and switch serialization to Pyrolite, plus some clean up and refactoring c0ebfb6 [Nick Pentreath] Change sequencefile test data generator to easily be called from PySpark tests 1d7c17c [Nick Pentreath] Amend tests to auto-generate sequencefile data in temp dir 17a656b [Nick Pentreath] remove binary sequencefile for tests f60959e [Nick Pentreath] Remove msgpack dependency and serializer from PySpark 450e0a2 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 31a2fff [Nick Pentreath] Scalastyle fixes fc5099e [Nick Pentreath] Add Apache license headers 4e08983 [Nick Pentreath] Clean up docs for PySpark context methods b20ec7e [Nick Pentreath] Clean up merge duplicate dependencies 951c117 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats f6aac55 [Nick Pentreath] Bring back msgpack 9d2256e [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 1bbbfb0 [Nick Pentreath] Clean up SparkBuild from merge a67dfad [Nick Pentreath] Clean up Msgpack serialization and registering 7237263 [Nick Pentreath] Add back msgpack serializer and hadoop file code lost during merging 25da1ca [Nick Pentreath] Add generator for nulls, bools, bytes and maps 65360d5 [Nick Pentreath] Adding test SequenceFiles 0c612e5 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats d72bf18 [Nick Pentreath] msgpack dd57922 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats e67212a [Nick Pentreath] Add back msgpack dependency f2d76a0 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 41856a5 [Nick Pentreath] Merge branch 'master' into pyspark-inputformats 97ef708 [Nick Pentreath] Remove old writeToStream 2beeedb [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 795a763 [Nick Pentreath] Change name to WriteInputFormatTestDataGenerator. Cleanup some var names. Use SPARK_HOME in path for writing test sequencefile data. 174f520 [Nick Pentreath] Add back graphx settings 703ee65 [Nick Pentreath] Add back msgpack 619c0fa [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats 1c8efbc [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats eb40036 [Nick Pentreath] Remove unused comment lines 4d7ef2e [Nick Pentreath] Fix indentation f1d73e3 [Nick Pentreath] mergeConfs returns a copy rather than mutating one of the input arguments 0f5cd84 [Nick Pentreath] Remove unused pair UTF8 class. Add comments to msgpack deserializer 4294cbb [Nick Pentreath] Add old Hadoop api methods. Clean up and expand comments. Clean up argument names 818a1e6 [Nick Pentreath] Add seqencefile and Hadoop InputFormat support to PythonRDD 4e7c9e3 [Nick Pentreath] Merge remote-tracking branch 'upstream/master' into pyspark-inputformats c304cc8 [Nick Pentreath] Adding supporting sequncefiles for tests. Cleaning up 4b0a43f [Nick Pentreath] Refactoring utils into own objects. Cleaning up old commented-out code d86325f [Nick Pentreath] Initial WIP of PySpark support for SequenceFile and arbitrary Hadoop InputFormat --- examples/src/main/python/cassandra_inputformat.py | 79 ++++++++++++++++++++++ examples/src/main/python/hbase_inputformat.py | 72 ++++++++++++++++++++ .../apache/spark/examples/CassandraCQLTest.scala | 1 + .../org/apache/spark/examples/HBaseTest.scala | 2 +- .../pythonconverters/CassandraConverters.scala | 46 +++++++++++++ .../examples/pythonconverters/HBaseConverter.scala | 33 +++++++++ 6 files changed, 232 insertions(+), 1 deletion(-) create mode 100644 examples/src/main/python/cassandra_inputformat.py create mode 100644 examples/src/main/python/hbase_inputformat.py create mode 100644 examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala (limited to 'examples/src') diff --git a/examples/src/main/python/cassandra_inputformat.py b/examples/src/main/python/cassandra_inputformat.py new file mode 100644 index 0000000000..39fa6b0d22 --- /dev/null +++ b/examples/src/main/python/cassandra_inputformat.py @@ -0,0 +1,79 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys + +from pyspark import SparkContext + +""" +Create data in Cassandra fist +(following: https://wiki.apache.org/cassandra/GettingStarted) + +cqlsh> CREATE KEYSPACE test + ... WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; +cqlsh> use test; +cqlsh:test> CREATE TABLE users ( + ... user_id int PRIMARY KEY, + ... fname text, + ... lname text + ... ); +cqlsh:test> INSERT INTO users (user_id, fname, lname) + ... VALUES (1745, 'john', 'smith'); +cqlsh:test> INSERT INTO users (user_id, fname, lname) + ... VALUES (1744, 'john', 'doe'); +cqlsh:test> INSERT INTO users (user_id, fname, lname) + ... VALUES (1746, 'john', 'smith'); +cqlsh:test> SELECT * FROM users; + + user_id | fname | lname +---------+-------+------- + 1745 | john | smith + 1744 | john | doe + 1746 | john | smith +""" +if __name__ == "__main__": + if len(sys.argv) != 4: + print >> sys.stderr, """ + Usage: cassandra_inputformat + + Run with example jar: + ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_inputformat.py + Assumes you have some data in Cassandra already, running on , in and + """ + exit(-1) + + host = sys.argv[1] + keyspace = sys.argv[2] + cf = sys.argv[3] + sc = SparkContext(appName="CassandraInputFormat") + + conf = {"cassandra.input.thrift.address":host, + "cassandra.input.thrift.port":"9160", + "cassandra.input.keyspace":keyspace, + "cassandra.input.columnfamily":cf, + "cassandra.input.partitioner.class":"Murmur3Partitioner", + "cassandra.input.page.row.size":"3"} + cass_rdd = sc.newAPIHadoopRDD( + "org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat", + "java.util.Map", + "java.util.Map", + keyConverter="org.apache.spark.examples.pythonconverters.CassandraCQLKeyConverter", + valueConverter="org.apache.spark.examples.pythonconverters.CassandraCQLValueConverter", + conf=conf) + output = cass_rdd.collect() + for (k, v) in output: + print (k, v) diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py new file mode 100644 index 0000000000..3289d9880a --- /dev/null +++ b/examples/src/main/python/hbase_inputformat.py @@ -0,0 +1,72 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys + +from pyspark import SparkContext + +""" +Create test data in HBase first: + +hbase(main):016:0> create 'test', 'f1' +0 row(s) in 1.0430 seconds + +hbase(main):017:0> put 'test', 'row1', 'f1', 'value1' +0 row(s) in 0.0130 seconds + +hbase(main):018:0> put 'test', 'row2', 'f1', 'value2' +0 row(s) in 0.0030 seconds + +hbase(main):019:0> put 'test', 'row3', 'f1', 'value3' +0 row(s) in 0.0050 seconds + +hbase(main):020:0> put 'test', 'row4', 'f1', 'value4' +0 row(s) in 0.0110 seconds + +hbase(main):021:0> scan 'test' +ROW COLUMN+CELL + row1 column=f1:, timestamp=1401883411986, value=value1 + row2 column=f1:, timestamp=1401883415212, value=value2 + row3 column=f1:, timestamp=1401883417858, value=value3 + row4 column=f1:, timestamp=1401883420805, value=value4 +4 row(s) in 0.0240 seconds +""" +if __name__ == "__main__": + if len(sys.argv) != 3: + print >> sys.stderr, """ + Usage: hbase_inputformat + + Run with example jar: + ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/hbase_inputformat.py
+ Assumes you have some data in HBase already, running on , in
+ """ + exit(-1) + + host = sys.argv[1] + table = sys.argv[2] + sc = SparkContext(appName="HBaseInputFormat") + + conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table} + hbase_rdd = sc.newAPIHadoopRDD( + "org.apache.hadoop.hbase.mapreduce.TableInputFormat", + "org.apache.hadoop.hbase.io.ImmutableBytesWritable", + "org.apache.hadoop.hbase.client.Result", + valueConverter="org.apache.spark.examples.pythonconverters.HBaseConverter", + conf=conf) + output = hbase_rdd.collect() + for (k, v) in output: + print (k, v) diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala index 9a00701f98..71f53af68f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala @@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ + /* Need to create following keyspace and column family in cassandra before running this example Start CQL shell using ./bin/cqlsh and execute following commands diff --git a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala index a8c338480e..4893b017ed 100644 --- a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.spark._ -import org.apache.spark.rdd.NewHadoopRDD + object HBaseTest { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala new file mode 100644 index 0000000000..29a65c7a5f --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.pythonconverters + +import org.apache.spark.api.python.Converter +import java.nio.ByteBuffer +import org.apache.cassandra.utils.ByteBufferUtil +import collection.JavaConversions.{mapAsJavaMap, mapAsScalaMap} + + +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra + * output to a Map[String, Int] + */ +class CassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, Int]] { + override def convert(obj: Any): java.util.Map[String, Int] = { + val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]] + mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.toInt(bb))) + } +} + +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra + * output to a Map[String, String] + */ +class CassandraCQLValueConverter extends Converter[Any, java.util.Map[String, String]] { + override def convert(obj: Any): java.util.Map[String, String] = { + val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]] + mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb))) + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala new file mode 100644 index 0000000000..42ae960bd6 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.pythonconverters + +import org.apache.spark.api.python.Converter +import org.apache.hadoop.hbase.client.Result +import org.apache.hadoop.hbase.util.Bytes + +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts a HBase Result + * to a String + */ +class HBaseConverter extends Converter[Any, String] { + override def convert(obj: Any): String = { + val result = obj.asInstanceOf[Result] + Bytes.toStringBinary(result.value()) + } +} -- cgit v1.2.3