From b104c7f5c7e2b173fe1b10035efbc00e43df13ec Mon Sep 17 00:00:00 2001 From: Rohit Rai Date: Mon, 3 Jun 2013 15:15:52 +0530 Subject: Example to write the output to cassandra --- .../main/scala/spark/examples/CassandraTest.scala | 48 +++++++++++++++++++--- 1 file changed, 43 insertions(+), 5 deletions(-) (limited to 'examples') diff --git a/examples/src/main/scala/spark/examples/CassandraTest.scala b/examples/src/main/scala/spark/examples/CassandraTest.scala index 6b9fd502e2..2cc62b9fe9 100644 --- a/examples/src/main/scala/spark/examples/CassandraTest.scala +++ b/examples/src/main/scala/spark/examples/CassandraTest.scala @@ -1,17 +1,16 @@ package spark.examples import org.apache.hadoop.mapreduce.Job -import org.apache.cassandra.hadoop.{ConfigHelper, ColumnFamilyInputFormat} -import org.apache.cassandra.thrift.{IndexExpression, SliceRange, SlicePredicate} +import org.apache.cassandra.hadoop.{ColumnFamilyOutputFormat, ConfigHelper, ColumnFamilyInputFormat} +import org.apache.cassandra.thrift._ import spark.{RDD, SparkContext} -import SparkContext._ +import spark.SparkContext._ import java.nio.ByteBuffer import java.util.SortedMap import org.apache.cassandra.db.IColumn import org.apache.cassandra.utils.ByteBufferUtil import scala.collection.JavaConversions._ -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + /* @@ -44,8 +43,15 @@ object CassandraTest { ConfigHelper.setInputRpcPort(job.getConfiguration(), args(2)) + ConfigHelper.setOutputInitialAddress(job.getConfiguration(), args(1)) + + ConfigHelper.setOutputRpcPort(job.getConfiguration(), args(2)) + ConfigHelper.setInputColumnFamily(job.getConfiguration(), "casDemo", "Words") + ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "casDemo", "WordCount") + + val predicate = new SlicePredicate() val sliceRange = new SliceRange() sliceRange.setStart(Array.empty[Byte]) @@ -55,6 +61,8 @@ object CassandraTest { ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner") + ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner") + //Make a new Hadoop RDD val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(), classOf[ColumnFamilyInputFormat], @@ -74,6 +82,33 @@ object CassandraTest { counts.collect().foreach { case (word, count) => println(word + ":" + count) } + + counts.map { + case (word, count) => { + val colWord = new org.apache.cassandra.thrift.Column() + colWord.setName(ByteBufferUtil.bytes("word")) + colWord.setValue(ByteBufferUtil.bytes(word)) + colWord.setTimestamp(System.currentTimeMillis) + + val colCount = new org.apache.cassandra.thrift.Column() + colCount.setName(ByteBufferUtil.bytes("wcount")) + colCount.setValue(ByteBufferUtil.bytes(count.toLong)) + colCount.setTimestamp(System.currentTimeMillis) + + + val outputkey = ByteBufferUtil.bytes(word + "-COUNT-" + System.currentTimeMillis) + + val mutations: java.util.List[Mutation] = new Mutation() :: new Mutation() :: Nil + mutations.get(0).setColumn_or_supercolumn(new ColumnOrSuperColumn()) + mutations.get(0).column_or_supercolumn.setColumn(colWord) + + mutations.get(1).setColumn_or_supercolumn(new ColumnOrSuperColumn()) + mutations.get(1).column_or_supercolumn.setColumn(colCount) + (outputkey, mutations) + } + }.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]], + classOf[ColumnFamilyOutputFormat], job.getConfiguration) + } } @@ -81,6 +116,9 @@ object CassandraTest { create keyspace casDemo; use casDemo; +create column family WordCount with comparator = UTF8Type; +update column family WordCount with column_metadata = [{column_name: word, validation_class: UTF8Type}, {column_name: wcount, validation_class: LongType}]; + create column family Words with comparator = UTF8Type; update column family Words with column_metadata = [{column_name: book, validation_class: UTF8Type}, {column_name: para, validation_class: UTF8Type}]; -- cgit v1.2.3