Microservice ส่งข้อมูลผ่าน Apache Kafka ด้วย Spring Kafka แบบง่ายๆ

Phayao Boonon
6 min readSep 20, 2018

--

Messaging System ตัวหนึ่งที่ได้รับความสนใจเป็นอย่างมากในช่วงนี้ (2018) และทุกคนต่างก็พูดถึงก็คือ Apache Kafka ซึ่งเค้าก็เครมว่าจะมาแทนที่ Message Broker ดั้งเดิม อย่างเช่น ActiveMQ หรือ RabbitMQ

Kafka® คืออะไร

Apache Kafka เป็นซอฟต์แวร์ open source stream-processing platform ที่อยู่ภายใต้การดูแลของ Apache Software Foundation เขียนด้วย Scala และ Java ซึ่งมีเป้าหมายให้ platform ที่มี unified, high-throughput, low-latency สำหรับจัดการกับ real-time data feed ได้อย่างมีประสิทธิภาพ

Steaming platform มี 3 ความสามารถหลัก :

  • Publish และ Subscribe กับ Stream ของ records ข้อมูลได้เหมือนกับ message queue หรือ enterprise messaging system.
  • เก็บ Stream ของ records ข้อมูลในแนวทางที่ทนต่อความผิดพลาด (fault-tolerance durable)
  • ดำเนินการกับ Stream ของ records ข้อมูลทันที่ที่เกิดขึ้น (real-time processing)

Kafka โดยทั้วไปใช้งานกับ

  • สร้างระบบ real-time streaming data pipeline ที่มีความน่าเชื่อถือในการรับข้อมูลระหว่าง system หรือ application
  • สร้างระบบ real-time streaming application ที่ transform หรือ react กับ stream ของข้อมูล

แนวคิดหลักของ Kafka

  • Kafka จะรันเป็นกลุ่ม (Cluster) บน server เดียวหรือมากกว่าที่สามารถขยายได้หลาย datacenter
  • Kafka cluster จะเก็บ stream record ข้อมูลเป็นประเภทเรียกว่า topics
  • แต่ล่ะ record ของข้อมูลประกอบด้วย key, value และ timestamp

Kafka มี 4 ส่วนหลัก และจะติดต่อด้วย API

  • Producer —จะให้ application สามารถ publish ข้อมูลหรือ stream ของ record ไปยัง Kafka topic ได้มากหว่าหนึ่ง ผ่านทาง Producer API
  • Consumer — จะให้ application สามารถ subscribe กับ Kafka topic ได้มากหว่าหนึ่ง และสามารถอ่านข้อมูล จาก stream ของ record ที่ Producer ส่งมาให้ ผ่านทาง Customer API
  • Stream — จะให้ application ทำตัวได้เหมือนกับ stream processor ที่สามารถ consuming input stream จาก topic มากหว่าหนึ่ง และ producing output stream ไปสู่ topic มากกว่าหนึ่ง โดยการส่งข้อมูลจาก input stream ไป output stream ได้อย่างมีประสิทธิภาพมาก
  • Connector — จะเป็นการสร้างและรัน reusable producer และ consumer ที่เชื่อมโยง Kafka topic ไปยัง application ที่มีอยู่หรือ data system

องค์ประกอบของ Kafka

https://en.wikipedia.org/wiki/Apache_Kafka
  • Producer — เป็นตัวที่ส่งข้อมูลหรือ public data เข้ามายัง Topic ใน Kafka
  • Consumer — เป็นตัวที่รับข้อมูลหรือ Subscribe data จาก Partition ภายใน Topic โดยจะกำหนดเป็น Consumer Group
  • Topic — เป็นตัวที่ข้อมูลถูกส่งเข้ามาโดย Producer ซึ่งสามารถมีได้หลาย Customer ที่ Subscribe ไว้กับ Topic เดียว และใน Topic จะมี Partition หนึ่งหรือมากกว่า
  • Partition — เป็นหน่วยของการเก็บข้อมูลใน Topic ซึ่งจะเก็บ record ของข้อมูลตามลำดับ Offset

Spring for Apache Kafka

เป็น project ที่ประยุกต์ใช้งานแนวคิดหลักของ Spring เพื่อพัฒนา message solution โดยใช้ Kafka โดยมี “template” เป็น high-level abstraction สำหรับส่ง message ซึ่งรองรับ Message-driven POJO (Pain old Java Object) ด้วย @KafkaListener annotation และ “listener container” ซึ่งตัว Library ใช้วิธี Dependency Injection และ Declarative ที่สามารถใช้งานได้เหมือน Spring AMQP ที่ใช้กับ RabbitMQ โดยใช้ spring-kafka dependency สำหรับ Spring framework ดังนี้

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.10.RELEASE</version>
</dependency>

ติดตั้ง Kafka

สามารถติดตั้ง Kafka ได้ทั้งในเครื่อง (Local) หรือบน Docker เราสามารถเลื่อกอย่างใดอย่างหนึ่งได้

