龙空技术网

zookeeper配置中心实战之solrcloud zookeeper原理及源码分析

架构师笔记 210

前言:

而今兄弟们对“apachesolrdemo”都比较关注,小伙伴们都想要了解一些“apachesolrdemo”的相关内容。那么小编在网上网罗了一些有关“apachesolrdemo””的相关文章,希望大家能喜欢,同学们快快来了解一下吧!

程序的发展,需要引入集中配置:

随着程序功能的日益复杂,程序的配置日益增多:各种功能的开关、参数的配置、服务器的地址……并且对配置的期望也越来越高,配置修改后实时生效,灰度发布,分环境、分集群管理配置,完善的权限、审核机制……并且随着采用分布式的开发模式,项目之间的相互引用随着服务的不断增多,相互之间的调用复杂度成指数升高,每次投产或者上线新的项目时苦不堪言,因此需要引用配置中心治理。

有哪些开源配置中心

spring-cloud/spring-cloud-config【】spring出品,可以和spring cloud无缝配合淘宝 diamond【】已经不维护disconf【】java开发,蚂蚁金服技术专家发起ctrip apollo【】Apollo(阿波罗)是携程框架部门研发的开源配置管理中心,具备规范的权限、流程治理等特性。

配置中心的实现方式可以使用数据库如mysql,可以使用缓存数据如redis,mongodb等,也可以使用zookeeper,zookeeper的watcher特性使它天然具有配置中心的属性。

1.solr zookeeper配置中心搭建(windows环境)

1.1 下载solr

下载最新的solr

我此时下载的最新版本为 solr-7.7.0.zip

解压到本地目录

E:\demo\solr-7.7.0

1.2 启动solr

CMD进入bin目录下,执行 solr.cmd start -e cloud

按照提示创建solr cloud实例和分片和collection:

  techproducts

,详细参考官方文档:

回到E:\demo\solr-7.7.0目录,CMD执行导入数据命令:

  java -jar -Dc=techproducts -Dauto example\exampledocs\post.jar example\exampledocs\* 

1.3 访问admin

查看内置zookeeper状态

创建了一个9983端口的zk实例

1.4 使用ZooInspector监控查看

运行脚本

@echo offcd D:\software\zookeeper-3.4.6\ZooInspector\buildd:Java -Dfile.encoding=UTF-8 -jar zookeeper-dev-ZooInspector.jar

2.配置中心文件的上传,下载功能实现

本文仅实现上传功能,下载功能由读者自行实现

2.1 上传配置文件:进入E:\demo\solr-7.7.0/bin目录,CMD执行

solr.cmd zk cp ../LICENSE.txt zk:/test/LICENSE.txt -z localhost:9983

此时在zk下面创建了一个test目录,目录下面有一个license.txt节点,数据即为license.txt文件内容。

2.2 源码分析

用记事本打开solr.cmd命令,找到parse_zk_args参数的地方,发现解析完zk参数就启动了运行zk命令方法run_zk

