使用 Kafka 和 Docker 開發事件驅動應用程式
隨著微服務的興起,事件驅動架構越來越受歡迎。 Apache Kafka 這個分散式事件串流平台通常是這些架構的核心。不幸的是,為開發設定和部署您自己的 Kafka 執行個體通常很棘手。幸運的是,Docker 和容器讓這變得更容易。
在本指南中,您將學習如何
- 使用 Docker 啟動 Kafka 叢集
- 將非容器化應用程式連線到叢集
- 將容器化應用程式連線到叢集
- 部署 Kafka-UI 以協助進行故障排除和除錯
先決條件
遵循本操作指南需要以下先決條件
- Docker Desktop 和 yarn
- Kafka 和 Docker 的基本知識
啟動 Kafka
從 Kafka 3.3
提示 本指南將使用 apache/kafka 映像檔,因為它包含許多用於管理和使用 Kafka 的好用腳本。但是,您可能想使用 apache/kafka-native 映像檔,因為它啟動速度更快,所需的資源也更少。
啟動 Kafka
按照以下步驟啟動基本的 Kafka 叢集。此範例將啟動一個叢集,將埠 9092 暴露給主機,讓原生執行的應用程式可以連線到它。
透過執行以下命令來啟動 Kafka 容器
$ docker run -d --name=kafka -p 9092:9092 apache/kafka
映像檔下載完成後,您將在一兩秒鐘內啟動並執行 Kafka 实例。
apache/kafka 映像檔在
/opt/kafka/bin
目錄中附帶了幾個好用腳本。執行以下命令以驗證叢集是否已啟動並執行,並取得其叢集 ID$ docker exec -ti kafka /opt/kafka/bin/kafka-cluster.sh cluster-id --bootstrap-server :9092
執行此操作將產生類似以下的輸出
Cluster ID: 5L6g3nShT-eMCtK--X86sw
透過執行以下命令建立範例主題並產生(或發佈)幾則訊息
$ docker exec -ti kafka /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server :9092 --topic demo
執行後,您可以每行輸入一則訊息。例如,輸入幾則訊息,每行一則。一些範例可能是
First message
以及
Second message
按下
Enter
鍵發送最後一則訊息,然後在完成後按下 Ctrl+c。訊息將發佈到 Kafka。透過使用訊息來確認訊息已發佈到叢集中
$ docker exec -ti kafka /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server :9092 --topic demo --from-beginning
然後您應該在輸出中看到您的訊息
First message Second message
如果您願意,您可以開啟另一個終端機並發佈更多訊息,並看到它們出現在消費者中。
完成後,按下 Ctrl+c 停止使用訊息。
您有一個本地執行的 Kafka 叢集,並且已驗證您可以連線到它。
從非容器化應用程式連線到 Kafka
現在您已證明可以從命令列連線到 Kafka 实例,接下來就要從應用程式連線到叢集。在此範例中,您將使用一個簡單的 Node 專案,該專案使用 KafkaJS
由於叢集在本地端執行,並暴露在埠 9092 上,因此應用程式可以連線到 localhost:9092 的叢集(因為它目前是在原生環境中執行,而不是在容器中)。連線後,此範例應用程式將記錄它從 demo
主題使用的訊息。此外,當它在開發模式下執行時,如果找不到主題,它也會建立主題。
如果您沒有從上一個步驟執行 Kafka 叢集,請執行以下命令以啟動 Kafka 实例
$ docker run -d --name=kafka -p 9092:9092 apache/kafka
將 GitHub 儲存庫
複製到本地。$ git clone https://github.com/dockersamples/kafka-development-node.git
瀏覽至專案。
cd kafka-development-node/app
使用 yarn 安裝依賴項。
$ yarn install
使用
yarn dev
啟動應用程式。這會將NODE_ENV
環境變數設定為development
,並使用nodemon
監控檔案變更。$ yarn dev
應用程式現在正在執行,它會將接收到的訊息記錄到主控台。在新終端機中,使用以下命令發佈幾則訊息
$ docker exec -ti kafka /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server :9092 --topic demo
然後將訊息發送到叢集
Test message
記住在完成後按下
Ctrl+c
以停止產生訊息。
從容器和原生應用程式連線到 Kafka
現在您有一個應用程式透過其暴露的埠連線到 Kafka,接下來就要探討從另一個容器連線到 Kafka 需要進行哪些變更。為此,您現在將從容器而不是原生環境中執行應用程式。
但在您這樣做之前,務必先了解 Kafka 聆聽器的工作原理,以及這些聆聽器如何協助用戶端連線。
了解 Kafka 監聽器
當用戶端連線到 Kafka 叢集時,它實際上是連線到「經紀人」。雖然經紀人有許多角色,但其中之一是支援用戶端的負載平衡。當用戶端連線時,經紀人會傳回一組連線 URL,用戶端應該使用這些 URL 進行連線,以便產生或使用訊息。這些連線 URL 的設定方式為何?
每個 Kafka 實例都有一組監聽器和已公告的監聽器。「監聽器」是 Kafka 綁定的對象,而「已公告的監聽器」則設定客戶端應如何連線到叢集。客戶端接收的連線 URL 是根據客戶端連線到哪個監聽器而定。
定義監聽器
為了更容易理解,讓我們看看 Kafka 需要如何設定才能支援兩種連線方式
- 主機連線(透過主機映射的連接埠) - 這些連線需要使用 localhost
- Docker 連線(來自 Docker 網路內部的連線) - 這些連線無法使用 localhost,但可以使用 Kafka 服務的網路別名(或 DNS 位址)
由於客戶端需要使用兩種不同的方法進行連線,因此需要兩個不同的監聽器 - HOST
和 DOCKER
。HOST
監聽器會告知客戶端使用 localhost:9092 進行連線,而 DOCKER
監聽器會告知客戶端使用 kafka:9093
進行連線。請注意,這表示 Kafka 正在監聽連接埠 9092 和 9093。但是,只有主機監聽器需要公開給主機。


