前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站:https://www.captainai.net/dongkelun

前言

接上篇文章:Hudi源码 | Insert源码分析总结(一)(整体流程),继续进行Apache Hudi Insert源码分析总结,本文主要分析上文提到的WorkloadProfile

版本

Hudi 0.9.0

入口

入口在上篇文章中讲到的BaseJavaCommitActionExecutorexecute

    WorkloadProfile profile = null;
    if (isWorkloadProfileNeeded()) { // 始终为true
      // 构建WorkloadProfile,构建WorkloadProfile的目的主要是为给getPartitioner使用
      // WorkloadProfile包含了分区路径对应的insert/upsert数量以及upsert数据对应的文件位置信息
      // 数量信息是为了分桶,或者说是为了分几个文件,这里涉及了小文件合并、文件大小等原理
      // 位置信息是为了获取要更新的文件
      // 对于upsert数据,我们复用原来的fileId
      // 对于insert数据,我们生成新的fileId,如果record数比较多,则分多个文件写
      profile = new WorkloadProfile(buildProfile(inputRecords));
      LOG.info("Workload profile :" + profile);
      try {
        // 将WorkloadProfile元数据信息持久化到.inflight文件中,.commit.request->.commit.inflight.
        // 这一步主要是为了mor表的rollback,rollback时可以从.inflight文件中读取对应的元数据信息
        saveWorkloadProfileMetadataToInflight(profile, instantTime);
      } catch (Exception e) {
        HoodieTableMetaClient metaClient = table.getMetaClient();
        HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
        try {
          if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
            throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
          }
        } catch (IOException ex) {
          LOG.error("Check file exists failed");
          throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
        }
      }
    }

WorkloadProfile

首先看一下WorkloadProfile的构造函数,看看需要哪些参数。它有两个构造函数,一个只有一个参数:Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile,另外一个相比于第一个只是多了一个写操作类型:WriteOperationType。对于profile,它的left是分区统计信息,right是全局统计信息。统计信息是通过WorkloadStat实现的。

  public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile) {
    this.partitionPathStatMap = profile.getLeft();
    this.globalStat = profile.getRight();
  }

  public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile, WriteOperationType operationType) {
    this(profile);
    this.operationType = operationType;
  }

buildProfile