:run_zkIF "!ZK_OP!"=="" ( set "ERROR_MSG=Invalid command specified for zk sub-command" goto zk_short_usage)IF "!ZK_HOST!"=="" ( set "ERROR_MSG=Must specify -z zkHost" goto zk_short_usage)IF "!ZK_OP!"=="-upconfig" ( set ZK_OP="upconfig")IF "!ZK_OP!"=="-downconfig" ( set ZK_OP="downconfig")IF "!ZK_OP!"=="upconfig" ( IF "!CONFIGSET_NAME!"=="" ( set ERROR_MSG="-n option must be set for upconfig" goto zk_short_usage ) IF "!CONFIGSET_DIR!"=="" ( set ERROR_MSG="The -d option must be set for upconfig." goto zk_short_usage ) "%JAVA%" %SOLR_SSL_OPTS% %AUTHC_OPTS% %SOLR_ZK_CREDS_AND_ACLS% -Dsolr.install.dir="%SOLR_TIP%" ^ -Dlog4j.configurationFile="\resources\log4j2-console.xml" ^ -classpath "%DEFAULT_SERVER_DIR%\solr-webapp\webapp\WEB-INF\lib\*;%DEFAULT_SERVER_DIR%\lib\ext\*" ^ org.apache.solr.util.SolrCLI !ZK_OP! -confname !CONFIGSET_NAME! -confdir !CONFIGSET_DIR! -zkHost !ZK_HOST! %ZK_VERBOSE%^ -configsetsDir "%SOLR_TIP%/server/solr/configsets") ELSE IF "!ZK_OP!"=="downconfig" ( IF "!CONFIGSET_NAME!"=="" ( set ERROR_MSG="-n option must be set for downconfig" goto zk_short_usage ) IF "!CONFIGSET_DIR!"=="" ( set ERROR_MSG="The -d option must be set for downconfig." goto zk_short_usage ) "%JAVA%" %SOLR_SSL_OPTS% %AUTHC_OPTS% %SOLR_ZK_CREDS_AND_ACLS% -Dsolr.install.dir="%SOLR_TIP%" ^ -Dlog4j.configurationFile="\resources\log4j2-console.xml" ^ -classpath "%DEFAULT_SERVER_DIR%\solr-webapp\webapp\WEB-INF\lib\*;%DEFAULT_SERVER_DIR%\lib\ext\*" ^ org.apache.solr.util.SolrCLI !ZK_OP! -confname !CONFIGSET_NAME! -confdir !CONFIGSET_DIR! -zkHost !ZK_HOST! %ZK_VERBOSE%) ELSE IF "!ZK_OP!"=="cp" ( IF "%ZK_SRC%"=="" ( set ERROR_MSG="<src> must be specified for 'cp' command" goto zk_short_usage ) IF "%ZK_DST%"=="" ( set ERROR_MSG=<dest> must be specified for 'cp' command" goto zk_short_usage ) IF NOT "!ZK_SRC:~0,3!"=="zk:" ( IF NOT "!%ZK_DST:~0,3!"=="zk:" ( set ERROR_MSG="At least one of src or dst must be prefixed by 'zk:'" goto zk_short_usage ) ) "%JAVA%" %SOLR_SSL_OPTS% %AUTHC_OPTS% %SOLR_ZK_CREDS_AND_ACLS% -Dsolr.install.dir="%SOLR_TIP%" ^ -Dlog4j.configurationFile="\resources\log4j2-console.xml" ^ -classpath "%DEFAULT_SERVER_DIR%\solr-webapp\webapp\WEB-INF\lib\*;%DEFAULT_SERVER_DIR%\lib\ext\*" ^ org.apache.solr.util.SolrCLI !ZK_OP! -zkHost !ZK_HOST! -src !ZK_SRC! -dst !ZK_DST! -recurse !ZK_RECURSE! %ZK_VERBOSE%) ELSE IF "!ZK_OP!"=="mv" ( IF "%ZK_SRC%"=="" ( set ERROR_MSG="<src> must be specified for 'mv' command" goto zk_short_usage ) IF "%ZK_DST%"=="" ( set ERROR_MSG="<dest> must be specified for 'mv' command" goto zk_short_usage ) "%JAVA%" %SOLR_SSL_OPTS% %AUTHC_OPTS% %SOLR_ZK_CREDS_AND_ACLS% -Dsolr.install.dir="%SOLR_TIP%" ^ -Dlog4j.configurationFile="\resources\log4j2-console.xml" ^ -classpath "%DEFAULT_SERVER_DIR%\solr-webapp\webapp\WEB-INF\lib\*;%DEFAULT_SERVER_DIR%\lib\ext\*" ^ org.apache.solr.util.SolrCLI !ZK_OP! -zkHost !ZK_HOST! -src !ZK_SRC! -dst !ZK_DST! %ZK_VERBOSE%) ELSE IF "!ZK_OP!"=="rm" ( IF "%ZK_SRC"=="" ( set ERROR_MSG="Zookeeper path to remove must be specified when using the 'rm' command" goto zk_short_usage ) "%JAVA%" %SOLR_SSL_OPTS% %AUTHC_OPTS% %SOLR_ZK_CREDS_AND_ACLS% -Dsolr.install.dir="%SOLR_TIP%" ^ -Dlog4j.configurationFile="\resources\log4j2-console.xml" ^ -classpath "%DEFAULT_SERVER_DIR%\solr-webapp\webapp\WEB-INF\lib\*;%DEFAULT_SERVER_DIR%\lib\ext\*" ^ org.apache.solr.util.SolrCLI !ZK_OP! -zkHost !ZK_HOST! -path !ZK_SRC! -recurse !ZK_RECURSE! %ZK_VERBOSE%) ELSE IF "!ZK_OP!"=="ls" ( IF "%ZK_SRC"=="" ( set ERROR_MSG="Zookeeper path to remove must be specified when using the 'ls' command" goto zk_short_usage ) "%JAVA%" %SOLR_SSL_OPTS% %AUTHC_OPTS% %SOLR_ZK_CREDS_AND_ACLS% -Dsolr.install.dir="%SOLR_TIP%" ^ -Dlog4j.configurationFile="\resources\log4j2-console.xml" ^ -classpath "%DEFAULT_SERVER_DIR%\solr-webapp\webapp\WEB-INF\lib\*;%DEFAULT_SERVER_DIR%\lib\ext\*" ^ org.apache.solr.util.SolrCLI !ZK_OP! -zkHost !ZK_HOST! -path !ZK_SRC! -recurse !ZK_RECURSE! %ZK_VERBOSE%) ELSE IF "!ZK_OP!"=="mkroot" ( IF "%ZK_SRC"=="" ( set ERROR_MSG="Zookeeper path to create must be specified when using the 'mkroot' command" goto zk_short_usage ) "%JAVA%" %SOLR_SSL_OPTS% %AUTHC_OPTS% %SOLR_ZK_CREDS_AND_ACLS% -Dsolr.install.dir="%SOLR_TIP%" ^ -Dlog4j.configurationFile="\resources\log4j2-console.xml" ^ -classpath "%DEFAULT_SERVER_DIR%\solr-webapp\webapp\WEB-INF\lib\*;%DEFAULT_SERVER_DIR%\lib\ext\*" ^ org.apache.solr.util.SolrCLI !ZK_OP! -zkHost !ZK_HOST! -path !ZK_SRC! %ZK_VERBOSE%) ELSE ( set ERROR_MSG="Unknown zk option !ZK_OP!" goto zk_short_usage)goto done

红色即为zk cp命令时执行

org.apache.solr.util.SolrCLI类

不同的命令,使用不同的Tool类

 // Creates an instance of the requested tool, using classpath scanning if necessary private static Tool newTool(String toolType) throws Exception { if ("healthcheck".equals(toolType)) return new HealthcheckTool(); else if ("status".equals(toolType)) return new StatusTool(); else if ("api".equals(toolType)) return new ApiTool(); else if ("create_collection".equals(toolType)) return new CreateCollectionTool(); else if ("create_core".equals(toolType)) return new CreateCoreTool(); else if ("create".equals(toolType)) return new CreateTool(); else if ("delete".equals(toolType)) return new DeleteTool(); else if ("config".equals(toolType)) return new ConfigTool(); else if ("run_example".equals(toolType)) return new RunExampleTool(); else if ("upconfig".equals(toolType)) return new ConfigSetUploadTool(); else if ("downconfig".equals(toolType)) return new ConfigSetDownloadTool(); else if ("rm".equals(toolType)) return new ZkRmTool(); else if ("mv".equals(toolType)) return new ZkMvTool(); else if ("cp".equals(toolType)) return new ZkCpTool(); else if ("ls".equals(toolType)) return new ZkLsTool(); else if ("mkroot".equals(toolType)) return new ZkMkrootTool(); else if ("assert".equals(toolType)) return new AssertTool(); else if ("utils".equals(toolType)) return new UtilsTool(); else if ("auth".equals(toolType)) return new AuthTool();

cp执行ZkCpTool的runImpl方法

 protected void runImpl(CommandLine cli) throws Exception { raiseLogLevelUnlessVerbose(cli); String zkHost = getZkHost(cli); if (zkHost == null) { throw new IllegalStateException("Solr at " + cli.getOptionValue("solrUrl") + " is running in standalone server mode, cp can only be used when running in SolrCloud mode.\n"); } try (SolrZkClient zkClient = new SolrZkClient(zkHost, 30000)) { echoIfVerbose("\nConnecting to ZooKeeper at " + zkHost + " ...", cli); String src = cli.getOptionValue("src"); String dst = cli.getOptionValue("dst"); Boolean recurse = Boolean.parseBoolean(cli.getOptionValue("recurse")); echo("Copying from '" + src + "' to '" + dst + "'. ZooKeeper at " + zkHost); boolean srcIsZk = src.toLowerCase(Locale.ROOT).startsWith("zk:"); boolean dstIsZk = dst.toLowerCase(Locale.ROOT).startsWith("zk:"); String srcName = src; if (srcIsZk) { srcName = src.substring(3); } else if (srcName.toLowerCase(Locale.ROOT).startsWith("file:")) { srcName = srcName.substring(5); } String dstName = dst; if (dstIsZk) { dstName = dst.substring(3); } else { if (dstName.toLowerCase(Locale.ROOT).startsWith("file:")) { dstName = dstName.substring(5); } } zkClient.zkTransfer(srcName, srcIsZk, dstName, dstIsZk, recurse); } catch (Exception e) { log.error("Could not complete the zk operation for reason: " + e.getMessage()); throw (e); } }

调用SolrZkClient的zkTransfer

 public void zkTransfer(String src, Boolean srcIsZk,  String dst, Boolean dstIsZk, Boolean recurse) throws SolrServerException, KeeperException, InterruptedException, IOException { ZkMaintenanceUtils.zkTransfer(this, src, srcIsZk, dst, dstIsZk, recurse); }

实现由ZkMaintenanceUtils来做

 /** * Copy between local file system and Zookeeper, or from one Zookeeper node to another, * optionally copying recursively. * * @param src Source to copy from. Both src and dst may be Znodes. However, both may NOT be local * @param dst The place to copy the files too. Both src and dst may be Znodes. However both may NOT be local * @param recurse if the source is a directory, reccursively copy the contents iff this is true. * @throws SolrServerException Explanatory exception due to bad params, failed operation, etc. * @throws KeeperException Could not perform the Zookeeper operation. * @throws InterruptedException Thread interrupted */ public static void zkTransfer(SolrZkClient zkClient, String src, Boolean srcIsZk, String dst, Boolean dstIsZk,  Boolean recurse) throws SolrServerException, KeeperException, InterruptedException, IOException { if (srcIsZk == false && dstIsZk == false) { throw new SolrServerException("One or both of source or destination must specify ZK nodes."); } // Make sure -recurse is specified if the source has children. if (recurse == false) { if (srcIsZk) { if (zkClient.getChildren(src, null, true).size() != 0) { throw new SolrServerException("Zookeeper node " + src + " has children and recurse is false"); } } else if (Files.isDirectory(Paths.get(src))) { throw new SolrServerException("Local path " + Paths.get(src).toAbsolutePath() + " is a directory and recurse is false"); } } if (dstIsZk && dst.length() == 0) { dst = "/"; // for consistency, one can copy from zk: and send to zk:/ } dst = normalizeDest(src, dst, srcIsZk, dstIsZk); // ZK -> ZK copy. if (srcIsZk && dstIsZk) { traverseZkTree(zkClient, src, VISIT_ORDER.VISIT_PRE, new ZkCopier(zkClient, src, dst)); return; } //local -> ZK copy if (dstIsZk) { uploadToZK(zkClient, Paths.get(src), dst, null); return; } // Copying individual files from ZK requires special handling since downloadFromZK assumes the node has children. // This is kind of a weak test for the notion of "directory" on Zookeeper. // ZK -> local copy where ZK is a parent node if (zkClient.getChildren(src, null, true).size() > 0) { downloadFromZK(zkClient, src, Paths.get(dst)); return; } // Single file ZK -> local copy where ZK is a leaf node if (Files.isDirectory(Paths.get(dst))) { if (dst.endsWith(File.separator) == false) dst += File.separator; dst = normalizeDest(src, dst, srcIsZk, dstIsZk); } byte[] data = zkClient.getData(src, null, null, true); Path filename = Paths.get(dst); Files.createDirectories(filename.getParent()); log.info("Writing file {}", filename); Files.write(filename, data); }

注意:这个copy 可以是本地文件到znode之间的copy,也可以是znode之间的copy

本文以本地文档到znode之间的copy

public static void uploadToZK(SolrZkClient zkClient, final Path fromPath, final String zkPath, final Pattern filenameExclusions) throws IOException { String path = fromPath.toString(); if (path.endsWith("*")) { path = path.substring(0, path.length() - 1); } final Path rootPath = Paths.get(path);  if (!Files.exists(rootPath)) throw new IOException("Path " + rootPath + " does not exist"); Files.walkFileTree(rootPath, new SimpleFileVisitor<Path>() { @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { String filename = file.getFileName().toString(); if (filenameExclusions != null && filenameExclusions.matcher(filename).matches()) { log.info("uploadToZK skipping '{}' due to filenameExclusions '{}'", filename, filenameExclusions); return FileVisitResult.CONTINUE; } String zkNode = createZkNodeName(zkPath, rootPath, file); try { // if the path exists (and presumably we're uploading data to it) just set its data if (file.toFile().getName().equals(ZKNODE_DATA_FILE) && zkClient.exists(zkNode, true)) { zkClient.setData(zkNode, file.toFile(), true); } else { zkClient.makePath(zkNode, file.toFile(), false, true); } } catch (KeeperException | InterruptedException e) { throw new IOException("Error uploading file " + file.toString() + " to zookeeper path " + zkNode, SolrZkClient.checkInterrupted(e)); } return FileVisitResult.CONTINUE; } @Override public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { if (dir.getFileName().toString().startsWith(".")) return FileVisitResult.SKIP_SUBTREE; return FileVisitResult.CONTINUE; } }); }

文件转换为byte流数据,设置到znode

 /** * Write file to ZooKeeper - default system encoding used. * * @param path path to upload file to e.g. /solr/conf/solrconfig.xml * @param file path to file to be uploaded */ public Stat setData(String path, File file, boolean retryOnConnLoss) throws IOException, KeeperException, InterruptedException { log.debug("Write to ZooKeeper: {} to {}", file.getAbsolutePath(), path); byte[] data = FileUtils.readFileToByteArray(file); return setData(path, data, retryOnConnLoss); /** * Returns node's state */ public Stat setData(final String path, final byte data[], final int version, boolean retryOnConnLoss) throws KeeperException, InterruptedException { if (retryOnConnLoss) { return zkCmdExecutor.retryOperation(() -> keeper.setData(path, data, version)); } else { return keeper.setData(path, data, version); } }

具体实现为ZooKeeper本身的客户端ZooKeeper.java

 /** * Set the data for the node of the given path if such a node exists and the * given version matches the version of the node (if the given version is * -1, it matches any node's versions). Return the stat of the node. * <p> * This operation, if successful, will trigger all the watches on the node * of the given path left by getData calls. * <p> * A KeeperException with error code KeeperException.NoNode will be thrown * if no node with the given path exists. * <p> * A KeeperException with error code KeeperException.BadVersion will be * thrown if the given version does not match the node's version. * <p> * The maximum allowable size of the data array is 1 MB (1,048,576 bytes). * Arrays larger than this will cause a KeeperException to be thrown. * * @param path * the path of the node * @param data * the data to set * @param version * the expected matching version * @return the state of the node * @throws InterruptedException If the server transaction is interrupted. * @throws KeeperException If the server signals an error with a non-zero error code. * @throws IllegalArgumentException if an invalid path is specified */ public Stat setData(final String path, byte data[], int version) throws KeeperException, InterruptedException { final String clientPath = path; PathUtils.validatePath(clientPath); final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.setData); SetDataRequest request = new SetDataRequest(); request.setPath(serverPath); request.setData(data); request.setVersion(version); SetDataResponse response = new SetDataResponse(); ReplyHeader r = cnxn.submitRequest(h, request, response, null); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } return response.getStat(); }

3. 小结

1.solrcloud使用zookeeper实现了配置中心,入口函数为ZkCLI.java ,它使用命令模式内部封装了一系列命令,如:healthcheck,status,api,config,upconfig,downconfig,mv,cp,ls,mkroot等命令

2.最终调用的是apache zookeeper本身的api,如果为文件的话,需要先转为byte流,然后存入znode节点中。

参考文献:

【1】

【2】

标签: #apachesolrdemo