diff options
author | Kan Zhang <kzhang@apache.org> | 2014-08-14 19:03:51 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-08-14 19:03:51 -0700 |
commit | 9422a9b084e3fd5b2b9be2752013588adfb430d0 (patch) | |
tree | 72d21725ac720cb2b796a42e4803d547a6a4514b /examples | |
parent | 3a8b68b7353fea50245686903b308fa9eb52cb51 (diff) | |
download | spark-9422a9b084e3fd5b2b9be2752013588adfb430d0.tar.gz spark-9422a9b084e3fd5b2b9be2752013588adfb430d0.tar.bz2 spark-9422a9b084e3fd5b2b9be2752013588adfb430d0.zip |
[SPARK-2736] PySpark converter and example script for reading Avro files
JIRA: https://issues.apache.org/jira/browse/SPARK-2736
This patch includes:
1. An Avro converter that converts Avro data types to Python. It handles all 3 Avro data mappings (Generic, Specific and Reflect).
2. An example Python script for reading Avro files using AvroKeyInputFormat and the converter.
3. Fixing a classloading issue.
cc @MLnick @JoshRosen @mateiz
Author: Kan Zhang <kzhang@apache.org>
Closes #1916 from kanzhang/SPARK-2736 and squashes the following commits:
02443f8 [Kan Zhang] [SPARK-2736] Adding .avsc files to .rat-excludes
f74e9a9 [Kan Zhang] [SPARK-2736] nit: clazz -> className
82cc505 [Kan Zhang] [SPARK-2736] Update data sample
0be7761 [Kan Zhang] [SPARK-2736] Example pyspark script and data files
c8e5881 [Kan Zhang] [SPARK-2736] Trying to work with all 3 Avro data models
2271a5b [Kan Zhang] [SPARK-2736] Using the right class loader to find Avro classes
536876b [Kan Zhang] [SPARK-2736] Adding Avro to Java converter
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/python/avro_inputformat.py | 75 | ||||
-rw-r--r-- | examples/src/main/resources/user.avsc | 8 | ||||
-rw-r--r-- | examples/src/main/resources/users.avro | bin | 0 -> 334 bytes | |||
-rw-r--r-- | examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala | 130 |
4 files changed, 213 insertions, 0 deletions
diff --git a/examples/src/main/python/avro_inputformat.py b/examples/src/main/python/avro_inputformat.py new file mode 100644 index 0000000000..e902ae2975 --- /dev/null +++ b/examples/src/main/python/avro_inputformat.py @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys + +from pyspark import SparkContext + +""" +Read data file users.avro in local Spark distro: + +$ cd $SPARK_HOME +$ ./bin/spark-submit --driver-class-path /path/to/example/jar ./examples/src/main/python/avro_inputformat.py \ +> examples/src/main/resources/users.avro +{u'favorite_color': None, u'name': u'Alyssa', u'favorite_numbers': [3, 9, 15, 20]} +{u'favorite_color': u'red', u'name': u'Ben', u'favorite_numbers': []} + +To read name and favorite_color fields only, specify the following reader schema: + +$ cat examples/src/main/resources/user.avsc +{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_color", "type": ["string", "null"]} + ] +} + +$ ./bin/spark-submit --driver-class-path /path/to/example/jar ./examples/src/main/python/avro_inputformat.py \ +> examples/src/main/resources/users.avro examples/src/main/resources/user.avsc +{u'favorite_color': None, u'name': u'Alyssa'} +{u'favorite_color': u'red', u'name': u'Ben'} +""" +if __name__ == "__main__": + if len(sys.argv) != 2 and len(sys.argv) != 3: + print >> sys.stderr, """ + Usage: avro_inputformat <data_file> [reader_schema_file] + + Run with example jar: + ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/avro_inputformat.py <data_file> [reader_schema_file] + Assumes you have Avro data stored in <data_file>. Reader schema can be optionally specified in [reader_schema_file]. + """ + exit(-1) + + path = sys.argv[1] + sc = SparkContext(appName="AvroKeyInputFormat") + + conf = None + if len(sys.argv) == 3: + schema_rdd = sc.textFile(sys.argv[2], 1).collect() + conf = {"avro.schema.input.key" : reduce(lambda x, y: x+y, schema_rdd)} + + avro_rdd = sc.newAPIHadoopFile(path, + "org.apache.avro.mapreduce.AvroKeyInputFormat", + "org.apache.avro.mapred.AvroKey", + "org.apache.hadoop.io.NullWritable", + keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter", + conf=conf) + output = avro_rdd.map(lambda x: x[0]).collect() + for k in output: + print k diff --git a/examples/src/main/resources/user.avsc b/examples/src/main/resources/user.avsc new file mode 100644 index 0000000000..4995357ab3 --- /dev/null +++ b/examples/src/main/resources/user.avsc @@ -0,0 +1,8 @@ +{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_color", "type": ["string", "null"]} + ] +} diff --git a/examples/src/main/resources/users.avro b/examples/src/main/resources/users.avro Binary files differnew file mode 100644 index 0000000000..27c526ab11 --- /dev/null +++ b/examples/src/main/resources/users.avro diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala new file mode 100644 index 0000000000..1b25983a38 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.pythonconverters + +import java.util.{Collection => JCollection, Map => JMap} + +import scala.collection.JavaConversions._ + +import org.apache.avro.generic.{GenericFixed, IndexedRecord} +import org.apache.avro.mapred.AvroWrapper +import org.apache.avro.Schema +import org.apache.avro.Schema.Type._ + +import org.apache.spark.api.python.Converter +import org.apache.spark.SparkException + + +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts + * an Avro Record wrapped in an AvroKey (or AvroValue) to a Java Map. It tries + * to work with all 3 Avro data mappings (Generic, Specific and Reflect). + */ +class AvroWrapperToJavaConverter extends Converter[Any, Any] { + override def convert(obj: Any): Any = { + if (obj == null) { + return null + } + obj.asInstanceOf[AvroWrapper[_]].datum() match { + case null => null + case record: IndexedRecord => unpackRecord(record) + case other => throw new SparkException( + s"Unsupported top-level Avro data type ${other.getClass.getName}") + } + } + + def unpackRecord(obj: Any): JMap[String, Any] = { + val map = new java.util.HashMap[String, Any] + obj match { + case record: IndexedRecord => + record.getSchema.getFields.zipWithIndex.foreach { case (f, i) => + map.put(f.name, fromAvro(record.get(i), f.schema)) + } + case other => throw new SparkException( + s"Unsupported RECORD type ${other.getClass.getName}") + } + map + } + + def unpackMap(obj: Any, schema: Schema): JMap[String, Any] = { + obj.asInstanceOf[JMap[_, _]].map { case (key, value) => + (key.toString, fromAvro(value, schema.getValueType)) + } + } + + def unpackFixed(obj: Any, schema: Schema): Array[Byte] = { + unpackBytes(obj.asInstanceOf[GenericFixed].bytes()) + } + + def unpackBytes(obj: Any): Array[Byte] = { + val bytes: Array[Byte] = obj match { + case buf: java.nio.ByteBuffer => buf.array() + case arr: Array[Byte] => arr + case other => throw new SparkException( + s"Unknown BYTES type ${other.getClass.getName}") + } + val bytearray = new Array[Byte](bytes.length) + System.arraycopy(bytes, 0, bytearray, 0, bytes.length) + bytearray + } + + def unpackArray(obj: Any, schema: Schema): JCollection[Any] = obj match { + case c: JCollection[_] => + c.map(fromAvro(_, schema.getElementType)) + case arr: Array[_] if arr.getClass.getComponentType.isPrimitive => + arr.toSeq + case arr: Array[_] => + arr.map(fromAvro(_, schema.getElementType)).toSeq + case other => throw new SparkException( + s"Unknown ARRAY type ${other.getClass.getName}") + } + + def unpackUnion(obj: Any, schema: Schema): Any = { + schema.getTypes.toList match { + case List(s) => fromAvro(obj, s) + case List(n, s) if n.getType == NULL => fromAvro(obj, s) + case List(s, n) if n.getType == NULL => fromAvro(obj, s) + case _ => throw new SparkException( + "Unions may only consist of a concrete type and null") + } + } + + def fromAvro(obj: Any, schema: Schema): Any = { + if (obj == null) { + return null + } + schema.getType match { + case UNION => unpackUnion(obj, schema) + case ARRAY => unpackArray(obj, schema) + case FIXED => unpackFixed(obj, schema) + case MAP => unpackMap(obj, schema) + case BYTES => unpackBytes(obj) + case RECORD => unpackRecord(obj) + case STRING => obj.toString + case ENUM => obj.toString + case NULL => obj + case BOOLEAN => obj + case DOUBLE => obj + case FLOAT => obj + case INT => obj + case LONG => obj + case other => throw new SparkException( + s"Unknown Avro schema type ${other.getName}") + } + } +} |