buildProfile就是将inputRecords构造成WorkloadProfile所需要的profile。首先初始化一个分区统计信息partitionPathStatMap和全局统计信息globalStat。然后将inputRecords通过map和groupingBy得到每个分区路径对应的文件位置信息和记录数量:partitionLocationCounts。其中文件位置信息是通过record.getCurrentLocation得到的,保存在HoodieRecordLocation中。而record中位置信息是通过上篇文章提到的tag方法通过读取索引信息得到的。不过tag方法只有upsert/delete等才会调用,对于insert方法是不会触发的,也就是这里的record中的location都为空。然后遍历partitionLocationCounts.entrySet(),其实就是按照分区执行,获取分区路径、记录数、文件位置。如果partitionPathStatMap没有该分区,则将该分区放进去,并且初始化value即WorkloadStat。接着判断文件信息是否存在,如果文件位置信息存在,则代表是update(有对应的历史数据),对于update,对应的分区下的WorkloadStat调用addUpdates,全局WorkloadStat调用addUpdates。如果文件位置信息不存在,则代表是insert数据(新数据),对于insert,对应分区下的WorkloadStat调用addInserts,使insert数加上对应的记录数,全局WorkloadStat中的insert数也加上对应的记录数,最后返回WorkloadProfile所需要的Pair.of(partitionPathStatMap, globalStat)

  protected Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(List<HoodieRecord<T>> inputRecords) {
    // 分区路径,WorkloadStat
    HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<>();
    // 全局WorkloadStat
    WorkloadStat globalStat = new WorkloadStat();

    // 返回(分区路径,文件位置信息),记录数
    // 也就是partitionLocationCounts:分区路径、文件位置、记录数
    Map<Pair<String, Option<HoodieRecordLocation>>, Long> partitionLocationCounts = inputRecords
        .stream()
        .map(record -> Pair.of(
            // (分区路径,文件位置信息),record
            Pair.of(record.getPartitionPath(), Option.ofNullable(record.getCurrentLocation())), record))
            // 根据分区路径groupBy,统计每个分区对应的数量
        .collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting()));

    // 遍历partitionLocationCounts(k,v)
    for (Map.Entry<Pair<String, Option<HoodieRecordLocation>>, Long> e : partitionLocationCounts.entrySet()) {
      // 分区路径
      String partitionPath = e.getKey().getLeft();
      // 记录数
      Long count = e.getValue();
      // 文件位置HoodieRecordLocation
      Option<HoodieRecordLocation> locOption = e.getKey().getRight();

      // 如果partitionPathStatMap没有该分区,则将该分区放进去,并且初始化value WorkloadStat
      if (!partitionPathStatMap.containsKey(partitionPath)) {
        partitionPathStatMap.put(partitionPath, new WorkloadStat());
      }

      // 如果文件位置信息存在,则代表是update
      // 获取文件位置信息是在前面的tag方法中,对于insert方法,不需要tag
      if (locOption.isPresent()) {
        // update
        // 对应分区下的WorkloadStat调用addUpdates
        partitionPathStatMap.get(partitionPath).addUpdates(locOption.get(), count);
        // 全局WorkloadStat调用addUpdates
        globalStat.addUpdates(locOption.get(), count);
      } else {
        // 否则是insert
        // insert
        // 对应分区下的WorkloadStat调用addInserts,使insert数+count
        partitionPathStatMap.get(partitionPath).addInserts(count);
        // 全局WorkloadStat中的insert数+count
        globalStat.addInserts(count);
      }
    }
    // 返回分区统计信息和全局统计信息
    return Pair.of(partitionPathStatMap, globalStat);
  }

WorkloadStat

主要是看一下它的addInserts和addUpdates方法。addInserts方法很简单,就是将numInsert更新为numInserts加上对应的记录数。addUpdates稍微复杂一点,主要是更新updateLocationToCount,updateLocationToCount保存的是(fileId,(instantTime,记录数))。主要逻辑:如果updateLocationToCount中没有该fileId,则直接将fileId,(instantTime,记录数)放进updateLocationToCount,如果有的话,则更新该fileId对应的value。value为pair,将value的left即instantTime更新为location.getInstantTime()。将value的right即记录数更新为numUpdates + accNumUpdates。其中numUpdates为参数即本次记录数,accNumUpdates是已经存在的累计数量。最后再将numUpdates更新numUpdates+对应的记录数。(numUpdates在后面并没有用到)

总结一下,WorkloadStat的作用主要记录insert累计数和update累计数。不过update需要以fileId为维度进行累计,这是因为update有明确要更新的fileId,而insert是没有的。


  private long numInserts = 0L;

  private long numUpdates = 0L;

    // fileId,(instantTime,记录数)
  private HashMap<String, Pair<String, Long>> updateLocationToCount;

  /**
   * numInserts数初始值0,addInserts将numInserts加上对应的记录数
   */
  public long addInserts(long numInserts) {
    return this.numInserts += numInserts;
  }

  /**
   *  accNumUpdates初始值0
   * 如果updateLocationToCount中有对应的fileId,
   * 则先获取updateLocationToCount对应的fileId对应的记录数赋值给accNumUpdates
   * 最后将updateLocationToCount
   */
  public long addUpdates(HoodieRecordLocation location, long numUpdates) {
    // 初始值0
    long accNumUpdates = 0;
    // 如果已经存在了对应的fileId
    if (updateLocationToCount.containsKey(location.getFileId())) {
      // 将updateLocationToCount中对应的数取出来赋值给accNumUpdates
      accNumUpdates = updateLocationToCount.get(location.getFileId()).getRight();
    }
    // 更新updateLocationToCount中该fileId对应的value。value为pair,将value的left即instantTime更新为location.getInstantTime()
    // 将value的right即记录数更新为numUpdates + accNumUpdates
    updateLocationToCount.put(
        location.getFileId(),
        Pair.of(location.getInstantTime(), numUpdates + accNumUpdates));
    // numUpdates初始值为0,每次调用addUpdates都将numUpdates加上对应的record数
    return this.numUpdates += numUpdates;
  }

