龙空技术网

ES GET 流程

一米雪碧钱钱 218

前言:

目前你们对“哈弗曼算法压缩图片上传”大致比较看重,大家都需要知道一些“哈弗曼算法压缩图片上传”的相关文章。那么小编在网络上搜集了一些有关“哈弗曼算法压缩图片上传””的相关知识,希望大家能喜欢,我们快快来了解一下吧!

1. 问题描述

上一篇文章中,我们讲到了 ES Nested 是拆开保存的(1拖N 方式),通过 Root Document 的 _id 来做关联的。那这个 ES 直接通过 _id 来读取数据,又是怎么样的呢?有没有做合并、组装的呢?

2. GET API 例子

GET nike/_doc/0{    "_index" : "ike",    "_type" : "_doc",    "_id" : "0",    "_version" : 1,    "found": true,    "_source" : {        "user" : "kimchy",        "date" : "2009-11-15T14:12:12",        "likes": 0,        "message" : "trying out Elasticsearch"    }}接收请求的入口是:org.elasticsearch.rest.action.document.RestGetAction
3. 可选参数

4. GET 基本流程

GET 操作 具体流程如下图所示:

备注:

a. 上面的部署架构也是非常常见的。 P0、P1、P2 都为主分片;R0、R1、R2 为副本分片。大致的请求流程描述如下:

a. 客户端向 NODE1 发起 GET 请求。此时 NODE1 称之为 协调节点。如果协调节点压力大,也是性能损耗点,建议先把节点数量增加,再增大内存、CPU 。

b. NODE1 根据规则,计算出对应的 docId 为 1 的文档再哪个 shard 上面。比如,docId 为 1 的文档在 Node2 上面。然后,发起 TCP 请求,将请求转发给 NODE2 。

c. NODE2 根据规则,获取到具体的文档后。将请求再次转发给 NODE1。如果节点很多,导致网络请求链路过多,异常因素增加。也是 shard 分片数量需要考虑的点。

d. NODE1 获取 NODE2 的响应后,将请求返回给 客户端。

ES 中,写入默认是写入大多数成功即可。默认是写入主分片成功,副本分片大多数成功即可。所以,在一瞬间去查询的话,有可能请求的是 副本分片,客户端会收到 doc 不存在的问题。

5、请求流程图

6、代码流程6.1 协调节点6.1.0 接收 HTTP 请求入口

// org.elasticsearch.rest.action.document.RestGetAction 代码入口 只贴出了部分代码。                /**         * {@link  TransportGetAction 跳转此类 }         */        return channel -> client.get(getRequest, new RestToXContentListener<GetResponse>(channel) {            @Override            protected RestStatus getStatus(final GetResponse response) {                return response.isExists() ? OK : NOT_FOUND;            }        });
6.1.1 总的代码入口

其实,请求应该是:org.elasticsearch.action.get.TransportGetAction。但是 org.elasticsearch.action.get.TransportGetAction 实现了 TransportSingleShardAction 这个抽象类。具体的请求处理先由 TransportSingleShardAction 来处理。具体处理如下:

-- org.elasticsearch.action.support.single.shard.TransportSingleShardAction#doExecute 代码入口/**     * 执行 execute,而且,是异步执行的     * @param request     * @param listener     */    @Override    protected void doExecute(Task task, Request request, ActionListener<Response> listener) {        // 异步操作,AsyncSingleAction 构造方法里面,先计算出正确的 索引,获取具体分片的路由规则。在执行 start()方法。        new AsyncSingleAction(request, listener).start();    }
6.1.2 计算 shard 路由表

代码入口:org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction#AsyncSingleAction。

......// 获取正确的索引,比如,你请求的是 nike_xx ,通过正则表达式,计算出一些列规则,最后,这里只返回一个索引。if (resolveIndex(request)) {   concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName();} else {  concreteSingleIndex = request.index();}.......// 根据hash和shard数量取余计算 shard 列表,或者走优先级规则this.shardIt = shards(clusterState, internalRequest);
6.1.3 路由算法(`TransportGetAction` 在这里实现)

代码入口:org.elasticsearch.action.get.TransportGetAction#shards

/**     * 根据 docId 计算,文档落在哪个分片中。     * @param state     * @param request     * @return     */    @Override    protected ShardIterator shards(ClusterState state, InternalRequest request) {        return clusterService.operationRouting()                .getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(),                    request.request().preference());    }---- 下面是计算 shardId。实现的比较有意思。如果有 docId,那么,就根据 docId 来获取。如果没有,就根据 routing 值来获取具体的shard 规则。/**     * 获取 shardId     * @param indexMetadata     * @param id     * @param routing     * @return     */    public static int generateShardId(IndexMetadata indexMetadata, @Nullable String id, @Nullable String routing) {        final String effectiveRouting;        final int partitionOffset;        // 使用的方法:PUT /index/type/id?routing=user_id,如果没有 routing ,那么,默认是 doc_id 来作为 routing。非常巧妙,不需要维护 docId 与 shard 关系了        if (routing == null) {            assert(indexMetadata.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";            effectiveRouting = id;        } else {            effectiveRouting = routing;        }        // 如果路由分区的大小 不是  1        if (indexMetadata.isRoutingPartitionedIndex()) {            partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetadata.getRoutingPartitionSize());        } else {            // we would have still got 0 above but this check just saves us an unnecessary hash calculation            partitionOffset = 0;        }        // 计算 shardId ,基于 hash 来计算        return calculateScaledShardId(indexMetadata, effectiveRouting, partitionOffset);    }
6.1.4 转发请求
// 本节点作为协调节点,向目标 node 转发请求,或者目标是本地节点,直接通过函数调用读取数据.发送流程封装了对请求的发送,org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction#start
6.1.5 发送请求
org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction#perform-- 这里由重试的逻辑;判断是否是本地节点;根据 shard 获取某个 节点,发送请求到数据节点请求。// 获取下一个路由final ShardRouting shardRouting = shardIt.nextOrNull();
6.2 数据节点6.2.0 总入口:

将请求转发到数据节点后,由 org.elasticsearch.action.support.single.shard.TransportSingleShardAction.ShardTransportHandler 来接收请求,处理请求。

6.2.1 接收请求请求

/**     * shard 分片的操作     */    private class ShardTransportHandler implements TransportRequestHandler<Request> {        @Override        public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {            if (logger.isTraceEnabled()) {                logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);            }                        // 异步操作            asyncShardOperation(request, request.internalShardId, new ChannelActionListener<>(channel, transportShardAction, request));        }    }
6.2.2 转换 docId
// 代码入口:org.elasticsearch.index.get.ShardGetService#innerGet// docId 其实也是一个 Term,名字是 _id,为什么这里要做 uuid 的转换呢?因为很多id 都是不规整的。UUID 编码后,长度一致,做好了很好的性能优化,比如,可以采用哈夫曼树来优化、压缩。Term uidTerm = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
6.2.3 进入 GET 核心地方

进入 org.elasticsearch.index.engine.InternalEngine#get 这个方法,主要是 判断版本号是否冲突、seqNo 是否冲突。还有一个重点,是获取 Searcher。 虽然 acquireSearcher 这个方法是获取 Searcher 的,但是经过一遍代码查找后,然后各种条件限制,需要继续跳到之前 6.2.4

// acquireSearcher 这个方法是获取 Searcher 的。但是你getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper));
6.2.4 获取 Search 入口(注意下方的 wrapSearcher)
/**     * 读取数据     * @param get     * @return     */    public Engine.GetResult get(Engine.Get get) {        // 校验是否允许读取数据        readAllowed();        // ES Mapper        DocumentMapper mapper = mapperService.documentMapper();        if (mapper == null || mapper.type().equals(mapperService.resolveDocumentType(get.type())) == false) {            return GetResult.NOT_EXISTS;        }        // 读取数据,这个 this:wrapperSearcher 方法最终在:org.elasticsearch.index.engine.Engine.SearcherSupplier.acquireSearcher 这里调用。最终的 search 是:ElasticsearchDirectoryReader        return getEngine().get(get, mapper, this::wrapSearcher);    }// 可能大家看了 wraSearcher 还是一脸懵逼,不知道 这个 searcher 怎么读取数据的。
6.2.5 Searcher 初始化各种跳转、回调

6.2.6 Searcher 初始化

代码入口:org.elasticsearch.index.engine.InternalEngine#createReaderManager

org.elasticsearch.index.engine.InternalEngine#createReaderManager// 下面,我只摘出了重要的部分说明。下面的函数说明final ElasticsearchDirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId); // 大家一定要仔细看 DirectoryReader.open(indexWriter) 这个方法。很妖娆。// 
6.2.6 Searcher 初始化2
public static ElasticsearchDirectoryReader wrap(DirectoryReader reader, ShardId shardId) throws IOException {        return new ElasticsearchDirectoryReader(reader, new SubReaderWrapper(shardId), shardId);    }    private static final class SubReaderWrapper extends FilterDirectoryReader.SubReaderWrapper {        private final ShardId shardId;        SubReaderWrapper(ShardId shardId) {            this.shardId = shardId;        }        /**         * ElasticsearchDirectoryReader 在这个类的构造函数,里面的 super 方法,执行了 wrap 回调。         * @param reader         * @return         */        @Override        public LeafReader wrap(LeafReader reader) {            // TODO 仔细看看这里的实现,最终是实现了 FilterLeafReader 这个类。最终还是回归到 segmentReader 这个类上,去读内容            return new ElasticsearchLeafReader(reader, shardId);        }    }
6.3 取数数据6.3.1 代码入口:

在这里,会根据 Searcher 来读取数据。最终这个 searcher 是 segmentReader 这个类。

org.elasticsearch.index.get.ShardGetService#innerGetLoadFromStoredFields
6.3.2 读取数据
// 引用了 Lucene 的包,根据 docId 去找数据,docIdAndVersion.reader.document(docIdAndVersion.docId, fieldVisitor);
6.3.4 数据转换
FieldInfo fieldInfo = new FieldInfo(indexableField.name(), 0, false, false, false, IndexOptions.NONE,                                DocValuesType.NONE, -1, Collections.emptyMap(), 0, 0, 0, false);                            StoredFieldVisitor.Status status = fieldVisitor.needsField(fieldInfo);                            if (status == StoredFieldVisitor.Status.YES) {                                if (indexableField.numericValue() != null) {                                    fieldVisitor.objectField(fieldInfo, indexableField.numericValue());                                } else if (indexableField.binaryValue() != null) {                                    fieldVisitor.binaryField(fieldInfo, indexableField.binaryValue());                                } else if (indexableField.stringValue() != null) {                                    fieldVisitor.objectField(fieldInfo, indexableField.stringValue());                                }                            } else if (status == StoredFieldVisitor.Status.STOP) {                                break;                            }
6.3.5 过滤
sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
7 总结ES GET API 里面,比较难懂的是 Searcher 究竟是怎么实例化的。如果读取失败了,还会继续读取。具体代码位置:org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction#onFailure8 附录

很多小伙伴还是想直观看看 ES 是怎么读取数据的,那么下面有一个 demo ,可以 DEBUG 跑个单元测试看看。

            Directory dir =new SimpleFSDirectory(new File("F:/luc_dir").toPath());            IndexWriterConfig iwc = new IndexWriterConfig(null);            iwc.setMaxBufferedDocs(100);            iwc.setMergePolicy(NoMergePolicy.INSTANCE);            IndexWriter iw = new IndexWriter(dir, iwc);            // add two docs, id:0 and id:1            Document doc = new Document();            Field idField = new StringField("id", "", Field.Store.NO);            doc.add(idField);            idField.setStringValue("0");            iw.addDocument(doc);            idField.setStringValue("1");            iw.addDocument(doc);            // open reader            ShardId shardId = new ShardId("fake", "_na_", 1);            DirectoryReader ir = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(iw), shardId);            ir.document(1);            System.out.println("aa");

#elasticsearch#​

标签: #哈弗曼算法压缩图片上传