Kafka

Docker Install

Install

docker pull wurstmeister/zookeeper  
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

docker pull wurstmeister/kafka
docker run  -d --name kafka -p 9092:9092 \
  -e KAFKA_BROKER_ID=0 \
  -e KAFKA_ZOOKEEPER_CONNECT=${host}:2181 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${host}:9092 \
  -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
  wurstmeister/kafka

Usage

docker exec -it kafka /bin/bash
cd opt/kafka_2.11-2.0.0/

# producer
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka

# consumer
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mykafka --from-beginning

配置

auto.create.topics.enable=false/true
num.partitions=1
default.replication.factor=1

Listeners配置

  1. 只需要内网访问kafka
listeners=PLAINTEXT://inner_ip:9092

或者配置SASL

listeners=SASL_PLAINTEXT://inner_ip:9092
security.inter.broker.protocol=SASL_PLAINTEXT
  1. 需要外网访问,所有流量走外网
listeners=SASL_PLAINTEXT://public_ip:9092
security.inter.broker.protocol=SASL_PLAINTEXT
  1. 内外网分流
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
listeners=INTERNAL://inner_ip:9092,EXTERNAL://public_ip:19092
advertised.listeners=INTERNAL:/inner_ip:9092,EXTERNAL://public_ip:19092
inter.broker.listener.name=INTERNAL

Shell Tools

启动

# 如果集群间以及有 zookeeper 在运行,则不用启动,zookeeper.properties 按照zookeeper相关配置填写
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties

# terminate
rm -rf /tmp/kafka-logs /tmp/zookeeper

Topic

# list topics
bin/kafka-topics.sh --zookeeper ${host_ip}:2191 --list
bin/kafka-topics.sh --bootstrap-server ${host_ip}:9092 --list
bin/kafka-topics.sh --bootstrap-server ${host_ip}:9092 --list --command-config sasl.conf

# describe a topic
bin/kafka-topics.sh --bootstrap-server ${host_ip}:9092 --describe --topic ${topic_name}

# describe all external topics
bin/kafka-topics.sh --bootstrap-server ${host_ip}:9092 --describe --exclude-internal

# create a topic
bin/kafka-topics.sh --bootstrap-server ${host_ip}:9092 --create --topic ${topic_name} --partitions 1 --replication-factor 1

# delete a topic
bin/kafka-topics.sh --bootstrap-server ${host_ip}:9092 --delete --topic ${topic_name}

Consumer Group

# describe all consumer groups
bin/kafka-consumer-groups.sh --bootstrap-server ${host_ip}:9092 --describe --all-groups
bin/kafka-consumer-groups.sh --bootstrap-server ${host_ip}:9092 --describe --all-groups --command-config sasl.conf

# describe a consumer group
bin/kafka-consumer-groups.sh --bootstrap-server ${host_ip}:9092 --describe --group ${group_id}

# describe a consumer group on a topic
bin/kafka-consumer-groups.sh --bootstrap-server ${host_ip}:9092 --describe --group ${group_id} --topic ${topic_name}

# 重置 offset
bin/kafka-consumer-groups.sh --bootstrap-server ${host_ip}:9092 --group ${group_id} --topic ${topic_name} --reset-offsets \
    [--to-earliest/--to-latest/--to-current/--to-offset <offset_integer>/--to-datetime <datetime_string>/--by-duration <duration_string>/--shift-by <positive_or_negative_integer>] \
    --execute

Console Consumer

# consumer
bin/kafka-console-consumer.sh --bootstrap-server ${host_ip}:9092 --topic ${topic_name} --group ${group_id} --consumer.config sasl.conf [--from-beginning]

Console Producer

# producer
bin/kafka-console-producer.sh --bootstrap-server ${host_ip}:9092 --topic ${topic_name} --producer.config sasl.conf
bin/kafka-console-producer.sh --broker-list ${host_ip}:9092 --topic ${topic_name} --producer.config sasl.conf # version 2.3.0+

bin/kafka-console-producer.sh --bootstrap-server ${host_ip}:9092 --topic ${topic_name} \
  --property parse.key=true \
  --property key.separator=":"

Reset Kafka Streams Offset