為了設定這個功能,Kafka 的 compose.yaml
需要一些額外的設定。一旦您開始覆寫某些預設值,您還需要指定其他一些選項才能讓 KRaft 模式運作。
services:
kafka:
image: apache/kafka-native
ports:
- "9092:9092"
environment:
# Configure listeners for both docker and host communication
KAFKA_LISTENERS: CONTROLLER://localhost:9091,HOST://0.0.0.0:9092,DOCKER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,DOCKER://kafka:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,DOCKER:PLAINTEXT,HOST:PLAINTEXT
# Settings required for KRaft mode
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9091
# Listener to use for broker-to-broker communication
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
# Required for a single node cluster
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
請嘗試使用以下步驟。
如果您正在執行上一步驟中的 Node 應用程式,請在終端機中按下
ctrl+c
將其停止。如果您正在執行上一節中的 Kafka 叢集,請使用以下指令停止該容器
$ docker rm -f kafka
在複製的專案目錄的根目錄執行以下指令,以啟動 Compose 堆疊
$ docker compose up
稍後,應用程式就會啟動並執行。
堆疊中還有另一個服務可以用來發佈訊息。您可以前往 http://localhost:3000
新增叢集視覺化 一旦您開始在開發環境中使用容器,您就會開始意識到新增僅專注於協助開發的額外服務(例如視覺化器和其他支援服務)是多麼容易。由於您正在執行 Kafka,因此將 Kafka 叢集中發生的事情視覺化可能會有幫助。為此,您可以執行 Kafbat UI 網頁應用程式
services: kafka-ui: image: ghcr.io/kafbat/kafka-ui:latest ports: - 8080:8080 environment: DYNAMIC_CONFIG_ENABLED: "true" KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093 depends_on: - kafka