文章目录
- 前言
- 一、客户端API
- 二、API操作索引库
- 1. mapping映射分析
- 2.初始化RestClient
- 3. 索引库CRUD
- 3.1 创建索引库
- 3.2 删除索引库
- 3.3 查询索引库
- 三、API操作文档
- 1. 初始化RestClient
- 2. 文档CRUD
- 2.0 批量导入文档
- 2.1 批量新增文档
- 2.2 查询文档
- 2.3 批量删除文档
- 2.4 批量修改文档
前言
RestAPI
ES官方提供了各种不同语言的客户端,用来操作ES。这些客户端的本质就是组装DSL语句,通过http请求发送给ES。
一、客户端API
ES官方提供了各种不同语言的客户端,用来操作ES。这些客户端的本质就是组装DSL语句,通过http请求发送给ES。
官方文档地址:https://www.elastic.co/guide/en/Elasticsearch/client/index.html
其中的Java Rest Client又包括两种:
Java Low Level Rest ClientJava High Level Rest Client
我们使用的是Java HighLevel Rest Client客户端API
二、API操作索引库
JavaRestClient操作Elasticsearch的流程基本类似。核心是client.indices()方法来获取索引库的操作对象。
索引库操作的基本步骤:【可以根据发送请求那步的第一个参数,发过来判断需要创建什么XXXXRequest】
- 初始化RestHighLevelClient
- 创建XxxIndexRequest。XXX是Create、Get、Delete
- 准备DSL( Create时需要,其它是无参)
- 发送请求。调用RestHighLevelClient#indices().xxx()方法,xxx是create、exists、delete
1. mapping映射分析
根据MySQL数据库表结构(建表语句),去写索引库结构JSON。表和索引库一一对应
注意:地理坐标、组合字段。索引库里的地理坐标是一个字段:
坐标:维度,精度。copy_to组合字段作用是供用户查询(输入关键字可以查询多个字段)
创建索引库,最关键的是mapping映射,而mapping映射要考虑的信息包括:
- 字段名
- 字段数据类型
- 是否参与搜索
- 是否需要分词
- 如果分词,分词器是什么?
其中:
- 字段名、字段数据类型,可以参考数据表结构的名称和类型
- 是否参与搜索要分析业务来判断,例如图片地址,就无需参与搜索
- 是否分词呢要看内容,内容如果是一个整体就无需分词,反之则要分词
- 分词器,我们可以统一使用ik_max_word
来看下酒店数据的索引库结构:
PUT/hotel{"mappings":{"properties":{"id":{"type":"keyword"},"name":{"type":"text","analyzer":"ik_max_word","copy_to":"all"},"address":{"type":"keyword","index":false},"price":{"type":"integer"},"score":{"type":"integer"},"brand":{"type":"keyword","copy_to":"all"},"city":{"type":"keyword","copy_to":"all"},"starName":{"type":"keyword"},"business":{"type":"keyword"},"location":{"type":"geo_point"},"pic":{"type":"keyword","index":false},"all":{"type":"text","analyzer":"ik_max_word"}}}}几个特殊字段说明:
- location:地理坐标,里面包含精度、纬度
- all:一个组合字段,其目的是将多字段的值 利用copy_to合并,提供给用户搜索
地理坐标说明:
copy_to说明:
2.初始化RestClient
在Elasticsearch提供的API中,与Elasticsearch一切交互都封装在一个名为RestHighLevelClient的类中,必须先完成这个对象的初始化,建立与Elasticsearch的连接。
分为三步:
1)引入es的RestHighLevelClient依赖:
<dependency><groupId>org.Elasticsearch.client</groupId><artifactId>Elasticsearch-rest-high-level-client</artifactId></dependency>2)因为SpringBoot默认的ES版本是7.6.2,所以我们需要覆盖默认的ES版本:
<properties><java.version>1.8</java.version><Elasticsearch.version>7.12.1</Elasticsearch.version></properties>3)初始化RestHighLevelClient:这里一般在启动类或者配置类里注入该Bean,用于告诉Java 访问ES的ip地址
初始化的代码如下:
@BeanpublicRestHighLevelClientclient(){returnnewRestHighLevelClient(RestClient.builder(HttpHost.create("http://192.168.150.101:9200")));}这里为了单元测试方便,我们创建一个测试类HotelIndexTest,然后将初始化的代码编写在@BeforeEach方法中:
packagecn.itcast.hotel;importorg.apache.http.HttpHost;importorg.Elasticsearch.client.RestHighLevelClient;importorg.junit.jupiter.api.AfterEach;importorg.junit.jupiter.api.BeforeEach;importorg.junit.jupiter.api.Test;importjava.io.IOException;publicclassHotelIndexTest{privateRestHighLevelClientclient;@BeforeEachvoidsetUp(){this.client=newRestHighLevelClient(RestClient.builder(HttpHost.create("http://192.168.150.101:9200")));}@AfterEachvoidtearDown()throwsIOException{this.client.close();}}3. 索引库CRUD
3.1 创建索引库
代码分为三步:
- 1)创建Request对象。因为是创建索引库的操作,因此Request是CreateIndexRequest。
- 2)添加请求参数,其实就是DSL的JSON参数部分。因为json字符串很长,这里是定义了静态字符串常量MAPPING_TEMPLATE,让代码看起来更加优雅。
- 3)发送请求,client.indices()方法的返回值是IndicesClient类型,封装了所有与索引库操作有关的方法。
创建索引库的API如下:
代码:
在hotel-demo的cn.itcast.hotel.constants包下,创建一个类,定义mapping映射的JSON字符串常量:
packagecn.itcast.hotel.constants;publicclassHotelConstants{publicstaticfinalStringMAPPING_TEMPLATE="{\n"+" \"mappings\": {\n"+" \"properties\": {\n"+" \"id\": {\n"+" \"type\": \"keyword\"\n"+" },\n"+" \"name\":{\n"+" \"type\": \"text\",\n"+" \"analyzer\": \"ik_max_word\",\n"+" \"copy_to\": \"all\"\n"+" },\n"+" \"address\":{\n"+" \"type\": \"keyword\",\n"+" \"index\": false\n"+" },\n"+" \"price\":{\n"+" \"type\": \"integer\"\n"+" },\n"+" \"score\":{\n"+" \"type\": \"integer\"\n"+" },\n"+" \"brand\":{\n"+" \"type\": \"keyword\",\n"+" \"copy_to\": \"all\"\n"+" },\n"+" \"city\":{\n"+" \"type\": \"keyword\",\n"+" \"copy_to\": \"all\"\n"+" },\n"+" \"starName\":{\n"+" \"type\": \"keyword\"\n"+" },\n"+" \"business\":{\n"+" \"type\": \"keyword\"\n"+" },\n"+" \"location\":{\n"+" \"type\": \"geo_point\"\n"+" },\n"+" \"pic\":{\n"+" \"type\": \"keyword\",\n"+" \"index\": false\n"+" },\n"+" \"all\":{\n"+" \"type\": \"text\",\n"+" \"analyzer\": \"ik_max_word\"\n"+" }\n"+" }\n"+" }\n"+"}";}在hotel-demo中的HotelIndexTest测试类中,编写单元测试,实现创建索引:
@TestvoidcreateHotelIndex()throwsIOException{// 1.创建Request对象CreateIndexRequestrequest=newCreateIndexRequest("hotel");// 2.准备请求的参数:DSL语句request.source(MAPPING_TEMPLATE,XContentType.JSON);// 3.发送请求client.indices().create(request,RequestOptions.DEFAULT);}3.2 删除索引库
三步走:
- 1)创建Request对象。这次是DeleteIndexRequest对象
- 2)准备参数。这里是无参
- 3)发送请求。改用delete方法
删除索引库的DSL语句非常简单:
DELETE/hotel在hotel-demo中的HotelIndexTest测试类中,编写单元测试,实现删除索引:
@TestvoidtestDeleteHotelIndex()throwsIOException{// 1.创建Request对象DeleteIndexRequestrequest=newDeleteIndexRequest("hotel");// 2.发送请求client.indices().delete(request,RequestOptions.DEFAULT);}3.3 查询索引库
三步走:
- 1)创建Request对象。这次是GetIndexRequest对象
- 2)准备参数。这里是无参
- 3)发送请求。改用exists方法
判断索引库是否存在,本质就是查询,对应的DSL是:
GET/hotel@TestvoidtestExistsHotelIndex()throwsIOException{// 1.创建Request对象GetIndexRequestrequest=newGetIndexRequest("hotel");// 2.发送请求booleanexists=client.indices().exists(request,RequestOptions.DEFAULT);// 3.输出System.err.println(exists?"索引库已经存在!":"索引库不存在!");}三、API操作文档
这里更多的是先读取Mysql中的数据,然后再存进ES中。
文档操作的基本步骤:【可以根据发送请求那步的第一个参数,发过来判断需要创建什么XXXXRequest】
- 初始化RestHighLevelClient
- 创建XxxRequest。XXX是Index、Get、Update、Delete、Bulk
- 准备参数(Index、Update、Bulk时需要)
- 发送请求。调用RestHighLevelClient#.xxx()方法,xxx是index、get、update、delete、bulk
- 解析结果(Get时需要)
1. 初始化RestClient
在Elasticsearch提供的API中,与Elasticsearch一切交互都封装在一个名为RestHighLevelClient的类中,必须先完成这个对象的初始化,建立与Elasticsearch的连接。
分为三步:
1)引入es的RestHighLevelClient依赖:
<dependency><groupId>org.Elasticsearch.client</groupId><artifactId>Elasticsearch-rest-high-level-client</artifactId></dependency>2)因为SpringBoot默认的ES版本是7.6.2,所以我们需要覆盖默认的ES版本:
<properties><java.version>1.8</java.version><Elasticsearch.version>7.12.1</Elasticsearch.version></properties>3)初始化RestHighLevelClient:这里一般写在最前面,用于告诉Java 访问ES的ip地址
初始化的代码如下:
RestHighLevelClientclient=newRestHighLevelClient(RestClient.builder(HttpHost.create("http://192.168.150.101:9200")));这里为了单元测试方便,我们创建一个测试类HotelIndexTest,然后将初始化的代码编写在@BeforeEach方法中:
packagecn.itcast.hotel;importorg.apache.http.HttpHost;importorg.Elasticsearch.client.RestHighLevelClient;importorg.junit.jupiter.api.AfterEach;importorg.junit.jupiter.api.BeforeEach;importorg.junit.jupiter.api.Test;importjava.io.IOException;publicclassHotelIndexTest{privateRestHighLevelClientclient;@BeforeEachvoidsetUp(){this.client=newRestHighLevelClient(RestClient.builder(HttpHost.create("http://192.168.150.101:9200")));}@AfterEachvoidtearDown()throwsIOException{this.client.close();}}2. 文档CRUD
2.0 批量导入文档
三步走:
- 1)创建Request对象。这里是BulkRequest
- 2)准备参数。批处理的参数,就是其它Request对象,这里就是多个IndexRequest
- 3)发起请求。这里是批处理,调用的方法为client.bulk()方法
案例需求:利用BulkRequest批量将数据库数据导入到索引库中。
步骤如下:
利用mybatis-plus查询酒店数据
将查询到的酒店数据(Hotel)转换为文档类型数据(HotelDoc)
利用JavaRestClient中的BulkRequest批处理,实现批量新增文档
语法说明:
批量处理BulkRequest,其本质就是将多个普通的CRUD请求组合在一起发送。
其中提供了一个add方法,用来添加其他请求:
可以看到,能添加的请求包括:
- IndexRequest,也就是新增
- UpdateRequest,也就是修改
- DeleteRequest,也就是删除
因此Bulk中添加了多个IndexRequest,就是批量新增功能了。示例:
我们在导入酒店数据时,将上述代码改造成for循环处理即可。
在hotel-demo的HotelDocumentTest测试类中,编写单元测试:
@TestvoidtestBulkRequest()throwsIOException{// 批量查询酒店数据List<Hotel>hotels=hotelService.list();// 1.创建RequestBulkRequestrequest=newBulkRequest();// 2.准备参数,添加多个新增的Requestfor(Hotelhotel:hotels){// 2.1.转换为文档类型HotelDocHotelDochotelDoc=newHotelDoc(hotel);// 2.2.创建新增文档的Request对象request.add(newIndexRequest("hotel").id(hotelDoc.getId().toString()).source(JSON.toJSONString(hotelDoc),XContentType.JSON));}// 3.发送请求client.bulk(request,RequestOptions.DEFAULT);}2.1 批量新增文档
四步走:
- 0)创建索引库实体类
- 1)创建Request对象
- 2)准备请求参数,也就是DSL中的JSON文档
- 3)发送请求 (注意:这里直接使用client.xxx()的API,不再需要client.indices()了)
我们要将数据库的酒店数据查询出来,写入Elasticsearch中。
1)创建索引库实体类
一般实体类里包含经纬度都需要创建一个新的实体类,将经纬度拼成一个字段
数据库查询后的结果是一个Hotel类型的对象。结构如下:
@Data@TableName("tb_hotel")publicclassHotel{@TableId(type=IdType.INPUT)privateLongid;privateStringname;privateStringaddress;privateIntegerprice;privateIntegerscore;privateStringbrand;privateStringcity;privateStringstarName;privateStringbusiness;privateStringlongitude;privateStringlatitude;privateStringpic;}与我们的索引库结构存在差异:
- longitude和latitude需要合并为location
因此,我们需要定义一个新的类型,与索引库结构吻合:
packagecn.itcast.hotel.pojo;importlombok.Data;importlombok.NoArgsConstructor;@Data@NoArgsConstructorpublicclassHotelDoc{privateLongid;privateStringname;privateStringaddress;privateIntegerprice;privateIntegerscore;privateStringbrand;privateStringcity;privateStringstarName;privateStringbusiness;privateStringlocation;privateStringpic;publicHotelDoc(Hotelhotel){this.id=hotel.getId();this.name=hotel.getName();this.address=hotel.getAddress();this.price=hotel.getPrice();this.score=hotel.getScore();this.brand=hotel.getBrand();this.city=hotel.getCity();this.starName=hotel.getStarName();this.business=hotel.getBusiness();this.location=hotel.getLatitude()+", "+hotel.getLongitude();this.pic=hotel.getPic();}}2)新增代码
新增文档的DSL语句如下:
POST/{索引库名}/_doc/1{"name":"Jack","age":21}对应的java代码如图:
我们导入酒店数据,基本流程一致,但是需要考虑几点变化:
- 酒店数据来自于数据库,我们需要先查询出来,得到hotel对象
- hotel对象需要转为HotelDoc对象
- HotelDoc需要序列化为json格式
在hotel-demo的HotelDocumentTest测试类中,编写单元测试:
@TestvoidtestAddDocument()throwsIOException{// 批量查询酒店数据List<Hotel>hotels=hotelService.list();// 1.创建RequestBulkRequestrequest=newBulkRequest();// 2.准备参数,添加多个新增的Requestfor(Hotelhotel:hotels){// 2.1.转换为文档类型HotelDocHotelDochotelDoc=newHotelDoc(hotel);// 2.2.创建新增文档的Request对象request.add(newIndexRequest("hotel").id(hotelDoc.getId().toString()).source(JSON.toJSONString(hotelDoc),XContentType.JSON));//实体类转JSON,指定JSON格式request.add(newIndexRequest("xxx")...)}// 3.发送请求client.bulk(request,RequestOptions.DEFAULT);}2.2 查询文档
查询文档是根据id查询的,所以没有批量查询
三步走:
- 1)准备Request对象。这次是查询,所以是GetRequest
- 2)发送请求,得到结果。因为是查询,这里调用client.get()方法
- 3)解析结果,就是对JSON做反序列化
查询的DSL语句如下:
GET/hotel/_doc/{id}非常简单,因此代码大概分两步:
- 准备Request对象
- 发送请求
不过查询的目的是得到结果,解析为HotelDoc,因此难点是结果的解析。完整代码如下:
可以看到,结果是一个JSON,其中文档放在一个_source属性中,因此解析就是拿到_source,使用工具反序列化为Java对象即可。
在hotel-demo的HotelDocumentTest测试类中,编写单元测试:
@TestvoidtestGetDocumentById()throwsIOException{// 1.准备RequestGetRequestrequest=newGetRequest("hotel","61082");// 2.发送请求,得到响应GetResponseresponse=client.get(request,RequestOptions.DEFAULT);// 3.解析响应结果Stringjson=response.getSourceAsString();HotelDochotelDoc=JSON.parseObject(json,HotelDoc.class);System.out.println(hotelDoc);}2.3 批量删除文档
三步走:
- 1)准备Request对象,因为是删除,这次是DeleteRequest对象。要指定索引库名和id
- 2)准备参数,无参
- 3)发送请求。因为是删除,所以是client.delete()方法
删除的DSL为是这样的:
DELETE/hotel/_doc/{id}在hotel-demo的HotelDocumentTest测试类中,编写单元测试:
@TestvoidtestDeleteDocument()throwsIOException{//0.查询数据库中的数据List<Hotel>list=hotelService.list();// 1.创建RequestBulkRequestrequest=newBulkRequest();//2.批量转换实体类,顺便写入到ES中for(Hotelhotel:list){//2.1转换实体类HotelDochotelDoc=newHotelDoc(hotel);//2.2写入ESrequest.add(newDeleteRequest("hotel").id(hotel.getId().toString()));}//3.发送请求client.bulk(request,RequestOptions.DEFAULT);}2.4 批量修改文档
三步走:
- 1)准备Request对象。这次是修改,所以是UpdateRequest
- 2)准备参数。也就是JSON文档,里面包含要修改的字段
- 3)更新文档。这里调用client.update()方法
修改有两种方式:
- 全量修改:本质是先根据id删除,再新增
- 增量修改:修改文档中的指定字段值
在RestClient的API中,全量修改与新增的API完全一致,判断依据是ID:
- 如果新增时,ID已经存在,则修改
- 如果新增时,ID不存在,则新增
只演示增量修改:
代码示例如图:
在hotel-demo的HotelDocumentTest测试类中,编写单元测试:
@TestvoidtestUpdateDocument()throwsIOException{//0.查询数据库中的数据List<Hotel>list=hotelService.list();// 1.创建RequestBulkRequestrequest=newBulkRequest();//2.批量转换实体类,顺便写入到ES中for(Hotelhotel:list){//2.1转换实体类HotelDochotelDoc=newHotelDoc(hotel);//2.2写入ESrequest.add(newUpdateRequest("hotel",hotel.getId().toString()).doc("price","952","starName","四钻"));}//3.发送请求client.bulk(request,RequestOptions.DEFAULT);}本文的引用仅限自我学习如有侵权,请联系作者删除。
参考知识
ElasticSearch (ES从入门到精通一篇就够了)
ELK介绍