ELK-Elasticsearch

Run on Docker

docker pull docker.elastic.co/elasticsearch/elasticsearch:7.9.1
docker run -d -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" --name elasticsearch docker.elastic.co/elasticsearch/elasticsearch:7.9.1

常用查询

# 查看集群健康
curl "http://localhost:9200/_cat/health?v"

# 查看节点
curl "http://localhost:9200/_cat/nodes?v"

# 查看index
curl "http://localhost:9200/_cat/indices?v&s=index"

# 查看index按大小排序
curl "http://localhost:9200/_cat/indices?v&s=store.size:desc"

# 查看数据,按照field1和field2的值过滤
curl "http://localhost:9200/my-index/_search?q=field1:100&q=field2:abc"

# 查看数据,返回field1和field2并按field1排序
curl "http://localhost:9200/my-index/_search?_source=field1,field2&sort=field1:desc"

# POST
curl -XPOST "http://localhost:9200/my-index/_search?" -H 'Content-Type: application/json' -d '{"size":3, "query": {"term":{"Name":"Tom"}}}'

# 查看配置
curl "http://localhost:9200/_cluster/settings?include_defaults=true&pretty"

# 查看统计
curl "http://localhost:9200/_nodes/stats?pretty"

APIs

cat APIs

cat APIs

Common

# verbose,显示 header
curl -XGET "http://{ip}:9200/_cat/master?v"

# help,显示说明
# 在 Kibana 中可以输入命令后按 `cmd + /` 查询文档
curl -XGET "http://{ip}:9200/_cat/master?help"

# 输出指定 headers
curl -XGET "http://{ip}:9200/_cat/master?h=ip,port,name"

# 排序
curl -XGET "http://{ip}:9200/_cat/indices?v&s=index
curl -XGET "http://{ip}:9200/_cat/indices?v&s=index:asc" # 升序
curl -XGET "http://{ip}:9200/_cat/indices?v&s=index:desc" # 降序
curl -XGET "http://{ip}:9200/_cat/templates?v&s=order:desc,index_patterns" # 多级排序

# 输出格式:format=text|json|yaml|smile|cbors
curl -XGET "http://{ip}:9200/_cat/indices?format=json&pretty"

# 输出单位:bytes=b|kb|mb|gb|tb|pb, 
curl -XGET "http://{ip}:9200/_cat/indices?s=store.size:desc&bytes=b"

APIs

Alias

Returns information about currently configured aliases to indices, including filter and routing information

curl -XGET "http://{ip}:9200/_cat/aliases?v"

Allocation

Provides a snapshot of the number of shards allocated to each data node and their disk space

curl -XGET "http://{ip}:9200/_cat/allocation?v"
curl -XGET "http://{ip}:9200/_cat/allocation/<node_id>?v"

Count

Provides quick access to a document count of individual indices or all indices in a cluster

curl -XGET "http://{ip}:9200/_cat/count/<index>"
curl -XGET "http://{ip}:9200/_cat/count"

Health

Returns the health status of a cluster

curl -XGET "http://{ip}:9200/_cat/health"

Indices

Returns high-level information about indices in a cluster

curl -XGET "http://{ip}:9200/_cat/indices"
curl -XGET "http://{ip}:9200/_cat/indices/<index>"

Master

Returns information about the master node

curl -XGET "http://{ip}:9200/_cat/master"

Nodes

Returns information about a cluster’s nodes

curl -XGET "http://{ip}:9200/_cat/nodes"

# 查看 heap 使用率
curl -XGET "http://{ip}:9200/_cat/nodes?v&h=name,node*,heap*"

# 查看 request/query cache 占用
curl -XGET "http://{ip}:9200/_cat/nodes?v&h=ip,heap.percent,request_cache.memory_size,query_cache.memory_size

Tasks

Returns cluster-level changes that have not yet been executed

curl -XGET "http://{ip}:9200/_cat/pending_tasks"

Returns information about tasks currently executing in the cluster

curl -XGET "http://{ip}:9200/_cat/tasks"

Recovery

Returns information about ongoing and completed shard recoveries

curl -XGET "http://{ip}:9200/_cat/recovery"
curl -XGET "http://{ip}:9200/_cat/recovery/<index>"

Repositories

Returns the snapshot repositories for a cluster

curl -XGET "http://{ip}:9200/_cat/repositories"

Thread pool

Returns thread pool statistics for each node in a cluste

curl -XGET "http://{ip}:9200/_cat/thread_pool/<thread_pool>"
curl -XGET "http://{ip}:9200/_cat/thread_pool"
curl -XGET "http://{ip}:9200/_cat/thread_pool/write?v"
curl -XGET "http://{ip}:9200/_cat/thread_pool/search?v"

Shards

