aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/scala
diff options
context:
space:
mode:
authorKan Zhang <kzhang@apache.org>2014-08-14 19:03:51 -0700
committerMatei Zaharia <matei@databricks.com>2014-08-14 19:03:51 -0700
commit9422a9b084e3fd5b2b9be2752013588adfb430d0 (patch)
tree72d21725ac720cb2b796a42e4803d547a6a4514b /examples/src/main/scala
parent3a8b68b7353fea50245686903b308fa9eb52cb51 (diff)
downloadspark-9422a9b084e3fd5b2b9be2752013588adfb430d0.tar.gz
spark-9422a9b084e3fd5b2b9be2752013588adfb430d0.tar.bz2
spark-9422a9b084e3fd5b2b9be2752013588adfb430d0.zip
[SPARK-2736] PySpark converter and example script for reading Avro files
JIRA: https://issues.apache.org/jira/browse/SPARK-2736 This patch includes: 1. An Avro converter that converts Avro data types to Python. It handles all 3 Avro data mappings (Generic, Specific and Reflect). 2. An example Python script for reading Avro files using AvroKeyInputFormat and the converter. 3. Fixing a classloading issue. cc @MLnick @JoshRosen @mateiz Author: Kan Zhang <kzhang@apache.org> Closes #1916 from kanzhang/SPARK-2736 and squashes the following commits: 02443f8 [Kan Zhang] [SPARK-2736] Adding .avsc files to .rat-excludes f74e9a9 [Kan Zhang] [SPARK-2736] nit: clazz -> className 82cc505 [Kan Zhang] [SPARK-2736] Update data sample 0be7761 [Kan Zhang] [SPARK-2736] Example pyspark script and data files c8e5881 [Kan Zhang] [SPARK-2736] Trying to work with all 3 Avro data models 2271a5b [Kan Zhang] [SPARK-2736] Using the right class loader to find Avro classes 536876b [Kan Zhang] [SPARK-2736] Adding Avro to Java converter
Diffstat (limited to 'examples/src/main/scala')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala130
1 files changed, 130 insertions, 0 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
new file mode 100644
index 0000000000..1b25983a38
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.pythonconverters
+
+import java.util.{Collection => JCollection, Map => JMap}
+
+import scala.collection.JavaConversions._
+
+import org.apache.avro.generic.{GenericFixed, IndexedRecord}
+import org.apache.avro.mapred.AvroWrapper
+import org.apache.avro.Schema
+import org.apache.avro.Schema.Type._
+
+import org.apache.spark.api.python.Converter
+import org.apache.spark.SparkException
+
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts
+ * an Avro Record wrapped in an AvroKey (or AvroValue) to a Java Map. It tries
+ * to work with all 3 Avro data mappings (Generic, Specific and Reflect).
+ */
+class AvroWrapperToJavaConverter extends Converter[Any, Any] {
+ override def convert(obj: Any): Any = {
+ if (obj == null) {
+ return null
+ }
+ obj.asInstanceOf[AvroWrapper[_]].datum() match {
+ case null => null
+ case record: IndexedRecord => unpackRecord(record)
+ case other => throw new SparkException(
+ s"Unsupported top-level Avro data type ${other.getClass.getName}")
+ }
+ }
+
+ def unpackRecord(obj: Any): JMap[String, Any] = {
+ val map = new java.util.HashMap[String, Any]
+ obj match {
+ case record: IndexedRecord =>
+ record.getSchema.getFields.zipWithIndex.foreach { case (f, i) =>
+ map.put(f.name, fromAvro(record.get(i), f.schema))
+ }
+ case other => throw new SparkException(
+ s"Unsupported RECORD type ${other.getClass.getName}")
+ }
+ map
+ }
+
+ def unpackMap(obj: Any, schema: Schema): JMap[String, Any] = {
+ obj.asInstanceOf[JMap[_, _]].map { case (key, value) =>
+ (key.toString, fromAvro(value, schema.getValueType))
+ }
+ }
+
+ def unpackFixed(obj: Any, schema: Schema): Array[Byte] = {
+ unpackBytes(obj.asInstanceOf[GenericFixed].bytes())
+ }
+
+ def unpackBytes(obj: Any): Array[Byte] = {
+ val bytes: Array[Byte] = obj match {
+ case buf: java.nio.ByteBuffer => buf.array()
+ case arr: Array[Byte] => arr
+ case other => throw new SparkException(
+ s"Unknown BYTES type ${other.getClass.getName}")
+ }
+ val bytearray = new Array[Byte](bytes.length)
+ System.arraycopy(bytes, 0, bytearray, 0, bytes.length)
+ bytearray
+ }
+
+ def unpackArray(obj: Any, schema: Schema): JCollection[Any] = obj match {
+ case c: JCollection[_] =>
+ c.map(fromAvro(_, schema.getElementType))
+ case arr: Array[_] if arr.getClass.getComponentType.isPrimitive =>
+ arr.toSeq
+ case arr: Array[_] =>
+ arr.map(fromAvro(_, schema.getElementType)).toSeq
+ case other => throw new SparkException(
+ s"Unknown ARRAY type ${other.getClass.getName}")
+ }
+
+ def unpackUnion(obj: Any, schema: Schema): Any = {
+ schema.getTypes.toList match {
+ case List(s) => fromAvro(obj, s)
+ case List(n, s) if n.getType == NULL => fromAvro(obj, s)
+ case List(s, n) if n.getType == NULL => fromAvro(obj, s)
+ case _ => throw new SparkException(
+ "Unions may only consist of a concrete type and null")
+ }
+ }
+
+ def fromAvro(obj: Any, schema: Schema): Any = {
+ if (obj == null) {
+ return null
+ }
+ schema.getType match {
+ case UNION => unpackUnion(obj, schema)
+ case ARRAY => unpackArray(obj, schema)
+ case FIXED => unpackFixed(obj, schema)
+ case MAP => unpackMap(obj, schema)
+ case BYTES => unpackBytes(obj)
+ case RECORD => unpackRecord(obj)
+ case STRING => obj.toString
+ case ENUM => obj.toString
+ case NULL => obj
+ case BOOLEAN => obj
+ case DOUBLE => obj
+ case FLOAT => obj
+ case INT => obj
+ case LONG => obj
+ case other => throw new SparkException(
+ s"Unknown Avro schema type ${other.getName}")
+ }
+ }
+}