首先先下载一个叫"elasticsearch-hadoop-hive"的 JAR 包,放到相应路径下:https://jar-download.com/artifacts/org.elasticsearch/elasticsearch-hadoop-hive

Hive 数据导入 Elasticsearch

1. 在 ES 中建立 index,并定义数据格式和属性

  PUT /index_name
  {
    "mappings": {
      "_doc": {
        "properties": {
          "field1": {"type": "keyword"},
          "field2": {"type": "integer"}
        }
      }
    },
    "settings": {
      "number_of_replicas": 0
    }
  }

建 index 的 API 可以参考https://www.elastic.co/guide/en/elasticsearch/reference/7.2/indices-create-index.html

定义 ES 中数据格式可以参考https://www.elastic.co/guide/en/elasticsearch/reference/7.2/mapping-types.html

2. 在 Hive 中建立定义映射关系外部表

add jar path/to/elasticsearch-hadoop-6.4.2.jar;
CREATE EXTERNAL TABLE index_name_to_es (
    field1 string,
    field2 int
    )
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'index_name/_doc',
    'es.index.auto.create' = 'false',
    'es.nodes' = 'http://XXX.XXX.XXX.XXX',
    'es.port'='9200'
);

也可以不用做第一步提前在 ES 中建 index,让 ES 自动解析数据类型,那么这里建外部表时设置属性'es.index.auto.create' = 'true'

3. 导入数据

add jar path/to/elasticsearch-hadoop-6.4.2.jar;
insert overwrite table index_name_to_es
select field1, field2
from index_name;

Elasticsearch 数据导入 Hive

与上面类似,不过步骤反过来

1. 在 Hive 中建好存数据的管理表

create table index_name (
    field1 string,
    field2 int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
LINES TERMINATED BY '\n';

2. 在 Hive 中建立定义映射关系外部表

add jar path/to/elasticsearch-hadoop-6.4.2.jar;
CREATE EXTERNAL TABLE index_name_from_es (
    field1 string,
    field2 int
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'index_name/_doc',
    'es.query' = '?q=field2:[100 TO 1000]',
    'es.nodes' = 'http://XXX.XXX.XXX.XXX',
    'es.port'='9200'
);

这里可以把 ES 的过滤条件写在es.query的属性中,例如上例中只查询filed2字段在范围 100 到 1000 以内的数据。

query 的写法可以参考https://www.elastic.co/guide/en/elasticsearch/reference/7.2/search-uri-request.html

3. 导入数据

add jar path/to/elasticsearch-hadoop-6.4.2.jar;
insert overwrite table index_name
select filed1, field2
from  index_name_from_es;