curl -XGET "http://{ip}:9200/_cat/shards/<index>"
curl -XGET "http://{ip}:9200/_cat/shards"

Segments

Returns low-level information about the Lucene segments in index shards

curl -XGET "http://{ip}:9200/_cat/segments"
curl -XGET "http://{ip}:9200/_cat/segments/<index>"

curl -XGET "http://{ip}:9200/_cat/segments?v&bytes=mb&h=index,docs.count,size,size.memory&s=size.memory:desc"

Snapshots

Returns information about the snapshots stored in one or more repositories

curl -XGET "http://{ip}:9200/_cat/snapshots/<repository>"

Templates

Returns information about index templates in a cluster

curl -XGET "http://{ip}:9200/_cat/templates/<template_name>"
curl -XGET "http://{ip}:9200/_cat/templates"

cluster APIs

Cluster APIs

Allocation explain

Provides explanations for shard allocations in the cluster.

可以查看 unassigned shards 的原因

curl -XGET "http://{ip}:9200/_cluster/allocation/explain"
{
  "index": "myindex",
  "shard": 0,
  "primary": false
}

Settings

查看集群配置

curl -XGET "http://{ip}:9200/_cluster/settings?flat_settings=true&include_defaults=true&pretty"

更新集群配置

# persistent
curl -XPUT "http://{ip}:9200/_cluster/settings
{
  "persistent" : {
    "indices.recovery.max_bytes_per_sec" : "50mb"
  }
}

# transient
curl -XPUT "http://{ip}:9200/_cluster/settings
{
  "transient" : {
    "indices.recovery.max_bytes_per_sec" : "20mb"
  }
}

# 取消配置
curl -XPUT "http://{ip}:9200/_cluster/settings
{
  "transient" : {
    "indices.recovery.*" : null
  }
}

Health

Returns the health status of a cluster

curl -XGET "http://{ip}:9200/_cluster/health
curl -XGET "http://{ip}:9200/_cluster/health/<index>
curl -XGET "http://{ip}:9200/_cluster/health/<index>?level=shard

Reroute

Changes the allocation of shards in a cluster

curl -XPOST "localhost:9200/_cluster/reroute?pretty" -H 'Content-Type: application/json' -d '{
    "commands" : [
        {
            "move" : {
                "index" : "test", "shard" : 0,
                "from_node" : "node1", "to_node" : "node2"
            }
        },
        {
          "allocate_replica" : {
                "index" : "test", "shard" : 1,
                "node" : "node3"
          }
        }
    ]
}'

State

Returns metadata about the state of the cluster.

# metric=_all|blocks|master_node|metadata|nodes|routing_nodes|routing_table|version
curl -XGET "http://{ip}:9200/_cluster/state/<metrics>/<index>"

Stats

Returns cluster statistics.

curl -XGET "http://{ip}:9200/_cluster/stats
curl -XGET "http://{ip}:9200/_cluster/stats/nodes/<node_filter>

nodes APIs

Info

Returns cluster nodes information.

curl -XGET "http://{ip}:9200/_nodes"
curl -XGET "http://{ip}:9200/_nodes/<node_id>"
curl -XGET "http://{ip}:9200/_nodes/<metric>"
curl -XGET "http://{ip}:9200/_nodes/<node_id>/<metric>"

Usage

Returns information on the usage of features.

curl -XGET "http://{ip}:9200/_nodes/usage"
curl -XGET "http://{ip}:9200/_nodes/<node_id>/usage"
curl -XGET "http://{ip}:9200/_nodes/usage/<metric>"
curl -XGET "http://{ip}:9200/_nodes/<node_id>/usage/<metric>"

Hot thread

Returns the hot threads on each selected node in the cluster.

curl -XGET "http://{ip}:9200/_nodes/hot_threads"
curl -XGET "http://{ip}:9200/_nodes/<node_id>/hot_threads"

Stats

Returns cluster nodes statistics.

# metric=breaker|fs|http|indexing_pressure|indices|jvm|os|process|thread_pool|transport
curl -XGET "http://{ip}:9200/_nodes/stats"
curl -XGET "http://{ip}:9200/_nodes/<node_id>/stats"
curl -XGET "http://{ip}:9200/_nodes/stats/<metric>"
curl -XGET "http://{ip}:9200/_nodes/<node_id>/stats/<metric>"
curl -XGET "http://{ip}:9200/_nodes/stats/<metric>/<index_metric>"
curl -XGET "http://{ip}:9200/_nodes/<node_id>/stats/<metric>/<index_metric>"

增删改查

查询

SQL

GET _sql?format=txt
{
  "query": "SELECT * from \"your_index\""
}

Query String

