前言:
目前兄弟们对“java模拟卷”大致比较注意,同学们都需要了解一些“java模拟卷”的相关文章。那么小编在网上网罗了一些关于“java模拟卷””的相关文章,希望我们能喜欢,看官们快快来了解一下吧!Gossip是什么
Gossip协议是一个通信协议,一种传播消息的方式,灵感来自于:瘟疫、社交网络等。使用Gossip协议的有:Redis Cluster、Consul、Apache Cassandra等。
原理
Gossip协议基本思想就是:一个节点想要分享一些信息给网络中的其他的一些节点。于是,它周期性的随机选择一些节点,并把信息传递给这些节点。这些收到信息的节点接下来会做同样的事情,即把这些信息传递给其他一些随机选择的节点。一般而言,信息会周期性的传递给N个目标节点,而不只是一个。这个N被称为fanout(这个单词的本意是扇出)。
例子
我们这里假设用N个节点,然后其中某一个节点的数据改变后,所有节点都需要改成相同的(最终相同),举个例子,有10个节点
1,2,3,4,5,6,7,8,9,10
假设扇出是4,然后节点1的数据变化
第一次扩散、节点1随机从剩下的节点中取四个通知
假设取到的是2,3,4,5,那么第一次扩散就已经有1,2,3,4,5个节点一致了
第二次扩散、节点2,3,4,5分别从剩下的节点中随机取四个
假设2取到的是6,7,8,9;3取到的是7,8,9,10,也有可能2,3,4,5都取到6,7,8,9
那么剩下的节点10就得第三次扩散才能扩散过去。
这里因为每个节点都是异步的,可以不考虑是否扩散成功,所以有些节点可能会多次扩散到,所以我下面的例子获取的扩散次数其实假设只扩散一次,实际应该会比我下面的例子次数多一点,举个例子,redis有多个节点,节点2和节点3准备扩散的时候,同时选了节点4,此时节点4状态还是为扩散的,所以会被扩散2次,如果节点数多,重复扩散数目会更多。
代码1、主程序GossipTest
public class GossipTest { public static void main(String[] args) { //节点数目 int num = 20; //扇出也就是每次扩散的节点数 int fanout = 4; List<Node> nodes = new ArrayList<Node>(); for (int i = 1; i <=num; i++) { Node node = new Node(); node.setFanout(fanout); node.setIndex(i); node.setNodes(nodes); nodes.add(node); } //表明这个是第一次传播 ConsistentUtil.map.put(nodes.get(0).getIndex(),0); nodes.get(0).setData("大家都设置为1"); try { Thread.sleep(10000); System.out.println("到结束,总共扩散了"+ConsistentUtil.max+"次"+nodes); } catch (InterruptedException e) { } }}
作用就是根据节点的数目和扇出的数目初始化相关设置,然后假设第一个节点数据变化,触发了扩散,当然这里的扩散只能用线程模拟,真实情况可能是RPC调用别的服务器的资源。
2、节点Node
/** * 每一个节点,最终目的是每个节点的数据最终一致 */class Node{ //节点顺序号 private int index; //设置扇出 private int fanout; //节点数据,这里简单的用字符串代替 private String data; //持有所有节点的位置(不需要考虑并发,重复执行,最终一致性即可) private List<Node> nodes; public int getIndex() { return index; } public void setIndex(int index) { this.index = index; } public String getData() { return data; } public void setData(String data) { this.data = data; //这里设置完消息后需要进行传播消息,这里启动一个线程来模拟 //但是这里可能会导致二次调用,所以这里要锁一下 try { //这里捕获异常,允许传播失败 new ConsistentUtil(nodes, this).start(); } catch (Exception e) { e.printStackTrace(); } } public List<Node> getNodes() { return nodes; } public void setNodes(List<Node> nodes) { this.nodes = nodes; } public int getFanout() { return fanout; } public void setFanout(int fanout) { this.fanout = fanout; } @Override public String toString() { return "Node [index=" + index + ", data=" + data + "]"; }}
上面的属性有节点编号,扇出,节点数据,以及所有节点信息,这里就单纯用list来保存所有节点信息了,并且所有节点信息的修改不需要加任何的锁,因为扩散可能是多次的,失败后会有别的节点继续扩散,这样的话就完全解耦合了。,扩散的触发点是如下代码
public void setData(String data) { this.data = data; //这里设置完消息后需要进行传播消息,这里启动一个线程来模拟 //但是这里可能会导致二次调用,所以这里要锁一下 try { //这里捕获异常,允许传播失败 new ConsistentUtil(nodes, this).start(); } catch (Exception e) { e.printStackTrace(); } }
但是世界情况,可能是数据改变后,会有个固定的时间去进行扩散,而不会立马,并且不通的应用场景有不通的需求,这里的话暂时接收到改变就立马处理。扩散的方式也是启动一个线程去处理,并且该线程运行失败也不用处理,因为就算这个节点没有扩散成功,会有别的节点扩散成功的,除非扩散的节点真的一直挂掉了,那就是另一个使用场景了。
3、扩散线程
class ConsistentUtil extends Thread{ public static Map<Integer,Integer> map = new ConcurrentHashMap<Integer,Integer>(); public static int max=0; public static synchronized void compare(int num) { if(num>max) { max=num; //System.out.println("最大循环次数:"+max); } } //持有所有节点的位置(不需要考虑并发,重复执行,最终一致性即可) private List<Node> nodes; //执行的node private Node node; public ConsistentUtil(List<Node> nodes, Node node) { super(); this.nodes = nodes; this.node = node; } @Override public void run() { //1、先检查是否全部都已经一致了 boolean flag = checkConsistent(); if(!flag) { //随机获取fanout个Node List<Node> choice = this.getChoiceNodes(); if(choice.size()>0) { //传播的次数加1 for (Node node : choice) { //System.out.println(this.node.getIndex()+"向"+node.getIndex()+"进行传播"); //获取上一个节点扩散的次数 Integer num = map.get(this.node.getIndex()); //本次扩散次数加1 map.put(node.getIndex(),num+1); node.setData(this.node.getData()); } } }else { //这里表示结束了 System.out.println(this.node.getIndex()+"已经结束了"); count(); } } private boolean checkConsistent() { Node pre = null; for (Node node : nodes) { if(pre==null) { pre=node; }else { if(pre.getData().equals(node.getData())) { pre = node; }else { //有不一致的 return false; } } } return true; } //选择没有被传播的节点 private List<Node> getChoiceNodes() { List<Node> noSets = new ArrayList<Node>(); for (Node n : nodes) { //这个没有传播过,且不是自己,继续传播 if(!this.node.getData().equals(n.getData())&&(this.node.getIndex()!=n.getIndex())) { noSets.add(n); } } List<Node> choice = new ArrayList<Node>(); //1、获取当前节点数目 int num = noSets.size(); //2、随机获取fanout个下标 if(num<=node.getFanout()) { //这里就获取全部 return noSets; }else { int[] indexs = this.getRandomNums(node.getFanout(), num); for (int i : indexs) { choice.add(noSets.get(i)); } } return choice; } /** * 从all里面随机产生size个不重复的下标 * @param size 需要产生的下标数 * @param all 范围 * @return */ private int[] getRandomNums(int size,int all) { if(size>all) { System.err.println("size大于all"+size+">"+all); return null; } SecureRandom random = new SecureRandom(); //把all当成一个list List<Integer> list = new ArrayList<Integer>(); for(int i=0;i<all;i++) { list.add(i); } int[] result = new int[size]; for(int j=0;j<size;j++) { //随机生成一个下标 int index = random.nextInt(list.size()); //根据下标去取list中的值 result[j]=list.get(index); //从list移除该值 list.remove(index); } return result; } //通过map统计次数 private static void count() { int max = 0; for (Map.Entry<Integer, Integer> entry : map.entrySet()) { //Integer mapKey = entry.getKey(); Integer mapValue = entry.getValue(); if(mapValue>max) { max = mapValue; } } compare(max); }}
原理就是,从所有节点中随机获取扇出(fanout)个数据还没有一致的节点,然后修改它们的值。
那么下面就是运行测试数据扇出是4.
节点数目
20
60
180
500
1000
3000
8000
20000
扩散次数
4
6
8
9
10
13
18
18
可以看到,随着节点数的增加,扩散次数其实变化的不大,毕竟扩散是指数级别递增的。
当然我上面的例子,肯定还有很多没考虑的地方,不过大体思路应是这样,只是随手用java模拟一下下而已,有错误麻烦指正下下。
标签: #java模拟卷