Flink window 源码分析4:WindowState

2024-01-03 18:32:55

本文源码为flink 1.18.0版本。
其他相关文章:
Flink window 源码分析1:窗口整体执行流程
Flink window 源码分析2:Window 的主要组件
Flink window 源码分析3:WindowOperator
Flink window 源码分析4:WindowState

reduce、aggregate 等函数中怎么使用 WindowState ?

主要考虑 reduce、aggregate 函数中的托管状态是在什么时候触发和使用的?使用时与WindowState有什么联系?

  1. 状态描述器
    若用户定义了 Evictor,则窗口中创建 ListState 描述器:
ListStateDescriptor<StreamRecord<T>> stateDesc =  
        new ListStateDescriptor<>(WINDOW_STATE_NAME, streamRecordSerializer);

在 process、apply 中创建 ListState 描述器:

ListStateDescriptor<T> stateDesc =  
        new ListStateDescriptor<>(  
                WINDOW_STATE_NAME, inputType.createSerializer(config));

在 reduce 中创建 ReducingState 描述器:

ReducingStateDescriptor<T> stateDesc =  
        new ReducingStateDescriptor<>(  
                WINDOW_STATE_NAME, reduceFunction, inputType.createSerializer(config));

在 aggregate 中创建 AggregatingState 描述器:

AggregatingStateDescriptor<T, ACC, V> stateDesc =  
        new AggregatingStateDescriptor<>(  
                WINDOW_STATE_NAME,  
                aggregateFunction,  
                accumulatorType.createSerializer(config));

在创建ReducingState、AggregatingState时,直接将用户定义的函数添加到状态中。
2. 创建状态
上述描述器会在 WindowOperator.open() 方法中使用。getOrCreateKeyedState() 会从状态后端中创建或检索相应的状态。最终会得到的状态类型是 InternalAppendingState 。
如果是会话窗口,会进一步将 windowState 转换为 windowMergingState, 其类型为 InternalMergingState。

// create (or restore) the state that hold the actual window contents  
// NOTE - the state may be null in the case of the overriding evicting window operator  
if (windowStateDescriptor != null) {  
    windowState =  
            (InternalAppendingState<K, W, IN, ACC, ACC>)  
                    getOrCreateKeyedState(windowSerializer, windowStateDescriptor);  
}
  1. 使用状态
    在 WindowOperator 类中的 processElement()、
windowState.setCurrentNamespace(stateWindow);  
windowState.add(element.getValue());
... # 判断窗口是否触发
if (triggerResult.isFire()) {  
    ACC contents = windowState.get();  
    if (contents == null) {  
        continue;  
    }  
    emitWindowContents(actualWindow, contents);  
}
if (triggerResult.isPurge()) {  
    windowState.clear();  
}

会调用 windowState 的 add() 和 get() 来添加元素或获取元素。这两个方法在接口 AppendingState 中做了定义,前面的那些类都继承该接口。不同类型的 State 对这两个函数的具体实现是不同的。

@PublicEvolving  
public interface AppendingState<IN, OUT> extends State {  
    OUT get() throws Exception;  
  
    void add(IN var1) throws Exception;  
}

为了可以在emitWindowContents()函数中统一调用用户自定义的代码,会将用户自定义的代码转换为 InternalIterableWindowFunction 类型,在该类型的 process() 方法中会执行用户定义的逻辑。若 windowState 是 ReducingState 或 AggregatingState 类型,则会提供“空”的 InternalIterableWindowFunction,因为逻辑已经绑定到 windowState 上了。

  • 若使用的 process()、apply() 方法,在调用add()方法时,则向ListState中添加数据。等待 emitWindowContents() 函数执行时,使用用户定义的Function处理数据。
  • 若使用的 reduce()、aggragate() 函数,在add()时进行聚合。emitWindowContents()函数执行时,直接将状态中聚合的数据进行提交。
  • 若用户定义了 Evictor,在调用add()方法时,则向ListState中添加数据。等待 emitWindowContents() 函数执行时,使用用户定义的Function处理数据。如果是使用的reduce()、aggragate()函数,那么会在这里遍历窗口的所有数据,反复执行用户自定义的函数。在执行用户窗口处理函数前后会执行用户定义的 Evictor 中的方法 evictBefore() 和 evictAfter() ,这两个函数中可能对窗口的历史数据做处理。
evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
... # 执行用户窗口处理函数
evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));

不同滑动窗口的 state 是否会有重叠?

这节的代码在WindowOperator.processElement()中。
滑动窗口与滚动窗口的底层代码相同,区别只是 WindowAssigner 不同。
滑动窗口到达新数据时,该数据可能属于多个窗口,那么 WindowAssigner 会返回窗口集合:

final Collection<W> elementWindows =  
        windowAssigner.assignWindows(  
                element.getValue(), element.getTimestamp(), windowAssignerContext);

进而会访问该集合的每个元素(窗口),在每个窗口中都处理一次该元素:

for (W window : elementWindows) {
	# 判断数据是否迟到
	# 数据添加到 windowState中
	# 判断该数据是否触发窗口操作
	# 如果触发窗口操作,则进一步处理
}

在该循环中,每个窗口访问其状态时,为了区别,需要设置 namespace:

windowState.setCurrentNamespace(window);  
windowState.add(element.getValue());

每个窗口就是一个 namespace。namespace 不同,访问到的状态会有联系吗?状态存储和检索时通过 StateTable 管理,StateTable 具体是使用 StateMap 管理每个 state 。StateMap 的 value 就是 state 。key 和 namespace 会分别序列化成 byte,两个 byte 数组拼接起来作为 StateMap 的 key。所以,namespace 不同,状态也不同。所以每个 window 单独存储其 state。如果一个数据属于多个窗口,那么它会被复制多份存储。
下面是通过key和namespace获得数据存储位置的函数(不重要):

MemorySegment serializeToSegment(K key, N namespace) {  
    outputStream.reset();  
    try {  
        // serialize namespace  
        outputStream.setPosition(Integer.BYTES);  
        namespaceSerializer.serialize(namespace, outputView);  
    } catch (IOException e) {  
        throw new RuntimeException("Failed to serialize namespace", e);  
    }  
  
    int keyStartPos = outputStream.getPosition();  
    try {  
        // serialize key  
        outputStream.setPosition(keyStartPos + Integer.BYTES);  
        keySerializer.serialize(key, outputView);  
    } catch (IOException e) {  
        throw new RuntimeException("Failed to serialize key", e);  
    }  
  
    final byte[] result = outputStream.toByteArray();  
    final MemorySegment segment = MemorySegmentFactory.wrap(result);  
  
    // set length of namespace and key  
    segment.putInt(0, keyStartPos - Integer.BYTES);  
    segment.putInt(keyStartPos, result.length - keyStartPos - Integer.BYTES);  
  
    return segment;  
}

会话窗口比较特殊,会涉及到 windowState 的合并:

// merge the merged state windows into the newly resulting  
// state window  
windowMergingState.mergeNamespaces(  
        stateWindowResult, mergedStateWindows);

WindowState 与 用户自定义 KeyedState

两者创建过程不同(下面有两者创建状态的源码),但创建出的同一类型状态是一样的。因为在 getPartitionedState() 方法中,如果状态在以前没有创建过,则使用的就是 getOrCreateKeyedState() 方法进行创建,因此两方法所得到的状态是一样的。那么状态描述符一样的话,状态使用起来也就没差别

  • WindowState 创建使用 AbstractKeyedStateBackend.getOrCreateKeyedState() 方法 :
