Pig Basic
Pig 官网
Apache Pig 是一个高级过程语言,适合于使用 Hadoop 和 MapReduce 平台来查询大型半结构化数据集。
通过允许对分布式数据集进行类似 SQL 的查询,Pig 可以简化 Hadoop 的使用
pig 语法关键字是不分大小写
Pig启动模式
- local
- mapreduce
- spark
通过 pig -x local xxx.pig 指定
本地测试
本地一个目录先创建一个文件比如data.txt
1 | w,10,6 |
pig -x local 进入交互模式 (下面省略中间执行日志)
1 | grunt> DATA = LOAD 'data.txt' USING PigStorage(',') AS (name: chararray, age:int, num: float); |
执行参数
- Properties
通过-D:
等价设置环境变量PIG_OPTS, export PIG_OPTS=–Dmapreduce.task.profile=true
或者通过文件
pig -P mypig.properties
常见mapreduce参数设置 hadoop相关配置
1 | pig -Dmapreduce.job.cache.files="/user/hadoop/dict.txt#dict.txt" xxx.pig |
- 变量 -p -param 替换pig文件中变量
pig -p INPUT=”data.txt” xx.pig
log4j 配置
默认是INFO 日志特别多,可以通过log4j文件改成ERROR
1 | log4j.rootLogger=ERROR,stdout |
pig -log4jconf ./pig-log4j.properties -x local
- set
主要用于job名称 job并行度 等job相关设置
1 | SET job.name 'my job'; |
PigLatin
pig 文件是使用PigLatin语法描述
注意pig里 字符串都是 单引号
结构
- tuple
有序字段集合
(name, 18) - bag
tuple 的组合 - map
key/value pair
[ key#value <, key#value …> ]
[name#John,phone#5551212]
引入包
1 | REGISTER 'hdfs://hadoop/piglibs/*.jar'; |
加载数据
pig记载数据通过Loader方式
apache内置多种Loader
- TextLoader
load 一行为一个字段
默认TextLoader是文本TSV分割数据(支持gzip,bzip)
TSV: 分割
A = LOAD ‘student’ USING PigStorage(‘\t’) AS (name: chararray, age:int, gpa: float); - JsonLoader 官方这个不是太好用 不支持 深层次json
a = load ‘a.json’ using JsonLoader(‘a0:int,a1:{(a10:int,a11:chararray)},a2:(a20:double,a21:bytearray),a3:[chararray]’);
第三方Loader
twitter elephantbird
- sequenceFileLoader
1 | pairs = LOAD '$INPUT' USING com.twitter.elephantbird.pig.load.SequenceFileLoader ( |
- Thrift Lzo(Block/Base64)
1 | DATA = LOAD '$INPUT' |
- Protobuf
1 | DATA = LOAD '$INPUT' |
- Json (elephant 版本支持嵌套)
1 | DATA = LOAD '$INPUT' |
注意 等号 左右 要有一个空格 不然会语法错误
1 | INPUT_LOG = LOAD '$XXX_LOG' USING org.apache.parquet.pig.ParquetLoader(); |
遍历数据
1 | INPUT_LOG = FOREACH INPUT_LOG GENERATE time, uid; |
过滤数据
1 | FILTERED_LOG = FILTER INPUT_LOG BY isValid == 1; |
join 数据
- inner join
alias = JOIN alias BY {expression|’(‘expression [, expression …]’)’} (, alias BY {expression|’(‘expression [, expression …]’)’} …) [USING ‘replicated’ | ‘bloom’ | ‘skewed’ | ‘merge’ | ‘merge-sparse’] [PARTITION BY partitioner] [PARALLEL n];
inner join 和sql inner join一样只有两边都有 才保留
1 | grunt> DATA = LOAD 'data.txt' USING PigStorage(',') as (name:chararray, age: int); |
- outer join
alias = JOIN left-alias BY left-alias-column [LEFT|RIGHT|FULL] [OUTER], right-alias BY right-alias-column [USING ‘replicated’ | ‘bloom’ | ‘skewed’ | ‘merge’] [PARTITION BY partitioner] [PARALLEL n];
out join和sql outer join概念类似
分LEFT,RIGHT,FULL
分别是保留左边全部,保留右边全部,保留两边全部
LEFT,RIGHT,FULL [OUTER] OUTER 可以省略
1 | grunt> JOIN_DATA = JOIN DATA BY name RIGHT, WEIGHT BY name; |
group 数据
alias = GROUP alias { ALL | BY expression} [, alias ALL | BY expression …] [USING ‘collected’ | ‘merge’] [PARTITION BY partitioner] [PARALLEL n];
- ALL, group ALL to one group
B = GROUP A ALL; - Field
B = GROUP A BY f1; - Tuple
B = GROUP A BY (f1, f2);
1 | A = load 'student' AS (name:chararray,age:int,gpa:float); |
存储
STORE alias INTO ‘directory’ [USING function];
和加载数据类似
分隔符 tsv: PigStorage
AvroStorage
第三方:twitter
elephant bird store
- LzoJsonStorage
- SequenceFileStorage
- LzoThriftBlockPigStorage
- LzoThriftB64LinePigStorage
- LzoProtobufBlockPigStorage
- LzoProtobufB64LinePigStorage
STORE D INTO ‘mysortedcount’ USING PigStorage();
其他
- 查看描述:
DESCRIBE DATA; - 打印(只建议在少数据量时候使用,大数据建议先LIMIT再DUMP)
DUMP DATA; - LIMIT
alis = LIMIT alias n; - 排序
alias = ORDER alias BY { * [ASC|DESC] | field_alias [ASC|DESC] [, field_alias [ASC|DESC] …] } [PARALLEL n];
X = ORDER A BY age DESC; - 采样
SAMPLE alias size;
size 0~1
X = SAMPLE A 0.01;
常见函数
UDF 用户自定义函数
全称 User Defined Functions