GET /_search
{
  "query": {
    "query_string": {
      "query": "(new york city) OR (big apple)",
      "default_field": "content"
    }
  }
}
curl "http://localhost:9200/your_index?q=content:apple&size=3&pretty"

curl -G "http://localhost:9200/your_index/_search?size=3&pretty" --data-urlencode "q=status:200 AND clientip:172.110.15.0"

curl -G "http://localhost:9200/your_index/_search?size=3&pretty" --data-urlencode 'q=request:"POST *"'

query-string-syntax

Query

GET /index/_search
{
  "size": 10,
  "query": {
    "bool": {
      "must": [
        {"term": {"field1": "keyword1"}},
        {"term": {"field2": "keyword2"}}
      ]
    }
  },
  "sort": [{"field1":{ "order":"asc"}}]
}

GET /index/_search
{
  "query": {
    "bool": {
      "must": [
        {"term": {"field1": "keyword1"}},
        {"term": {"field2": "keyword2"}}
      ]
      }
    },
    "aggs": {
    "name": {"sum": {"field": "field1"}}
  }
}

GET /index/_search
{
  "query": {
    "bool": {
      "must": [
        {"term": {"field1": "keyword1"}},
        {"term": {"field2": "keyword2"}}
      ]
    }
  },
  "aggs": {
    "by_term": {
      "terms": {"field": "filed3", "size": "10"},
      "aggs": {"sum_per_field": {"sum":{"field": "field4"}}}
    }
  }
}

GET /index/_search
{
  "query": {
    "bool": {
      "must": [
        {"term": {"field1": "keyword1"}},
        {"term": {"field2": "keyword2"}}
      ]
    }
  },
  "aggs": {
    "by_date": {
      "date_histogram": {"field": "ds", "interval": "day"},
      "aggs": {"sum_per_day": {"sum":{"field": "field3"}}}
    }
  }
}

# range query
GET /index/_search
{
  "query": {
    "bool": {
      "filter": [
        {"range": {"timestamp": {"gt": "now-4d/d", "lte": "now-3d/d"}}}
      ]
    }
  }
}

# filter source fields
GET /index/_search
{
  "_source": ["term1", "term2.*"]
}

# sort
GET /index/_search
{
  "sort": [
    {
      "timestamp": {
        "order": "desc"
      }
    }
  ]
}

创建Index

# create index with mappings
PUT /index_name
{
  "mappings": {
    "_doc": {
      "properties": {
        "field_keyword": {"type": "keyword"},
        "field_text": {"type": "text"},
        "field_date1": {"type": "date", "format": "epoch_second"},
        "field_date2": {"type": "date", "format": "yyyyMMdd"},
        "field_location": {"type": "geo_point"},
        "field_int": {"type": "integer"},
        "field_bool": {"type": "boolean"},
        "field_float": {"type": "float"},
        "field_double": {"type": "double"},
        "field_alias": {"type": "alias", "path": "poi_id"}
      }
    }
  },
  "settings": {
    "number_of_replicas": 0
  },
  "aliases": {"the_alias": {}}
}

# create alias
POST /_aliases
{
  "actions":[
    {"remove": {
      "index": "old_index",
      "alias": "index_alias"
    }},
    {"add": {
      "index": "new_index",
      "alias": "index_alias"
    }}
  ]
}

# Only one index per alias can be assigned to be the write index at a time
# If no write index is specified and there are multiple indices referenced by an alias, then writes will not be allowed.

POST /_aliases
{
  "actions":[
    {"add": {
      "index": "new_index",
      "alias": "index_alias",
      "is_write_index": true
    }},
    {"add": {
      "index": "old_index",
      "alias": "index_alias",
      "is_write_index": false
    }}
  ]
}

插入数据

POST /index/_doc
{
  "field1": "data1",
  "field2": "data2"
}

更新

update

# add to value
POST test/_update/1
{
  "script" : {
    "source": "ctx._source.counter += params.count",
    "lang": "painless",
    "params" : {
      "count" : 4
    }
  }
}

# add to a list
POST test/_update/1
{
  "script": {
    "source": "ctx._source.tags.add(params.tag)",
    "lang": "painless",
    "params": {
      "tag": "blue"
    }
  }
}

# remove from list
POST test/_update/1
{
  "script": {
    "source": "if (ctx._source.tags.contains(params.tag)) { ctx._source.tags.remove(ctx._source.tags.indexOf(params.tag)) }",
    "lang": "painless",
    "params": {
      "tag": "blue"
    }
  }
}

update by query

# increment
POST my-index-000001/_update_by_query
{
  "script": {
    "source": "ctx._source.count++",
    "lang": "painless"
  },
  "query": {
    "term": {
      "user.id": "kimchy"
    }
  }
}

# add field
POST my-index-000001/_update_by_query
{
  "script": {
    "source": "ctx._source.newField='abc'",
    "lang": "painless"
  }
}

