• -------------------------------------------------------------
  • ====================================

使用ElasticSearch6.0快速实现全文搜索功能的示例代码

elasticsearch dewbay 5年前 (2019-04-12) 1815次浏览 已收录 0个评论 扫描二维码

本文不涉及 ElasticSearch 具体原理,只记录如何快速的导入 mysql 中的数据进行全文检索。

工作中需要实现一个搜索功能,并且导入现有数据库数据,组长推荐用 ElasticSearch 实现,网上翻一通教程,都是比较古老的文章了,无奈只能自己摸索,参考 ES 的文档,总算是把服务搭起来了,记录下,希望有同样需求的朋友可以少走弯路,能按照这篇教程快速的搭建一个可用的 ElasticSearch 服务。

ES 的搭建

ES 搭建有直接下载 zip 文件,也有 docker 容器的方式,相对来说,docker 更适合我们跑 ES 服务。可以方便的搭建集群或建立测试环境。这里使用的也是容器方式,首先我们需要一份 Dockerfile:
?

12345678910FROM docker.elastic.co/elasticsearch/elasticsearch-oss:6.0.0# 提交配置 包括新的elasticsearch.yml 和 keystore.jks 文件COPY --chown=elasticsearch:elasticsearch conf/ /usr/share/elasticsearch/config/# 安装 ikRUN ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.0.0/elasticsearch-analysis-ik-6.0.0.zip# 安装 readonlyrestRUN ./bin/elasticsearch-plugin install https://github.com/HYY-yu/BezierCurveDemo/raw/master/readonlyrest-1.16.14_es6.0.0.zip USER elasticsearchCMD ./bin/elasticsearch

这里对上面的操作做一下说明:

  1. 首先在 Dockerfile 下的同级目录中需要建立一个 conf 文件夹,保存 elasticsearch.yml 文件(稍后给出)和 keystore.jks。(jks 是自签名文件,用于 https,如何生成请自行搜索)
  2. ik 是一款很流行的中文分词库,使用它来支持中文搜索。
  3. readonlyrest 是一款开源的 ES 插件,用于用户管理、安全验证,土豪可以使用 ES 自带的 X-pack 包,有更完善的安全功能。

elactic 配置 elasticsearch.yml
?

1234567891011121314151617181920212223242526272829303132333435363738394041cluster.name: "docker-cluster"network.host: 0.0.0.0 # minimum_master_nodes need to be explicitly set when bound on a public IP# set to 1 to allow single node clusters# Details: https://github.com/elastic/elasticsearch/pull/17288discovery.zen.minimum_master_nodes: 1 # 禁止系统对 ES 交换内存bootstrap.memory_lock: true http.type: ssl_netty4 readonlyrest:enable: truessl:enable: truekeystore_file: "server.jks"keystore_pass: serverkey_pass: server access_control_rules: - name: "Block 1 - ROOT"type: allowgroups: ["admin"] - name: "User read only - paper"groups: ["user"]indices: ["paper*"]actions: ["indices:data/read/*"] users: - username: rootauth_key_sha256: cb7c98bae153065db931980a13bd45ee3a77cb8f27a7dfee68f686377acc33f1groups: ["admin"] - username: xiaomingauth_key: xiaoming:xiaominggroups: ["user"]

这里 bootstrap.memory_lock: true 是个坑,禁止交换内存这里文档已经说明了,有的 os 会在运行时把暂时不用的内存交换到硬盘的一块区域,然而这种行为会让 ES 的资源占用率飙升,甚至让系统无法响应。

配置文件里已经很明显了,一个 root 用户属于 admin 组,而 admin 有所有权限,xiaoming 同学因为在 user 组,只能访问 paper 索引,并且只能读取,不能操作。更详细的配置请见:readonlyrest 文档

至此,ES 的准备工作算是做完了,docker build -t ESImage:tag 一下,docker run -p 9200:9200 ESImage:Tag 跑起来。
如果 https://127.0.0.1:9200/返回
?

123456789101112131415{"name": "VaKwrIR","cluster_name": "docker-cluster","cluster_uuid": "YsYdOWKvRh2swz907s2m_w","version": {"number": "6.0.0","build_hash": "8f0685b","build_date": "2017-11-10T18:41:22.859Z","build_snapshot": false,"lucene_version": "7.0.1","minimum_wire_compatibility_version": "5.6.0","minimum_index_compatibility_version": "5.0.0"},"tagline": "You Know, for Search"}

我们本次教程的主角算是出场了,分享几个常用的 API 调戏调试 ES 用:

{{url}}替换成你本地的 ES 地址。

  1. 查看所有插件:{{url}}/_cat/plugins?v
  2. 查看所有索引:{{url}}/_cat/indices?v
  3. 对 ES 进行健康检查:{{url}}/_cat/health?v
  4. 查看当前的磁盘占用率:{{url}}/_cat/allocation?v

