龙空技术网

用Java模拟一个Gossip协议的简单例子

小林软件工作室 105

前言:

目前兄弟们对“java模拟卷”大致比较注意,同学们都需要了解一些“java模拟卷”的相关文章。那么小编在网上网罗了一些关于“java模拟卷””的相关文章,希望我们能喜欢,看官们快快来了解一下吧!

Gossip是什么

Gossip协议是一个通信协议,一种传播消息的方式,灵感来自于:瘟疫、社交网络等。使用Gossip协议的有:Redis Cluster、Consul、Apache Cassandra等。

原理

Gossip协议基本思想就是:一个节点想要分享一些信息给网络中的其他的一些节点。于是,它周期性的随机选择一些节点,并把信息传递给这些节点。这些收到信息的节点接下来会做同样的事情,即把这些信息传递给其他一些随机选择的节点。一般而言,信息会周期性的传递给N个目标节点,而不只是一个。这个N被称为fanout(这个单词的本意是扇出)。

例子

我们这里假设用N个节点,然后其中某一个节点的数据改变后,所有节点都需要改成相同的(最终相同),举个例子,有10个节点

1,2,3,4,5,6,7,8,9,10

假设扇出是4,然后节点1的数据变化

第一次扩散、节点1随机从剩下的节点中取四个通知

假设取到的是2,3,4,5,那么第一次扩散就已经有1,2,3,4,5个节点一致了

第二次扩散、节点2,3,4,5分别从剩下的节点中随机取四个

假设2取到的是6,7,8,9;3取到的是7,8,9,10,也有可能2,3,4,5都取到6,7,8,9

那么剩下的节点10就得第三次扩散才能扩散过去。

这里因为每个节点都是异步的,可以不考虑是否扩散成功,所以有些节点可能会多次扩散到,所以我下面的例子获取的扩散次数其实假设只扩散一次,实际应该会比我下面的例子次数多一点,举个例子,redis有多个节点,节点2和节点3准备扩散的时候,同时选了节点4,此时节点4状态还是为扩散的,所以会被扩散2次,如果节点数多,重复扩散数目会更多。

代码1、主程序GossipTest

