aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/python/avro_inputformat.py
diff options
context:
space:
mode:
authorKan Zhang <kzhang@apache.org>2014-08-14 19:03:51 -0700
committerMatei Zaharia <matei@databricks.com>2014-08-14 19:03:51 -0700
commit9422a9b084e3fd5b2b9be2752013588adfb430d0 (patch)
tree72d21725ac720cb2b796a42e4803d547a6a4514b /examples/src/main/python/avro_inputformat.py
parent3a8b68b7353fea50245686903b308fa9eb52cb51 (diff)
downloadspark-9422a9b084e3fd5b2b9be2752013588adfb430d0.tar.gz
spark-9422a9b084e3fd5b2b9be2752013588adfb430d0.tar.bz2
spark-9422a9b084e3fd5b2b9be2752013588adfb430d0.zip
[SPARK-2736] PySpark converter and example script for reading Avro files
JIRA: https://issues.apache.org/jira/browse/SPARK-2736 This patch includes: 1. An Avro converter that converts Avro data types to Python. It handles all 3 Avro data mappings (Generic, Specific and Reflect). 2. An example Python script for reading Avro files using AvroKeyInputFormat and the converter. 3. Fixing a classloading issue. cc @MLnick @JoshRosen @mateiz Author: Kan Zhang <kzhang@apache.org> Closes #1916 from kanzhang/SPARK-2736 and squashes the following commits: 02443f8 [Kan Zhang] [SPARK-2736] Adding .avsc files to .rat-excludes f74e9a9 [Kan Zhang] [SPARK-2736] nit: clazz -> className 82cc505 [Kan Zhang] [SPARK-2736] Update data sample 0be7761 [Kan Zhang] [SPARK-2736] Example pyspark script and data files c8e5881 [Kan Zhang] [SPARK-2736] Trying to work with all 3 Avro data models 2271a5b [Kan Zhang] [SPARK-2736] Using the right class loader to find Avro classes 536876b [Kan Zhang] [SPARK-2736] Adding Avro to Java converter
Diffstat (limited to 'examples/src/main/python/avro_inputformat.py')
-rw-r--r--examples/src/main/python/avro_inputformat.py75
1 files changed, 75 insertions, 0 deletions
diff --git a/examples/src/main/python/avro_inputformat.py b/examples/src/main/python/avro_inputformat.py
new file mode 100644
index 0000000000..e902ae2975
--- /dev/null
+++ b/examples/src/main/python/avro_inputformat.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from pyspark import SparkContext
+
+"""
+Read data file users.avro in local Spark distro:
+
+$ cd $SPARK_HOME
+$ ./bin/spark-submit --driver-class-path /path/to/example/jar ./examples/src/main/python/avro_inputformat.py \
+> examples/src/main/resources/users.avro
+{u'favorite_color': None, u'name': u'Alyssa', u'favorite_numbers': [3, 9, 15, 20]}
+{u'favorite_color': u'red', u'name': u'Ben', u'favorite_numbers': []}
+
+To read name and favorite_color fields only, specify the following reader schema:
+
+$ cat examples/src/main/resources/user.avsc
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_color", "type": ["string", "null"]}
+ ]
+}
+
+$ ./bin/spark-submit --driver-class-path /path/to/example/jar ./examples/src/main/python/avro_inputformat.py \
+> examples/src/main/resources/users.avro examples/src/main/resources/user.avsc
+{u'favorite_color': None, u'name': u'Alyssa'}
+{u'favorite_color': u'red', u'name': u'Ben'}
+"""
+if __name__ == "__main__":
+ if len(sys.argv) != 2 and len(sys.argv) != 3:
+ print >> sys.stderr, """
+ Usage: avro_inputformat <data_file> [reader_schema_file]
+
+ Run with example jar:
+ ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/avro_inputformat.py <data_file> [reader_schema_file]
+ Assumes you have Avro data stored in <data_file>. Reader schema can be optionally specified in [reader_schema_file].
+ """
+ exit(-1)
+
+ path = sys.argv[1]
+ sc = SparkContext(appName="AvroKeyInputFormat")
+
+ conf = None
+ if len(sys.argv) == 3:
+ schema_rdd = sc.textFile(sys.argv[2], 1).collect()
+ conf = {"avro.schema.input.key" : reduce(lambda x, y: x+y, schema_rdd)}
+
+ avro_rdd = sc.newAPIHadoopFile(path,
+ "org.apache.avro.mapreduce.AvroKeyInputFormat",
+ "org.apache.avro.mapred.AvroKey",
+ "org.apache.hadoop.io.NullWritable",
+ keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
+ conf=conf)
+ output = avro_rdd.map(lambda x: x[0]).collect()
+ for k in output:
+ print k