导入 MYSQL 数据

这里我使用的是 MYSQL 数据,其实其它的数据库也是一样,关键在于如何导入,网上教程会推荐 Logstash、Beat、ES 的 mysql 插件进行导入,我也都实验过,配置繁琐,文档稀少,要是数据库结构复杂一点,导入是个劳心劳神的活计,所以并不推荐。其实 ES 在各个语言都有对应的 API 库,你在语言层面把数据组装成 json,通过 API 库发送到 ES 即可。流程大致如下:

我使用的是 Golang 的 ES 库 elastic,其它语言可以去 github 上自行搜索,操作的方式都是一样的。

接下来使用一个简单的数据库做介绍:

Paper 表

idname
1北京第一小学模拟卷
2江西北京通用高考真题

Province 表

idname
1北京
2江西

Paper_Province 表

paper_idprovince_id
11
21
22

如上,Paper 和 Province 是多对多关系,现在把 Paper 数据打入 ES,,可以按 Paper 名称模糊搜索,也可通过 Province 进行筛选。json 数据格式如下:
?

12345678910{"id":1,"name": "北京第一小学模拟卷","provinces":[{"id":1,"name":"北京"}]}

首先准备一份 mapping.json 文件,这是在 ES 中数据的存储结构定义,
?

1234567891011121314151617181920212223242526272829303132{"mappings":{"docs":{"include_in_all": false, "properties":{"id":{"type":"long"},"name":{"type":"text","analyzer":"ik_max_word" // 使用最大词分词器},"provinces":{"type":"nested","properties":{"id":{"type":"integer"},"name":{"type":"text","index":"false" // 不索引}}}}}},"settings":{"number_of_shards":1,"number_of_replicas":0}}

需要注意的是取消 _all 字段,这个默认的 _all 会收集所有的存储字段,实现无条件限制的搜索,缺点是空间占用大。

shard(分片)数我设置为了 1,没有设置 replicas(副本),毕竟这不是一个集群,处理的数据也不是很多,如果有大量数据需要处理可以自行设置分片和副本的数量。

首先与 ES 建立连接,ca.crt 与 jks 自签名有关。当然,在这里我使用 InsecureSkipVerify 忽略了证书文件的验证。
?

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253func InitElasticSearch() {pool := x509.NewCertPool()crt, err0 := ioutil.ReadFile("conf/ca.crt")if err0 != nil {cannotOpenES(err0, "read crt file err")return} pool.AppendCertsFromPEM(crt)tr := &http.Transport{TLSClientConfig: &tls.Config{RootCAs: pool, InsecureSkipVerify: true},}httpClient := &http.Client{Transport: tr} //后台构造 elasticClientvar err errorelasticClient, err = elastic.NewClient(elastic.SetURL(MyConfig.ElasticUrl),elastic.SetErrorLog(GetLogger()),elastic.SetGzip(true),elastic.SetHttpClient(httpClient),elastic.SetSniff(false), // 集群嗅探,单节点记得关闭。elastic.SetScheme("https"),elastic.SetBasicAuth(MyConfig.ElasticUsername, MyConfig.ElasticPassword))if err != nil {cannotOpenES(err, "search_client_error")return}//elasticClient 构造完成 //查询是否有 paper 索引exist, err := elasticClient.IndexExists(MyConfig.ElasticIndexName).Do(context.Background())if err != nil {cannotOpenES(err, "exist_paper_index_check")return} //索引存在且通过完整性检查则不发送任何数据if exist {if !isIndexIntegrity(elasticClient) {//删除当前索引  准备重建deleteResponse, err := elasticClient.DeleteIndex(MyConfig.ElasticIndexName).Do(context.Background())if err != nil || !deleteResponse.Acknowledged {cannotOpenES(err, "delete_index_error")return}} else {return}} //后台查询数据库,发送数据到 elasticsearch 中go fetchDBGetAllPaperAndSendToES()}

?

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061type PaperSearch struct {PaperId  int64   `gorm:"primary_key;column:F_paper_id;type:BIGINT(20)" json:"id"`Name    string  `gorm:"column:F_name;size:80" json:"name"`Provinces []Province `gorm:"many2many:t_paper_province;" json:"provinces"`    // 试卷适用的省份} func fetchDBGetAllPaperAndSendToES() {//fetch papervar allPaper []PaperSearch GetDb().Table("t_papers").Find(&allPaper) //provincefor i := range allPaper {var allPro []ProvinceGetDb().Table("t_provinces").Joins("INNER JOIN `t_paper_province` ON `t_paper_province`.`province_F_province_id` = `t_provinces`.`F_province_id`").Where("t_paper_province.paper_F_paper_id = ?", allPaper[i].PaperId).Find(&allPro)allPaper[i].Provinces = allPro} if len(allPaper) > 0 {//send to es - create indexcreateService := GetElasticSearch().CreateIndex(MyConfig.ElasticIndexName)// 此处的 index_default_setting 就是上面 mapping.json 中的内容。createService.Body(index_default_setting)createResult, err := createService.Do(context.Background())if err != nil {cannotOpenES(err, "create_paper_index")return} if !createResult.Acknowledged || !createResult.ShardsAcknowledged {cannotOpenES(err, "create_paper_index_fail")} // - send all paperbulkRequest := GetElasticSearch().Bulk() for i := range allPaper {indexReq := elastic.NewBulkIndexRequest().OpType("create").Index(MyConfig.ElasticIndexName).Type("docs").Id(helper.Int64ToString(allPaper[i].PaperId)).Doc(allPaper[i]) bulkRequest.Add(indexReq)} // Do sends the bulk requests to ElasticsearchbulkResponse, err := bulkRequest.Do(context.Background())if err != nil {cannotOpenES(err, "insert_docs_error")return} // Bulk request actions get clearedif len(bulkResponse.Created()) != len(allPaper) {cannotOpenES(err, "insert_docs_nums_error")return}//send success}}

跑通上面的代码后,使用{{url}}/_cat/indices?v 看看 ES 中是否出现了新创建的索引,使用{{url}}/papers/_search 看看命中了多少文档,如果文档数等于你发送过去的数据量,搜索服务就算跑起来了。

搜索

现在就可以通过 ProvinceID 和 q 来搜索试卷,默认按照相关度评分排序。
?

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798//q 搜索字符串 provinceID 限定省份 id limit page 分页参数func SearchPaper(q string, provinceId uint, limit int, page int) (list []PaperSearch, totalPage int, currentPage int, pageIsEnd int, returnErr error) {//不满足条件,使用数据库搜索if !CanUseElasticSearch && !MyConfig.UseElasticSearch {return SearchPaperLocal(q, courseId, gradeId, provinceId, paperTypeId, limit, page)} list = make([]PaperSimple, 0)totalPage = 0currentPage = pagepageIsEnd = 0returnErr = nil client := GetElasticSearch()if client == nil {return SearchPaperLocal(q, courseId, gradeId, provinceId, paperTypeId, limit, page)} //ElasticSearch 有问题,使用数据库搜索if !isIndexIntegrity(client) {return SearchPaperLocal(q, courseId, gradeId, provinceId, paperTypeId, limit, page)} if !client.IsRunning() {client.Start()}defer client.Stop() q = html.EscapeString(q)boolQuery := elastic.NewBoolQuery()// Paper.namematchQuery := elastic.NewMatchQuery("name", q) //省份if provinceId > 0 && provinceId != DEFAULT_PROVINCE_ALL {proBool := elastic.NewBoolQuery()tpro := elastic.NewTermQuery("provinces.id", provinceId)proNest := elastic.NewNestedQuery("provinces", proBool.Must(tpro))boolQuery.Must(proNest)} boolQuery.Must(matchQuery) for _, e := range termQuerys {boolQuery.Must(e)} highligt := elastic.NewHighlight()highligt.Field(ELASTIC_SEARCH_SEARCH_FIELD_NAME)highligt.PreTags(ELASTIC_SEARCH_SEARCH_FIELD_TAG_START)highligt.PostTags(ELASTIC_SEARCH_SEARCH_FIELD_TAG_END)searchResult, err2 := client.Search(MyConfig.ElasticIndexName).Highlight(highligt).Query(boolQuery).From((page - 1) * limit).Size(limit).Do(context.Background()) if err2 != nil {// Handle errorGetLogger().LogErr("搜索时出错 "+err2.Error(), "search_error")// Handle errorreturnErr = errors.New("搜索时出错")} else {if searchResult.Hits.TotalHits > 0 {// Iterate through resultsfor _, hit := range searchResult.Hits.Hits {var p PaperSearcherr := json.Unmarshal(*hit.Source, &p)if err != nil {// Deserialization failedGetLogger().LogErr("搜索时出错 "+err.Error(), "search_deserialization_error")returnErr = errors.New("搜索时出错")return} if len(hit.Highlight[ELASTIC_SEARCH_SEARCH_FIELD_NAME]) > 0 {p.Name = hit.Highlight[ELASTIC_SEARCH_SEARCH_FIELD_NAME][0]} list = append(list, p)} count := searchResult.TotalHits() currentPage = pageif count > 0 {totalPage = int(math.Ceil(float64(count) / float64(limit)))}if currentPage >= totalPage {pageIsEnd = 1}} else {// No hits}}return}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。


露水湾 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:使用ElasticSearch6.0快速实现全文搜索功能的示例代码
喜欢 (0)
[]
分享 (0)
关于作者:
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址