public class GossipTest {    public static void main(String[] args) {        //节点数目        int num = 20;        //扇出也就是每次扩散的节点数        int fanout = 4;        List<Node> nodes = new ArrayList<Node>();        for (int i = 1; i <=num; i++) {            Node node = new Node();            node.setFanout(fanout);            node.setIndex(i);            node.setNodes(nodes);            nodes.add(node);        }        //表明这个是第一次传播        ConsistentUtil.map.put(nodes.get(0).getIndex(),0);        nodes.get(0).setData("大家都设置为1");        try {            Thread.sleep(10000);            System.out.println("到结束,总共扩散了"+ConsistentUtil.max+"次"+nodes);        } catch (InterruptedException e) {        }    }}

作用就是根据节点的数目和扇出的数目初始化相关设置,然后假设第一个节点数据变化,触发了扩散,当然这里的扩散只能用线程模拟,真实情况可能是RPC调用别的服务器的资源。

2、节点Node

/** *     每一个节点,最终目的是每个节点的数据最终一致 */class Node{    //节点顺序号    private int index;    //设置扇出    private int fanout;    //节点数据,这里简单的用字符串代替    private String data;    //持有所有节点的位置(不需要考虑并发,重复执行,最终一致性即可)    private List<Node> nodes;    public int getIndex() {        return index;    }    public void setIndex(int index) {        this.index = index;    }    public String getData() {        return data;    }    public void setData(String data) {        this.data = data;        //这里设置完消息后需要进行传播消息,这里启动一个线程来模拟        //但是这里可能会导致二次调用,所以这里要锁一下        try {            //这里捕获异常,允许传播失败            new ConsistentUtil(nodes, this).start();        } catch (Exception e) {            e.printStackTrace();        }    }    public List<Node> getNodes() {        return nodes;    }    public void setNodes(List<Node> nodes) {        this.nodes = nodes;    }    public int getFanout() {        return fanout;    }    public void setFanout(int fanout) {        this.fanout = fanout;    }    @Override    public String toString() {        return "Node [index=" + index + ", data=" + data + "]";    }}

上面的属性有节点编号,扇出,节点数据,以及所有节点信息,这里就单纯用list来保存所有节点信息了,并且所有节点信息的修改不需要加任何的锁,因为扩散可能是多次的,失败后会有别的节点继续扩散,这样的话就完全解耦合了。,扩散的触发点是如下代码

public void setData(String data) {        this.data = data;        //这里设置完消息后需要进行传播消息,这里启动一个线程来模拟        //但是这里可能会导致二次调用,所以这里要锁一下        try {            //这里捕获异常,允许传播失败            new ConsistentUtil(nodes, this).start();        } catch (Exception e) {            e.printStackTrace();        }    }

但是世界情况,可能是数据改变后,会有个固定的时间去进行扩散,而不会立马,并且不通的应用场景有不通的需求,这里的话暂时接收到改变就立马处理。扩散的方式也是启动一个线程去处理,并且该线程运行失败也不用处理,因为就算这个节点没有扩散成功,会有别的节点扩散成功的,除非扩散的节点真的一直挂掉了,那就是另一个使用场景了。

3、扩散线程

class ConsistentUtil extends Thread{    public static Map<Integer,Integer> map = new ConcurrentHashMap<Integer,Integer>();    public static int max=0;    public static synchronized void compare(int num) {        if(num>max) {            max=num;            //System.out.println("最大循环次数:"+max);        }    }    //持有所有节点的位置(不需要考虑并发,重复执行,最终一致性即可)    private List<Node> nodes;    //执行的node    private Node node;    public ConsistentUtil(List<Node> nodes, Node node) {        super();        this.nodes = nodes;        this.node = node;    }    @Override    public void run() {        //1、先检查是否全部都已经一致了        boolean flag = checkConsistent();        if(!flag) {            //随机获取fanout个Node            List<Node> choice = this.getChoiceNodes();            if(choice.size()>0) {                //传播的次数加1                for (Node node : choice) {                    //System.out.println(this.node.getIndex()+"向"+node.getIndex()+"进行传播");                    //获取上一个节点扩散的次数                    Integer num = map.get(this.node.getIndex());                    //本次扩散次数加1                    map.put(node.getIndex(),num+1);                    node.setData(this.node.getData());                }            }        }else {            //这里表示结束了            System.out.println(this.node.getIndex()+"已经结束了");            count();        }    }    private boolean checkConsistent() {        Node pre = null;        for (Node node : nodes) {            if(pre==null) {                pre=node;            }else {                if(pre.getData().equals(node.getData())) {                    pre = node;                }else {                    //有不一致的                    return false;                }            }        }        return true;    }    //选择没有被传播的节点    private List<Node> getChoiceNodes() {        List<Node> noSets = new ArrayList<Node>();        for (Node n : nodes) {            //这个没有传播过,且不是自己,继续传播            if(!this.node.getData().equals(n.getData())&&(this.node.getIndex()!=n.getIndex())) {                noSets.add(n);            }        }        List<Node> choice = new ArrayList<Node>();        //1、获取当前节点数目        int num = noSets.size();        //2、随机获取fanout个下标        if(num<=node.getFanout()) {            //这里就获取全部            return noSets;        }else {            int[] indexs = this.getRandomNums(node.getFanout(), num);            for (int i : indexs) {                choice.add(noSets.get(i));            }        }        return choice;    }    /**         * 从all里面随机产生size个不重复的下标        * @param size 需要产生的下标数    * @param all 范围    * @return    */    private int[] getRandomNums(int size,int all) {        if(size>all) {            System.err.println("size大于all"+size+">"+all);            return null;        }        SecureRandom random = new SecureRandom();        //把all当成一个list        List<Integer> list = new ArrayList<Integer>();        for(int i=0;i<all;i++) {            list.add(i);        }        int[] result = new int[size];        for(int j=0;j<size;j++) {            //随机生成一个下标            int index = random.nextInt(list.size());            //根据下标去取list中的值            result[j]=list.get(index);            //从list移除该值            list.remove(index);        }        return result;    }    //通过map统计次数    private static void count() {        int max = 0;        for (Map.Entry<Integer, Integer> entry : map.entrySet()) {            //Integer mapKey = entry.getKey();            Integer mapValue = entry.getValue();            if(mapValue>max) {                max = mapValue;            }        }        compare(max);    }}

原理就是,从所有节点中随机获取扇出(fanout)个数据还没有一致的节点,然后修改它们的值。

那么下面就是运行测试数据扇出是4.

节点数目

20

60

180

500

1000

3000

8000

20000

扩散次数

4

6

8

9

10

13

18

18

可以看到,随着节点数的增加,扩散次数其实变化的不大,毕竟扩散是指数级别递增的。

当然我上面的例子,肯定还有很多没考虑的地方,不过大体思路应是这样,只是随手用java模拟一下下而已,有错误麻烦指正下下。

标签: #java模拟卷