# remove field
POST my-index-000001/_update_by_query
{
  "script": {
    "source": "ctx._source.remove(\"oldField\")",
    "lang": "painless"
  }
}

删除

delete by query

POST /index/_doc/_delete_by_query
{
  "query": {
    "bool": {
      "must": [
        {"term": {"field1": "keyword1"}},
        {"term": {"field2": "keyword2"}}
      ]
    }
  }
}

允许/禁止批量删除

PUT /_cluster/settings
{
  "transient" : {
    "action.destructive_requires_name" : false
  }
}

Index管理

Index名匹配

文档:

格式:<static_name{date_math_expr{date_format|time_zone}}>

示例:

# 以前一年年份开头的indices
GET _cat/indices/<ioa-*-{now-1y{yyyy|+08:00}}*>?s=index&v

修改Index配置

PUT /my_index/_settings
{
  "index" : {
    "number_of_replicas": 2,
    "max_result_window": 10000000
  }
}

给Index创建模版

在每个index创建时都设立mapping比较麻烦,可以通过索引模版,为每个匹配索引名pattern的索引在创建时自动建立mapping或其他setting

GET /_cat/templates?v&s=name

PUT _template/template_1
{
  "index_patterns": ["test-*", "template-*"],
  "template": {
    "settings": {
      "number_of_shards": 1
    },
    "mappings": {
      "properties": {
        "host_name": {
          "type": "keyword"
        },
        "timestamp": {
          "type": "date",
          "format": "EEE MMM dd HH:mm:ss Z yyyy"
        }
      }
    }
  }
}

生命周期管理

自己写crontab脚本管理index的创建、滚动、删除等比较麻烦,可以使用ES的Index Lifetime Management功能来管理(注:ILM为x-pack功能,需要basic的license)

# 创建ILM策略
PUT _ilm/policy/my_policy
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_size": "25GB"
          }
        }
      },
      "delete": {
        "min_age": "30d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

# 查看策略
GET _ilm/policy/my_policy

# 创建索引时指定ILM策略
PUT my-index-1
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 1,
    "index.lifecycle.name": "my_policy"
  }
}

# 创建模版时指定ILM策略
# 使用了rollover的策略必须通过template来创建
PUT _index_template/timeseries_template
{
  "index_patterns": ["my-index-*"],
  "template": {
    "settings": {
      "number_of_shards": 1,
      "number_of_replicas": 1,
      "index.lifecycle.name": "my_policy"
    }
  }
}

# 检测策略运行进程
GET my-index-*/_ilm/explain

Force Merge

Lucene中一个index分成多个segment保存,一般来说ES会自动周期性地把小的segment合成较大的segment,但是也可以手动执行

当index无需写入时,可以将他们合并成一个segment,降低资源占用

POST /.ds-logs-000001/_forcemerge?max_num_segments=1

Reindex

可以将多个index合并成一个,例如将按天的index合并成按月的

POST /_reindex?pretty
{
    "conflicts": "proceed",
    "source": {
        "index": "myindex-1-2021.*"
    },
    "dest": {
        "index": "myindex-1-2021",
        "op_type": "create"
    }
}

可以将数据从别的集群迁移过来

POST _reindex
{
  "source": {
    "remote": {
      "host": "http://oldhost:9200",
      "username": "user",
      "password": "pass"
    },
    "index": "source",
  },
  "dest": {
    "index": "dest"
  }
}

Freeze

被冻结的index可以降低内存占用,但是搜索会变慢,且是只读的

Frozen indices

Searching a frozen index

Creating frozen indices with the Elasticsearch Freeze index API | Elastic Blog

# it’s recommended to first run a force merge before freeze
curl -XPOST "http://{ip}:9200/my-index-000001/_forcemerge?max_num_segments=1

# freeze
curl -XPOST "http://{ip}:9200/my-index-000001/_freeze"

# search freezed indices
curl -XGET "http://{ip}:9200/my-index-000001/_search?ignore_throttled=false"

# `pre_filter_shard_size` specifies a threshold that, when exceeded, will enforce a round-trip to pre-filter search shards that cannot possibly match
# default is 128
curl -XGET "http://{ip}:9200/my-index-000001/_search?ignore_throttled=false&pre_filter_shard_size=1"

# get throttled indices
curl -XGET "http://{ip}:9200/_cat/indices/twitter?v&h=i,sth&s=sth"

# unfreeze
curl -XPOST "http://{ip}:9200/my-index-000001/_unfreeze"

Pipeline

POST _ingest/pipeline/test
{
  "description": "splog connnect log pipeline",
  "processors": [
    {
      "set": {
        "field": "auth.ProcName",
        "value": "",
        "if": "ctx?.auth?.ProcName == 'moa'"
      }
    }
  ]
}

