博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka序列化和反序列化与示例
阅读量:4557 次
发布时间:2019-06-08

本文共 3852 字,大约阅读时间需要 12 分钟。

1.  卡夫卡序列化和反序列化

今天,在这篇Kafka SerDe文章中,我们将学习使用Kafka创建自定义序列化器和反序列化器的概念。此外,我们将了解序列化在Kafka中的工作原理以及为什么需要序列化。与此同时,我们将看到 Kafka序列化器示例和Kafka解串器示例。此外,这个Kafka序列化和反序列化教程为我们提供了 Kafka字符串序列化器和 Kafka  对象序列化器的知识。 

基本上,提供了我们可以轻松发布以及订阅记录流的功能。因此,我们可以灵活地创建自己的自定义序列化器以及解串器,这有助于使用它传输不同的数据类型。 那么,让我们开始Kafka序列化和反序列化 

Kafka-序列化和反序列化(Kafka SerDe)

2. Apache Kafka SerDe

但是,为了传输而将对象转换为字节流的过程就是我们所说的序列化。虽然,Apache Kafka存储以及在队列中传输这些字节数组。

然而,序列化的反面是反序列化。在这里,我们将数组的字节转换为我们想要的数据类型。但是,请确保Kafka仅为少数数据类型提供序列化程序和反序列化程序,例如

  • 整数
  • 字节

3.为什么在Kafka中使用Custom Serializer和Deserializer?

基本上,为了准备从生产者传递到代理的消息,我们使用序列化器。换句话说,在将整个消息传输到代理之前,让生产者知道如何将消息转换为字节数组,我们使用序列化器。类似地,要将字节数组转换回对象,我们使用消费者的反序列化器。

4. Kafka SerDe的实施

实现org.apache.kafka.common.serialization.Serializer接口以创建序列化程序类非常重要。Ans,对于反序列化器类,重要的是实现org.apache.kafka.common.serialization.Deserializer接口。

Kafka序列化和反序列化接口有3种方法:

Kafka序列化和反序列化的实现方法

一个。配置

在配置启动时,我们调用Configure方法。

湾 序列化/反序列化

出于Kafka序列化和反序列化的目的,我们使用此方法。

C。关

在关闭Kafka会话时,我们使用Close方法。

5.与Kafka的串行器接口

  1. public interface Serializer extends Closeable { void configure(Map
    var1, boolean var2); byte[] serialize(String var1, T var2); void close();}

     

6.与Kafka的解串器接口

  1. public interface Deserializer extends Closeable { void configure(Map
    var1, boolean var2); T deserialize(String var1, byte[] var2); void close();}

     

7. Serializer和Deserializer的示例

这里的依赖关系是:

  • 卡夫卡(0.10.1.1)。
  • FasterXML Jackson(2.8.6)。
  1. user.java:public class User { private String firstname; private int age; public User() { } public User(String firstname, int age) {   this.firstname = firstname;   this.age = age; } public String getfirstName() {   return this.firstname; } public int getAge() {   return this.age; } @Override public String toString() {   return "User(" + firstname + ", " + age + ")"; }}

     

  1. userserializer.java:public class UserSerializer implements Serializer { @Override public void configure(Map
    map, boolean b) { } @Override public byte[] serialize(String arg0, User arg1) { byte[] retVal = null; ObjectMapper objectMapper = new ObjectMapper(); try { retVal = objectMapper.writeValueAsString(arg1).getBytes(); } catch (Exception e) { e.printStackTrace(); } return retVal; } @Override public void close() { }}

     

  1. Userdeserializer.java:public class UserDeserializer implements Deserializer { @Override public void close() { } @Override public void configure(Map
    arg0, boolean arg1) { } @Override public User deserialize(String arg0, byte[] arg1) { ObjectMapper mapper = new ObjectMapper(); User user = null; try { user = mapper.readValue(arg1, User.class); } catch (Exception e) { e.printStackTrace(); } return user; }}

     

此外,为了使用上面的序列化程序,我们必须使用此属性进行注册:

  1. props.put("value.serializer", "com.knoldus.serializers.UserSerializer");

     

那么,制作人将是:

  1. try (Producer
    producer = new KafkaProducer<>(props)) { producer.send(new ProducerRecord
    ("MyTopic", user)); System.out.println("Message " + user.toString() + " sent !!");} catch (Exception e) { e.printStackTrace();}

     

现在,我们再次需要为反序列化器注册此属性:

  1. props.put("value.deserializer", "com.knoldus.deserializer.UserDeserializer");

     

因此,消费者将:

  1. try (KafkaConsumer
    consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords
    messages = consumer.poll(100); for (ConsumerRecord
    message : messages) { System.out.println("Message received " + message.value().toString()); } }} catch (Exception e) { e.printStackTrace();}

     

所以,这就是Kafka序列化和反序列化。希望您喜欢并理解我们对Kafka的自定义序列化器和反序列化器的解释。

8.结论

因此,在这个Kafka序列化和反序列化教程中,我们学会了创建一个自定义的Kafka SerDe示例。此外,我们看到了对Kafka的串行器和解串器的需求。与此同时,我们学习了Kafka序列化和反序列化的实现方法

转载于:https://www.cnblogs.com/a00ium/p/10853005.html

你可能感兴趣的文章
图论其一:图的存储
查看>>
20180923-WebService
查看>>
z变换
查看>>
Python - 静态函数(staticmethod), 类函数(classmethod), 成员函数
查看>>
Spring基础2
查看>>
【灵异短篇】这个夜晚有点凉
查看>>
一点小问题
查看>>
pytest 10 skip跳过测试用例
查看>>
MVC身份验证及权限管理
查看>>
It was not possible to find any compatible framework version
查看>>
关于8.0.15版本的mysql下载与安装
查看>>
Redis主从复制看这篇就够了
查看>>
洛谷 P1202 [USACO1.1]黑色星期五Friday the Thirteenth 题解
查看>>
(4.20)SQL Server数据库启动过程,以及启动不起来的各种问题的分析及解决技巧...
查看>>
基本数据类型(数字和字符串)
查看>>
函数__装饰器
查看>>
linux system函数分析
查看>>
前端优化措施
查看>>
论学习汉语和学习编程的异同点
查看>>
linux img文件压缩及解压
查看>>