【elasticsearch】ES数据库重建索引 — Reindex(数据迁移)

1.应用背景:

1、当你的数据量过大,而你的索引最初创建的分片数量不足,导致数据入库较慢的情况,此时需要扩大分片的数量,此时可以尝试使用Reindex。

2、当数据的mapping需要修改,但是大量的数据已经导入到索引中了,重新导入数据到新的索引太耗时;但是在ES中,一个字段的mapping在定义并且导入数据之后是不能再修改的,

所以这种情况下也可以考虑尝试使用Reindex。

ES提供了_reindex这个API。相对于我们重新导入数据肯定会快不少,实测速度大概是bulk导入数据的5-10倍。

数据迁移步骤:

  • 创建新的索引(可以通过java程序也可以直接在head插件上创建)
    • 注意:在创建索引的时候要把表结构也要创建好(也就是mapping)
  • 复制数据

1.1 注意事项

  • 源和目的不能相同, 比如不能将数据流 Reindex 给它自身。
  • 源索引的文档中 _source 字段必须开启。
  • Reindex 不会复制源的 setting 和源所匹配的模板, 因此在调用 _reindex 前, 你需要设置好目的索引 (action.auto_create_index 为 false 或者 -.* 时)。
  • 目标索引的 mapping, 主分片数, 副本数等推荐提前配置。

1.2 前置要求

如果 Elasticsearch 集群配置了安全策略和权限策略, 则进行 Reindex 必须拥有以下权限:

  • 读取源的数据流、 索引、 索引别名等索引级别权限。
  • 对于目的数据流、 索引、 索引别名的写权限。
  • 如果需要使用 Reindex API 自动创建数据流和索引, 则必须拥有对目的数据流、 索引、 索引别名的 auto_configure、 create_index 或者 manage 等索引级别权限。
  • 如果源为远程的集群, 则 source.remote.user 用户必须拥有集群监控权限, 和读取源索引、 源索引别名、 源数据流的权限。
  • 如果 Reindex 的源为远程集群, 必须在当前集群的请求节点 elasticsearch.yml文件配置远程白名单 reindex.remote.whitelist。

创建数据流, 需要提前配置好数据流的匹配索引模板, 详情可参看 Set up a data stream: https://www.elastic.co/guide/en/elasticsearch/reference/7.11/set-up-a-data-stream.html

2.Reindex

2.1 使用

2.1.1 基本使用

最简单、基本的方式:

1)代码请求:

POST _reindex
{
  "source": {
    "index": "old_index"
  },
  "dest": {
    "index": "new_index"
  }
}

2)利用命令:

curl _XPOST 'ES数据库请求地址:9200/_reindex' -d{"source":{"index":"old_index"},"dest":{"index":"new_index"}}

2.1.2 将多个索引reindex到一个目标

POST _reindex
{
  "source": {
    "index": ["twitter", "blog"],
    "type": ["tweet", "post"]
  },
  "dest": {
    "index": "all_together"
  }
}

这里要注意的是,如果twitter和blog中有document的id是一样的,则无法保证最终出现在all_together里面的document是哪个,因为迭代是随机的。(最后一个会覆盖前一个)

2.1.3 只复制特定的field

POST _reindex
{
  "source": {
    "index": "twitter",
    "_source": ["user", "tweet"]
  },
  "dest": {
    "index": "new_twitter"
  }
}

2.1.4 使用script

POST _reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "version_type": "external"
  },
  "script": {
    "source": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}",
    "lang": "painless"
  }
}

2.1.5 使用Ingest Node

这个功能应该说是最好用的了。当你的source是因为不合理的结构,需要重新结构化所有的数据时,通过ingest node, 可以很方便的在新的index中获得不一眼的mapping和value:

POST _reindex
{
  "source": {
    "index": "source"
  },
  "dest": {
    "index": "dest",
    "pipeline": "some_ingest_pipeline"
  }
}

2.1.6 reindex远程服务器的索引到本地

使用remote属性:

POST _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200",
      "username": "user",
      "password": "pass"
    },
    "index": "source",
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest"
  }
}

对于在其他集群上的index,就不存在本地镜像复制的便利。需要从网络上下载数据再写到本地,默认的,buffer的size是100M。在scroll size是1000的情况下,如果单个document的平均大小超过100Kb,则有可能会报错。因此在在遇到非常大的documents,需要减小batch的size:

POST _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200"
    },
    "index": "source",
    "size": 100,
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest"
  }
}

2.2 参数

2.2.1 version_type

但如果新的index中有数据,并且可能发生冲突,那么可以设置version_type”version_type”: “internal”或者不设置,则Elasticsearch强制性的将文档转储到目标中,覆盖具有相同类型和ID的任何内容:

POST _reindex
{
  "source": {
    "index": "old_index"
  },
  "dest": {
    "index": "new_index",
    "version_type": "internal"
  }
}

就像_update_by_query,_reindex会生成源索引的快照(snapshot),但它的目标必须是一个不同的索引,以便避免版本冲突。dest对象可以像index API一样进行配置,以乐观锁控制并发。像上面那样,不设置version_type或设置它将设置为internal。Elasticsearch将会直接将文档转储到dest中,覆盖任何发生的具有相同类型和id的document:

POST _reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "version_type": "internal"
  }
}

如果把version_type设置为external.则elasticsearch会从source读取version字段,当遇到具有相同类型和id的documents,只更新newer verion。

POST _reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "version_type": "external"
  }
}

上面说起来似乎有点不好理解。简单说来,就是在redinex的时候,你的dest index可以不是一个新的index,而是包含有数据的。如果你的source indexh和dest index里面有相同类型和id的document.对于使用internal,是直接覆盖。使用external的话,只有当source的version更加新的时候,才更新。

2.2.2 op_type

把op_type设置为create,_reindex API,只在dest index中添加不不存在的doucments。如果相同的documents已经存在,则会报version confilct的错误。

POST _reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "op_type": "create"
  }
}

2.2.3 conflicts

默认情况下,当发生version conflict的时候,_reindex会被abort。除非把conflicts设置为“proceed”:

POST _reindex
{
  "conflicts": "proceed",
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "op_type": "create"
  }
}

2.2.4 query

我们还可以通过query,把需要_reindex的document限定在一定的范围。下面的例子,就只copy了作者是’kimchy’的twitter到new_twitter:

POST _reindex
{
  "source": {
    "index": "twitter",
    "type": "tweet",
    "query": {
      "term": {
        "user": "kimchy"
      }
    }
  },
  "dest": {
    "index": "new_twitter"
  }
}

2.2.5 size设置

通过size可以控制复制多少内容,下例只复制了一个document:

POST _reindex
{
  "size": 1,
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter"
  }
}

2.2.6 sort配置

把时间排序前10000个document,复制到目标:

POST _reindex
{
  "size": 10000,
  "source": {
    "index": "twitter",
    "sort": { "date": "desc" }
  },
  "dest": {
    "index": "new_twitter"
  }
}

2.2.7 refresh

可选参数, 枚举类型 (true,false,wait_for), 默认值为 false。
如果设置为 true, Elasticsearch 刷新受当前操作影响的数据, 能够被立即搜索(即立即刷新, 但是会对 Elasticsearch 的性能有一定的影响)。 如果为 wait_for, 则等待刷新以使当前操作对搜索可见, 等待时间为默认为 1s(index.refresh_interval) 。 如果为false, 本次请求不执行刷新

2.2.8 timeout

可选参数, 时间值(time units) , 默认值为 1 分钟; 每个索引周期中等待索引自
动创建、 动态映射更新, 和等待活跃健康分片等的时间。 该参数可以确保 Elasticsearch
在失败之前, 基本等待的超时时间。 实际等待时间可能更长, 特别是在发生多个等待时。

2.2.9 wait_for_active_shards

可选参数, 参数类型 string, 默认值为 1(即只要一个分片处于活跃就可以执行该操作) 。 在执行 Reindex 之前索引必须处于活动状态的分片副本数, 可以设置为 all
或者小于 number_of_replicas+1 的任何正整数, 比如你的索引主分片数目为 3, 副本
设置为 2, 那么可以设置的最大正整数为 3, 即副本份数加 1 (主分片)

2.3 查看进度

使用Task API查询进度
一般来说,如果你的source index很大,则可能需要比较长的时间来完成_reindex的工作。如果需要查看进度,可以通过_tasks API:

GET _tasks?detailed=true&actions=*reindex

返回结果如下:

{
  "nodes" : {
    "r1A2WoRbTwKZ516z6NEs5A" : {
      "name" : "r1A2WoR",
      "transport_address" : "127.0.0.1:9300",
      "host" : "127.0.0.1",
      "ip" : "127.0.0.1:9300",
      "attributes" : {
        "testattr" : "test",
        "portsfile" : "true"
      },
      "tasks" : {
        "r1A2WoRbTwKZ516z6NEs5A:36619" : {
          "node" : "r1A2WoRbTwKZ516z6NEs5A",
          "id" : 36619,
          "type" : "transport",
          "action" : "indices:data/write/reindex",
          "status" : { "total" : 6154, "updated" : 3500, "created" : 0, "deleted" : 0, "batches" : 4, "version_conflicts" : 0, "noops" : 0, "retries": { "bulk": 0, "search": 0 }, "throttled_millis": 0 },
          "description" : "" }
      }
    }
  }
}

上面表示,一共需要处理6154个document,现在已经update了3500个。

2.4 取消reindex

射出去的箭还是可以收回的,通过_tasks API:

POST _tasks/task_id:1/_cancel

这里的task_id,可以通过上面的tasks API获得。

2.5 补充

在重建索引之前,首先要考虑一下重建索引的必要性,因为重建索引是非常耗时的。

ES 的 reindex api 不会去尝试设置目标索引,不会复制源索引的设置,所以我们应该在运行_reindex 操作之前设置目标索引,包括设置映射(mapping),分片,副本等。

第一步,和创建普通索引一样创建新索引。当数据量很大的时候,需要设置刷新时间间隔,把 refresh_intervals 设置为-1,即不刷新,number_of_replicas 副本数设置为 0(因为副本数可以动态调整,这样有助于提升速度)。

{
    "settings": {

        "number_of_shards": "50",
        "number_of_replicas": "0",
        "index": {
            "refresh_interval": "-1"
        }
    }
    "mappings": {
    }
}

第二步,调用 reindex 接口,建议加上 wait_for_completion=false 的参数条件,这样 reindex 将直接返回 taskId。

POST _reindex?wait_for_completion=false

{
  "source": {
    "index": "old_index",   //原有索引
    "size": 5000            //一个批次处理的数据量
  },
  "dest": {
    "index": "new_index",   //目标索引
  }
}

第三步,等待。可以通过 GET _tasks?detailed=true&actions=*reindex 来查询重建的进度。如果要取消 task 则调用_tasks/node_id:task_id/_cancel。

第四步,删除旧索引,释放磁盘空间。更多细节可以查看 ES 官网的 reindex api。

那么有的同学可能会问,如果我此刻 ES 是实时写入的,那咋办呀?
这个时候,我们就要重建索引的时候,在参数里加上上一次重建索引的时间戳,直白的说就是,比如我们的数据是 100G,这时候我们重建索引了,但是这个 100G 在增加,那么我们重建索引的时候,需要记录好重建索引的时间戳,记录时间戳的目的是下一次重建索引跑任务的时候不用全部重建,只需要在此时间戳之后的重建就可以,如此迭代,直到新老索引数据量基本一致,把数据流向切换到新索引的名字。

POST /_reindex
{
    "conflicts": "proceed",          //意思是冲突以旧索引为准,直接跳过冲突,否则会抛出异常,停止task
    "source": {
        "index": "old_index"         //旧索引
        "query": {
            "constant_score" : {
                "filter" : {
                    "range" : {
                        "data_update_time" : {
                            "gte" : 123456789   //reindex开始时刻前的毫秒时间戳
                            }
                        }
                    }
                }
            }
        },
    "dest": {
        "index": "new_index",       //新索引
        "version_type": "external"  //以旧索引的数据为准
        }
}

3.问题发现:

常规的如果我们只是进行少量的数据迁移利用普通的reindex就可以很好的达到要求,但是当我们发现我们需要迁移的数据量过大时,我们会发现reindex的速度会变得很慢

数据量几十个G的场景下,elasticsearch reindex速度太慢,从旧索引导数据到新索引,当前最佳方案是什么?
原因分析:

reindex的核心做跨索引、跨集群的数据迁移。
慢的原因及优化思路无非包括:
1)批量大小值可能太小。需要结合堆内存、线程池调整大小;
2)reindex的底层是scroll实现,借助scroll并行优化方式,提升效率;
3)跨索引、跨集群的核心是写入数据,考虑写入优化角度提升效率。

可行方案:

1)提升批量写入大小值

默认情况下,_reindex使用1000进行批量操作,您可以在source中调整batch_size。

POST _reindex
{
  "source": {
    "index": "source",
    "size": 5000
  },
  "dest": {
    "index": "dest",
    "routing": "=cat"
  }
}

批量大小设置的依据:

1、使用批量索引请求以获得最佳性能。

批量大小取决于数据、分析和集群配置,但一个好的起点是每批处理5-15 MB。

注意,这是物理大小。文档数量不是度量批量大小的好指标。例如,如果每批索引1000个文档:

1)每个1kb的1000个文档是1mb。

2)每个100kb的1000个文档是100 MB。

这些是完全不同的体积大小。

2、逐步递增文档容量大小的方式调优。

1)从大约5-15 MB的大容量开始,慢慢增加,直到你看不到性能的提升。然后开始增加批量写入的并发性(多线程等等)。

2)使用kibana、cerebro或iostat、top和ps等工具监视节点,以查看资源何时开始出现瓶颈。如果您开始接收EsRejectedExecutionException,您的集群就不能再跟上了:至少有一个资源达到了容量。

要么减少并发性,或者提供更多有限的资源(例如从机械硬盘切换到ssd固态硬盘),要么添加更多节点。

2)借助scroll的sliced提升写入效率

Reindex支持Sliced Scroll以并行化重建索引过程。 这种并行化可以提高效率,并提供一种方便的方法将请求分解为更小的部分。

4.sliced原理(from medcl)

1)用过Scroll接口吧,很慢?如果你数据量很大,用Scroll遍历数据那确实是接受不了,现在Scroll接口可以并发来进行数据遍历了。
2)每个Scroll请求,可以分成多个Slice请求,可以理解为切片,各Slice独立并行,利用Scroll重建或者遍历要快很多倍。

slicing使用举例

slicing的设定分为两种方式:手动设置分片、自动设置分片。
手动设置分片参见官网。
自动设置分片如下:

POST _reindex?slices=5&refresh
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter"
  }
}

slices大小设置注意事项:

1)slices大小的设置可以手动指定,或者设置slices设置为auto,auto的含义是:针对单索引,slices大小=分片数;针对多索引,slices=分片的最小值。
2)当slices的数量等于索引中的分片数量时,查询性能最高效。slices大小大于分片数,非但不会提升效率,反而会增加开销。
3)如果这个slices数字很大(例如500),建议选择一个较低的数字,因为过大的slices 会影响性能。

效果

实践证明,比默认设置reindex速度能提升10倍+。

4.3 ES副本数设置为0

如果要进行大量批量导入,请考虑通过设置index.number_of_replicas来禁用副本:0

主要原因在于:复制文档时,将整个文档发送到副本节点,并逐字重复索引过程。 这意味着每个副本都将执行分析,索引和潜在合并过程。

相反,如果您使用零副本进行索引,然后在提取完成时启用副本,则恢复过程本质上是逐字节的网络传输。 这比复制索引过程更有效。

PUT /my_logs/_settings
{
    "number_of_replicas": 1
}

4.4 增加refresh间隔

如果你的搜索结果不需要接近实时的准确性,考虑先不要急于索引刷新refresh。可以将每个索引的refresh_interval到30s。

如果正在进行大量数据导入,可以通过在导入期间将此值设置为-1来禁用刷新。完成后不要忘记重新启用它!

设置方法:

PUT /my_logs/_settings
{ "refresh_interval": -1 }

参考:https://blog.csdn.net/goxingman/article/details/103734747

Reindex API 详解

M.扩展阅读

Elastic Stack 实战手册(早鸟版).pdf 有更详细的介绍。

文章来源于互联网:【elasticsearch】ES数据库重建索引 — Reindex(数据迁移)

由于版权原因,本站共享资源只供云盘资源,版权均属于影片公司所有,请在下载后24小时删除,切勿用于商业用途。本站所有资源信息均从互联网搜索而来,本站不对显示的内容承担责任,如您认为本站页面信息侵犯了您的权益,请附上版权证明邮件并发送到[email protected]告知,我们会在收到邮件后72小时内删除。
想开点 » 【elasticsearch】ES数据库重建索引 — Reindex(数据迁移)