Arrow

PyArrow Install: pip install pyarrow Load Data import pyarrow.parquet as pq table = pq.read_table('/path/to/table') Load data from OSS import ossfs import pyarrow.parquet as pq OSS_ENDPOINT = 'http://oss-cn-wulanchabu-internal.aliyuncs.com' OSS_BUCKET = 'bucket-name' OSS_ACCESS_KEY = '***' OSS_ACCESS_SECRET = '***' fs = ossfs.OSSFileSystem(endpoint=OSS_ENDPOINT, key=OSS_ACCESS_KEY, secret=OSS_ACCESS_SECRET) table = pq.read_table(f'{OSS_BUCKET}/path/to/table', filesystem=fs)

January 1, 2000

ELK-Elasticsearch

ELK-Elasticsearch Run on Docker docker pull docker.elastic.co/elasticsearch/elasticsearch:7.9.1 docker run -d -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" --name elasticsearch docker.elastic.co/elasticsearch/elasticsearch:7.9.1 常用查询 # 查看集群健康 curl "http://localhost:9200/_cat/health?v" # 查看节点 curl "http://localhost:9200/_cat/nodes?v" # 查看index curl "http://localhost:9200/_cat/indices?v&s=index" # 查看index按大小排序 curl "http://localhost:9200/_cat/indices?v&s=store.size:desc" # 查看数据,按照field1和field2的值过滤 curl "http://localhost:9200/my-index/_search?q=field1:100&q=field2:abc" # 查看数据,返回field1和field2并按field1排序 curl "http://localhost:9200/my-index/_search?_source=field1,field2&sort=field1:desc" # POST curl -XPOST "http://localhost:9200/my-index/_search?" -H 'Content-Type: application/json' -d '{"size":3, "query": {"term":{"Name":"Tom"}}}' # 查看配置 curl "http://localhost:9200/_cluster/settings?include_defaults=true&pretty" # 查看统计 curl "http://localhost:9200/_nodes/stats?pretty" APIs cat APIs cat APIs Common # verbose,显示 header curl -XGET "http://{ip}:9200/_cat/master?v" # help,显示说明 # 在 Kibana 中可以输入命令后按 `cmd + /` 查询文档 curl -XGET "http://{ip}:9200/_cat/master?help" # 输出指定 headers curl -XGET "http://{ip}:9200/_cat/master?h=ip,port,name" # 排序 curl -XGET "http://{ip}:9200/_cat/indices?v&s=index curl -XGET "http://{ip}:9200/_cat/indices?v&s=index:asc" # 升序 curl -XGET "http://{ip}:9200/_cat/indices?v&s=index:desc" # 降序 curl -XGET "http://{ip}:9200/_cat/templates?v&s=order:desc,index_patterns" # 多级排序 # 输出格式:format=text|json|yaml|smile|cbors curl -XGET "http://{ip}:9200/_cat/indices?format=json&pretty" # 输出单位:bytes=b|kb|mb|gb|tb|pb, curl -XGET "http://{ip}:9200/_cat/indices?s=store.size:desc&bytes=b" APIs ...

January 1, 2000

ELK-Filebeat

ELK-Filebeat Run Filebeat on Docker docker pull docker.elastic.co/beats/filebeat:7.9.1 docker run -d \ --name=filebeat \ --user=root \ --volume="$(pwd)/filebeat.docker.yml:/usr/share/filebeat/filebeat.yml:ro" \ --volume="/var/lib/docker/containers:/var/lib/docker/containers:ro" \ --volume="/var/run/docker.sock:/var/run/docker.sock:ro" \ --volume="/var/log:/var/log" \ # mount host log position docker.elastic.co/beats/filebeat:7.9.1 filebeat -e -strict.perms=false Filebeat Config # file input filebeat.inputs: - type: log enabled: true paths: - /var/log/test.log exclude_lines: ['^nohup'] # modules filebeat.config: modules: path: ${path.config}/modules.d/*.yml reload.enabled: false # logstash output output.logstash: hosts: ["localhost:5044"] # elasticsearch output output.elasticsearch: hosts: ["localhost:9200"] # 当ES在docker中运行时,要用host ip而不能写localhost,因为Filebeat和ES不在一个docker中 username: 'username' password: 'password'