Local

ดาวน์โหลด binary Kafka แล้วก็ unzip ออกมาใว้ในเครื่อง ด้วยคำสั่ง

$ tar -xzf kafka_2.11-2.0.0.tgz
$ cd kafka_2.11-2.0.0

Kafka จะใช้ Zookeeper ดังนั้นจะต้อง start Zookeeper server ก่อนถ้าเรายังไม่มีในเครื่องก็สามารถใช้คำสั่งนี้ได้

$ bin/zookeeper-server-start.sh config/zookeeper.properties

และ start Kafka server ด้วยคำสั่งนี้

$ bin/kafka-server-start.sh config/server.properties

Docker

และสามารถที่จะรัน Kafka บน Docker ดังนั้นจะใช้ wurstmeister/kafka ซึ่งมีไฟล์ docker-compose.yml ดังนี้

ใช้คำส่ังดังนี้ เพื่อสร้าง image ในเครื่องของเราโดยทำการ download Kafka และ dependency ต่างมาให้ หลังจากเสร็จเรียบร้อยแล้วจะสร้าง Zookeeper และ Kafka container บน Docker ซึ่งเป็นการรัน single-broker เท่านั้น

$ docker-compose up -d

โดยเราสามารถใช้คำสั่ง docker ps เพื่อตรวจสอบ container ที่รันอยู่ในเครื่องได้ และจะสังเกตุ expose port ดังนี้

  • Zookeeper0.0.0.0:2181->2181/tcp
  • Kafka0.0.0.0:9092->9092/tcp

สามารถทดสอบสร้าง Topic ได้ด้วยคำสั่งเรียก shell script kafka-topics.sh

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

และดู Topic ที่เราสร้างขึ้นด้วยคำสั่งเรียก shell script kafka-topics.sh

$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test

Use Case

ในบทความนี้จะสร้างการสื่อสารระหว่าง Microservice ดังนั้นจะสร้าง 2 Service โดยที่ User จะ POST Customer data มาที่ Customer Service จะบันทึกข้อมูลของ Customer ใน H2 database แล้วก็ส่งข้อมูล Customer มายัง Email Service เพื่อทำการส่ง email ให้ Customer

Customer Service จะเป็น Producer และ Email Service จะเป็นฝั่ง Consumer และส่งข้อมูลผ่าน Kafka อย่างง่ายโดย Topic เป็น customer.topic

Producer Service

สร้าง Customer Service ที่เป็น RESTful Service ด้วย Spring Initializr โดยเลือก dependency เป็น Web Lombok JPA H2 และ Kafka เพื่อที่จะเป็น CRUD Service เก็บข้อมูลใน H2 local database

เนื่องจากว่าเราจะต้องใช้ JsonSerializer ของ Spring Kafka ดังนั้นเราจะต้องเพิ่ม Jackson dependency ขึ้นมา เพื่อจัดการการแปลง Object เป็น Json

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.6</version>
</dependency>

Customer Data/Entity

Entity เป็นข้อมูลของ Service ใน JPA ดังนั้นสร้าง @Entiry Customer และใช้ @Data ของ Lombok มาช่วยสร้าง Getter/Setter

Customer Controller/Service

บทความนี้จะสร้าง Microservice อย่างง่ายดังนั้น สร้าง endpoint /customers เดียวโดย mapping กับ POST method รับ input เป็น Customer แล้วเรียก createService ของ CustomerService ใน Service layer ที่จะส่ง message ไปหา Kafka

สร้างคลาส CustomerService โดยเก็บข้อมูลของ Customer ลง Database และเรียกใช้ sendEmail ของ MessageProducer เพื่อส่ง message ไปที่ Kafka

ตั้งค่า/ส่งข้อความไปหา Kafka

สร้าง @Configuration คลาส MessageConfig โดยที่สร้าง ProducerFactory ที่มี config ต่าง สำหรับ Producer ที่ส่งข้อมูลไปที่ Kafka Server

ค่า Config ต่างๆ ของ Producer โดยใช้ตัวแปรจาก ProducerConfig:

  • BOOTSTRAP_SERVER_CONFIG — กำหนด Hostname และ Port ของ Zookeeper, ค่าเป็น localhost:9092 ให้เหมือนกับ Consumer
  • KEY_SERIALIZER_CLASS_CONFIG — กำหนดคลาสที่ใช้สำหรับ Serialize Key ของข้อมูลใน record (KeyValuePair) ของ Partition ใน Topic ของ Kafka Server, ค่าเป็นคลาส StringSerializer.class สำหรับ String
  • VALUE_SERIALIZER_CLASS_CONFIG — กำหนดคลาสที่ใช้สำหรับ Serialize Value ของข้อมูลใน record (KeyValuePair) ของ Partition ใน Topic ของ Kafka Server, ค่าเป็นคลาส JsonSerializer.class สำหรับ Json

