学习 Flink(十七):HyperLogLog 去重计数

在需要对数据进行去重计数的场景里,实现方式是将数据明细存储在集合的数据结构中。然而,随着数据随时间的不断累积,明细数据占用了大量的存储空间。使用 HyperLoglog 去重计数,在牺牲非常小准确性的情况下,可以极大的减少数据存储。

依赖

编辑 pom.xml 文件,添加依赖:

<dependency>  
    <groupId>net.agkn</groupId>
    <artifactId>hll</artifactId>
    <version>1.6.0</version>
</dependency>  

使用

定义状态:

private ValueState<Byte[]> hllState;  

初识化状态:

@Override
public void open(Configuration parameters) throws Exception {  
    super.open(parameters);
    ValueStateDescriptor<Byte[]> hllStateDescriptor = new ValueStateDescriptor<>(
        "hll",
        Types.OBJECT_ARRAY(Types.BYTE)
    );

    this.hllState = getRuntimeContext().getState(hllStateDescriptor);
}

处理方法中,由状态获取 HLL:

HLL hll = null;  
if (this.hllState.value() == null) {  
    hll = new HLL(14, 5);
} else {
    hll = HLL.fromBytes(ArrayUtils.toPrimitive(this.hllState.value()));
}

处理方法中,由 HLL 更新状态:

this.hllState.update(ArrayUtils.toObject(hll.toBytes()));