测试pipeline

POST /_ingest/pipeline/test/_simulate
{
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "bar",
        "auth":{
          "ProcName": "moa",
          "UserName": "abc"
        }
      }
    }
  ]
}

Frozen indices

Frozen indices

Roll up

Roll up or transform your data

将历史数据以低维的方式保存,只保存部份字段的统计值,可以在不影响分析和报表的情况下节省存储资源

Client APIs

Python API

Python Elasticsearch Client

Install

pip3 install elasticsearch

Usage

from elasticsearch import Elasticsearch, helpers

es = Elasticsearch([f'http://{ES_USER}:{ES_PASS}@{ES_HOST}:{ES_PORT}'])

# create index
index_body = {
    "mappings":{
        "_doc":{
            "properties": properties
        }
    },
    "settings": {"number_of_replicas": 0}
}
es.indices.delete(index_name, body=index_body)

# chech index existence
es.indices.exists(index=index_name)

# delete index
es.indices.delete(index_name)

# query
query = {
    'size': 10,
    'query': {'range': {'@timestamp': {'gte': starttime, 'lt': endtime}}}
}
res = es.search(index=index_name, body=query)

# bulk insert
actions = [
    {
        '_index': index_name,
        '_type': '_doc',
        '_source': {'field1': 'val1', 'field2': 'val2'}
    },
]
helpers.bulk(es, actions)

Java API

Java REST Client

建议使用 REST API,比 JAVA API 兼容性更好

Template of Common Task

在ES中建立索引并定义类型

PUT /index_name
{
    "mappings": {
        "_doc": {
            "properties": {
                "field1": {"type": "keyword"},
                "field2": {"type": "integer"}
            }
        }
    },
    "settings": {
        "number_of_replicas": 0
    }
}

在Hive中创建外部表

add jar hdfs:///data/script/poseidon/yzchen/elasticsearch-hadoop-6.4.2/dist/elasticsearch-hadoop-6.4.2.jar;
CREATE EXTERNAL TABLE index_name_to_elk (
    field1 string,
    field2 int
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'index_name/_doc',
    'es.index.auto.create' = 'false',
    'es.nodes' = 'http://9.24.140.147,http://9.24.140.148,http://9.24.140.149,http://9.24.140.150,http://9.24.140.152',
    'es.port'='9200'
);

导入数据

insert overwrite table index_name_to_elk
select field1, field2
from index_name

监控

启动

curl -XPUT http://elastic:123456@localhost:9200/_cluster/settings -d '{"persistent":{"xpack.monitoring.collection.enabled": true}}' -H 'Content-Type: application/json'

curl http://elastic:123456@localhost:9200/_cluster/settings

查看监控指标

query:

{
  "size": 1,
  "sort": [
    {
      "timestamp": {
        "order": "desc"
      }
    }
  ],
  "query":{
    "bool": {
      "must": [
        {"term": {"type": "node_stats"}}
      ]
    }
  },
  "_source": ["source_node"]
}

search:

curl http://elastic:123456@localhost:9200/.monitor-es-*/_search \
    -H 'Content-Type: application/json' \
    -d @query.json \
    | json_pp

备份与恢复

Elasticsearch backup and restore

Elasticdump

Github - elasticseach-dump

# 安装
npm install elasticdump
cd node_modules/elasticdump
./bin/elasticdump \
  --input=http://production.es.com:9200/my_index \
  --output=http://staging.es.com:9200/my_index \
  --limit 10000 \
  --type=data

# docker
docker pull elasticdump/elasticsearch-dump
docker run --rm -ti elasticdump/elasticsearch-dump \
  --input=http://production.es.com:9200/my_index \
  --output=http://staging.es.com:9200/my_index \
  --limit 10000 \
  --type=data
  
# 用multielasticdump备份所有索引
# dump
multielasticdump --direction=dump --input=http://localhost:9200 --output=/home/backup
# load
multielasticdump --direction=load --input=/home/backup --output=http://localhost:9200

Snapshot

创建备份路径,并添加到配置,然后重启es服务。如果是docker运行的,记得把备份文件夹映射到容器里

mkdir /home/backup
chown elasticsearch:elasticsearch /home/backup
echo 'path.repo: ["/home/backup"]'>>/etc/elasticsearch/elasticsearch.yml
systemctl restart elasticsearch

创建本地snapshot仓库

PUT /_snapshot/my_backup
{
  "type": "fs",
  "settings": {
    "location": "/home/backup"
  }
}

# 查看仓库
GET /_snapshot/my_backup

# 查看所有仓库
GET /_snapshot

# 删除仓库
DELETE /_snapshot/my_backup

创建snapshot

