1. 简介
什么是Kafka?
Apache Kafka是一个分布式流媒体平台,用于构建实时数据管道和流媒体应用。它是水平可扩展的、故障导通的。
什么是Zookeeper?
ZooKeeper是用于维护配置信息、命名、提供分布式同步和提供组服务的集中式服务。
本次文章中 Kafka
Zookeeper
均是基于bitnami制作的镜像进行修改,并部署在Rainbond中。
2.快速安装
Kafka-Zookeeper已发布在开源应用商店中,可从开源应用商店一键部署。
3.如何伸缩Kafka
通过快速复制功能,复制出另一个Kafka组件,依赖于Zookeeper。
4.如何伸缩Zookeeper
首先关闭Zookeeper组件,进入组件中 > 伸缩 > 实例数量,手动进行伸缩实例,修改完启动组件即可。
缩小实例也是如此,减少实例数量。
5. 如何使用
5.1 外部访问
-
进入 Zookeeper 组件中,打开Zookeeper 2181的对外端口(默认已打开,进入组件内查看端口即可)。
-
进入 每个 Kafka 组件中,修改环境变量,IP+PORT从组件的对外端口中获取。
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://xxx:xxx
-
打开 Kafka 客户端工具,连接 Zookeeper Kafka 的对外端口地址即可访问。
6.应用制作过程分享
基于 bitnami-docker- zookeeper bitnami-docker-kafka 进行的修改
6.1 Zookeeper的改动
Fork bitnami 的仓库进行修改,新增了rainbond-env.sh 文件,修改内容如下:
#!/bin/bash
set -o errexit
set -o nounset
set -o pipefail
# set -o xtrace # Uncomment this line for debugging purposes
## This script works with Rainbond
# define zookeeper id
export ZOO_SERVER_ID=`expr ${HOSTNAME#*-} + 1`
# define zookeeper server list
ZOO_SERVERS_LIST=()
for (( i = 0;i < $SERVICE_POD_NUM;i++)); do
if [ $SERVICE_POD_NUM != 1 ]; then
ZOO_SERVERS_LIST[$i]="$SERVICE_NAME-$i.$SERVICE_NAME:2888:3888"
fi
done
if [ $SERVICE_POD_NUM != 1 ]; then
echo "zookeeper cluster nodes are ${ZOO_SERVERS_LIST[@]}"
export ZOO_SERVERS=$(echo ${ZOO_SERVERS_LIST[@]} | tr ' ' ',')
fi
# set default_java_mem_opts
case ${MEMORY_SIZE} in
"micro")
export ZOO_HEAP_SIZE=128
echo "Optimizing java process for 128M Memory...." >&2
;;
"small")
export ZOO_HEAP_SIZE=256
echo "Optimizing java process for 256M Memory...." >&2
;;
"medium")
export ZOO_HEAP_SIZE=512
echo "Optimizing java process for 512M Memory...." >&2
;;
"large")
export ZOO_HEAP_SIZE=1024
echo "Optimizing java process for 1G Memory...." >&2
;;
"2xlarge")
export ZOO_HEAP_SIZE=2048
echo "Optimizing java process for 2G Memory...." >&2
;;
"4xlarge")
export ZOO_HEAP_SIZE=4096
echo "Optimizing java process for 4G Memory...." >&2
;;
"8xlarge")
export ZOO_HEAP_SIZE=8192
echo "Optimizing java process for 8G Memory...." >&2
;;
16xlarge|32xlarge|64xlarge)
export ZOO_HEAP_SIZE=16384
echo "Optimizing java process for biger Memory...." >&2
;;
esac
以上 rainbond-env.sh 脚本文件包含了:
- 根据系统环境变量
HOSTNAME
自动获取ZOO_SERVER_ID
的值,并且动态伸缩实例时自增。 - 根据系统环境变量
SERVICE_POD_NUM
自动获取当前有几个POD,通过SERVICE_NAME
获取当前POD的访问地址,并写入到变量ZOO_SERVERS
中,达到动态伸缩实例,自动组成集群。 - 根据 动态配置JVM内存 动态调整Zookeeper的JVM内存。
修改完以上内容,可在Rainbond平台上进行源码构建,并可达到动态伸缩实例的效果。
6.2 Kafka的改动
Fork bitnami 的仓库进行修改,新增了rainbond-env.sh 文件,修改内容如下:
#!/bin/bash
set -o errexit
set -o nounset
set -o pipefail
# set -o xtrace # Uncomment this line for debugging purposes
## This script works with Rainbond
# define kafka broker id
KAFKA_CFG_BROKER_ID=${HOSTNAME#*-}
# define zk server Addr
if [ -z $DEPEND_SERVICE ]; then
ZooKeeperServer=$(nslookup ${DEPEND_SERVICE%:*} | grep Address | sed '1d' | awk '{print $2":2181"}')
export KAFKA_CFG_ZOOKEEPER_CONNECT=$(echo $ZooKeeperServer | tr ' ' ',')
fi
# set default_java_mem_opts
case ${MEMORY_SIZE} in
"micro")
export KAFKA_HEAP_OPTS="-Xmx128m -Xms128m"
echo "Optimizing java process for 128M Memory...." >&2
;;
"small")
export KAFKA_HEAP_OPTS="-Xmx256m -Xms256m"
echo "Optimizing java process for 256M Memory...." >&2
;;
"medium")
export KAFKA_HEAP_OPTS="-Xmx512m -Xms512m"
echo "Optimizing java process for 512M Memory...." >&2
;;
"large")
export KAFKA_HEAP_OPTS="-Xmx1024m -Xms1024m"
echo "Optimizing java process for 1G Memory...." >&2
;;
"2xlarge")
export KAFKA_HEAP_OPTS="-Xmx2048m -Xms2048m"
echo "Optimizing java process for 2G Memory...." >&2
;;
"4xlarge")
export KAFKA_HEAP_OPTS="-Xmx4096m -Xms4096m"
echo "Optimizing java process for 4G Memory...." >&2
;;
"8xlarge")
export KAFKA_HEAP_OPTS="-Xmx8192m -Xms8192m"
echo "Optimizing java process for 8G Memory...." >&2
;;
16xlarge|32xlarge|64xlarge)
export KAFKA_HEAP_OPTS="-Xmx16384m -Xms16384m"
echo "Optimizing java process for biger Memory...." >&2
;;
esac
以上 rainbond-env.sh 脚本文件包含了:
- 根据系统环境变量
HOSTNAME
自动获取KAFKA_CFG_BROKER_ID
的值。 - 当依赖了Zookeeper组件后,会自动注入环境变量
DEPEND_SERVICE
,使用 环境变量DEPEND_SERVICE
解析出Zookeeper的IP
将IP
写入到KAFKA_CFG_ZOOKEEPER_CONNECT
环境变量中。 - 根据 动态配置JVM内存 动态调整Kafka的JVM内存。
修改完以上内容,可在Rainbond平台上进行源码构建,并且需依赖Zookeeper组件。