Kafka
Docker Install
Install
docker pull wurstmeister/zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
docker pull wurstmeister/kafka
docker run -d --name kafka -p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=${host}:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${host}:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
wurstmeister/kafka
Usage
docker exec -it kafka /bin/bash
cd opt/kafka_2.11-2.0.0/
# producer
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka
# consumer
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mykafka --from-beginning
配置
auto.create.topics.enable=false/true
num.partitions=1
default.replication.factor=1
Listeners配置
- Kafka 2.1 Documentation - 3.1 Broker Configs
- kafka的listeners和advertised.listeners,配置内外网分流
- Kafka从上手到实践-Kafka集群:Kafka Listeners
- 只需要内网访问kafka
listeners=PLAINTEXT://inner_ip:9092
或者配置SASL
listeners=SASL_PLAINTEXT://inner_ip:9092
security.inter.broker.protocol=SASL_PLAINTEXT
- 需要外网访问,所有流量走外网
listeners=SASL_PLAINTEXT://public_ip:9092
security.inter.broker.protocol=SASL_PLAINTEXT
- 内外网分流
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
listeners=INTERNAL://inner_ip:9092,EXTERNAL://public_ip:19092
advertised.listeners=INTERNAL:/inner_ip:9092,EXTERNAL://public_ip:19092
inter.broker.listener.name=INTERNAL
Shell Tools
启动
# 如果集群间以及有 zookeeper 在运行,则不用启动,zookeeper.properties 按照zookeeper相关配置填写
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
# terminate
rm -rf /tmp/kafka-logs /tmp/zookeeper
Topic
# list topics
bin/kafka-topics.sh --zookeeper ${host_ip}:2191 --list
bin/kafka-topics.sh --bootstrap-server ${host_ip}:9092 --list
bin/kafka-topics.sh --bootstrap-server ${host_ip}:9092 --list --command-config sasl.conf
# describe a topic
bin/kafka-topics.sh --bootstrap-server ${host_ip}:9092 --describe --topic ${topic_name}
# describe all external topics
bin/kafka-topics.sh --bootstrap-server ${host_ip}:9092 --describe --exclude-internal
# create a topic
bin/kafka-topics.sh --bootstrap-server ${host_ip}:9092 --create --topic ${topic_name} --partitions 1 --replication-factor 1
# delete a topic
bin/kafka-topics.sh --bootstrap-server ${host_ip}:9092 --delete --topic ${topic_name}
Consumer Group
# describe all consumer groups
bin/kafka-consumer-groups.sh --bootstrap-server ${host_ip}:9092 --describe --all-groups
bin/kafka-consumer-groups.sh --bootstrap-server ${host_ip}:9092 --describe --all-groups --command-config sasl.conf
# describe a consumer group
bin/kafka-consumer-groups.sh --bootstrap-server ${host_ip}:9092 --describe --group ${group_id}
# describe a consumer group on a topic
bin/kafka-consumer-groups.sh --bootstrap-server ${host_ip}:9092 --describe --group ${group_id} --topic ${topic_name}
# 重置 offset
bin/kafka-consumer-groups.sh --bootstrap-server ${host_ip}:9092 --group ${group_id} --topic ${topic_name} --reset-offsets \
[--to-earliest/--to-latest/--to-current/--to-offset <offset_integer>/--to-datetime <datetime_string>/--by-duration <duration_string>/--shift-by <positive_or_negative_integer>] \
--execute
Console Consumer
# consumer
bin/kafka-console-consumer.sh --bootstrap-server ${host_ip}:9092 --topic ${topic_name} --group ${group_id} --consumer.config sasl.conf [--from-beginning]
Console Producer
# producer
bin/kafka-console-producer.sh --bootstrap-server ${host_ip}:9092 --topic ${topic_name} --producer.config sasl.conf
bin/kafka-console-producer.sh --broker-list ${host_ip}:9092 --topic ${topic_name} --producer.config sasl.conf # version 2.3.0+
bin/kafka-console-producer.sh --bootstrap-server ${host_ip}:9092 --topic ${topic_name} \
--property parse.key=true \
--property key.separator=":"
Reset Kafka Streams Offset
bin/kafka-streams-application-reset.sh --bootstrap-servers ${host_ip}:9092 --application-id ${streams-id} --to-latest --config-file sasl.conf
Connector
bin/connect-standalone.sh test/connect-standalone.properties test/connect-file-source.properties test/connect-file-sink.properties
bin/connect-standalone.sh test/connect-standalone.properties test/connect-file-source.properties
Python API
Install
pip install kafka-python
Cluster Metadata
cluster = kafka.cluster.ClusterMetadata(bootstrap_servers=['localhost:9092'])
# get topics
cluster.topics()
# get brokers
cluster.brokers()
Consumer
from kafka import KafkaConsumer
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('test-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
# consume earliest available messages, don't commit offsets
KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)
# consume json messages
KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))
# consume msgpack
KafkaConsumer(value_deserializer=msgpack.unpackb)
# StopIteration if no message after 1sec
KafkaConsumer(consumer_timeout_ms=1000)
# Subscribe to a regex topic pattern
consumer = KafkaConsumer()
consumer.subscribe(pattern='^awesome.*')
# Use multiple consumers in parallel w/ 0.9 kafka brokers
# typically you would run each on a different server / process / CPU
consumer1 = KafkaConsumer('test-topic',
group_id='my-group',
bootstrap_servers='my.server.com')
consumer2 = KafkaConsumer('test-topic',
group_id='my-group',
bootstrap_servers='my.server.com')
Producer
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
# Asynchronous by default
future = producer.send('test-topic', b'raw_bytes')
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
pass
# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
# produce keyed messages to enable hashed partitioning
producer.send('test-topic', key=b'foo', value=b'bar')
# encode objects via msgpack
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})
# produce json messages
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
producer.send('json-topic', {'key': 'value'})
# produce asynchronously
for _ in range(100):
producer.send('my-topic', b'msg')
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
def on_send_error(excp):
log.error('I am an errback', exc_info=excp)
# handle exception
# produce asynchronously with callbacks
producer.send('test-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error)
# block until all async messages are sent
producer.flush()
# configure multiple retries
producer = KafkaProducer(retries=5)
可视化
需要修改的配置项,其他按默认的就行
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop01:2181,hadoop02:2181,hadoop05:2181
kafka.eagle.webui.port=8080
# 改成自己按照路径下的 db 文件夹
kafka.eagle.url=jdbc:sqlite:/data/username/Softwares/kafka-eagle/db/ke.db
Security
配置Server
- 修改
config/server.properties
listeners=SASL_SSL://host.name:port
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
或者(公司集群写法)
listeners=INTERNAL://127.0.0.1:9092,OUTSIDE://127.0.0.1:9093
listener.security.protocol.map = INTERNAL:SASL_PLAINTEXT,OUTSIDE:SASL_PLAINTEXT
advertised.listeners=INTERNAL://127.0.0.1:9092,OUTSIDE://host:9093
inter.broker.listener.name=INTERNAL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
- 创建密码文件
kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_yourusername="your_password";
};
- 启动时添加配置项
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
bin/kafka-server-start.sh -daemon sasl/server.properties
客户端配置
添加以下配置
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="yourusername" \
password="your_password";
压力测试
Producer端压测
kafka-producer-perf-test \
--producer-props "bootstrap.servers=127.0.0.1:9092" \
--producer.config sasl/client.conf \
--topic test \
--num-records 500000 \#records总数
--throughput 500 \#每秒throughput
--record-size 10000\#每条record大小,byte
--payload-file payload.txt\#payload文件,会从其中随机选中record插入,和record-size二选一输入
--print-metrics#打印metrics
Consumer端压测
kafka-consumer-perf-test \
--broker-list host1:port1,host2:port2,... \
--zookeeper zk1:port1,zk2:port2,... \
--topic TOPIC \
--group gid \
--num-fetch-threads \
--from-latest