PUT /_snapshot/my_backup/snapshot_1?wait_for_completion=true

restore

POST /_snapshot/my_backup/snapshot_1/_restore

压力测试

Install

sudo yum install pbzip2

conda create --name rally python=3.8.13
conda activate rally
pip3 install --upgrade pip
pip3 install esrally

Usage

# 创建 track
esrally create-track --track={name} --target-hosts={host}:9200 --client-options=basic_auth_user:"'elastic',basic_auth_password:'{password}'" --indices="index-*" --output-path=./tracks

# race 
esrally race --distribution-version=7.6.2 --track=geonames

# race with custom track
esrally race --distribution-version=7.6.2 --track-path=./tracks/mytrack

# race with target host
esrally race --target-hosts={host}:9200 --client-options=basic_auth_user:"'elastic',basic_auth_password:'{password}'" --track=geonames

第三方平台对接

Hive与Elasticsearch互导中的问题

忽略错误

参考ES-Hadoop的Error Handlers,在TBLPROPERTIES中加入

'es.write.rest.error.handlers'='log'
'es.write.rest.error.handler.log.logger.name'='BulkErrors'

调整Bulk Size

参考ES-Hadoop的Configuration,在TBLPROPERTIES中加入

'es.batch.size.bytes'='1mb'
'es.batch.size.entries'='200'
'es.batch.write.refresh'='false'

重试策略

'es.batch.write.retry.count'='3'
'es.batch.write.retry.wait'='10s'

减少Hive的Mapper数

set mapred.max.split.size=100000000;
set mapred.min.split.size.per.node=100000000;
set mapred.min.split.size.per.rack=100000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

分页查询

  • Do you need to paginate over more than 10,000 documents? If so, you should use search_after or scroll.

  • Do you need to keep the page contents consistent? If so, use search_after with the point in time API.

  • Do you need to keep the page contents consistent but do not have access to ELK version 7.10? If so, use the scroll API.

  • Do you need to support multiple searches at once? If so, you should avoid using scroll because of its high memory requirements.

使用from/size参数

GET my_index/_search
{
    "from": 0,
    "size": 10
}

不能超过10000条,否则需要修改 index.max_result_window参数

PUT /my_index/_settings
{
  "index" : {
    "number_of_replicas": 2,
    "max_result_window": 10000000}
  }
}

使用scroll

7.6版本可用

https://www.elastic.co/guide/en/elasticsearch/reference/7.6/search-request-body.html

GET my_index/_search?scroll=1m
{
  "size": 2
}

返回会包含_scroll_id,使用它进行滚动搜索

POST my_index/_search/scroll 
{
    "scroll" : "1m", 
    "scroll_id" : "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAxYWSWFkTnRhZ0pRTnFXZVpXb0hmT1NiZw==" 
}

搜索完后释放scroll

DELETE my_index/_search/scroll
{
    "scroll_id" : "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAxYWSWFkTnRhZ0pRTnFXZVpXb0hmT1NiZw=="
}

# 删除所有scroll
DELETE my_index/_search/scroll/_all?pretty

使用search_after

如果排序查询,返回的 hit 中会包含一个 sort 字段,将该字段作为 search_after 参数传入下一次查询,配合 size 参数可以实现分页查询,效率比 from/size 高

GET my_index/_search
{
    "sort": [
        {"@timestamp": {"order": "desc"}}
    ],
    "from": 0,
    "size": 10000
}

返回

{
    "hits": {
    	"hits": [
            {
                "_id": "",
                "_source": {},
                "sort": [1711077629975]
            }
        ]
    }
}

在下一次查询中传入 search_after

GET my_index/_search
{
    "sort": [
        {"@timestamp": {"order": "desc"}}
    ],
    "size": 10000,
    "search_after": [1711077629975]
}

使用 PIT

使用 Elasticsearch 时间点读取器获得随时间推移而保持一致的数据视图

版本7.10之后,可以设置Points in time (PITs) ,来保证搜索一致性

# 创建PIT,使其在1分钟之内有效
POST /my-index-000001/_pit?keep_alive=1m

# 返回pit id
{
    "id": "46ToAwM"
}

# 搜索时传入PIT
GET /my-index-000001/_search
{
    "size":10000,
    "pit": {
        "id" : "46ToAwM"
    },
    "sort": [ 
        {"@timestamp": {"order": "asc"}}
    ]
}

# 返回中会带sort的游标
{
  "pit_id" : "46ToAw", 
  "hits" : {
    "total" : ...,
    "hits" : [
      ...
      {
        "sort" : [                                
          "2021-05-20T05:30:04.832Z",
          4294967298                              
        ]
      }
    ]
  }
}

