Hive的执行计划
Hive的SQL是怎么转变成的MapReduce
Driver端,
- 将HQL语句转换为AST;
- 借助于 ParseDriver: 将HQL语句转为Token , 对Token解析生成AST;
- 将AST转换为TaskTree;
- SemanticAnalyzer, AST转换为QueryBlock
- 将QB转换为OperatorTree
- 将OT进行逻辑优化, 生成TaskTree
- TaskTree执行物理优化
- 提交任务执行 (ExecDriver)
Hive 的执行入口类
- 主类 CliDriver run() 方法
- 执行executeDriver
- processCmd
- processLocalCmd
- qp.run(cmd)
- runInternal
其中 qp 是Driver对象, Driver端的runInternal 方法, 调用了compile 完成了语法的解析和编译.
compileInternal 编译
编译器, 解析器, 优化器 在这步
AST 在这里生成
compile方法中, 使用ParseUtils.parse 完成对命令的解析,生成一棵AST树,代码如下:
ASTNode tree; // AST 树
tree = ParseUtils.parse(command, ctx);
...
HiveLexerX lexer = new HiveLexerX(new ANTLRNoCaseStringStream(command));
TokenRewriteStream tokens = new TokenRewriteStream(lexer);
Hive 使用ANTLR 完成SQL的词法解析, 在老版本的Hive中, 因为语法非常简单, 只用一个文件 Hive.g
完成,随着语法规则越来越复杂, 目前为拆分成了5个文件:
- 词法规则 HiveLexer.g
- 语法规则 四个文件:
- SelectClauseParser.g
- FromClauseParser.g
- IdentifiersParser.g
- HiveParser.g
AST转为Operator Tree
- Implementation of the semantic analyzer. It generates the query plan.
- There are other specific semantic analyzers for some hive operations such as
- DDLSemanticAnalyzer for ddl operations.
优化器
// SemanticAnalyzer.java
Optimizer optm = new Optimizer();
轮询Optimizer中的优化策略
// Optimizer.java
// Invoke all the transformations one-by-one, and alter the query plan.
public ParseContext optimize() throws SemanticException {
for (Transform t : transformations) {
t.beginPerfLogging();
pctx = t.transform(pctx);
t.endPerfLogging(t.toString());
}
return pctx;
}
这个Transform 就是优化策略的抽象类,
例如GroupByOptimizer
是有条件地执行map侧计算,减少shuffle IO.
This transformation does group by optimization. If the grouping key is a superset of the bucketing and sorting keys of the underlying table in the same order, the group by can be performed on the map-side completely.
execute() 执行
执行器
Hive- Explain
最简单的Explain计划例子
只有一个Stage, 即Stage-0, 而且是一个Fetch Operator, limit为负1 表示没有Limit操作.
Processor树下面只有一个TableScan, 是对表名 logs的操作.
统计信息能看到 一共的行数,(Num rows), 数据尺寸(Data size). Select算子下面是执行的扫列, 因为执行了select *, 所以全部列的schema都展示出来了.
一个稍稍复杂的例子
再来看一个带有 分组聚合查询(count + count distinct)的例子
执行语句是
hive> explain extended select ip , count(distinct id) uv, count( doop) pv from logs group by ip;
阶段划分为2个了, Stage 1是 根阶段, Stage 0 依赖于Stage1
Stage1是一个Map Reduce阶段,分为 Map树
Map过程也是要执行TableScan, 扫的logs表-行数列数, 执行 Select, 只对 ip id 和 doop 三列扫. GroupBy 算子
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: logs
Statistics: Num rows: 2762962 Data size: 828888768 Basic stats: COMPLETE Column stats: NONE
GatherStats: false
Select Operator
expressions: ip (type: string), id (type: string), doop (type: string)
outputColumnNames: ip, id, doop
Statistics: Num rows: 2762962 Data size: 828888768 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count(DISTINCT id), count(doop)
keys: ip (type: string), id (type: string)
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3
Statistics: Num rows: 2762962 Data size: 828888768 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: string), _col1 (type: string)
null sort order: aa
sort order: ++
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 2762962 Data size: 828888768 Basic stats: COMPLETE Column stats: NONE
tag: -1
value expressions: _col3 (type: bigint)
auto parallelism: false
Path -> Alias:
file:/Users/staticor/tmp/hive/warehouse/shopee.db/logs [logs]
Path -> Partition:
file:/Users/staticor/tmp/hive/warehouse/shopee.db/logs
Partition
base file name: logs
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
column.name.delimiter ,
columns id,user_id,ip,doop,ttr
columns.comments
columns.types bigint:bigint:string:string:string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location file:/Users/staticor/tmp/hive/warehouse/shopee.db/logs
name shopee.logs
numFiles 1
numRows 0
rawDataSize 0
separatorChar
serialization.ddl struct logs { i64 id, i64 user_id, string ip, string doop, string ttr}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.OpenCSVSerde
totalSize 828888758
transient_lastDdlTime 1653360838
serde: org.apache.hadoop.hive.serde2.OpenCSVSerde
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
column.name.delimiter ,
columns id,user_id,ip,doop,ttr
columns.comments
columns.types bigint:bigint:string:string:string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location file:/Users/staticor/tmp/hive/warehouse/shopee.db/logs
name shopee.logs
numFiles 1
numRows 0
rawDataSize 0
separatorChar
serialization.ddl struct logs { i64 id, i64 user_id, string ip, string doop, string ttr}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.OpenCSVSerde
totalSize 828888758
transient_lastDdlTime 1653360838
serde: org.apache.hadoop.hive.serde2.OpenCSVSerde
name: shopee.logs
name: shopee.logs
Truncated Path -> Alias:
/shopee.db/logs [logs]
Needs Tagging: false
Reduce Operator Tree:
Group By Operator
aggregations: count(DISTINCT KEY._col1:0._col0), count(VALUE._col1)
keys: KEY._col0 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1381481 Data size: 414444384 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
GlobalTableId: 0
directory: file:/Users/staticor/tmp/hive/9ae192bf-f90d-4427-ba16-54d05ac3c8a3/hive_2022-05-29_16-51-04_737_1249285988661867765-1/-mr-10001/.hive-staging_hive_2022-05-29_16-51-04_737_1249285988661867765-1/-ext-10002
NumFilesPerFileSink: 1
Statistics: Num rows: 1381481 Data size: 414444384 Basic stats: COMPLETE Column stats: NONE
Stats Publishing Key Prefix: file:/Users/staticor/tmp/hive/9ae192bf-f90d-4427-ba16-54d05ac3c8a3/hive_2022-05-29_16-51-04_737_1249285988661867765-1/-mr-10001/.hive-staging_hive_2022-05-29_16-51-04_737_1249285988661867765-1/-ext-10002/
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
properties:
columns _col0,_col1,_col2
columns.types string:bigint:bigint
escape.delim \
hive.serialization.extend.additional.nesting.levels true
serialization.escape.crlf true
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
TotalFiles: 1
GatherStats: false
MultiFileSpray: false
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
Time taken: 0.159 seconds, Fetched: 123 row(s)
hive.fetch.task.conversion=more
控制哪些查询走MapReduce
none, 所有查询都走MR;
more, 默认Level, 查询字段,分区的where, limit 时间戳, 虚拟字段
minimal, 查询字段, 分区的where, limit 不走MR
一个简单的count (distinct ) + sum 的单表执行计划:
Stage 0 依赖于Stage1,
Stage1 执行了MapReduce,
Map阶段执行操作:
表扫描,
字段映射Project(Select Operator)
聚合