bin/kafka-streams-application-reset.sh --bootstrap-servers ${host_ip}:9092 --application-id ${streams-id} --to-latest --config-file sasl.conf

Connector

bin/connect-standalone.sh test/connect-standalone.properties test/connect-file-source.properties test/connect-file-sink.properties

bin/connect-standalone.sh test/connect-standalone.properties test/connect-file-source.properties

Python API

Install

pip install kafka-python

Cluster Metadata

cluster = kafka.cluster.ClusterMetadata(bootstrap_servers=['localhost:9092'])

# get topics
cluster.topics()

# get brokers
cluster.brokers()

Consumer

from kafka import KafkaConsumer

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('test-topic',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])
for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

# consume earliest available messages, don't commit offsets
KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)

# consume json messages
KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))

# consume msgpack
KafkaConsumer(value_deserializer=msgpack.unpackb)

# StopIteration if no message after 1sec
KafkaConsumer(consumer_timeout_ms=1000)

# Subscribe to a regex topic pattern
consumer = KafkaConsumer()
consumer.subscribe(pattern='^awesome.*')

# Use multiple consumers in parallel w/ 0.9 kafka brokers
# typically you would run each on a different server / process / CPU
consumer1 = KafkaConsumer('test-topic',
                          group_id='my-group',
                          bootstrap_servers='my.server.com')
consumer2 = KafkaConsumer('test-topic',
                          group_id='my-group',
                          bootstrap_servers='my.server.com')

Producer

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

# Asynchronous by default
future = producer.send('test-topic', b'raw_bytes')

# Block for 'synchronous' sends
try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    # Decide what to do if produce request failed...
    log.exception()
    pass

# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)

# produce keyed messages to enable hashed partitioning
producer.send('test-topic', key=b'foo', value=b'bar')

# encode objects via msgpack
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})

# produce json messages
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
producer.send('json-topic', {'key': 'value'})

# produce asynchronously
for _ in range(100):
    producer.send('my-topic', b'msg')

def on_send_success(record_metadata):
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)

def on_send_error(excp):
    log.error('I am an errback', exc_info=excp)
    # handle exception

# produce asynchronously with callbacks
producer.send('test-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error)

# block until all async messages are sent
producer.flush()

# configure multiple retries
producer = KafkaProducer(retries=5)

可视化

kafka-eagle

需要修改的配置项,其他按默认的就行

kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop01:2181,hadoop02:2181,hadoop05:2181

kafka.eagle.webui.port=8080

# 改成自己按照路径下的 db 文件夹
kafka.eagle.url=jdbc:sqlite:/data/username/Softwares/kafka-eagle/db/ke.db

Security

配置Server

  1. 修改config/server.properties
listeners=SASL_SSL://host.name:port
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

或者(公司集群写法)

listeners=INTERNAL://127.0.0.1:9092,OUTSIDE://127.0.0.1:9093
listener.security.protocol.map = INTERNAL:SASL_PLAINTEXT,OUTSIDE:SASL_PLAINTEXT
advertised.listeners=INTERNAL://127.0.0.1:9092,OUTSIDE://host:9093
inter.broker.listener.name=INTERNAL

sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
  1. 创建密码文件kafka_server_jaas.conf
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_yourusername="your_password";
};
  1. 启动时添加配置项-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
bin/kafka-server-start.sh -daemon sasl/server.properties

客户端配置

添加以下配置

sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
          username="yourusername" \
          password="your_password";

压力测试

Producer端压测

kafka-producer-perf-test \
  --producer-props "bootstrap.servers=127.0.0.1:9092" \
  --producer.config sasl/client.conf \
  --topic test \
  --num-records 500000 \#records总数
  --throughput 500 \#每秒throughput
  --record-size 10000\#每条record大小,byte
  --payload-file payload.txt\#payload文件,会从其中随机选中record插入,和record-size二选一输入
  --print-metrics#打印metrics

Consumer端压测

kafka-consumer-perf-test \
  --broker-list host1:port1,host2:port2,... \
  --zookeeper zk1:port1,zk2:port2,... \
  --topic TOPIC \
  --group gid \
  --num-fetch-threads \
  --from-latest

Kafka streams

Joins

Crossing the Streams – Joins in Apache Kafka

References