# 查找下一页时带上search_after
GET /my-index-000001/_search
{
    "size":10000,
    "pit": {
        "id" : "46ToAwM"
    },
    "sort": [ 
        {"@timestamp": {"order": "asc"}}
    ],
    "search_after": [
        4294967298
    ],
}

# 删除PIT
DELETE /_pit
{
    "id" : "46ToAwM"
}

Kibana

Run Kibana on Docker

docker pull docker.elastic.co/kibana/kibana:7.9.1
docker run -d --link elasticsearch -p 8080:5601 --name kibana docker.elastic.co/kibana/kibana:7.9.1

常用查询

GET /

GET /_cat/indices?v&s=index

GET /_cat/indices/<ioa-*>,<filebeat-*>?v&s=index

GET MY_INDEX/_search
{
  "size": 1,
  "sort": [{"@timestamp": {"order": "desc"}}]
}

GET MY_INDEX/_search
{
  "query": {
    "term": {
      "FIELD": {
        "value": VALUE
      }
    }
  }
}

GET MY_INDEX/_search
{
  "aggs": {
    "NAME": {
      "terms": {
        "field": "AGG_FIELD",
        "size": 10
      }
    }
  }
}

GET _cat/templates?v&s=name

调优

  • 禁用节点 swap
    • 设置 bootstrap.memory_locktrue

问题

Managing and troubleshooting Elasticsearch memory

Unassigned shards

# 查看unassigned shards
curl -XGET "http://{ip}:9200/_cat/shards?v&h=index,shard,prirep,state,node,unassigned.reason&s=state"

# 查看原因
curl -XGET "http://{ip}:9200/_cluster/allocation/explain?pretty"
# 8.0+
curl -XGET "http://{ip}:9200/_cluster/allocation/explain?pretty" -H 'Content-Type: application/json' -d'{
  "index": "my-index-000001", 
  "shard": 0, 
  "primary": true 
}'

# 查看硬盘使用
curl "http://{ip}:9200/_cat/allocation?v"

# 清理unassigned shards
curl -XGET "http://{ip}:9200/_cat/shards" | grep UNASSIGNED | awk {'print $1'} | xargs -i curl -XDELETE "http://{ip}:9200/{}"

# reroute
curl -XPOST "http://{ip}:9200/_cluster/reroute" -H 'Content-Type: application/json' -d '{
    "commands" : [
        {
          "allocate_replica" : {
                "index" : "index_name",
                "shard" : 0,
                "node" : "node_id"
          }
        }
    ]
}'

Heap size

A Heap of Trouble: Managing Elasticsearch’s Managed Heap

推荐 heap 设置为 1/2 机器内存,不超过 32 G

推荐 Garbage Collector 为 G1

Less is more

较小的 Heap 能更频繁地触发 GC,因为长 GC 而暂停的风险会更小。同时能留给系统更多的空间做 cache,Lucene 和 ES 都会用到 cache。

Circuit breaks

  • 一般 50% 节点内存分配给 heap size用于索引、查询等操作,另外50% 节点内存留给操作系统做 caching
  • head usage 一般在 30 - 70% 震荡,因为 JVM 会不断增加对 heap 的使用直到触发 garbage collection,如果 GC 后 heap usage 不能下降到 30% 左右说明 GC 已经不够用了
  • Young GC 一般 10s 以上执行一次,每次耗时 50 ms,Old GC 一般 10 min 以上执行一次,每次耗时 1 s

查看 cluster 统计

curl -XGET "http://{ip}:9200/_cluster/stats?pretty"

查看 circuit breaker 配置

curl -XGET "http://{ip}:9200/_nodes/stats/breaker"
  • Fielddata circuit breaker
    • memory used by fielddata in your indices
    • Solution: index 中避免使用 fielddata
  • Request circuit breaker
    • memory required based on the request structures
      • Increase in the number of requests to the cluster
      • Aggregation, wildcards, and using wide time ranges in your queries
    • Solution: avoid the use of aggregations with a large size value
  • Inflight requests circuit breaker
    • the size of active transport and http requests for the node based on the byte size of those requests
    • Solution: reduce the size of bulk requests, particularly if those requests contain large documents
  • Script compilation circuit breaker
    • it limits the number of times a script can be compiled in a given period
    • Solution: use stored scripts with parameters instead of inline ones
  • Parent circuit breakers
    • the sum of all memory being used across the different types of circuit breakers
  • Accounting circuit breakers
    • to protect the node from over usage of memory due to things that persist in memory after a request has completed, such as lucene segments before they are flushed to disk.

修改 circuit breaker 设置

curl -XPUT "http://{ip}:9200/_cluster/settings" -H 'Content-Type: application/json' -d'{
  "transient": {"indices.breaker.total.limit":"5GB" }
}'