@Override  
@SuppressWarnings("unchecked")  
public <N, S extends State, V> S getOrCreateKeyedState(  
        final TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor)  
        throws Exception {  
    checkNotNull(namespaceSerializer, "Namespace serializer");  
    checkNotNull(  
            keySerializer,  
            "State key serializer has not been configured in the config. "  
                    + "This operation cannot use partitioned state.");  

	# 判断之前是否已经创建过该状态
    InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());  
    if (kvState == null) {  
        if (!stateDescriptor.isSerializerInitialized()) {  
            stateDescriptor.initializeSerializerUnlessSet(executionConfig);  
        }  
        kvState =  
                LatencyTrackingStateFactory.createStateAndWrapWithLatencyTrackingIfEnabled(  
                        TtlStateFactory.createStateAndWrapWithTtlIfEnabled(  
                                namespaceSerializer, stateDescriptor, this, ttlTimeProvider),  
                        stateDescriptor,  
                        latencyTrackingStateConfig);  
        keyValueStatesByName.put(stateDescriptor.getName(), kvState);  
        publishQueryableStateIfEnabled(stateDescriptor, kvState);  
    }  
    return (S) kvState;  
}
  • KeyedState 创建使用 AbstractKeyedStateBackend.getPartitionedState() 方法 :
@SuppressWarnings("unchecked")  
@Override  
public <N, S extends State> S getPartitionedState(  
        final N namespace,  
        final TypeSerializer<N> namespaceSerializer,  
        final StateDescriptor<S, ?> stateDescriptor)  
        throws Exception {  
  
    checkNotNull(namespace, "Namespace");  

	# 如果上一次创建的与这一次是同一个name,则返回上次创建的
    if (lastName != null && lastName.equals(stateDescriptor.getName())) {  
        lastState.setCurrentNamespace(namespace);  
        return (S) lastState;  
    }  

	# 若之前已经创建过该状态,则返回之前创建的
	# 在 getOrCreateKeyedState 函数中也有该判断
    InternalKvState<K, ?, ?> previous = keyValueStatesByName.get(stateDescriptor.getName());  
    if (previous != null) {  
        lastState = previous;  
        lastState.setCurrentNamespace(namespace);  
        lastName = stateDescriptor.getName();  
        return (S) previous;  
    }  

	# 之前没创建过该状态,则创建个新的
	# 此处 getOrCreateKeyedState() 一定是 Create 行为,因为上面的if判断是假
    final S state = getOrCreateKeyedState(namespaceSerializer, stateDescriptor);  
    final InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;  
  
    lastName = stateDescriptor.getName();  
    lastState = kvState;  
    kvState.setCurrentNamespace(namespace);  
  
    return state;  
}

WindowState 的数据存取是怎么实现的?

WindowState 在窗口中主要使用方法是 add()、get()、clear()。
下面主要考虑 Heap-backed。

  1. add()
  • HeapListState.add()
@Override  
public void add(V value) {  
    Preconditions.checkNotNull(value, "You cannot add null to a ListState.");  
  
    final N namespace = currentNamespace;  
  
    final StateTable<K, N, List<V>> map = stateTable;  
    List<V> list = map.get(namespace);  
  
    if (list == null) {  
        list = new ArrayList<>();  
        map.put(namespace, list);  
    }  
    list.add(value);  
}

这个比较简单。ListState 存储数据的数据结构为 ArrayList,这里只需要使用 ArrayList.add()方法添加数据即可。

  • HeapReducingState.add()
@Override  
public void add(V value) throws IOException {  
  
    if (value == null) {  
        clear();  
        return;  
    }  
  
    try {  
        stateTable.transform(currentNamespace, value, reduceTransformation);  
    } catch (Exception e) {  
        throw new IOException("Exception while applying ReduceFunction in reducing state", e);  
    }  
}

调用了 stateTable.transform() 方法修改状态值,reduceTransformation 是用户定义的聚合逻辑。按照下面访问具体逻辑:

stateTable.transform(currentNamespace, value, reduceTransformation)
-> getMapForKeyGroup(keyGroup).transform(key, namespace, value, transformation)
-> CopyOnWriteStateMap.transform(key, namespace, value, transformation)

