aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorKan Zhang <kzhang@apache.org>2014-08-14 19:03:51 -0700
committerMatei Zaharia <matei@databricks.com>2014-08-14 19:03:51 -0700
commit9422a9b084e3fd5b2b9be2752013588adfb430d0 (patch)
tree72d21725ac720cb2b796a42e4803d547a6a4514b /examples
parent3a8b68b7353fea50245686903b308fa9eb52cb51 (diff)
downloadspark-9422a9b084e3fd5b2b9be2752013588adfb430d0.tar.gz
spark-9422a9b084e3fd5b2b9be2752013588adfb430d0.tar.bz2
spark-9422a9b084e3fd5b2b9be2752013588adfb430d0.zip
[SPARK-2736] PySpark converter and example script for reading Avro files
JIRA: https://issues.apache.org/jira/browse/SPARK-2736 This patch includes: 1. An Avro converter that converts Avro data types to Python. It handles all 3 Avro data mappings (Generic, Specific and Reflect). 2. An example Python script for reading Avro files using AvroKeyInputFormat and the converter. 3. Fixing a classloading issue. cc @MLnick @JoshRosen @mateiz Author: Kan Zhang <kzhang@apache.org> Closes #1916 from kanzhang/SPARK-2736 and squashes the following commits: 02443f8 [Kan Zhang] [SPARK-2736] Adding .avsc files to .rat-excludes f74e9a9 [Kan Zhang] [SPARK-2736] nit: clazz -> className 82cc505 [Kan Zhang] [SPARK-2736] Update data sample 0be7761 [Kan Zhang] [SPARK-2736] Example pyspark script and data files c8e5881 [Kan Zhang] [SPARK-2736] Trying to work with all 3 Avro data models 2271a5b [Kan Zhang] [SPARK-2736] Using the right class loader to find Avro classes 536876b [Kan Zhang] [SPARK-2736] Adding Avro to Java converter
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/python/avro_inputformat.py75
-rw-r--r--examples/src/main/resources/user.avsc8
-rw-r--r--examples/src/main/resources/users.avrobin0 -> 334 bytes
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala130
4 files changed, 213 insertions, 0 deletions
diff --git a/examples/src/main/python/avro_inputformat.py b/examples/src/main/python/avro_inputformat.py
new file mode 100644
index 0000000000..e902ae2975
--- /dev/null
+++ b/examples/src/main/python/avro_inputformat.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from pyspark import SparkContext
+
+"""
+Read data file users.avro in local Spark distro:
+
+$ cd $SPARK_HOME
+$ ./bin/spark-submit --driver-class-path /path/to/example/jar ./examples/src/main/python/avro_inputformat.py \
+> examples/src/main/resources/users.avro
+{u'favorite_color': None, u'name': u'Alyssa', u'favorite_numbers': [3, 9, 15, 20]}
+{u'favorite_color': u'red', u'name': u'Ben', u'favorite_numbers': []}
+
+To read name and favorite_color fields only, specify the following reader schema:
+
+$ cat examples/src/main/resources/user.avsc
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_color", "type": ["string", "null"]}
+ ]
+}
+
+$ ./bin/spark-submit --driver-class-path /path/to/example/jar ./examples/src/main/python/avro_inputformat.py \
+> examples/src/main/resources/users.avro examples/src/main/resources/user.avsc
+{u'favorite_color': None, u'name': u'Alyssa'}
+{u'favorite_color': u'red', u'name': u'Ben'}
+"""
+if __name__ == "__main__":
+ if len(sys.argv) != 2 and len(sys.argv) != 3:
+ print >> sys.stderr, """
+ Usage: avro_inputformat <data_file> [reader_schema_file]
+
+ Run with example jar:
+ ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/avro_inputformat.py <data_file> [reader_schema_file]
+ Assumes you have Avro data stored in <data_file>. Reader schema can be optionally specified in [reader_schema_file].
+ """
+ exit(-1)
+
+ path = sys.argv[1]
+ sc = SparkContext(appName="AvroKeyInputFormat")
+
+ conf = None
+ if len(sys.argv) == 3:
+ schema_rdd = sc.textFile(sys.argv[2], 1).collect()
+ conf = {"avro.schema.input.key" : reduce(lambda x, y: x+y, schema_rdd)}
+
+ avro_rdd = sc.newAPIHadoopFile(path,
+ "org.apache.avro.mapreduce.AvroKeyInputFormat",
+ "org.apache.avro.mapred.AvroKey",
+ "org.apache.hadoop.io.NullWritable",
+ keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
+ conf=conf)
+ output = avro_rdd.map(lambda x: x[0]).collect()
+ for k in output:
+ print k
diff --git a/examples/src/main/resources/user.avsc b/examples/src/main/resources/user.avsc
new file mode 100644
index 0000000000..4995357ab3
--- /dev/null
+++ b/examples/src/main/resources/user.avsc
@@ -0,0 +1,8 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_color", "type": ["string", "null"]}
+ ]
+}
diff --git a/examples/src/main/resources/users.avro b/examples/src/main/resources/users.avro
new file mode 100644
index 0000000000..27c526ab11
--- /dev/null
+++ b/examples/src/main/resources/users.avro
Binary files differ
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
new file mode 100644
index 0000000000..1b25983a38
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.pythonconverters
+
+import java.util.{Collection => JCollection, Map => JMap}
+
+import scala.collection.JavaConversions._
+
+import org.apache.avro.generic.{GenericFixed, IndexedRecord}
+import org.apache.avro.mapred.AvroWrapper
+import org.apache.avro.Schema
+import org.apache.avro.Schema.Type._
+
+import org.apache.spark.api.python.Converter
+import org.apache.spark.SparkException
+
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts
+ * an Avro Record wrapped in an AvroKey (or AvroValue) to a Java Map. It tries
+ * to work with all 3 Avro data mappings (Generic, Specific and Reflect).
+ */
+class AvroWrapperToJavaConverter extends Converter[Any, Any] {
+ override def convert(obj: Any): Any = {
+ if (obj == null) {
+ return null
+ }
+ obj.asInstanceOf[AvroWrapper[_]].datum() match {
+ case null => null
+ case record: IndexedRecord => unpackRecord(record)
+ case other => throw new SparkException(
+ s"Unsupported top-level Avro data type ${other.getClass.getName}")
+ }
+ }
+
+ def unpackRecord(obj: Any): JMap[String, Any] = {
+ val map = new java.util.HashMap[String, Any]
+ obj match {
+ case record: IndexedRecord =>
+ record.getSchema.getFields.zipWithIndex.foreach { case (f, i) =>
+ map.put(f.name, fromAvro(record.get(i), f.schema))
+ }
+ case other => throw new SparkException(
+ s"Unsupported RECORD type ${other.getClass.getName}")
+ }
+ map
+ }
+
+ def unpackMap(obj: Any, schema: Schema): JMap[String, Any] = {
+ obj.asInstanceOf[JMap[_, _]].map { case (key, value) =>
+ (key.toString, fromAvro(value, schema.getValueType))
+ }
+ }
+
+ def unpackFixed(obj: Any, schema: Schema): Array[Byte] = {
+ unpackBytes(obj.asInstanceOf[GenericFixed].bytes())
+ }
+
+ def unpackBytes(obj: Any): Array[Byte] = {
+ val bytes: Array[Byte] = obj match {
+ case buf: java.nio.ByteBuffer => buf.array()
+ case arr: Array[Byte] => arr
+ case other => throw new SparkException(
+ s"Unknown BYTES type ${other.getClass.getName}")
+ }
+ val bytearray = new Array[Byte](bytes.length)
+ System.arraycopy(bytes, 0, bytearray, 0, bytes.length)
+ bytearray
+ }
+
+ def unpackArray(obj: Any, schema: Schema): JCollection[Any] = obj match {
+ case c: JCollection[_] =>
+ c.map(fromAvro(_, schema.getElementType))
+ case arr: Array[_] if arr.getClass.getComponentType.isPrimitive =>
+ arr.toSeq
+ case arr: Array[_] =>
+ arr.map(fromAvro(_, schema.getElementType)).toSeq
+ case other => throw new SparkException(
+ s"Unknown ARRAY type ${other.getClass.getName}")
+ }
+
+ def unpackUnion(obj: Any, schema: Schema): Any = {
+ schema.getTypes.toList match {
+ case List(s) => fromAvro(obj, s)
+ case List(n, s) if n.getType == NULL => fromAvro(obj, s)
+ case List(s, n) if n.getType == NULL => fromAvro(obj, s)
+ case _ => throw new SparkException(
+ "Unions may only consist of a concrete type and null")
+ }
+ }
+
+ def fromAvro(obj: Any, schema: Schema): Any = {
+ if (obj == null) {
+ return null
+ }
+ schema.getType match {
+ case UNION => unpackUnion(obj, schema)
+ case ARRAY => unpackArray(obj, schema)
+ case FIXED => unpackFixed(obj, schema)
+ case MAP => unpackMap(obj, schema)
+ case BYTES => unpackBytes(obj)
+ case RECORD => unpackRecord(obj)
+ case STRING => obj.toString
+ case ENUM => obj.toString
+ case NULL => obj
+ case BOOLEAN => obj
+ case DOUBLE => obj
+ case FLOAT => obj
+ case INT => obj
+ case LONG => obj
+ case other => throw new SparkException(
+ s"Unknown Avro schema type ${other.getName}")
+ }
+ }
+}