ELK-Elasticsearch
Run on Docker
docker pull docker.elastic.co/elasticsearch/elasticsearch:7.9.1
docker run -d -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" --name elasticsearch docker.elastic.co/elasticsearch/elasticsearch:7.9.1
常用查询
# 查看集群健康
curl "http://localhost:9200/_cat/health?v"
# 查看节点
curl "http://localhost:9200/_cat/nodes?v"
# 查看index
curl "http://localhost:9200/_cat/indices?v&s=index"
# 查看index按大小排序
curl "http://localhost:9200/_cat/indices?v&s=store.size:desc"
# 查看数据,按照field1和field2的值过滤
curl "http://localhost:9200/my-index/_search?q=field1:100&q=field2:abc"
# 查看数据,返回field1和field2并按field1排序
curl "http://localhost:9200/my-index/_search?_source=field1,field2&sort=field1:desc"
# POST
curl -XPOST "http://localhost:9200/my-index/_search?" -H 'Content-Type: application/json' -d '{"size":3, "query": {"term":{"Name":"Tom"}}}'
# 查看配置
curl "http://localhost:9200/_cluster/settings?include_defaults=true&pretty"
# 查看统计
curl "http://localhost:9200/_nodes/stats?pretty"
APIs
cat APIs
Common
# verbose,显示 header
curl -XGET "http://{ip}:9200/_cat/master?v"
# help,显示说明
# 在 Kibana 中可以输入命令后按 `cmd + /` 查询文档
curl -XGET "http://{ip}:9200/_cat/master?help"
# 输出指定 headers
curl -XGET "http://{ip}:9200/_cat/master?h=ip,port,name"
# 排序
curl -XGET "http://{ip}:9200/_cat/indices?v&s=index
curl -XGET "http://{ip}:9200/_cat/indices?v&s=index:asc" # 升序
curl -XGET "http://{ip}:9200/_cat/indices?v&s=index:desc" # 降序
curl -XGET "http://{ip}:9200/_cat/templates?v&s=order:desc,index_patterns" # 多级排序
# 输出格式:format=text|json|yaml|smile|cbors
curl -XGET "http://{ip}:9200/_cat/indices?format=json&pretty"
# 输出单位:bytes=b|kb|mb|gb|tb|pb,
curl -XGET "http://{ip}:9200/_cat/indices?s=store.size:desc&bytes=b"
APIs
Alias
Returns information about currently configured aliases to indices, including filter and routing information
curl -XGET "http://{ip}:9200/_cat/aliases?v"
Allocation
Provides a snapshot of the number of shards allocated to each data node and their disk space
curl -XGET "http://{ip}:9200/_cat/allocation?v"
curl -XGET "http://{ip}:9200/_cat/allocation/<node_id>?v"
Count
Provides quick access to a document count of individual indices or all indices in a cluster
curl -XGET "http://{ip}:9200/_cat/count/<index>"
curl -XGET "http://{ip}:9200/_cat/count"
Health
Returns the health status of a cluster
curl -XGET "http://{ip}:9200/_cat/health"
Indices
Returns high-level information about indices in a cluster
curl -XGET "http://{ip}:9200/_cat/indices"
curl -XGET "http://{ip}:9200/_cat/indices/<index>"
Master
Returns information about the master node
curl -XGET "http://{ip}:9200/_cat/master"
Nodes
Returns information about a cluster’s nodes
curl -XGET "http://{ip}:9200/_cat/nodes"
# 查看 heap 使用率
curl -XGET "http://{ip}:9200/_cat/nodes?v&h=name,node*,heap*"
# 查看 request/query cache 占用
curl -XGET "http://{ip}:9200/_cat/nodes?v&h=ip,heap.percent,request_cache.memory_size,query_cache.memory_size
Tasks
Returns cluster-level changes that have not yet been executed
curl -XGET "http://{ip}:9200/_cat/pending_tasks"
Returns information about tasks currently executing in the cluster
curl -XGET "http://{ip}:9200/_cat/tasks"
Recovery
Returns information about ongoing and completed shard recoveries
curl -XGET "http://{ip}:9200/_cat/recovery"
curl -XGET "http://{ip}:9200/_cat/recovery/<index>"
Repositories
Returns the snapshot repositories for a cluster
curl -XGET "http://{ip}:9200/_cat/repositories"
Thread pool
Returns thread pool statistics for each node in a cluste
curl -XGET "http://{ip}:9200/_cat/thread_pool/<thread_pool>"
curl -XGET "http://{ip}:9200/_cat/thread_pool"
curl -XGET "http://{ip}:9200/_cat/thread_pool/write?v"
curl -XGET "http://{ip}:9200/_cat/thread_pool/search?v"
Shards
curl -XGET "http://{ip}:9200/_cat/shards/<index>"
curl -XGET "http://{ip}:9200/_cat/shards"
Segments
Returns low-level information about the Lucene segments in index shards
curl -XGET "http://{ip}:9200/_cat/segments"
curl -XGET "http://{ip}:9200/_cat/segments/<index>"
curl -XGET "http://{ip}:9200/_cat/segments?v&bytes=mb&h=index,docs.count,size,size.memory&s=size.memory:desc"
Snapshots
Returns information about the snapshots stored in one or more repositories
curl -XGET "http://{ip}:9200/_cat/snapshots/<repository>"
Templates
Returns information about index templates in a cluster
curl -XGET "http://{ip}:9200/_cat/templates/<template_name>"
curl -XGET "http://{ip}:9200/_cat/templates"
cluster APIs
Allocation explain
Provides explanations for shard allocations in the cluster.
可以查看 unassigned shards 的原因
curl -XGET "http://{ip}:9200/_cluster/allocation/explain"
{
"index": "myindex",
"shard": 0,
"primary": false
}
Settings
查看集群配置
curl -XGET "http://{ip}:9200/_cluster/settings?flat_settings=true&include_defaults=true&pretty"
更新集群配置
# persistent
curl -XPUT "http://{ip}:9200/_cluster/settings
{
"persistent" : {
"indices.recovery.max_bytes_per_sec" : "50mb"
}
}
# transient
curl -XPUT "http://{ip}:9200/_cluster/settings
{
"transient" : {
"indices.recovery.max_bytes_per_sec" : "20mb"
}
}
# 取消配置
curl -XPUT "http://{ip}:9200/_cluster/settings
{
"transient" : {
"indices.recovery.*" : null
}
}
Health
Returns the health status of a cluster
curl -XGET "http://{ip}:9200/_cluster/health
curl -XGET "http://{ip}:9200/_cluster/health/<index>
curl -XGET "http://{ip}:9200/_cluster/health/<index>?level=shard
Reroute
Changes the allocation of shards in a cluster
curl -XPOST "localhost:9200/_cluster/reroute?pretty" -H 'Content-Type: application/json' -d '{
"commands" : [
{
"move" : {
"index" : "test", "shard" : 0,
"from_node" : "node1", "to_node" : "node2"
}
},
{
"allocate_replica" : {
"index" : "test", "shard" : 1,
"node" : "node3"
}
}
]
}'
State
Returns metadata about the state of the cluster.
# metric=_all|blocks|master_node|metadata|nodes|routing_nodes|routing_table|version
curl -XGET "http://{ip}:9200/_cluster/state/<metrics>/<index>"
Stats
Returns cluster statistics.
curl -XGET "http://{ip}:9200/_cluster/stats
curl -XGET "http://{ip}:9200/_cluster/stats/nodes/<node_filter>
nodes APIs
Info
Returns cluster nodes information.
curl -XGET "http://{ip}:9200/_nodes"
curl -XGET "http://{ip}:9200/_nodes/<node_id>"
curl -XGET "http://{ip}:9200/_nodes/<metric>"
curl -XGET "http://{ip}:9200/_nodes/<node_id>/<metric>"
Usage
Returns information on the usage of features.
curl -XGET "http://{ip}:9200/_nodes/usage"
curl -XGET "http://{ip}:9200/_nodes/<node_id>/usage"
curl -XGET "http://{ip}:9200/_nodes/usage/<metric>"
curl -XGET "http://{ip}:9200/_nodes/<node_id>/usage/<metric>"
Hot thread
Returns the hot threads on each selected node in the cluster.
curl -XGET "http://{ip}:9200/_nodes/hot_threads"
curl -XGET "http://{ip}:9200/_nodes/<node_id>/hot_threads"
Stats
Returns cluster nodes statistics.
# metric=breaker|fs|http|indexing_pressure|indices|jvm|os|process|thread_pool|transport
curl -XGET "http://{ip}:9200/_nodes/stats"
curl -XGET "http://{ip}:9200/_nodes/<node_id>/stats"
curl -XGET "http://{ip}:9200/_nodes/stats/<metric>"
curl -XGET "http://{ip}:9200/_nodes/<node_id>/stats/<metric>"
curl -XGET "http://{ip}:9200/_nodes/stats/<metric>/<index_metric>"
curl -XGET "http://{ip}:9200/_nodes/<node_id>/stats/<metric>/<index_metric>"
增删改查
查询
SQL
GET _sql?format=txt
{
"query": "SELECT * from \"your_index\""
}
Query String
GET /_search
{
"query": {
"query_string": {
"query": "(new york city) OR (big apple)",
"default_field": "content"
}
}
}
curl "http://localhost:9200/your_index?q=content:apple&size=3&pretty"
curl -G "http://localhost:9200/your_index/_search?size=3&pretty" --data-urlencode "q=status:200 AND clientip:172.110.15.0"
curl -G "http://localhost:9200/your_index/_search?size=3&pretty" --data-urlencode 'q=request:"POST *"'
Query
GET /index/_search
{
"size": 10,
"query": {
"bool": {
"must": [
{"term": {"field1": "keyword1"}},
{"term": {"field2": "keyword2"}}
]
}
},
"sort": [{"field1":{ "order":"asc"}}]
}
GET /index/_search
{
"query": {
"bool": {
"must": [
{"term": {"field1": "keyword1"}},
{"term": {"field2": "keyword2"}}
]
}
},
"aggs": {
"name": {"sum": {"field": "field1"}}
}
}
GET /index/_search
{
"query": {
"bool": {
"must": [
{"term": {"field1": "keyword1"}},
{"term": {"field2": "keyword2"}}
]
}
},
"aggs": {
"by_term": {
"terms": {"field": "filed3", "size": "10"},
"aggs": {"sum_per_field": {"sum":{"field": "field4"}}}
}
}
}
GET /index/_search
{
"query": {
"bool": {
"must": [
{"term": {"field1": "keyword1"}},
{"term": {"field2": "keyword2"}}
]
}
},
"aggs": {
"by_date": {
"date_histogram": {"field": "ds", "interval": "day"},
"aggs": {"sum_per_day": {"sum":{"field": "field3"}}}
}
}
}
# range query
GET /index/_search
{
"query": {
"bool": {
"filter": [
{"range": {"timestamp": {"gt": "now-4d/d", "lte": "now-3d/d"}}}
]
}
}
}
# filter source fields
GET /index/_search
{
"_source": ["term1", "term2.*"]
}
# sort
GET /index/_search
{
"sort": [
{
"timestamp": {
"order": "desc"
}
}
]
}
创建Index
# create index with mappings
PUT /index_name
{
"mappings": {
"_doc": {
"properties": {
"field_keyword": {"type": "keyword"},
"field_text": {"type": "text"},
"field_date1": {"type": "date", "format": "epoch_second"},
"field_date2": {"type": "date", "format": "yyyyMMdd"},
"field_location": {"type": "geo_point"},
"field_int": {"type": "integer"},
"field_bool": {"type": "boolean"},
"field_float": {"type": "float"},
"field_double": {"type": "double"},
"field_alias": {"type": "alias", "path": "poi_id"}
}
}
},
"settings": {
"number_of_replicas": 0
},
"aliases": {"the_alias": {}}
}
# create alias
POST /_aliases
{
"actions":[
{"remove": {
"index": "old_index",
"alias": "index_alias"
}},
{"add": {
"index": "new_index",
"alias": "index_alias"
}}
]
}
# Only one index per alias can be assigned to be the write index at a time
# If no write index is specified and there are multiple indices referenced by an alias, then writes will not be allowed.
POST /_aliases
{
"actions":[
{"add": {
"index": "new_index",
"alias": "index_alias",
"is_write_index": true
}},
{"add": {
"index": "old_index",
"alias": "index_alias",
"is_write_index": false
}}
]
}
插入数据
POST /index/_doc
{
"field1": "data1",
"field2": "data2"
}
更新
update
# add to value
POST test/_update/1
{
"script" : {
"source": "ctx._source.counter += params.count",
"lang": "painless",
"params" : {
"count" : 4
}
}
}
# add to a list
POST test/_update/1
{
"script": {
"source": "ctx._source.tags.add(params.tag)",
"lang": "painless",
"params": {
"tag": "blue"
}
}
}
# remove from list
POST test/_update/1
{
"script": {
"source": "if (ctx._source.tags.contains(params.tag)) { ctx._source.tags.remove(ctx._source.tags.indexOf(params.tag)) }",
"lang": "painless",
"params": {
"tag": "blue"
}
}
}
update by query
# increment
POST my-index-000001/_update_by_query
{
"script": {
"source": "ctx._source.count++",
"lang": "painless"
},
"query": {
"term": {
"user.id": "kimchy"
}
}
}
# add field
POST my-index-000001/_update_by_query
{
"script": {
"source": "ctx._source.newField='abc'",
"lang": "painless"
}
}
# remove field
POST my-index-000001/_update_by_query
{
"script": {
"source": "ctx._source.remove(\"oldField\")",
"lang": "painless"
}
}
删除
delete by query
POST /index/_doc/_delete_by_query
{
"query": {
"bool": {
"must": [
{"term": {"field1": "keyword1"}},
{"term": {"field2": "keyword2"}}
]
}
}
}
允许/禁止批量删除
PUT /_cluster/settings
{
"transient" : {
"action.destructive_requires_name" : false
}
}
Index管理
Index名匹配
文档:
格式:<static_name{date_math_expr{date_format|time_zone}}>
示例:
# 以前一年年份开头的indices
GET _cat/indices/<ioa-*-{now-1y{yyyy|+08:00}}*>?s=index&v
修改Index配置
PUT /my_index/_settings
{
"index" : {
"number_of_replicas": 2,
"max_result_window": 10000000
}
}
给Index创建模版
在每个index创建时都设立mapping比较麻烦,可以通过索引模版,为每个匹配索引名pattern的索引在创建时自动建立mapping或其他setting
GET /_cat/templates?v&s=name
PUT _template/template_1
{
"index_patterns": ["test-*", "template-*"],
"template": {
"settings": {
"number_of_shards": 1
},
"mappings": {
"properties": {
"host_name": {
"type": "keyword"
},
"timestamp": {
"type": "date",
"format": "EEE MMM dd HH:mm:ss Z yyyy"
}
}
}
}
}
生命周期管理
自己写crontab脚本管理index的创建、滚动、删除等比较麻烦,可以使用ES的Index Lifetime Management功能来管理(注:ILM为x-pack功能,需要basic的license)
# 创建ILM策略
PUT _ilm/policy/my_policy
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "25GB"
}
}
},
"delete": {
"min_age": "30d",
"actions": {
"delete": {}
}
}
}
}
}
# 查看策略
GET _ilm/policy/my_policy
# 创建索引时指定ILM策略
PUT my-index-1
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"index.lifecycle.name": "my_policy"
}
}
# 创建模版时指定ILM策略
# 使用了rollover的策略必须通过template来创建
PUT _index_template/timeseries_template
{
"index_patterns": ["my-index-*"],
"template": {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"index.lifecycle.name": "my_policy"
}
}
}
# 检测策略运行进程
GET my-index-*/_ilm/explain
Force Merge
Lucene中一个index分成多个segment保存,一般来说ES会自动周期性地把小的segment合成较大的segment,但是也可以手动执行
当index无需写入时,可以将他们合并成一个segment,降低资源占用
POST /.ds-logs-000001/_forcemerge?max_num_segments=1
Reindex
可以将多个index合并成一个,例如将按天的index合并成按月的
POST /_reindex?pretty
{
"conflicts": "proceed",
"source": {
"index": "myindex-1-2021.*"
},
"dest": {
"index": "myindex-1-2021",
"op_type": "create"
}
}
可以将数据从别的集群迁移过来
POST _reindex
{
"source": {
"remote": {
"host": "http://oldhost:9200",
"username": "user",
"password": "pass"
},
"index": "source",
},
"dest": {
"index": "dest"
}
}
Freeze
被冻结的index可以降低内存占用,但是搜索会变慢,且是只读的
Creating frozen indices with the Elasticsearch Freeze index API | Elastic Blog
# it’s recommended to first run a force merge before freeze
curl -XPOST "http://{ip}:9200/my-index-000001/_forcemerge?max_num_segments=1
# freeze
curl -XPOST "http://{ip}:9200/my-index-000001/_freeze"
# search freezed indices
curl -XGET "http://{ip}:9200/my-index-000001/_search?ignore_throttled=false"
# `pre_filter_shard_size` specifies a threshold that, when exceeded, will enforce a round-trip to pre-filter search shards that cannot possibly match
# default is 128
curl -XGET "http://{ip}:9200/my-index-000001/_search?ignore_throttled=false&pre_filter_shard_size=1"
# get throttled indices
curl -XGET "http://{ip}:9200/_cat/indices/twitter?v&h=i,sth&s=sth"
# unfreeze
curl -XPOST "http://{ip}:9200/my-index-000001/_unfreeze"
Pipeline
POST _ingest/pipeline/test
{
"description": "splog connnect log pipeline",
"processors": [
{
"set": {
"field": "auth.ProcName",
"value": "",
"if": "ctx?.auth?.ProcName == 'moa'"
}
}
]
}
测试pipeline
POST /_ingest/pipeline/test/_simulate
{
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "bar",
"auth":{
"ProcName": "moa",
"UserName": "abc"
}
}
}
]
}
Frozen indices
Roll up
将历史数据以低维的方式保存,只保存部份字段的统计值,可以在不影响分析和报表的情况下节省存储资源
Client APIs
Python API
Install
pip3 install elasticsearch
Usage
from elasticsearch import Elasticsearch, helpers
es = Elasticsearch([f'http://{ES_USER}:{ES_PASS}@{ES_HOST}:{ES_PORT}'])
# create index
index_body = {
"mappings":{
"_doc":{
"properties": properties
}
},
"settings": {"number_of_replicas": 0}
}
es.indices.delete(index_name, body=index_body)
# chech index existence
es.indices.exists(index=index_name)
# delete index
es.indices.delete(index_name)
# query
query = {
'size': 10,
'query': {'range': {'@timestamp': {'gte': starttime, 'lt': endtime}}}
}
res = es.search(index=index_name, body=query)
# bulk insert
actions = [
{
'_index': index_name,
'_type': '_doc',
'_source': {'field1': 'val1', 'field2': 'val2'}
},
]
helpers.bulk(es, actions)
Java API
建议使用 REST API,比 JAVA API 兼容性更好
Template of Common Task
在ES中建立索引并定义类型
PUT /index_name
{
"mappings": {
"_doc": {
"properties": {
"field1": {"type": "keyword"},
"field2": {"type": "integer"}
}
}
},
"settings": {
"number_of_replicas": 0
}
}
在Hive中创建外部表
add jar hdfs:///data/script/poseidon/yzchen/elasticsearch-hadoop-6.4.2/dist/elasticsearch-hadoop-6.4.2.jar;
CREATE EXTERNAL TABLE index_name_to_elk (
field1 string,
field2 int
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'index_name/_doc',
'es.index.auto.create' = 'false',
'es.nodes' = 'http://9.24.140.147,http://9.24.140.148,http://9.24.140.149,http://9.24.140.150,http://9.24.140.152',
'es.port'='9200'
);
导入数据
insert overwrite table index_name_to_elk
select field1, field2
from index_name
监控
启动
curl -XPUT http://elastic:123456@localhost:9200/_cluster/settings -d '{"persistent":{"xpack.monitoring.collection.enabled": true}}' -H 'Content-Type: application/json'
curl http://elastic:123456@localhost:9200/_cluster/settings
查看监控指标
query:
{
"size": 1,
"sort": [
{
"timestamp": {
"order": "desc"
}
}
],
"query":{
"bool": {
"must": [
{"term": {"type": "node_stats"}}
]
}
},
"_source": ["source_node"]
}
search:
curl http://elastic:123456@localhost:9200/.monitor-es-*/_search \
-H 'Content-Type: application/json' \
-d @query.json \
| json_pp
备份与恢复
Elasticsearch backup and restore
Elasticdump
# 安装
npm install elasticdump
cd node_modules/elasticdump
./bin/elasticdump \
--input=http://production.es.com:9200/my_index \
--output=http://staging.es.com:9200/my_index \
--limit 10000 \
--type=data
# docker
docker pull elasticdump/elasticsearch-dump
docker run --rm -ti elasticdump/elasticsearch-dump \
--input=http://production.es.com:9200/my_index \
--output=http://staging.es.com:9200/my_index \
--limit 10000 \
--type=data
# 用multielasticdump备份所有索引
# dump
multielasticdump --direction=dump --input=http://localhost:9200 --output=/home/backup
# load
multielasticdump --direction=load --input=/home/backup --output=http://localhost:9200
Snapshot
创建备份路径,并添加到配置,然后重启es服务。如果是docker运行的,记得把备份文件夹映射到容器里
mkdir /home/backup
chown elasticsearch:elasticsearch /home/backup
echo 'path.repo: ["/home/backup"]'>>/etc/elasticsearch/elasticsearch.yml
systemctl restart elasticsearch
创建本地snapshot仓库
PUT /_snapshot/my_backup
{
"type": "fs",
"settings": {
"location": "/home/backup"
}
}
# 查看仓库
GET /_snapshot/my_backup
# 查看所有仓库
GET /_snapshot
# 删除仓库
DELETE /_snapshot/my_backup
创建snapshot
PUT /_snapshot/my_backup/snapshot_1?wait_for_completion=true
restore
POST /_snapshot/my_backup/snapshot_1/_restore
压力测试
Install
sudo yum install pbzip2
conda create --name rally python=3.8.13
conda activate rally
pip3 install --upgrade pip
pip3 install esrally
Usage
# 创建 track
esrally create-track --track={name} --target-hosts={host}:9200 --client-options=basic_auth_user:"'elastic',basic_auth_password:'{password}'" --indices="index-*" --output-path=./tracks
# race
esrally race --distribution-version=7.6.2 --track=geonames
# race with custom track
esrally race --distribution-version=7.6.2 --track-path=./tracks/mytrack
# race with target host
esrally race --target-hosts={host}:9200 --client-options=basic_auth_user:"'elastic',basic_auth_password:'{password}'" --track=geonames
第三方平台对接
Hive与Elasticsearch互导中的问题
忽略错误
参考ES-Hadoop的Error Handlers,在TBLPROPERTIES中加入
'es.write.rest.error.handlers'='log'
'es.write.rest.error.handler.log.logger.name'='BulkErrors'
调整Bulk Size
参考ES-Hadoop的Configuration,在TBLPROPERTIES中加入
'es.batch.size.bytes'='1mb'
'es.batch.size.entries'='200'
'es.batch.write.refresh'='false'
重试策略
'es.batch.write.retry.count'='3'
'es.batch.write.retry.wait'='10s'
减少Hive的Mapper数
set mapred.max.split.size=100000000;
set mapred.min.split.size.per.node=100000000;
set mapred.min.split.size.per.rack=100000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
分页查询
-
Do you need to paginate over more than 10,000 documents? If so, you should use search_after or scroll.
-
Do you need to keep the page contents consistent? If so, use search_after with the point in time API.
-
Do you need to keep the page contents consistent but do not have access to ELK version 7.10? If so, use the scroll API.
-
Do you need to support multiple searches at once? If so, you should avoid using scroll because of its high memory requirements.
使用from
/size
参数
GET my_index/_search
{
"from": 0,
"size": 10
}
不能超过10000条,否则需要修改 index.max_result_window
参数
PUT /my_index/_settings
{
"index" : {
"number_of_replicas": 2,
"max_result_window": 10000000}
}
}
使用scroll
7.6版本可用
https://www.elastic.co/guide/en/elasticsearch/reference/7.6/search-request-body.html
GET my_index/_search?scroll=1m
{
"size": 2
}
返回会包含_scroll_id
,使用它进行滚动搜索
POST my_index/_search/scroll
{
"scroll" : "1m",
"scroll_id" : "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAxYWSWFkTnRhZ0pRTnFXZVpXb0hmT1NiZw=="
}
搜索完后释放scroll
DELETE my_index/_search/scroll
{
"scroll_id" : "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAxYWSWFkTnRhZ0pRTnFXZVpXb0hmT1NiZw=="
}
# 删除所有scroll
DELETE my_index/_search/scroll/_all?pretty
使用search_after
如果排序查询,返回的 hit 中会包含一个 sort 字段,将该字段作为 search_after 参数传入下一次查询,配合 size 参数可以实现分页查询,效率比 from/size 高
GET my_index/_search
{
"sort": [
{"@timestamp": {"order": "desc"}}
],
"from": 0,
"size": 10000
}
返回
{
"hits": {
"hits": [
{
"_id": "",
"_source": {},
"sort": [1711077629975]
}
]
}
}
在下一次查询中传入 search_after
GET my_index/_search
{
"sort": [
{"@timestamp": {"order": "desc"}}
],
"size": 10000,
"search_after": [1711077629975]
}
使用 PIT
使用 Elasticsearch 时间点读取器获得随时间推移而保持一致的数据视图
版本7.10之后,可以设置Points in time (PITs) ,来保证搜索一致性
# 创建PIT,使其在1分钟之内有效
POST /my-index-000001/_pit?keep_alive=1m
# 返回pit id
{
"id": "46ToAwM"
}
# 搜索时传入PIT
GET /my-index-000001/_search
{
"size":10000,
"pit": {
"id" : "46ToAwM"
},
"sort": [
{"@timestamp": {"order": "asc"}}
]
}
# 返回中会带sort的游标
{
"pit_id" : "46ToAw",
"hits" : {
"total" : ...,
"hits" : [
...
{
"sort" : [
"2021-05-20T05:30:04.832Z",
4294967298
]
}
]
}
}
# 查找下一页时带上search_after
GET /my-index-000001/_search
{
"size":10000,
"pit": {
"id" : "46ToAwM"
},
"sort": [
{"@timestamp": {"order": "asc"}}
],
"search_after": [
4294967298
],
}
# 删除PIT
DELETE /_pit
{
"id" : "46ToAwM"
}
Kibana
Run Kibana on Docker
docker pull docker.elastic.co/kibana/kibana:7.9.1
docker run -d --link elasticsearch -p 8080:5601 --name kibana docker.elastic.co/kibana/kibana:7.9.1
常用查询
GET /
GET /_cat/indices?v&s=index
GET /_cat/indices/<ioa-*>,<filebeat-*>?v&s=index
GET MY_INDEX/_search
{
"size": 1,
"sort": [{"@timestamp": {"order": "desc"}}]
}
GET MY_INDEX/_search
{
"query": {
"term": {
"FIELD": {
"value": VALUE
}
}
}
}
GET MY_INDEX/_search
{
"aggs": {
"NAME": {
"terms": {
"field": "AGG_FIELD",
"size": 10
}
}
}
}
GET _cat/templates?v&s=name
调优
- 禁用节点 swap
- 设置
bootstrap.memory_lock
为true
- 设置
问题
Unassigned shards
# 查看unassigned shards
curl -XGET "http://{ip}:9200/_cat/shards?v&h=index,shard,prirep,state,node,unassigned.reason&s=state"
# 查看原因
curl -XGET "http://{ip}:9200/_cluster/allocation/explain?pretty"
# 8.0+
curl -XGET "http://{ip}:9200/_cluster/allocation/explain?pretty" -H 'Content-Type: application/json' -d'{
"index": "my-index-000001",
"shard": 0,
"primary": true
}'
# 查看硬盘使用
curl "http://{ip}:9200/_cat/allocation?v"
# 清理unassigned shards
curl -XGET "http://{ip}:9200/_cat/shards" | grep UNASSIGNED | awk {'print $1'} | xargs -i curl -XDELETE "http://{ip}:9200/{}"
# reroute
curl -XPOST "http://{ip}:9200/_cluster/reroute" -H 'Content-Type: application/json' -d '{
"commands" : [
{
"allocate_replica" : {
"index" : "index_name",
"shard" : 0,
"node" : "node_id"
}
}
]
}'
Heap size
推荐 heap 设置为 1/2 机器内存,不超过 32 G
推荐 Garbage Collector 为 G1
Less is more
较小的 Heap 能更频繁地触发 GC,因为长 GC 而暂停的风险会更小。同时能留给系统更多的空间做 cache,Lucene 和 ES 都会用到 cache。
Circuit breaks
- 一般 50% 节点内存分配给 heap size用于索引、查询等操作,另外50% 节点内存留给操作系统做 caching
- head usage 一般在 30 - 70% 震荡,因为 JVM 会不断增加对 heap 的使用直到触发 garbage collection,如果 GC 后 heap usage 不能下降到 30% 左右说明 GC 已经不够用了
- Young GC 一般 10s 以上执行一次,每次耗时 50 ms,Old GC 一般 10 min 以上执行一次,每次耗时 1 s
查看 cluster 统计
curl -XGET "http://{ip}:9200/_cluster/stats?pretty"
查看 circuit breaker 配置
curl -XGET "http://{ip}:9200/_nodes/stats/breaker"
- Fielddata circuit breaker
- memory used by fielddata in your indices
- Solution: index 中避免使用
fielddata
- Request circuit breaker
- memory required based on the request structures
- Increase in the number of requests to the cluster
- Aggregation, wildcards, and using wide time ranges in your queries
- Solution: avoid the use of aggregations with a large size value
- memory required based on the request structures
- Inflight requests circuit breaker
- the size of active transport and http requests for the node based on the byte size of those requests
- Solution: reduce the size of bulk requests, particularly if those requests contain large documents
- Script compilation circuit breaker
- it limits the number of times a script can be compiled in a given period
- Solution: use stored scripts with parameters instead of inline ones
- Parent circuit breakers
- the sum of all memory being used across the different types of circuit breakers
- Accounting circuit breakers
- to protect the node from over usage of memory due to things that persist in memory after a request has completed, such as lucene segments before they are flushed to disk.
修改 circuit breaker 设置
curl -XPUT "http://{ip}:9200/_cluster/settings" -H 'Content-Type: application/json' -d'{
"transient": {"indices.breaker.total.limit":"5GB" }
}'
# 恢复默认
curl -XPUT "http://{ip}:9200/_cluster/settings" -H 'Content-Type: application/json' -d'{
"transient": {"indices.breaker.total.limit":null }
}'
References
- Circuit Breaker
- Circuit breaker settings
- Elasticsearch Circuit Breaker Exceptions: How to Handle Circuit Breakers
- Elasticsearch Heap Size Usage and JVM Garbage Collection
- How do I troubleshoot a circuit breaker exception in Amazon OpenSearch Service?
- ES相关诊断及内存优化总结
Slow query
设置慢查询日志
curl -XPUT "http://{ip}:9200/_cluster/settings" -H 'Content-Type: application/json' -d '{
"index.search.slowlog.level": "info",
"index.search.slowlog.threshold.query.warn": "10s",
"index.search.slowlog.threshold.query.info": "5s",
"index.search.slowlog.threshold.query.debug": "2s",
"index.search.slowlog.threshold.query.trace": "500ms",
"index.search.slowlog.threshold.fetch.warn": "1s",
"index.search.slowlog.threshold.fetch.info": "800ms",
"index.search.slowlog.threshold.fetch.debug": "500ms",
"index.search.slowlog.threshold.fetch.trace": "200ms",
"index.indexing.slowlog.threshold.index.warn": "10s",
"index.indexing.slowlog.threshold.index.info": "5s",
"index.indexing.slowlog.threshold.index.debug": "2s",
"index.indexing.slowlog.threshold.index.trace": "500ms"
}'
开启慢查询日志对性能有一点影响,最好 JVM 内存压力在 85% 以下才开启
Oversharding
Because the cluster state is loaded into the heap on every node (including the masters), and the amount of heap is directly proportional to the number of indices, fields per index and shards
数据分片过多,导致 ES 集群需要耗费资源维护众多小分片的 metadata
20 shards per GB of heap memory (version 7.9)
References
Index mapping explosions
- 使用
strict
模式
curl -XPUT "http://{ip}:9200/_cluster/settings" -H 'Content-Type: application/json' -d '{
"mappings": {
"dynamic": "strict",
"properties": {
"message": {
"type": "text"
},
"transaction": {
"properties": {
"user": {
"type": "keyword"
},
"amount": {
"type": "long"
}
}
}
}
}
}
- 通过
"dynamic": "false"
使字段不检查类型,但不可查
curl -XPUT "http://{ip}:9200/_cluster/settings" -H 'Content-Type: application/json' -d '{
"mappings": {
"dynamic": "strict",
"properties": {
"message": {
"type": "text"
},
"transaction": {
"dynamic": "false",
"properties": {
"user": {
"type": "keyword"
},
"amount": {
"type": "long"
}
}
}
}
}
}
- 设置字段数量上限:
index.mapping.total_fields.limit
Max shard
curl -XPUT "http://{ip}:9200/_cluster/settings" -H 'Content-Type: application/json' -d '{
"persistent" : {
"cluster.max_shards_per_node" : 2000
}
}'
Index closed
open all closed indices
curl -s "http://{ip}:9200/_cat/indices" | grep close | awk {'print $3'} | xargs -i curl -XPOST "http://{ip}:9200/{}/_open"
write with refresh
在Elasticsearch中,索引文档最终会被存储在称为“segment”的文件中。这些segment是Elasticsearch的低级存储单元,它们包含了文档的数据和倒排索引,用于执行搜索操作。以下是使用?refresh=true
参数对segment可能产生的影响:
- 实时性增强:使用
?refresh=true
可以确保文档在索引后立即对搜索可见,这是因为每次写入操作后,Elasticsearch都会立即触发一个refresh操作,将新的文档数据刷新到segment中。 - 更多的segment:频繁的refresh操作会导致更多的segment生成。因为每次refresh都会创建一个新的segment,如果大量文档被连续索引,并且每个文档后面都跟着一个refresh操作,那么会生成大量的segment。
- 内存和性能开销:每个segment都需要占用一定的内存,频繁的refresh操作会增加内存的使用量,因为每个新创建的segment都需要被加载到内存中。此外,频繁的refresh操作也会增加I/O开销,因为每个新segment都需要被写入磁盘。
- 合并压力:Elasticsearch会定期执行后台的segment合并操作,将多个小的segment合并成更大的segment,以优化存储效率和搜索性能。频繁的refresh操作会增加合并的压力,因为有更多的segment需要合并。
- 写入性能影响:由于每次refresh操作都需要创建新的segment,这可能会降低写入性能,尤其是在高负载的情况下。
- 搜索性能:虽然使用
?refresh=true
可以提高文档的实时可见性,但是过多的segment可能会对搜索性能产生负面影响,因为搜索操作需要在更多的segment中查找数据。
在生产环境中,通常建议:
- 使用默认的refresh间隔(通常是1秒)
- 在写入大量数据时,避免使用
?refresh=true
,以减少对性能的影响 - 在确实需要实时可见性的情况下,才使用
?refresh=true