saveWorkloadProfileMetadataToInflight

上篇文章讲到将WorkloadProfile元数据信息持久化到.inflight文件中,我们来看一下是如何持久化的。主要逻辑就是遍历profile中的分区,获取对应的WorkloadStat,然后将对应的partitionPath、numInserts/numUpdates、fileId、instantTime放到HoodieWriteStat中,再将HoodieWriteStat放到HoodieCommitMetadata中,最后调用activeTimeline.transitionRequestedToInflight将HoodieCommitMetadata转成json持久化到.inflight

  // 将WorkloadProfile元数据信息持久化到.inflight文件中,.commit.request->.commit.inflight.
  // 这一步主要是为了mor表的rollback,rollback时可以从.inflight文件中读取对应的元数据信息
  saveWorkloadProfileMetadataToInflight(profile, instantTime);

  void saveWorkloadProfileMetadataToInflight(WorkloadProfile profile, String instantTime)
      throws HoodieCommitException {
    try {
      HoodieCommitMetadata metadata = new HoodieCommitMetadata();
      // 按照分区路径遍历
      profile.getPartitionPaths().forEach(path -> {
        // 获取对应分区的WorkloadStat
        WorkloadStat partitionStat = profile.getWorkloadStat(path);
        // 创建一个新的HoodieWriteStat,先进行insert
        HoodieWriteStat insertStat = new HoodieWriteStat();
        // 将WorkloadStat中的numInserts赋值给insertStat
        insertStat.setNumInserts(partitionStat.getNumInserts());
        // insertStat的fileId为空
        insertStat.setFileId("");
        // prevCommit为null
        insertStat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
        // 将path和insertStat添加到metadata中
        metadata.addWriteStat(path, insertStat);

        // 接着进行update的逻辑
        partitionStat.getUpdateLocationToCount().forEach((key, value) -> {
          // 创建一个新的HoodieWriteStat
          HoodieWriteStat writeStat = new HoodieWriteStat();
          // 设置fileId
          writeStat.setFileId(key);
          // TODO : Write baseCommitTime is possible here ?
          // prevCommit设为WorkloadStat中的instantTime
          writeStat.setPrevCommit(value.getKey());
          // 设置更新数
          writeStat.setNumUpdateWrites(value.getValue());
          // 将path和writeStat添加到metadata中
          metadata.addWriteStat(path, writeStat);
        });
      });
      // 设置操作类型
      metadata.setOperationType(operationType);

      HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
      String commitActionType = getCommitActionType();
      HoodieInstant requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime);
      // 将.request转为.inflight,其实就是创建一个新的.inflight,将metadata转成json持久化到.inflight
      activeTimeline.transitionRequestedToInflight(requested,
          Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)),
          config.shouldAllowMultiWriteOnSameInstant());
    } catch (IOException io) {
      throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
    }
  }

总结

关于WorkloadProfile的分析一共就这么多,主要是统计record中每个分区路径对应的insert/upsert数量以及upsert数据对应的fileIdinstantTime,先持久化到.inflight文件中,然后给后面的getPartitioner使用。关于WorkloadProfile统计的这些信息是如何在getPartitioner中使用的,我们放在下篇文章中分析。

注释代码

github: https://github.com/dongkelun/hudi/tree/0.9.0-learning-comments
gitee: https://gitee.com/dongkelun/hudi/tree/0.9.0-learning-comments

相关阅读

更多文章请关注《万象专栏》