程序地带

记一次Flink addsink报错org.apache.kafka.common.serialization.StringSerializer


public static Properties pro = new Properties();
pro.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
pro.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

由于之前一次疏忽便将原本的StringSerializer改成了ByteArraySerializer 使用flink的addsink 在kafka 中充当消费者 使用自带的方法


new FlinkKafkaConsumer<T>(topics, deserializer, props)

之后发现问题改为以上配置 就开始报错。 经过两天的修改发现原因 原来我没有将key序列化key一直为null而现在要将key进行序列化就报错 于是就重新自定义addsink


// 输出函数addSink
public static class ToKafka extends RichSinkFunction<String>{
/**
*
*/
private static final long serialVersionUID = 1L;
KafkaProducer<String, String> producer = null;
@Override
public void open(Configuration parameters) throws Exception {
producer = new KafkaProducer<String,String>(toFisPro);
}
@Override
public void close() throws Exception {
if (producer != null) {
producer.flush();
producer.close();
}
}
@Override
public void invoke(String value) throws Exception {
if(null != value) {
String key = String.format("%d", (int) (Math.random() * 6));
ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>("topic", key, value);
producer.send(producerRecord);
producer.flush();
}
}
DataStream<T>.addSink(new Kafka())

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_45872156/article/details/111246040

随机推荐

OSPF动态路由协议的单区和多区

OSPF动态路由协议的单区和多区一、OSPF路由协议(一)内部网关协议和外部网关协议(二)ospf工作过程(三)os...

gde大冬天 阅读(850)