ELK-Elasticsearch

ELK-Elasticsearch Run on Docker docker pull docker.elastic.co/elasticsearch/elasticsearch:7.9.1 docker run -d -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" --name elasticsearch docker.elastic.co/elasticsearch/elasticsearch:7.9.1 常用查询 # 查看集群健康 curl "http://localhost:9200/_cat/health?v" # 查看节点 curl "http://localhost:9200/_cat/nodes?v" # 查看index curl "http://localhost:9200/_cat/indices?v&s=index" # 查看index按大小排序 curl "http://localhost:9200/_cat/indices?v&s=store.size:desc" # 查看数据,按照field1和field2的值过滤 curl "http://localhost:9200/my-index/_search?q=field1:100&q=field2:abc" # 查看数据,返回field1和field2并按field1排序 curl "http://localhost:9200/my-index/_search?_source=field1,field2&sort=field1:desc" # POST curl -XPOST "http://localhost:9200/my-index/_search?" -H 'Content-Type: application/json' -d '{"size":3, "query": {"term":{"Name":"Tom"}}}' # 查看配置 curl "http://localhost:9200/_cluster/settings?include_defaults=true&pretty" # 查看统计 curl "http://localhost:9200/_nodes/stats?pretty" APIs cat APIs cat APIs Common # verbose,显示 header curl -XGET "http://{ip}:9200/_cat/master?v" # help,显示说明 # 在 Kibana 中可以输入命令后按 `cmd + /` 查询文档 curl -XGET "http://{ip}:9200/_cat/master?help" # 输出指定 headers curl -XGET "http://{ip}:9200/_cat/master?h=ip,port,name" # 排序 curl -XGET "http://{ip}:9200/_cat/indices?v&s=index curl -XGET "http://{ip}:9200/_cat/indices?v&s=index:asc" # 升序 curl -XGET "http://{ip}:9200/_cat/indices?v&s=index:desc" # 降序 curl -XGET "http://{ip}:9200/_cat/templates?v&s=order:desc,index_patterns" # 多级排序 # 输出格式:format=text|json|yaml|smile|cbors curl -XGET "http://{ip}:9200/_cat/indices?format=json&pretty" # 输出单位:bytes=b|kb|mb|gb|tb|pb, curl -XGET "http://{ip}:9200/_cat/indices?s=store.size:desc&bytes=b" APIs ...

January 1, 2000

ELK-Filebeat

ELK-Filebeat Run Filebeat on Docker docker pull docker.elastic.co/beats/filebeat:7.9.1 docker run -d \ --name=filebeat \ --user=root \ --volume="$(pwd)/filebeat.docker.yml:/usr/share/filebeat/filebeat.yml:ro" \ --volume="/var/lib/docker/containers:/var/lib/docker/containers:ro" \ --volume="/var/run/docker.sock:/var/run/docker.sock:ro" \ --volume="/var/log:/var/log" \ # mount host log position docker.elastic.co/beats/filebeat:7.9.1 filebeat -e -strict.perms=false Filebeat Config # file input filebeat.inputs: - type: log enabled: true paths: - /var/log/test.log exclude_lines: ['^nohup'] # modules filebeat.config: modules: path: ${path.config}/modules.d/*.yml reload.enabled: false # logstash output output.logstash: hosts: ["localhost:5044"] # elasticsearch output output.elasticsearch: hosts: ["localhost:9200"] # 当ES在docker中运行时,要用host ip而不能写localhost,因为Filebeat和ES不在一个docker中 username: 'username' password: 'password'

January 1, 2000

ELK-Logstash

