1 2 3 4 5 6 7 8 9 10 11 12 13
| from pyspark import SparkConf, SparkContext import thrift from gen_py.xxx.ttypes import ThriftObj from thrift.TSerialization import deserialize conf = SparkConf() sc = SparkContext(conf = conf)
logpath="/youhdfspath/xxx.thrift.lzo" log = sc.newAPIHadoopFile(logpath,'com.twitter.elephantbird.mapreduce.input.RawMultiInputFormat', 'org.apache.hadoop.io.LongWritable','com.twitter.elephantbird.mapreduce.io.BinaryWritable')
rdd = log.map(lambda x:deserialize(ThriftObj(),x[1])) rdd.saveAsTextFile("testfile")
|