ส่ง message ไปที่ Kafka Server โดยใช้ KafkaTemplate ของ Spring Kafka โดยที่ส่งระบุ topic ใน Kafka Server และ ข้อมูล Customer

Consumer Service

สร้าง Email Service ด้วย Spring Initializr โดยเลือก dependency เป็น Lombok Kafka และ Mail เพื่อที่จะเป็น Mail Service ที่จะรับ message และส่ง email หา Customer

เนื่องจากว่าเราจะต้องใช้ JsonDeserializer ของ Spring Kafka ดังนั้นเราจะต้องเพิ่ม Jackson dependency ขึ้นมา เพื่อจัดการการแปลง Json กลับมาเป็น Object

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.6</version>
</dependency>

Customer Data

สร้าง Customer type ในฝั่ง Consumer เพื่อรับข้อมูลเป็น Customer และใช้ @Data ของ Lombok มาช่วยสร้าง Getter/Setter

ตั้งค่า/รับข้อความจาก Kafka

สร้าง @Configuration คลาส MessageConfig โดยต้องสร้าง Concurrent Kafka Listener ที่รับข้อมูลเป็น Customer โดยตั้งค่า ConsumerFactory เป็นค่า config ต่างๆ สำหรับ Consumer ที่รับข้อมูลจากี่ Kafka Server

ค่า Config ต่างๆ ของ Consumer โดยใช้ตัวแปรจาก ConsumerConfig:

  • BOOTSTRAP_SERVERS_CONFIG —กำหนด Hostname และ Port ของ Zookeeper, ค่าเป็น localhost:9092 ให้เหมือนกับ Producer
  • GROUP_ID_CONFIG — ค่า group id ของ Consumer เพื่อรับข้อมูลจาก Kafka Server, ค่าเป็น test-consumer-group เป็น group ที่กำหนดขึ้นมาเอง
  • KEY_DESERIALIZER_CLASS_CONFIG — กำหนดคลาสที่ใช้สำหรับ Deserialize Key ของข้อมูลใน record (KeyValuePair) ของ Partition ใน Topic ของ Kafka Server, ค่าเป็นคลาส StringDeserializer.class สำหรับ String
  • VALUE_DESERIALIZER_CLASS_CONFIG — กำหนดคลาสที่ใช้สำหรับ Deserialize Value ของข้อมูลใน record (KeyValuePair) ของ Partition ใน Topic ของ Kafka Server, ค่าเป็นคลาส JsonDeserializer.class สำหรับ Json
  • TRUSTED_PACKAGES — กำหนด package ที่เราเชื่อใจ (Trusted) ให้กับ Consumer เพราะจะไว้ใจคลาสที่อยู่ในนี้เท่านั้นในการรับข้อมูล ถ้านอกจากนี้จะ error, ค่าเป็น com.example.demo.customer

ใช้ @KafkaListener เพื่อกำหนด topic ที่จะให้ Consumer คอยเฝ้าตรวจ (Listen) ข้อมูลใน Kafka Server หลังจากที่รับข้อมูลเป็น ConsumerReord (Key = String, Value = Customer) แล้วก็เรียก sendMessageToCostomer method เพื่อส่ง email หา Customer

ทดสอบ Kafka

รันทั้ง Customer Service ในฝั่ง Producer และ Email Service ในฝั่ง Consumer และทดลอง POST ข้อมูล Customer ด้วย Postman

สังเกต log ใน Producer (Customer Service) จะมีการ connect กับ Kafka Server เพื่อส่งข้อมูลออก

และสังเกต log ใน console ของ Consumer (Email Service) จะเห็นได้ว่าได้รับข้อมูล ตามที่เราได้ส่งมาจาก Producer ซึ่งก็แสดงให้เห็นว่าสามารถส่งข้อมูลจาก Producer ไปที่ Kafka Server และ Consumer ก็ได้รับข้อมูลนั้น

สรุป

จากการที่ได้ทดลองส่งข้อมูลระหว่าง Microservice อย่างง่ายด้วย Apache Kafka โดยใช้ Spring Kafka นั้นสามาทำได้อย่างดี และ implement ไม่ยาก โดย Config ส่วนใหญ่ใช่เป็น Java Config ที่จะทำให้ทั้งฝั่ง Producer และ Consumer ทำงานได้ดี และ Spring Kafka ก็มีคลาสสำหรับ Serialize Object ไปเป็น JSON และ Deserialize จาก JSON กลับมาเป็น Object ซึ่งก็สะดวกกับการใช้งานเป็นอย่างมาก ซึ่งในบทความนี้ใช้เพียง feature หนึ่งที่เป็น Messaging System ของ Kafka เท่านั้น

--

--

Phayao Boonon
Phayao Boonon

Written by Phayao Boonon

Software Engineer 👨🏻‍💻 Stay Hungry Stay Foolish

Responses (2)