前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站:https://www.captainai.net/dongkelun
前言
接上篇文章:Hudi源码 | Insert源码分析总结(一)(整体流程),继续进行Apache Hudi Insert源码分析总结,本文主要分析上文提到的WorkloadProfile
版本
Hudi 0.9.0
入口
入口在上篇文章中讲到的BaseJavaCommitActionExecutor的execute
WorkloadProfile profile = null;
if (isWorkloadProfileNeeded()) { // 始终为true
// 构建WorkloadProfile,构建WorkloadProfile的目的主要是为给getPartitioner使用
// WorkloadProfile包含了分区路径对应的insert/upsert数量以及upsert数据对应的文件位置信息
// 数量信息是为了分桶,或者说是为了分几个文件,这里涉及了小文件合并、文件大小等原理
// 位置信息是为了获取要更新的文件
// 对于upsert数据,我们复用原来的fileId
// 对于insert数据,我们生成新的fileId,如果record数比较多,则分多个文件写
profile = new WorkloadProfile(buildProfile(inputRecords));
LOG.info("Workload profile :" + profile);
try {
// 将WorkloadProfile元数据信息持久化到.inflight文件中,.commit.request->.commit.inflight.
// 这一步主要是为了mor表的rollback,rollback时可以从.inflight文件中读取对应的元数据信息
saveWorkloadProfileMetadataToInflight(profile, instantTime);
} catch (Exception e) {
HoodieTableMetaClient metaClient = table.getMetaClient();
HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
try {
if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
}
} catch (IOException ex) {
LOG.error("Check file exists failed");
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
}
}
}
WorkloadProfile
首先看一下WorkloadProfile的构造函数,看看需要哪些参数。它有两个构造函数,一个只有一个参数:Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile,另外一个相比于第一个只是多了一个写操作类型:WriteOperationType。对于profile,它的left是分区统计信息,right是全局统计信息。统计信息是通过WorkloadStat实现的。
public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile) {
this.partitionPathStatMap = profile.getLeft();
this.globalStat = profile.getRight();
}
public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile, WriteOperationType operationType) {
this(profile);
this.operationType = operationType;
}
buildProfile
buildProfile就是将inputRecords构造成WorkloadProfile所需要的profile。首先初始化一个分区统计信息partitionPathStatMap和全局统计信息globalStat。然后将inputRecords通过map和groupingBy得到每个分区路径对应的文件位置信息和记录数量:partitionLocationCounts。其中文件位置信息是通过record.getCurrentLocation得到的,保存在HoodieRecordLocation中。而record中位置信息是通过上篇文章提到的tag方法通过读取索引信息得到的。不过tag方法只有upsert/delete等才会调用,对于insert方法是不会触发的,也就是这里的record中的location都为空。然后遍历partitionLocationCounts.entrySet(),其实就是按照分区执行,获取分区路径、记录数、文件位置。如果partitionPathStatMap没有该分区,则将该分区放进去,并且初始化value即WorkloadStat。接着判断文件信息是否存在,如果文件位置信息存在,则代表是update(有对应的历史数据),对于update,对应的分区下的WorkloadStat调用addUpdates,全局WorkloadStat调用addUpdates。如果文件位置信息不存在,则代表是insert数据(新数据),对于insert,对应分区下的WorkloadStat调用addInserts,使insert数加上对应的记录数,全局WorkloadStat中的insert数也加上对应的记录数,最后返回WorkloadProfile所需要的Pair.of(partitionPathStatMap, globalStat)
protected Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(List<HoodieRecord<T>> inputRecords) {
// 分区路径,WorkloadStat
HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<>();
// 全局WorkloadStat
WorkloadStat globalStat = new WorkloadStat();
// 返回(分区路径,文件位置信息),记录数
// 也就是partitionLocationCounts:分区路径、文件位置、记录数
Map<Pair<String, Option<HoodieRecordLocation>>, Long> partitionLocationCounts = inputRecords
.stream()
.map(record -> Pair.of(
// (分区路径,文件位置信息),record
Pair.of(record.getPartitionPath(), Option.ofNullable(record.getCurrentLocation())), record))
// 根据分区路径groupBy,统计每个分区对应的数量
.collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting()));
// 遍历partitionLocationCounts(k,v)
for (Map.Entry<Pair<String, Option<HoodieRecordLocation>>, Long> e : partitionLocationCounts.entrySet()) {
// 分区路径
String partitionPath = e.getKey().getLeft();
// 记录数
Long count = e.getValue();
// 文件位置HoodieRecordLocation
Option<HoodieRecordLocation> locOption = e.getKey().getRight();
// 如果partitionPathStatMap没有该分区,则将该分区放进去,并且初始化value WorkloadStat
if (!partitionPathStatMap.containsKey(partitionPath)) {
partitionPathStatMap.put(partitionPath, new WorkloadStat());
}
// 如果文件位置信息存在,则代表是update
// 获取文件位置信息是在前面的tag方法中,对于insert方法,不需要tag
if (locOption.isPresent()) {
// update
// 对应分区下的WorkloadStat调用addUpdates
partitionPathStatMap.get(partitionPath).addUpdates(locOption.get(), count);
// 全局WorkloadStat调用addUpdates
globalStat.addUpdates(locOption.get(), count);
} else {
// 否则是insert
// insert
// 对应分区下的WorkloadStat调用addInserts,使insert数+count
partitionPathStatMap.get(partitionPath).addInserts(count);
// 全局WorkloadStat中的insert数+count
globalStat.addInserts(count);
}
}
// 返回分区统计信息和全局统计信息
return Pair.of(partitionPathStatMap, globalStat);
}
WorkloadStat
主要是看一下它的addInserts和addUpdates方法。addInserts方法很简单,就是将numInsert更新为numInserts加上对应的记录数。addUpdates稍微复杂一点,主要是更新updateLocationToCount,updateLocationToCount保存的是(fileId,(instantTime,记录数))。主要逻辑:如果updateLocationToCount中没有该fileId,则直接将fileId,(instantTime,记录数)放进updateLocationToCount,如果有的话,则更新该fileId对应的value。value为pair,将value的left即instantTime更新为location.getInstantTime()。将value的right即记录数更新为numUpdates + accNumUpdates。其中numUpdates为参数即本次记录数,accNumUpdates是已经存在的累计数量。最后再将numUpdates更新numUpdates+对应的记录数。(numUpdates在后面并没有用到)
总结一下,WorkloadStat的作用主要记录insert累计数和update累计数。不过update需要以fileId为维度进行累计,这是因为update有明确要更新的fileId,而insert是没有的。
private long numInserts = 0L;
private long numUpdates = 0L;
// fileId,(instantTime,记录数)
private HashMap<String, Pair<String, Long>> updateLocationToCount;
/**
* numInserts数初始值0,addInserts将numInserts加上对应的记录数
*/
public long addInserts(long numInserts) {
return this.numInserts += numInserts;
}
/**
* accNumUpdates初始值0
* 如果updateLocationToCount中有对应的fileId,
* 则先获取updateLocationToCount对应的fileId对应的记录数赋值给accNumUpdates
* 最后将updateLocationToCount
*/
public long addUpdates(HoodieRecordLocation location, long numUpdates) {
// 初始值0
long accNumUpdates = 0;
// 如果已经存在了对应的fileId
if (updateLocationToCount.containsKey(location.getFileId())) {
// 将updateLocationToCount中对应的数取出来赋值给accNumUpdates
accNumUpdates = updateLocationToCount.get(location.getFileId()).getRight();
}
// 更新updateLocationToCount中该fileId对应的value。value为pair,将value的left即instantTime更新为location.getInstantTime()
// 将value的right即记录数更新为numUpdates + accNumUpdates
updateLocationToCount.put(
location.getFileId(),
Pair.of(location.getInstantTime(), numUpdates + accNumUpdates));
// numUpdates初始值为0,每次调用addUpdates都将numUpdates加上对应的record数
return this.numUpdates += numUpdates;
}
saveWorkloadProfileMetadataToInflight
上篇文章讲到将WorkloadProfile元数据信息持久化到.inflight文件中,我们来看一下是如何持久化的。主要逻辑就是遍历profile中的分区,获取对应的WorkloadStat,然后将对应的partitionPath、numInserts/numUpdates、fileId、instantTime放到HoodieWriteStat中,再将HoodieWriteStat放到HoodieCommitMetadata中,最后调用activeTimeline.transitionRequestedToInflight将HoodieCommitMetadata转成json持久化到.inflight中
// 将WorkloadProfile元数据信息持久化到.inflight文件中,.commit.request->.commit.inflight.
// 这一步主要是为了mor表的rollback,rollback时可以从.inflight文件中读取对应的元数据信息
saveWorkloadProfileMetadataToInflight(profile, instantTime);
void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, String instantTime)
throws HoodieCommitException {
try {
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
// 按照分区路径遍历
profile.getPartitionPaths().forEach(path -> {
// 获取对应分区的WorkloadStat
WorkloadStat partitionStat = profile.getWorkloadStat(path);
// 创建一个新的HoodieWriteStat,先进行insert
HoodieWriteStat insertStat = new HoodieWriteStat();
// 将WorkloadStat中的numInserts赋值给insertStat
insertStat.setNumInserts(partitionStat.getNumInserts());
// insertStat的fileId为空
insertStat.setFileId("");
// prevCommit为null
insertStat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
// 将path和insertStat添加到metadata中
metadata.addWriteStat(path, insertStat);
// 接着进行update的逻辑
partitionStat.getUpdateLocationToCount().forEach((key, value) -> {
// 创建一个新的HoodieWriteStat
HoodieWriteStat writeStat = new HoodieWriteStat();
// 设置fileId
writeStat.setFileId(key);
// TODO : Write baseCommitTime is possible here ?
// prevCommit设为WorkloadStat中的instantTime
writeStat.setPrevCommit(value.getKey());
// 设置更新数
writeStat.setNumUpdateWrites(value.getValue());
// 将path和writeStat添加到metadata中
metadata.addWriteStat(path, writeStat);
});
});
// 设置操作类型
metadata.setOperationType(operationType);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
String commitActionType = getCommitActionType();
HoodieInstant requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime);
// 将.request转为.inflight,其实就是创建一个新的.inflight,将metadata转成json持久化到.inflight
activeTimeline.transitionRequestedToInflight(requested,
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)),
config.shouldAllowMultiWriteOnSameInstant());
} catch (IOException io) {
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
}
}
总结
关于WorkloadProfile的分析一共就这么多,主要是统计record中每个分区路径对应的insert/upsert数量以及upsert数据对应的fileId和instantTime,先持久化到.inflight文件中,然后给后面的getPartitioner使用。关于WorkloadProfile统计的这些信息是如何在getPartitioner中使用的,我们放在下篇文章中分析。
注释代码
github: https://github.com/dongkelun/hudi/tree/0.9.0-learning-comments
gitee: https://gitee.com/dongkelun/hudi/tree/0.9.0-learning-comments
相关阅读
- 开源经验分享 | 如何从一名小白成为Apache Hudi Contributor
- Hudi源码 | Insert源码分析总结(一)(整体流程)
- Hudi Java Client总结|读取Hive写Hudi代码示例
- Hudi源码|bootstrap源码分析总结(写Hudi)
- Hudi Clean Policy 清理策略实现分析
- Hudi Clean 清理文件实现分析
- Hudi查询类型/视图总结
- Hudi preCombinedField 总结(二)-源码分析
- Hudi Spark SQL源码学习总结-Create Table
- Hudi Spark SQL源码学习总结-CTAS
- Hudi Spark源码学习总结-df.write.format(“hudi”).save
- Hudi Spark源码学习总结-spark.read.format(“hudi”).load
- Hudi Spark SQL源码学习总结-select(查询)
更多文章请关注《万象专栏》
转载请注明出处:https://www.wanxiangsucai.com/read/cv172621