ELK-Logstash Run Logstash on Docker docker pull docker.elastic.co/logstash/logstash-oss:7.9.1 docker run --rm -it \ -p 5044:5044 \ -v /data/yzchen/elk/logstash/pipeline/:/usr/share/logstash/pipeline/ \ docker.elastic.co/logstash/logstash-oss:7.9.1 Logstash Config input { # Filebeat input beats { port => "5044" } # Redis input redis { host => "host_ip" port => port password => "password" data_type => "channel" key => "channel_pattern" } filter { # drop data if [type] != "raw" { drop { } } # parse json string json { source => "msgdata" } # drop field prune { blacklist_names => ["msgdata"] } # check if a field exists if ([foo]) { } # for numeric type if ("" in [foo]) { } # for other type # dissect dissect { mapping => { "line" => "%{month} %{day} %{time} %{id} %{?NET_CONNTRACK->}[%{type}]%{rest}" } } # select from array and dict mutate { update => {"new_field" => "%{[array_name][0][key_name]}"} } # qeury mysql and add filed jdbc_streaming { jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.34.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://host:3306/database" jdbc_user => "username" jdbc_password => "secret" statement => "SELECT col FROM table WHERE name = :name" parameters => { "name" => "name"} target => "col" } # query elasticsearch and add filed # 注意这里的query语法是ES的Query string # 详见https://www.elastic.co/guide/en/elasticsearch/reference/7.12/query-dsl-query-string-query.html#query-string-syntax elasticsearch { hosts => ["host:9200"] index => "python-log-auth-*" query => 'type:auth AND name:"%{[name]}"' fields => { "device" => "device" } sort => "@timestamp:desc" } # ruby ruby { code => "event.set('date', (event.get('message')[0..-7].to_f + event.get('message')[-6..-1].to_f / 1000000).to_s)" } } output { # debug output stdout { codec => rubydebug } 监控 curl -XGET 'localhost:9600/_node/stats/<ps|jvm|process|events|pipelines|reloads>' References Logstash 最佳实践

January 1, 2000

Hadoop

Hadoop Basics # 查看hdfs文件系统 hadoop fs -ls <dir> # 查看文本文件 hadoop fs -cat <filename> hadoop fs -cat <filename> | head # 查看压缩文件 hadoop fs -text <filename> # 查看文件大小 hadoop fs -du <dir> 其他命令可以看:hadoop shell commands

January 1, 2000

Hive

Hive [TOC] CLI # 启动cli hive # or hive --service cli # 定义变量 hive --define A=B # or hive -d A=B # 执行SQL语句 hive -e SELECT * FROM a_table # 运行文件 hive -f filename # or hive > source filename; # 运行初始化文件 # 可以将初始化内容加到$HOME/.hiverc文件中 hive -i filename # 历史记录保存在$HOME/.hivehistory文件中 # 配置文件在$HIVE_HOME/conf/hive-default.xml.template # 配置 hive --hiveconf option_A=true hive set hiveconf:option_A=trues; # 执行jar应用 hive --service jar # 执行shell命令,前面加! hive > !pwd; # 执行hadoop命令,去掉前面的hadoop hive > fs -ls /; # 注释 # -- 注释内容前面加'--' Database操作 # 查看database列表 SHOW DATABASES; # 创建database CREATE DATABASE a_database; # 查看信息 DESCRIBE DATABASE a_database; # 使用database USE a_database; # 删除database DROP a_database; # 修改属性 ALTER DATABASE a_database SET DBPROPERTIES('edited-by' = 'ychen'); Table操作 # 查看table列表 SHOW TABLES; SHOW TABLES IN a_database; # 查看表信息 DESCRIBE a_table; DESCRIBE a_database.a_table; DESCRIBE a_table.a_column; # 查看某一列 # 创建表 CREATE TABLE IF NOT EXISTS a_table ( name STRING COMMENT 'the name', salary FLOAT COMENT 'the salary' ) COMMENT 'Description of the table' TBLPROPERTIES ('creator'='me') LOCATION '/user/hive/warehouse/a_database/a_table'; # 删除表 DROP TABLE IF EXISTS a_table; # 重命名表 ALTER TABLE a_table RENAME TO b_table; # 修改列 ALTER TABLE a_table CHANGE COLUMN old_column new_column INT # 重命名列 AFTER other_column; # 修改位置 # 增加列 ALTER TABLE a_table ADD COLUMNS ( app_name STRING session_id LONG ) # 删除与替换列 ALTER TABLE a_table REPLACE COLUMNS ( a_column INT, b_column STRING ) 外部表 Hive没有所有权的数据,但是可以进行查询 ...

January 1, 2000

Kafka

Kafka Docker Install Install docker pull wurstmeister/zookeeper docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper docker pull wurstmeister/kafka docker run -d --name kafka -p 9092:9092 \ -e KAFKA_BROKER_ID=0 \ -e KAFKA_ZOOKEEPER_CONNECT=${host}:2181 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${host}:9092 \ -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ wurstmeister/kafka Usage docker exec -it kafka /bin/bash cd opt/kafka_2.11-2.0.0/ # producer ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka # consumer ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mykafka --from-beginning 配置 auto.create.topics.enable=false/true num.partitions=1 default.replication.factor=1 Listeners配置 Kafka 2.1 Documentation - 3.1 Broker Configs kafka的listeners和advertised.listeners,配置内外网分流 Kafka从上手到实践-Kafka集群:Kafka Listeners 只需要内网访问kafka listeners=PLAINTEXT://inner_ip:9092 或者配置SASL ...

January 1, 2000

Spark

Spark [TOC] PySpark initialize and stop # 使用spark context from pyspark import SparkContext sc = SparkContext() # some code sc.stop() # 使用spark sql from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("test") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() # some code spark.stop() read and write # load parquet file data = spark.read.parquet("file_name.parquet") # show dataframe data.show() # write json file data.write.json("filename.json") DataFrame import pyspark.sql.functions as F import pyspark.sql.types as types # From SQL df = spark.sql('select name, age from table') # collect/show df.show() df.take(5) df.collect() df.take(1)[0].asDict()['name'] # Rename column df.withColumnRenamed('old_name', 'new_name') # add column df.withColumn('new_col', F.lit(1)) # drop column df.drop('new_col') # filter df.filter(F.col('cnt') > 1) df.where('cnt > 2') # join df1.join(df2, df['key1']==df2['key2'], how='inner') # to RDD df.rdd # map with rdd df.rdd.map(lambda x: (x[0]+1, )).toDF() # map with udf func = lambda x: x + 1 # define function udf_func = F.udf(func, FloatType()) # convert function to UDF df.withColumn('new_col', udf_func('old_col')) # map function to column # save to hive df.createTempView('tmp_table') spark.sql(''' INSERT OVERWRITE TABLE hive_table PARTITION(ds=20200825) SELECT name, age FROM tmp_table ''') df.write.insertInto(target_db.target_table, overwrite = False) References pyspark.sql module

January 1, 2000

Zookeeper

Zookeeper 配置 tickTime=2000 initLimit=10 syncLimit=5 dataDir=zookeeper/data dataLogDir=zookeeper/logs clientPort=2181 server.1=host1:2888:3888 server.2=host2:2888:3888 server.3=host3:2888:3888 #server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里 #host1为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888 Command Line Tools # start server bin/zkServer.sh start/start-foreground/stop/status # client bin/zkCli.sh -server 127.0.0.1:2181 create /MyFirstZNode ZNodeVal get /MyFirstZNode set /MyFirstZNode ZNodeValUpdated Maven <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.11</version> </dependency> Source Code Zookeeper框架设计及源码解读 References Getting Started with Java and Zookeeper Explaining Apache ZooKeeper

January 1, 2000