# 恢复默认
curl -XPUT "http://{ip}:9200/_cluster/settings" -H 'Content-Type: application/json' -d'{
  "transient": {"indices.breaker.total.limit":null }
}'

References

Slow query

设置慢查询日志

curl -XPUT "http://{ip}:9200/_cluster/settings" -H 'Content-Type: application/json' -d '{
    "index.search.slowlog.level": "info",
    "index.search.slowlog.threshold.query.warn": "10s",
    "index.search.slowlog.threshold.query.info": "5s",
    "index.search.slowlog.threshold.query.debug": "2s",
    "index.search.slowlog.threshold.query.trace": "500ms",
    "index.search.slowlog.threshold.fetch.warn": "1s",
    "index.search.slowlog.threshold.fetch.info": "800ms",
    "index.search.slowlog.threshold.fetch.debug": "500ms",
    "index.search.slowlog.threshold.fetch.trace": "200ms",
    "index.indexing.slowlog.threshold.index.warn": "10s",
    "index.indexing.slowlog.threshold.index.info": "5s",
    "index.indexing.slowlog.threshold.index.debug": "2s",
    "index.indexing.slowlog.threshold.index.trace": "500ms"
}'

开启慢查询日志对性能有一点影响,最好 JVM 内存压力在 85% 以下才开启

Oversharding

Because the cluster state is loaded into the heap on every node (including the masters), and the amount of heap is directly proportional to the number of indices, fields per index and shards

数据分片过多,导致 ES 集群需要耗费资源维护众多小分片的 metadata

20 shards per GB of heap memory (version 7.9)

References

Index mapping explosions

  • 使用 strict 模式
curl -XPUT "http://{ip}:9200/_cluster/settings" -H 'Content-Type: application/json' -d '{
 "mappings": {
   "dynamic": "strict",
   "properties": {
     "message": {
       "type": "text"
     },
     "transaction": {
       "properties": {
         "user": {
           "type": "keyword"
         },
         "amount": {
           "type": "long"
         }
       }
     }
   }
 }
}
  • 通过 "dynamic": "false" 使字段不检查类型,但不可查
curl -XPUT "http://{ip}:9200/_cluster/settings" -H 'Content-Type: application/json' -d '{
  "mappings": {
    "dynamic": "strict",
    "properties": {
      "message": {
        "type": "text"
      },
      "transaction": {
        "dynamic": "false",
        "properties": {
          "user": {
            "type": "keyword"
          },
          "amount": {
            "type": "long"
          }
        }
      }
    }
  }
}
  • 设置字段数量上限:index.mapping.total_fields.limit

Max shard

curl -XPUT "http://{ip}:9200/_cluster/settings" -H 'Content-Type: application/json' -d '{
  "persistent" : {
    "cluster.max_shards_per_node" : 2000 
  }
}'

Index closed

open all closed indices

curl -s "http://{ip}:9200/_cat/indices" | grep close | awk {'print $3'} | xargs -i curl -XPOST "http://{ip}:9200/{}/_open"

write with refresh

refresh

在Elasticsearch中,索引文档最终会被存储在称为“segment”的文件中。这些segment是Elasticsearch的低级存储单元,它们包含了文档的数据和倒排索引,用于执行搜索操作。以下是使用?refresh=true参数对segment可能产生的影响:

  1. 实时性增强:使用?refresh=true可以确保文档在索引后立即对搜索可见,这是因为每次写入操作后,Elasticsearch都会立即触发一个refresh操作,将新的文档数据刷新到segment中。
  2. 更多的segment:频繁的refresh操作会导致更多的segment生成。因为每次refresh都会创建一个新的segment,如果大量文档被连续索引,并且每个文档后面都跟着一个refresh操作,那么会生成大量的segment。
  3. 内存和性能开销:每个segment都需要占用一定的内存,频繁的refresh操作会增加内存的使用量,因为每个新创建的segment都需要被加载到内存中。此外,频繁的refresh操作也会增加I/O开销,因为每个新segment都需要被写入磁盘。
  4. 合并压力:Elasticsearch会定期执行后台的segment合并操作,将多个小的segment合并成更大的segment,以优化存储效率和搜索性能。频繁的refresh操作会增加合并的压力,因为有更多的segment需要合并。
  5. 写入性能影响:由于每次refresh操作都需要创建新的segment,这可能会降低写入性能,尤其是在高负载的情况下。
  6. 搜索性能:虽然使用?refresh=true可以提高文档的实时可见性,但是过多的segment可能会对搜索性能产生负面影响,因为搜索操作需要在更多的segment中查找数据。

在生产环境中,通常建议:

  • 使用默认的refresh间隔(通常是1秒)
  • 在写入大量数据时,避免使用?refresh=true,以减少对性能的影响
  • 在确实需要实时可见性的情况下,才使用?refresh=true