写在前面
最近在使用flink进行业务数据的实时计算以展示报表,在开发自测过程中遇到一个NPE,刚接触flink的我是一头雾水,硬啃了一早上源码,才找到问题,发现是flink序列,特此记录下
正文
问题如下:
分析:
通过debug分析,最初new的实例和最终sink的function不是一个实例。
那么猜测是实例被替换或者拷贝,那么先查阅代码发现sinkMQ线程初始化如下:

- 可以看到这里的实例已经是被”替换“过的了,那么我们继续跟踪源代码,发现org.apache.flink.streaming.runtime.tasks.StreamTask#operatorChain是在这里被初始化的,那么继续debug跟踪该方法:org.apache.flink.streaming.runtime.tasks.StreamTask#beforeInvoke。跟踪步骤如下:



- 可以看到这里是把sinkFunction序列化为二进制数据,然后使用自定义classLoader等来进行了反序列化。
- 看到这里就明白了,flink会把我们定义的各种function序列化保存起来,方便分派给各个slot task,但是可以看到我们前面的自定义sinkFunction内确对charset字段使用了transient关键字,导致该字段在序列化和反序列化后变为了null
解决方案:
- 现在尝试去掉transient,启动直接报错了,原来是flink在准备阶段就做了是否可以被序列化的校验,因为charset该类没有实现Serializable接口,所以直接报错了,如下:
- 所以可以这么改:

- 问题解决。
- 现在尝试去掉transient,启动直接报错了,原来是flink在准备阶段就做了是否可以被序列化的校验,因为charset该类没有实现Serializable接口,所以直接报错了,如下:
思考:



