前言:
目前你们对“哈弗曼算法压缩图片上传”大致比较看重,大家都需要知道一些“哈弗曼算法压缩图片上传”的相关文章。那么小编在网络上搜集了一些有关“哈弗曼算法压缩图片上传””的相关知识,希望大家能喜欢,我们快快来了解一下吧!1. 问题描述
上一篇文章中,我们讲到了 ES Nested 是拆开保存的(1拖N 方式),通过 Root Document 的 _id 来做关联的。那这个 ES 直接通过 _id 来读取数据,又是怎么样的呢?有没有做合并、组装的呢?
2. GET API 例子
GET nike/_doc/0{ "_index" : "ike", "_type" : "_doc", "_id" : "0", "_version" : 1, "found": true, "_source" : { "user" : "kimchy", "date" : "2009-11-15T14:12:12", "likes": 0, "message" : "trying out Elasticsearch" }}接收请求的入口是:org.elasticsearch.rest.action.document.RestGetAction3. 可选参数
4. GET 基本流程
GET 操作 具体流程如下图所示:
备注:
a. 上面的部署架构也是非常常见的。 P0、P1、P2 都为主分片;R0、R1、R2 为副本分片。大致的请求流程描述如下:
a. 客户端向 NODE1 发起 GET 请求。此时 NODE1 称之为 协调节点。如果协调节点压力大,也是性能损耗点,建议先把节点数量增加,再增大内存、CPU 。
b. NODE1 根据规则,计算出对应的 docId 为 1 的文档再哪个 shard 上面。比如,docId 为 1 的文档在 Node2 上面。然后,发起 TCP 请求,将请求转发给 NODE2 。
c. NODE2 根据规则,获取到具体的文档后。将请求再次转发给 NODE1。如果节点很多,导致网络请求链路过多,异常因素增加。也是 shard 分片数量需要考虑的点。
d. NODE1 获取 NODE2 的响应后,将请求返回给 客户端。
ES 中,写入默认是写入大多数成功即可。默认是写入主分片成功,副本分片大多数成功即可。所以,在一瞬间去查询的话,有可能请求的是 副本分片,客户端会收到 doc 不存在的问题。
5、请求流程图
6、代码流程6.1 协调节点6.1.0 接收 HTTP 请求入口
// org.elasticsearch.rest.action.document.RestGetAction 代码入口 只贴出了部分代码。 /** * {@link TransportGetAction 跳转此类 } */ return channel -> client.get(getRequest, new RestToXContentListener<GetResponse>(channel) { @Override protected RestStatus getStatus(final GetResponse response) { return response.isExists() ? OK : NOT_FOUND; } });6.1.1 总的代码入口
其实,请求应该是:org.elasticsearch.action.get.TransportGetAction。但是 org.elasticsearch.action.get.TransportGetAction 实现了 TransportSingleShardAction 这个抽象类。具体的请求处理先由 TransportSingleShardAction 来处理。具体处理如下:
-- org.elasticsearch.action.support.single.shard.TransportSingleShardAction#doExecute 代码入口/** * 执行 execute,而且,是异步执行的 * @param request * @param listener */ @Override protected void doExecute(Task task, Request request, ActionListener<Response> listener) { // 异步操作,AsyncSingleAction 构造方法里面,先计算出正确的 索引,获取具体分片的路由规则。在执行 start()方法。 new AsyncSingleAction(request, listener).start(); }6.1.2 计算 shard 路由表
代码入口:org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction#AsyncSingleAction。
......// 获取正确的索引,比如,你请求的是 nike_xx ,通过正则表达式,计算出一些列规则,最后,这里只返回一个索引。if (resolveIndex(request)) { concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName();} else { concreteSingleIndex = request.index();}.......// 根据hash和shard数量取余计算 shard 列表,或者走优先级规则this.shardIt = shards(clusterState, internalRequest);6.1.3 路由算法(`TransportGetAction` 在这里实现)
代码入口:org.elasticsearch.action.get.TransportGetAction#shards
/** * 根据 docId 计算,文档落在哪个分片中。 * @param state * @param request * @return */ @Override protected ShardIterator shards(ClusterState state, InternalRequest request) { return clusterService.operationRouting() .getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(), request.request().preference()); }---- 下面是计算 shardId。实现的比较有意思。如果有 docId,那么,就根据 docId 来获取。如果没有,就根据 routing 值来获取具体的shard 规则。/** * 获取 shardId * @param indexMetadata * @param id * @param routing * @return */ public static int generateShardId(IndexMetadata indexMetadata, @Nullable String id, @Nullable String routing) { final String effectiveRouting; final int partitionOffset; // 使用的方法:PUT /index/type/id?routing=user_id,如果没有 routing ,那么,默认是 doc_id 来作为 routing。非常巧妙,不需要维护 docId 与 shard 关系了 if (routing == null) { assert(indexMetadata.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index"; effectiveRouting = id; } else { effectiveRouting = routing; } // 如果路由分区的大小 不是 1 if (indexMetadata.isRoutingPartitionedIndex()) { partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetadata.getRoutingPartitionSize()); } else { // we would have still got 0 above but this check just saves us an unnecessary hash calculation partitionOffset = 0; } // 计算 shardId ,基于 hash 来计算 return calculateScaledShardId(indexMetadata, effectiveRouting, partitionOffset); }6.1.4 转发请求
// 本节点作为协调节点,向目标 node 转发请求,或者目标是本地节点,直接通过函数调用读取数据.发送流程封装了对请求的发送,org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction#start6.1.5 发送请求
org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction#perform-- 这里由重试的逻辑;判断是否是本地节点;根据 shard 获取某个 节点,发送请求到数据节点请求。// 获取下一个路由final ShardRouting shardRouting = shardIt.nextOrNull();6.2 数据节点6.2.0 总入口:
将请求转发到数据节点后,由 org.elasticsearch.action.support.single.shard.TransportSingleShardAction.ShardTransportHandler 来接收请求,处理请求。
6.2.1 接收请求请求
/** * shard 分片的操作 */ private class ShardTransportHandler implements TransportRequestHandler<Request> { @Override public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception { if (logger.isTraceEnabled()) { logger.trace("executing [{}] on shard [{}]", request, request.internalShardId); } // 异步操作 asyncShardOperation(request, request.internalShardId, new ChannelActionListener<>(channel, transportShardAction, request)); } }6.2.2 转换 docId
// 代码入口:org.elasticsearch.index.get.ShardGetService#innerGet// docId 其实也是一个 Term,名字是 _id,为什么这里要做 uuid 的转换呢?因为很多id 都是不规整的。UUID 编码后,长度一致,做好了很好的性能优化,比如,可以采用哈夫曼树来优化、压缩。Term uidTerm = new Term(IdFieldMapper.NAME, Uid.encodeId(id));6.2.3 进入 GET 核心地方
进入 org.elasticsearch.index.engine.InternalEngine#get 这个方法,主要是 判断版本号是否冲突、seqNo 是否冲突。还有一个重点,是获取 Searcher。 虽然 acquireSearcher 这个方法是获取 Searcher 的,但是经过一遍代码查找后,然后各种条件限制,需要继续跳到之前 6.2.4
// acquireSearcher 这个方法是获取 Searcher 的。但是你getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper));6.2.4 获取 Search 入口(注意下方的 wrapSearcher)
/** * 读取数据 * @param get * @return */ public Engine.GetResult get(Engine.Get get) { // 校验是否允许读取数据 readAllowed(); // ES Mapper DocumentMapper mapper = mapperService.documentMapper(); if (mapper == null || mapper.type().equals(mapperService.resolveDocumentType(get.type())) == false) { return GetResult.NOT_EXISTS; } // 读取数据,这个 this:wrapperSearcher 方法最终在:org.elasticsearch.index.engine.Engine.SearcherSupplier.acquireSearcher 这里调用。最终的 search 是:ElasticsearchDirectoryReader return getEngine().get(get, mapper, this::wrapSearcher); }// 可能大家看了 wraSearcher 还是一脸懵逼,不知道 这个 searcher 怎么读取数据的。6.2.5 Searcher 初始化各种跳转、回调
6.2.6 Searcher 初始化
代码入口:org.elasticsearch.index.engine.InternalEngine#createReaderManager
org.elasticsearch.index.engine.InternalEngine#createReaderManager// 下面,我只摘出了重要的部分说明。下面的函数说明final ElasticsearchDirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId); // 大家一定要仔细看 DirectoryReader.open(indexWriter) 这个方法。很妖娆。//6.2.6 Searcher 初始化2
public static ElasticsearchDirectoryReader wrap(DirectoryReader reader, ShardId shardId) throws IOException { return new ElasticsearchDirectoryReader(reader, new SubReaderWrapper(shardId), shardId); } private static final class SubReaderWrapper extends FilterDirectoryReader.SubReaderWrapper { private final ShardId shardId; SubReaderWrapper(ShardId shardId) { this.shardId = shardId; } /** * ElasticsearchDirectoryReader 在这个类的构造函数,里面的 super 方法,执行了 wrap 回调。 * @param reader * @return */ @Override public LeafReader wrap(LeafReader reader) { // TODO 仔细看看这里的实现,最终是实现了 FilterLeafReader 这个类。最终还是回归到 segmentReader 这个类上,去读内容 return new ElasticsearchLeafReader(reader, shardId); } }6.3 取数数据6.3.1 代码入口:
在这里,会根据 Searcher 来读取数据。最终这个 searcher 是 segmentReader 这个类。
org.elasticsearch.index.get.ShardGetService#innerGetLoadFromStoredFields6.3.2 读取数据
// 引用了 Lucene 的包,根据 docId 去找数据,docIdAndVersion.reader.document(docIdAndVersion.docId, fieldVisitor);6.3.4 数据转换
FieldInfo fieldInfo = new FieldInfo(indexableField.name(), 0, false, false, false, IndexOptions.NONE, DocValuesType.NONE, -1, Collections.emptyMap(), 0, 0, 0, false); StoredFieldVisitor.Status status = fieldVisitor.needsField(fieldInfo); if (status == StoredFieldVisitor.Status.YES) { if (indexableField.numericValue() != null) { fieldVisitor.objectField(fieldInfo, indexableField.numericValue()); } else if (indexableField.binaryValue() != null) { fieldVisitor.binaryField(fieldInfo, indexableField.binaryValue()); } else if (indexableField.stringValue() != null) { fieldVisitor.objectField(fieldInfo, indexableField.stringValue()); } } else if (status == StoredFieldVisitor.Status.STOP) { break; }6.3.5 过滤
sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());7 总结ES GET API 里面,比较难懂的是 Searcher 究竟是怎么实例化的。如果读取失败了,还会继续读取。具体代码位置:org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction#onFailure8 附录
很多小伙伴还是想直观看看 ES 是怎么读取数据的,那么下面有一个 demo ,可以 DEBUG 跑个单元测试看看。
Directory dir =new SimpleFSDirectory(new File("F:/luc_dir").toPath()); IndexWriterConfig iwc = new IndexWriterConfig(null); iwc.setMaxBufferedDocs(100); iwc.setMergePolicy(NoMergePolicy.INSTANCE); IndexWriter iw = new IndexWriter(dir, iwc); // add two docs, id:0 and id:1 Document doc = new Document(); Field idField = new StringField("id", "", Field.Store.NO); doc.add(idField); idField.setStringValue("0"); iw.addDocument(doc); idField.setStringValue("1"); iw.addDocument(doc); // open reader ShardId shardId = new ShardId("fake", "_na_", 1); DirectoryReader ir = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(iw), shardId); ir.document(1); System.out.println("aa");
#elasticsearch#
标签: #哈弗曼算法压缩图片上传