Microservice ส่งข้อมูลผ่าน Apache Kafka ด้วย Spring Kafka แบบง่ายๆ
Messaging System ตัวหนึ่งที่ได้รับความสนใจเป็นอย่างมากในช่วงนี้ (2018) และทุกคนต่างก็พูดถึงก็คือ Apache Kafka ซึ่งเค้าก็เครมว่าจะมาแทนที่ Message Broker ดั้งเดิม อย่างเช่น ActiveMQ หรือ RabbitMQ
Kafka® คืออะไร
Apache Kafka เป็นซอฟต์แวร์ open source stream-processing platform ที่อยู่ภายใต้การดูแลของ Apache Software Foundation เขียนด้วย Scala และ Java ซึ่งมีเป้าหมายให้ platform ที่มี unified, high-throughput, low-latency สำหรับจัดการกับ real-time data feed ได้อย่างมีประสิทธิภาพ
Steaming platform มี 3 ความสามารถหลัก :
- Publish และ Subscribe กับ Stream ของ records ข้อมูลได้เหมือนกับ message queue หรือ enterprise messaging system.
- เก็บ Stream ของ records ข้อมูลในแนวทางที่ทนต่อความผิดพลาด (fault-tolerance durable)
- ดำเนินการกับ Stream ของ records ข้อมูลทันที่ที่เกิดขึ้น (real-time processing)
Kafka โดยทั้วไปใช้งานกับ
- สร้างระบบ real-time streaming data pipeline ที่มีความน่าเชื่อถือในการรับข้อมูลระหว่าง system หรือ application
- สร้างระบบ real-time streaming application ที่ transform หรือ react กับ stream ของข้อมูล
แนวคิดหลักของ Kafka
- Kafka จะรันเป็นกลุ่ม (Cluster) บน server เดียวหรือมากกว่าที่สามารถขยายได้หลาย datacenter
- Kafka cluster จะเก็บ stream record ข้อมูลเป็นประเภทเรียกว่า topics
- แต่ล่ะ record ของข้อมูลประกอบด้วย key, value และ timestamp
Kafka มี 4 ส่วนหลัก และจะติดต่อด้วย API
- Producer —จะให้ application สามารถ publish ข้อมูลหรือ stream ของ record ไปยัง Kafka topic ได้มากหว่าหนึ่ง ผ่านทาง Producer API
- Consumer — จะให้ application สามารถ subscribe กับ Kafka topic ได้มากหว่าหนึ่ง และสามารถอ่านข้อมูล จาก stream ของ record ที่ Producer ส่งมาให้ ผ่านทาง Customer API
- Stream — จะให้ application ทำตัวได้เหมือนกับ stream processor ที่สามารถ consuming input stream จาก topic มากหว่าหนึ่ง และ producing output stream ไปสู่ topic มากกว่าหนึ่ง โดยการส่งข้อมูลจาก input stream ไป output stream ได้อย่างมีประสิทธิภาพมาก
- Connector — จะเป็นการสร้างและรัน reusable producer และ consumer ที่เชื่อมโยง Kafka topic ไปยัง application ที่มีอยู่หรือ data system
องค์ประกอบของ Kafka
- Producer — เป็นตัวที่ส่งข้อมูลหรือ public data เข้ามายัง Topic ใน Kafka
- Consumer — เป็นตัวที่รับข้อมูลหรือ Subscribe data จาก Partition ภายใน Topic โดยจะกำหนดเป็น Consumer Group
- Topic — เป็นตัวที่ข้อมูลถูกส่งเข้ามาโดย Producer ซึ่งสามารถมีได้หลาย Customer ที่ Subscribe ไว้กับ Topic เดียว และใน Topic จะมี Partition หนึ่งหรือมากกว่า
- Partition — เป็นหน่วยของการเก็บข้อมูลใน Topic ซึ่งจะเก็บ record ของข้อมูลตามลำดับ Offset
Spring for Apache Kafka
เป็น project ที่ประยุกต์ใช้งานแนวคิดหลักของ Spring เพื่อพัฒนา message solution โดยใช้ Kafka โดยมี “template” เป็น high-level abstraction สำหรับส่ง message ซึ่งรองรับ Message-driven POJO (Pain old Java Object) ด้วย @KafkaListener
annotation และ “listener container” ซึ่งตัว Library ใช้วิธี Dependency Injection และ Declarative ที่สามารถใช้งานได้เหมือน Spring AMQP ที่ใช้กับ RabbitMQ โดยใช้ spring-kafka
dependency สำหรับ Spring framework ดังนี้
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.10.RELEASE</version>
</dependency>
ติดตั้ง Kafka
สามารถติดตั้ง Kafka ได้ทั้งในเครื่อง (Local) หรือบน Docker เราสามารถเลื่อกอย่างใดอย่างหนึ่งได้
Local
ดาวน์โหลด binary Kafka แล้วก็ unzip ออกมาใว้ในเครื่อง ด้วยคำสั่ง
$ tar -xzf kafka_2.11-2.0.0.tgz
$ cd kafka_2.11-2.0.0
Kafka จะใช้ Zookeeper ดังนั้นจะต้อง start Zookeeper server ก่อนถ้าเรายังไม่มีในเครื่องก็สามารถใช้คำสั่งนี้ได้
$ bin/zookeeper-server-start.sh config/zookeeper.properties
และ start Kafka server ด้วยคำสั่งนี้
$ bin/kafka-server-start.sh config/server.properties
Docker
และสามารถที่จะรัน Kafka บน Docker ดังนั้นจะใช้ wurstmeister/kafka ซึ่งมีไฟล์ docker-compose.yml
ดังนี้
ใช้คำส่ังดังนี้ เพื่อสร้าง image ในเครื่องของเราโดยทำการ download Kafka และ dependency ต่างมาให้ หลังจากเสร็จเรียบร้อยแล้วจะสร้าง Zookeeper และ Kafka container บน Docker ซึ่งเป็นการรัน single-broker เท่านั้น
$ docker-compose up -d
โดยเราสามารถใช้คำสั่ง docker ps
เพื่อตรวจสอบ container ที่รันอยู่ในเครื่องได้ และจะสังเกตุ expose port ดังนี้
- Zookeeper —
0.0.0.0:2181->2181/tcp
- Kafka —
0.0.0.0:9092->9092/tcp
สามารถทดสอบสร้าง Topic ได้ด้วยคำสั่งเรียก shell script kafka-topics.sh
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
และดู Topic ที่เราสร้างขึ้นด้วยคำสั่งเรียก shell script kafka-topics.sh
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test
Use Case
ในบทความนี้จะสร้างการสื่อสารระหว่าง Microservice ดังนั้นจะสร้าง 2 Service โดยที่ User จะ POST Customer data มาที่ Customer Service จะบันทึกข้อมูลของ Customer ใน H2 database แล้วก็ส่งข้อมูล Customer มายัง Email Service เพื่อทำการส่ง email ให้ Customer
Customer Service จะเป็น Producer และ Email Service จะเป็นฝั่ง Consumer และส่งข้อมูลผ่าน Kafka อย่างง่ายโดย Topic เป็น customer.topic
Producer Service
สร้าง Customer Service ที่เป็น RESTful Service ด้วย Spring Initializr โดยเลือก dependency เป็น Web
Lombok
JPA
H2
และ Kafka
เพื่อที่จะเป็น CRUD Service เก็บข้อมูลใน H2 local database
เนื่องจากว่าเราจะต้องใช้ JsonSerializer ของ Spring Kafka ดังนั้นเราจะต้องเพิ่ม Jackson
dependency ขึ้นมา เพื่อจัดการการแปลง Object เป็น Json
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.6</version>
</dependency>
Customer Data/Entity
Entity เป็นข้อมูลของ Service ใน JPA ดังนั้นสร้าง @Entiry Customer และใช้ @Data ของ Lombok มาช่วยสร้าง Getter/Setter
Customer Controller/Service
บทความนี้จะสร้าง Microservice อย่างง่ายดังนั้น สร้าง endpoint /customers
เดียวโดย mapping กับ POST method รับ input เป็น Customer แล้วเรียก createService ของ CustomerService
ใน Service layer ที่จะส่ง message ไปหา Kafka
สร้างคลาส CustomerService
โดยเก็บข้อมูลของ Customer ลง Database และเรียกใช้ sendEmail
ของ MessageProducer
เพื่อส่ง message ไปที่ Kafka
ตั้งค่า/ส่งข้อความไปหา Kafka
สร้าง @Configuration คลาส MessageConfig โดยที่สร้าง ProducerFactory ที่มี config ต่าง สำหรับ Producer ที่ส่งข้อมูลไปที่ Kafka Server
ค่า Config ต่างๆ ของ Producer โดยใช้ตัวแปรจาก ProducerConfig:
- BOOTSTRAP_SERVER_CONFIG — กำหนด Hostname และ Port ของ Zookeeper, ค่าเป็น localhost:9092 ให้เหมือนกับ Consumer
- KEY_SERIALIZER_CLASS_CONFIG — กำหนดคลาสที่ใช้สำหรับ Serialize Key ของข้อมูลใน record (KeyValuePair) ของ Partition ใน Topic ของ Kafka Server, ค่าเป็นคลาส StringSerializer.class สำหรับ String
- VALUE_SERIALIZER_CLASS_CONFIG — กำหนดคลาสที่ใช้สำหรับ Serialize Value ของข้อมูลใน record (KeyValuePair) ของ Partition ใน Topic ของ Kafka Server, ค่าเป็นคลาส JsonSerializer.class สำหรับ Json
ส่ง message ไปที่ Kafka Server โดยใช้ KafkaTemplate ของ Spring Kafka โดยที่ส่งระบุ topic ใน Kafka Server และ ข้อมูล Customer
Consumer Service
สร้าง Email Service ด้วย Spring Initializr โดยเลือก dependency เป็น Lombok
Kafka
และ Mail
เพื่อที่จะเป็น Mail Service ที่จะรับ message และส่ง email หา Customer
เนื่องจากว่าเราจะต้องใช้ JsonDeserializer ของ Spring Kafka ดังนั้นเราจะต้องเพิ่ม Jackson
dependency ขึ้นมา เพื่อจัดการการแปลง Json กลับมาเป็น Object
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.6</version>
</dependency>
Customer Data
สร้าง Customer type ในฝั่ง Consumer เพื่อรับข้อมูลเป็น Customer และใช้ @Data ของ Lombok มาช่วยสร้าง Getter/Setter
ตั้งค่า/รับข้อความจาก Kafka
สร้าง @Configuration คลาส MessageConfig โดยต้องสร้าง Concurrent Kafka Listener ที่รับข้อมูลเป็น Customer โดยตั้งค่า ConsumerFactory เป็นค่า config ต่างๆ สำหรับ Consumer ที่รับข้อมูลจากี่ Kafka Server
ค่า Config ต่างๆ ของ Consumer โดยใช้ตัวแปรจาก ConsumerConfig:
- BOOTSTRAP_SERVERS_CONFIG —กำหนด Hostname และ Port ของ Zookeeper, ค่าเป็น localhost:9092 ให้เหมือนกับ Producer
- GROUP_ID_CONFIG — ค่า group id ของ Consumer เพื่อรับข้อมูลจาก Kafka Server, ค่าเป็น test-consumer-group เป็น group ที่กำหนดขึ้นมาเอง
- KEY_DESERIALIZER_CLASS_CONFIG — กำหนดคลาสที่ใช้สำหรับ Deserialize Key ของข้อมูลใน record (KeyValuePair) ของ Partition ใน Topic ของ Kafka Server, ค่าเป็นคลาส StringDeserializer.class สำหรับ String
- VALUE_DESERIALIZER_CLASS_CONFIG — กำหนดคลาสที่ใช้สำหรับ Deserialize Value ของข้อมูลใน record (KeyValuePair) ของ Partition ใน Topic ของ Kafka Server, ค่าเป็นคลาส JsonDeserializer.class สำหรับ Json
- TRUSTED_PACKAGES — กำหนด package ที่เราเชื่อใจ (Trusted) ให้กับ Consumer เพราะจะไว้ใจคลาสที่อยู่ในนี้เท่านั้นในการรับข้อมูล ถ้านอกจากนี้จะ error, ค่าเป็น com.example.demo.customer
ใช้ @KafkaListener เพื่อกำหนด topic ที่จะให้ Consumer คอยเฝ้าตรวจ (Listen) ข้อมูลใน Kafka Server หลังจากที่รับข้อมูลเป็น ConsumerReord (Key = String, Value = Customer) แล้วก็เรียก sendMessageToCostomer method เพื่อส่ง email หา Customer
ทดสอบ Kafka
รันทั้ง Customer Service ในฝั่ง Producer และ Email Service ในฝั่ง Consumer และทดลอง POST ข้อมูล Customer ด้วย Postman
สังเกต log ใน Producer (Customer Service) จะมีการ connect กับ Kafka Server เพื่อส่งข้อมูลออก
และสังเกต log ใน console ของ Consumer (Email Service) จะเห็นได้ว่าได้รับข้อมูล ตามที่เราได้ส่งมาจาก Producer ซึ่งก็แสดงให้เห็นว่าสามารถส่งข้อมูลจาก Producer ไปที่ Kafka Server และ Consumer ก็ได้รับข้อมูลนั้น
สรุป
จากการที่ได้ทดลองส่งข้อมูลระหว่าง Microservice อย่างง่ายด้วย Apache Kafka โดยใช้ Spring Kafka นั้นสามาทำได้อย่างดี และ implement ไม่ยาก โดย Config ส่วนใหญ่ใช่เป็น Java Config ที่จะทำให้ทั้งฝั่ง Producer และ Consumer ทำงานได้ดี และ Spring Kafka ก็มีคลาสสำหรับ Serialize Object ไปเป็น JSON และ Deserialize จาก JSON กลับมาเป็น Object ซึ่งก็สะดวกกับการใช้งานเป็นอย่างมาก ซึ่งในบทความนี้ใช้เพียง feature หนึ่งที่เป็น Messaging System ของ Kafka เท่านั้น