January 1, 2000

ELK-Logstash

ELK-Logstash Run Logstash on Docker docker pull docker.elastic.co/logstash/logstash-oss:7.9.1 docker run --rm -it \ -p 5044:5044 \ -v /data/yzchen/elk/logstash/pipeline/:/usr/share/logstash/pipeline/ \ docker.elastic.co/logstash/logstash-oss:7.9.1 Logstash Config input { # Filebeat input beats { port => "5044" } # Redis input redis { host => "host_ip" port => port password => "password" data_type => "channel" key => "channel_pattern" } filter { # drop data if [type] != "raw" { drop { } } # parse json string json { source => "msgdata" } # drop field prune { blacklist_names => ["msgdata"] } # check if a field exists if ([foo]) { } # for numeric type if ("" in [foo]) { } # for other type # dissect dissect { mapping => { "line" => "%{month} %{day} %{time} %{id} %{?NET_CONNTRACK->}[%{type}]%{rest}" } } # select from array and dict mutate { update => {"new_field" => "%{[array_name][0][key_name]}"} } # qeury mysql and add filed jdbc_streaming { jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.34.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://host:3306/database" jdbc_user => "username" jdbc_password => "secret" statement => "SELECT col FROM table WHERE name = :name" parameters => { "name" => "name"} target => "col" } # query elasticsearch and add filed # 注意这里的query语法是ES的Query string # 详见https://www.elastic.co/guide/en/elasticsearch/reference/7.12/query-dsl-query-string-query.html#query-string-syntax elasticsearch { hosts => ["host:9200"] index => "python-log-auth-*" query => 'type:auth AND name:"%{[name]}"' fields => { "device" => "device" } sort => "@timestamp:desc" } # ruby ruby { code => "event.set('date', (event.get('message')[0..-7].to_f + event.get('message')[-6..-1].to_f / 1000000).to_s)" } } output { # debug output stdout { codec => rubydebug } 监控 curl -XGET 'localhost:9600/_node/stats/<ps|jvm|process|events|pipelines|reloads>' References Logstash 最佳实践

January 1, 2000

Hadoop

Install find installed Hadoop env | grep -i hadoop echo $HADOOP_HOME echo $HADOOP_CLASSPATH Hadoop directory structure: hadoop-3.2.1-1.3.2 ├── bin # hadoop的可执行文件,包括 hadoop, hdfs, yarn 等 │ ├── container-executor │ ├── hadoop │ ├── hdfs │ ├── mapred │ ├── oom-listener │ └── yarn ├── etc # hadoop的配置文件 │ └── hadoop │ ├── core-site.xml │ ├── hdfs-site.xml │ ├── httpfs-site.xml │ ├── kms-site.xml │ ├── log4j.properties │ ├── mapred-site.xml │ ├── workers │ └── yarn-site.xml ├── sbin # hadoop的启动脚本 │ ├── hadoop-daemon.sh │ ├── hadoop-daemons.sh │ ├── httpfs.sh │ ├── kms.sh │ ├── mr-jobhistory-daemon.sh │ ├── refresh-namenodes.sh │ ├── start-all.sh │ ├── start-balancer.sh │ ├── start-dfs.sh │ ├── start-secure-dns.sh │ ├── start-yarn.sh │ ├── stop-all.sh │ ├── stop-balancer.sh │ ├── stop-dfs.sh │ ├── stop-secure-dns.sh │ ├── stop-yarn.sh │ ├── workers.sh │ ├── yarn-daemon.sh │ └── yarn-daemons.sh └── share/ # hadoop的共享 jar 文件 └── hadoop ├── client ├── common ├── hdfs ├── mapreduce ├── tools └── yarn Commands Hadoop Commands Guide ...

January 1, 2000

Hive

Deploy Deploy Standalone Metastore with Docker Hive - Quick Start HIVE_VERSION=3.1.3 MYSQL_HOST="localhost" MYSQL_PORT="3306" MYSQL_USER="hive" MYSQL_PASSWORD="password" MYSQL_DATABASE="hive_metastore" docker run -d -p 9083:9083 --env SERVICE_NAME=metastore --env DB_DRIVER=mysql \ --env SERVICE_OPTS="-Djavax.jdo.option.ConnectionDriverName=com.mysql.cj.jdbc.Driver -Djavax.jdo.option.ConnectionURL=jdbc:mysql://${MYSQL_HOST}:${MYSQL_PORT}/${MYSQL_DATABASE} -Djavax.jdo.option.ConnectionUserName=${MYSQL_USER} -Djavax.jdo.option.ConnectionPassword=${MYSQL_PASSWORD}" \ -v $(pwd)/warehouse:/opt/hive/data/warehouse \ -v $(pwd)/lib/mysql-connector-java-8.0.30.jar:/opt/hive/lib/mysql-connector-java.jar \ --name hive-metastore apache/hive:${HIVE_VERSION} Deploy Standalone Metastore download hive tar: https://dlcdn.apache.org/hive/ extract tar, add HIVE_HOME to environment variables echo 'export HIVE_HOME=/opt/hive' >> ~/.bashrc echo 'export PATH=$HIVE_HOME/bin:$PATH' >> ~/.bashrc source ~/.bashrc create database CREATE DATABASE hive_metastore; CREATE USER 'hive'@'%' IDENTIFIED BY 'password'; GRANT ALL PRIVILEGES ON hive_metastore.* TO 'hive'@'%'; FLUSH PRIVILEGES; download mysql connector jar: https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar mv mysql-connector-java-8.0.30.jar $HIVE_HOME/lib/ 修改 hive-site.xml <configuration> <!-- 元数据存储位置 --> <property> <name>hive.metastore.warehouse.dir</name> <value>/user/hive/warehouse</value> </property> <!-- MySQL连接配置 --> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://localhost:3306/hive_metastore?createDatabaseIfNotExist=true</value> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>hive</value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>password</value> </property> <!-- 远程模式配置 --> <property> <name>hive.metastore.uris</name> <value>thrift://0.0.0.0:9083</value> </property> <property> <name>hive.metastore.local</name> <value>false</value> </property> </configuration> 启动 metastore $HIVE_HOME/bin/schematool -dbType mysql -initSchema $HIVE_HOME/bin/hive --service metastore check metastore is running jps lsof -i :9083 CLI # 启动cli hive # or hive --service cli # 定义变量 hive --define A=B # or hive -d A=B # 执行SQL语句 hive -e SELECT * FROM a_table # 运行文件 hive -f filename # or hive > source filename; # 运行初始化文件 # 可以将初始化内容加到$HOME/.hiverc文件中 hive -i filename # 历史记录保存在$HOME/.hivehistory文件中 # 配置文件在$HIVE_HOME/conf/hive-default.xml.template # 配置 hive --hiveconf option_A=true hive set hiveconf:option_A=trues; # 执行jar应用 hive --service jar # 执行shell命令,前面加! hive > !pwd; # 执行hadoop命令,去掉前面的hadoop hive > fs -ls /; # 注释 # -- 注释内容前面加'--' Database操作 # 查看database列表 SHOW DATABASES; # 创建database CREATE DATABASE a_database; # 查看信息 DESCRIBE DATABASE a_database; # 使用database USE a_database; # 删除database DROP a_database; # 修改属性 ALTER DATABASE a_database SET DBPROPERTIES('edited-by' = 'ychen'); Table操作 # 查看table列表 SHOW TABLES; SHOW TABLES IN a_database; # 查看表信息 DESCRIBE a_table; DESCRIBE a_database.a_table; DESCRIBE a_table.a_column; # 查看某一列 # 创建表 CREATE TABLE IF NOT EXISTS a_table ( name STRING COMMENT 'the name', salary FLOAT COMENT 'the salary' ) COMMENT 'Description of the table' TBLPROPERTIES ('creator'='me') LOCATION '/user/hive/warehouse/a_database/a_table'; # 删除表 DROP TABLE IF EXISTS a_table; # 重命名表 ALTER TABLE a_table RENAME TO b_table; # 修改列 ALTER TABLE a_table CHANGE COLUMN old_column new_column INT # 重命名列 AFTER other_column; # 修改位置 # 增加列 ALTER TABLE a_table ADD COLUMNS ( app_name STRING session_id LONG ) # 删除与替换列 ALTER TABLE a_table REPLACE COLUMNS ( a_column INT, b_column STRING ) 外部表 Hive没有所有权的数据,但是可以进行查询 ...

January 1, 2000

Kafka

Docker Install Install docker pull wurstmeister/zookeeper docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper docker pull wurstmeister/kafka docker run -d --name kafka -p 9092:9092 \ -e KAFKA_BROKER_ID=0 \ -e KAFKA_ZOOKEEPER_CONNECT=${host}:2181 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${host}:9092 \ -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ wurstmeister/kafka Usage docker exec -it kafka /bin/bash cd opt/kafka_2.11-2.0.0/ # producer ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka # consumer ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mykafka --from-beginning 配置 auto.create.topics.enable=false/true num.partitions=1 default.replication.factor=1 Listeners配置 Kafka 2.1 Documentation - 3.1 Broker Configs kafka的listeners和advertised.listeners,配置内外网分流 Kafka从上手到实践-Kafka集群:Kafka Listeners 只需要内网访问kafka listeners=PLAINTEXT://inner_ip:9092 或者配置SASL ...

January 1, 2000

Lakehouse

Hudi PySpark tableName = "table_name" basePath = "oss://bucket/user/hive/warehouse" hudi_options = { 'hoodie.table.name': tableName, 'hoodie.datasource.write.partitionpath.field': 'pt' } spark_df.write.format("hudi"). \ options(**hudi_options). \ mode("overwrite"). \ save(basePath) Iceberg TODO Paimon Catalog Catalog is an abstraction to manage the table of contents and metadata filesystem metastore (default), which stores both metadata and table files in filesystems. hive metastore, which additionally stores metadata in Hive metastore. Users can directly access the tables from Hive. jdbc metastore, which additionally stores metadata in relational databases such as MySQL, Postgres, etc. rest metastore, which is designed to provide a lightweight way to access any catalog backend from a single client. Bucket 每个 bucket 里面都包含一个单独的 LSM Tree 及其变更日志文件(包含 INSERT、UPDATE、DELETE) Bucket 是最小的读写存储单元,Bucket 的数量限制了最大的处理并行度 建议每个bucket中的数据大小约为 200MB-1GB References: ...

January 1, 2000

Spark

Quick Start cd $SPARK_HOME ./bin/spark-submit examples/src/main/python/pi.py 10 # ---output--- # Pi is roughly 3.145440 Running Spark YARN cluster mode: 命令仅提交任务,不等待任务完成,spark driver 在 YARN 的 application master 上运行 ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options] ./bin/spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ --driver-memory 4g \ --executor-memory 2g \ --executor-cores 1 \ --queue default \ examples/jars/spark-examples*.jar \ 10 client mode: 命令提交任务并等待任务完成,spark driver 在 client 上运行,application master 仅用于申请资源 ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode client [options] <app jar> [app options] Spark Connect Spark Doc - Spark Connect Overview ...

January 1, 2000

Zookeeper

Zookeeper 配置 tickTime=2000 initLimit=10 syncLimit=5 dataDir=zookeeper/data dataLogDir=zookeeper/logs clientPort=2181 server.1=host1:2888:3888 server.2=host2:2888:3888 server.3=host3:2888:3888 #server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里 #host1为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888 Command Line Tools # start server bin/zkServer.sh start/start-foreground/stop/status # client bin/zkCli.sh -server 127.0.0.1:2181 create /MyFirstZNode ZNodeVal get /MyFirstZNode set /MyFirstZNode ZNodeValUpdated Maven <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.11</version> </dependency> Source Code Zookeeper框架设计及源码解读 References Getting Started with Java and Zookeeper Explaining Apache ZooKeeper

January 1, 2000