则看到以下代码:

@Override  
public <T> void transform(  
        K key, N namespace, T value, StateTransformationFunction<S, T> transformation)  
        throws Exception {  
  
    final StateMapEntry<K, N, S> entry = putEntry(key, namespace);  
  
    // copy-on-write check for state  
    entry.state =  
            transformation.apply(  
                    (entry.stateVersion < highestRequiredSnapshotVersion)  
                            ? getStateSerializer().copy(entry.state)  
                            : entry.state,  
                    value);  
    entry.stateVersion = stateMapVersion;  
}

这里 transformation.apply() 将 entry.state 与新到达的数据 value 进行聚合,并写回 entry.state。

  • HeapAggregatingState.add()
    与 HeapReducingState.add() 几乎完全一样,不做赘述。
  1. get()
    三种状态的 get() 方法是相同的,如下面代码所示,调用 getInternal() 实现。ListState 通过 get() 函数获得一个迭代器,ReducingState 和 AggregatingState 则获得聚合值。
    HeapListState、HeapReducingState、HeapAggregatingState的get():
@Override  
public Iterable<V> get() {  
    return getInternal();  
}

这里是封装的 getInternal() 方法,按照下面访问具体的逻辑:

getInternal()
-> stateTable.get(namespace)
-> get(key, keyGroupIndex, namespace)
-> getMapForKeyGroup(keyGroupIndex).get(key, namespace)
-> CopyOnWriteStateMap.get(key, namespace)

则看到以下代码:

@Override  
public S get(K key, N namespace) {  
  
    final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);  
    final int requiredVersion = highestRequiredSnapshotVersion;  
    final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);  
    int index = hash & (tab.length - 1);  
  
    for (StateMapEntry<K, N, S> e = tab[index]; e != null; e = e.next) {  
        final K eKey = e.key;  
        final N eNamespace = e.namespace;  
        if ((e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace))) {  
  
            // copy-on-write check for state  
            if (e.stateVersion < requiredVersion) {  
                // copy-on-write check for entry  
                if (e.entryVersion < requiredVersion) {  
                    e = handleChainedEntryCopyOnWrite(tab, hash & (tab.length - 1), e);  
                }  
                e.stateVersion = stateMapVersion;  
                e.state = getStateSerializer().copy(e.state);  
            }  
  
            return e.state;  
        }  
    }  
  
    return null;  
}

这里就是通过 StateMap 直接获取状态的值。
3. clear()
三种状态的 clear() 方法也是一样的。
AbstractHeapState.clear():

@Override  
public final void clear() {  
    stateTable.remove(currentNamespace);  
}

可按照下面访问具体逻辑:

stateTable.remove(namespace)
-> remove(key, keyGroupIndex(), namespace)
-> getMapForKeyGroup(keyGroupIndex).remove(key, namespace)
-> CopyOnWriteStateMap.remove(key, namespace)
-> removeEntry(key, namespace)

则可看到如下代码:

private StateMapEntry<K, N, S> removeEntry(K key, N namespace) {  
  
    final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);  
    final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);  
    int index = hash & (tab.length - 1);  
  
    for (StateMapEntry<K, N, S> e = tab[index], prev = null; e != null; prev = e, e = e.next) {  
        if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {  
            if (prev == null) {  
                tab[index] = e.next;  
            } else {  
                // copy-on-write check for entry  
                if (prev.entryVersion < highestRequiredSnapshotVersion) {  
                    prev = handleChainedEntryCopyOnWrite(tab, index, prev);  
                }  
                prev.next = e.next;  
            }  
            ++modCount;  
            if (tab == primaryTable) {  
                --primaryTableSize;  
            } else {  
                --incrementalRehashTableSize;  
            }  
            return e;  
        }  
    }  
    return null;  
}

文章来源:https://blog.csdn.net/White_Ink_/article/details/135339269
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。