龙空技术网

大数据工作流DAG源码分享

从今天开始锻炼身体 81

前言:

此刻兄弟们对“javadag算法”大体比较关切,同学们都想要剖析一些“javadag算法”的相关文章。那么小编也在网上收集了一些有关“javadag算法””的相关文章,希望大家能喜欢,我们快快来学习一下吧!

话不多说直接上源码

package com.sckr.graph;import com.sckr.utils.CollectionUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.*;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;/** * analysis of DAG * Node: node 节点主键 * NodeInfo:node description information * EdgeInfo: edge description information */public class DAG<Node, NodeInfo, EdgeInfo> {  private static final Logger logger = LoggerFactory.getLogger(DAG.class);  private final ReadWriteLock lock = new ReentrantReadWriteLock();  /**   * node map, key is node, value is node information   */  private volatile Map<Node, NodeInfo> nodesMap;  /**   * edge map. key is node of origin;value is Map with key for destination node and value for edge   */  private volatile Map<Node, Map<Node, EdgeInfo>> edgesMap;  /**   * reversed edge set,key is node of destination, value is Map with key for origin node and value for edge   */  private volatile Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap;  public DAG() {    nodesMap = new HashMap<>();    edgesMap = new HashMap<>();    reverseEdgesMap = new HashMap<>();  }  /**   * add node information   *   * @param node          node   * @param nodeInfo      node information   */  public void addNode(Node node, NodeInfo nodeInfo) {    lock.writeLock().lock();    try{      nodesMap.put(node, nodeInfo);    }finally {      lock.writeLock().unlock();    }  }  /**   * add edge   * @param fromNode node of origin   * @param toNode   node of destination   * @return The result of adding an edge. returns false if the DAG result is a ring result   */  public boolean addEdge(Node fromNode, Node toNode) {    return addEdge(fromNode, toNode, false);  }  /**   * add edge   * @param fromNode        node of origin   * @param toNode          node of destination   * @param createNode      whether the node needs to be created if it does not exist   * @return The result of adding an edge. returns false if the DAG result is a ring result   */  private boolean addEdge(Node fromNode, Node toNode, boolean createNode) {    return addEdge(fromNode, toNode, null, createNode);  }  /**   * add edge   *   * @param fromNode        node of origin   * @param toNode          node of destination   * @param edge            edge description   * @param createNode      whether the node needs to be created if it does not exist   * @return The result of adding an edge. returns false if the DAG result is a ring result   */  public boolean addEdge(Node fromNode, Node toNode, EdgeInfo edge, boolean createNode) {    lock.writeLock().lock();    try{      // Whether an edge can be successfully added(fromNode -> toNode)      if (!isLegalAddEdge(fromNode, toNode, createNode)) {        logger.error("serious error: add edge({} -> {}) is invalid, cause cycle!", fromNode, toNode);        return false;      }      addNodeIfAbsent(fromNode, null);      addNodeIfAbsent(toNode, null);      addEdge(fromNode, toNode, edge, edgesMap);      addEdge(toNode, fromNode, edge, reverseEdgesMap);      return true;    }finally {      lock.writeLock().unlock();    }  }  /**   * whether this node is contained   *   * @param node node   * @return true if contains   */  public boolean containsNode(Node node) {    lock.readLock().lock();    try{      return nodesMap.containsKey(node);    }finally {      lock.readLock().unlock();    }  }  /**   * whether this edge is contained   *   * @param fromNode node of origin   * @param toNode   node of destination   * @return true if contains   */  public boolean containsEdge(Node fromNode, Node toNode) {    lock.readLock().lock();    try{      Map<Node, EdgeInfo> endEdges = edgesMap.get(fromNode);      if (endEdges == null) {        return false;      }     return endEdges.containsKey(toNode);    }finally {      lock.readLock().unlock();    }  }  /**   * get node description   *   * @param node node   * @return node description   */  public NodeInfo getNode(Node node) {    lock.readLock().lock();    try{      return nodesMap.get(node);    }finally {      lock.readLock().unlock();    }  }  /**   * Get the number of nodes   *   * @return the number of nodes   */  public int getNodesCount() {    lock.readLock().lock();    try{      return nodesMap.size();    }finally {      lock.readLock().unlock();    }  }  /**   * Get the number of edges   *   * @return the number of edges   */  public int getEdgesCount() {    lock.readLock().lock();    try{      int count = 0;      for (Map.Entry<Node, Map<Node, EdgeInfo>> entry : edgesMap.entrySet()) {        count += entry.getValue().size();      }      return count;    }finally {      lock.readLock().unlock();    }  }  /**   * get the start node of DAG   *   * @return the start node of DAG   */  public Collection<Node> getBeginNode() {    lock.readLock().lock();    try{      return CollectionUtils.subtract(nodesMap.keySet(), reverseEdgesMap.keySet());    }finally {      lock.readLock().unlock();    }  }  /**   * get the end node of DAG   *   * @return the end node of DAG   */  public Collection<Node> getEndNode() {    lock.readLock().lock();    try{      return CollectionUtils.subtract(nodesMap.keySet(), edgesMap.keySet());    }finally {      lock.readLock().unlock();    }  }  /**   * Gets all previous nodes of the node   *   * @param node node id to be calculated   * @return all previous nodes of the node   */  public Set<Node> getPreviousNodes(Node node) {    lock.readLock().lock();    try{      return getNeighborNodes(node, reverseEdgesMap);    }finally {      lock.readLock().unlock();    }  }  /**   * Get all subsequent nodes of the node   *   * @param node node id to be calculated   * @return all subsequent nodes of the node   */  public Set<Node> getSubsequentNodes(Node node) {    lock.readLock().lock();    try{      return getNeighborNodes(node, edgesMap);    }finally {      lock.readLock().unlock();    }  }  /**   * Gets the degree of entry of the node   *   * @param node node id   * @return the degree of entry of the node   */  public int getIndegree(Node node) {    lock.readLock().lock();    try{      return getPreviousNodes(node).size();    }finally {      lock.readLock().unlock();    }  }  /**   * whether the graph has a ring   *   * @return true if has cycle, else return false.   */  public boolean hasCycle() {    lock.readLock().lock();    try{        return !topologicalSortImpl().getKey();    }finally {      lock.readLock().unlock();    }  }  /**   * Only DAG has a topological sort   * @return topologically sorted results, returns false if the DAG result is a ring result   * @throws Exception errors   */  public List<Node> topologicalSort() throws Exception {    lock.readLock().lock();    try{      Map.Entry<Boolean, List<Node>> entry = topologicalSortImpl();      if (entry.getKey()) {        return entry.getValue();      }      throw new Exception("serious error: graph has cycle ! ");    }finally {      lock.readLock().unlock();    }  }  /**   *  if tho node does not exist,add this node   *   * @param node    node   * @param nodeInfo node information   */  private void addNodeIfAbsent(Node node, NodeInfo nodeInfo) {    if (!containsNode(node)) {      addNode(node, nodeInfo);    }  }  /**   * add edge   *   * @param fromNode node of origin   * @param toNode   node of destination   * @param edge  edge description   * @param edges edge set   */  private void addEdge(Node fromNode, Node toNode, EdgeInfo edge, Map<Node, Map<Node, EdgeInfo>> edges) {    edges.putIfAbsent(fromNode, new HashMap<>());    Map<Node, EdgeInfo> toNodeEdges = edges.get(fromNode);    toNodeEdges.put(toNode, edge);  }  /**   * Whether an edge can be successfully added(fromNode -> toNode)   * need to determine whether the DAG has cycle   *   * @param fromNode     node of origin   * @param toNode       node of destination   * @param createNode whether to create a node   * @return true if added   */  private boolean isLegalAddEdge(Node fromNode, Node toNode, boolean createNode) {      if (fromNode.equals(toNode)) {          logger.error("edge fromNode({}) can't equals toNode({})", fromNode, toNode);          return false;      }      if (!createNode) {          if (!containsNode(fromNode) || !containsNode(toNode)){              logger.error("edge fromNode({}) or toNode({}) is not in vertices map", fromNode, toNode);              return false;          }      }      // Whether an edge can be successfully added(fromNode -> toNode),need to determine whether the DAG has cycle!      int verticesCount = getNodesCount();      Queue<Node> queue = new LinkedList<>();      queue.add(toNode);      // if DAG doesn't find fromNode, it's not has cycle!      while (!queue.isEmpty() && (--verticesCount > 0)) {          Node key = queue.poll();          for (Node subsequentNode : getSubsequentNodes(key)) {              if (subsequentNode.equals(fromNode)) {                  return false;              }              queue.add(subsequentNode);          }      }      return true;  }  /**   * Get all neighbor nodes of the node   *   * @param node   Node id to be calculated   * @param edges neighbor edge information   * @return all neighbor nodes of the node   */  private Set<Node> getNeighborNodes(Node node, final Map<Node, Map<Node, EdgeInfo>> edges) {    final Map<Node, EdgeInfo> neighborEdges = edges.get(node);    if (neighborEdges == null) {      return Collections.EMPTY_MAP.keySet();    }    return neighborEdges.keySet();  }  /**   * Determine whether there are ring and topological sorting results   *   * Directed acyclic graph (DAG) has topological ordering   * Breadth First Search:   *    1、Traversal of all the vertices in the graph, the degree of entry is 0 vertex into the queue   *    2、Poll a vertex in the queue to update its adjacency (minus 1) and queue the adjacency if it is 0 after minus 1   *    3、Do step 2 until the queue is empty   * If you cannot traverse all the nodes, it means that the current graph is not a directed acyclic graph.   * There is no topological sort.   *   *   * @return key Returns the state   * if success (acyclic) is true, failure (acyclic) is looped,   * and value (possibly one of the topological sort results)   */  private Map.Entry<Boolean, List<Node>> topologicalSortImpl() {    // node queue with degree of entry 0    Queue<Node> zeroIndegreeNodeQueue = new LinkedList<>();    // save result    List<Node> topoResultList = new ArrayList<>();    // save the node whose degree is not 0    Map<Node, Integer> notZeroIndegreeNodeMap = new HashMap<>();    // Scan all the vertices and push vertexs with an entry degree of 0 to queue    for (Map.Entry<Node, NodeInfo> vertices : nodesMap.entrySet()) {      Node node = vertices.getKey();      int inDegree = getIndegree(node);      if (inDegree == 0) {        zeroIndegreeNodeQueue.add(node);        topoResultList.add(node);      } else {        notZeroIndegreeNodeMap.put(node, inDegree);      }    }    /**     * After scanning, there is no node with 0 degree of entry,     * indicating that there is a ring, and return directly     */    if(zeroIndegreeNodeQueue.isEmpty()){      return new AbstractMap.SimpleEntry(false, topoResultList);    }    // The topology algorithm is used to delete nodes with 0 degree of entry and its associated edges    while (!zeroIndegreeNodeQueue.isEmpty()) {      Node v = zeroIndegreeNodeQueue.poll();      // Get the neighbor node      Set<Node> subsequentNodes = getSubsequentNodes(v);      for (Node subsequentNode : subsequentNodes) {        Integer degree = notZeroIndegreeNodeMap.get(subsequentNode);        if(--degree == 0){          topoResultList.add(subsequentNode);          zeroIndegreeNodeQueue.add(subsequentNode);          notZeroIndegreeNodeMap.remove(subsequentNode);        }else{          notZeroIndegreeNodeMap.put(subsequentNode, degree);        }      }    }    // if notZeroIndegreeNodeMap is empty,there is no ring!    AbstractMap.SimpleEntry resultMap = new AbstractMap.SimpleEntry(notZeroIndegreeNodeMap.size() == 0 , topoResultList);    return resultMap;  }}

DAG单元测试代码

import org.junit.After;import org.junit.Before;import org.junit.Test;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import static org.junit.Assert.*;public class DAGTest {  private DAG<Integer, String, String> graph;  private static final Logger logger = LoggerFactory.getLogger(DAGTest.class);  @Before  public void setup() {    graph = new DAG<>();  }  @After  public void tearDown() {    clear();  }  private void clear() {    graph = null;    graph = new DAG<>();    assertEquals(graph.getNodesCount(), 0);  }  private void makeGraph() {    clear();    //         1->2    //         2->5    //         3->5    //         4->6    //         5->6    //         6->7    for (int i = 1; i <= 7; ++i) {      graph.addNode(i, "v(" + i + ")");    }    // construction side    assertTrue(graph.addEdge(1, 2));    assertTrue(graph.addEdge(2, 5));    assertTrue(graph.addEdge(3, 5));    assertTrue(graph.addEdge(4, 6));    assertTrue(graph.addEdge(5, 6));    assertTrue(graph.addEdge(6, 7));    assertEquals(graph.getNodesCount(), 7);    assertEquals(graph.getEdgesCount(), 6);  }  /**   * add node   */  @Test  public void testAddNode() {    clear();    graph.addNode(1, "v(1)");    graph.addNode(2, null);    graph.addNode(5, "v(5)");    assertEquals(graph.getNodesCount(), 3);    assertEquals(graph.getNode(1), "v(1)");    assertTrue(graph.containsNode(1));    assertFalse(graph.containsNode(10));  }  /**   * add edge   */  @Test  public void testAddEdge() {    clear();    assertFalse(graph.addEdge(1, 2, "edge(1 -> 2)", false));    graph.addNode(1, "v(1)");    assertTrue(graph.addEdge(1, 2, "edge(1 -> 2)",true));    graph.addNode(2, "v(2)");    assertTrue(graph.addEdge(1, 2, "edge(1 -> 2)",true));    assertFalse(graph.containsEdge(1, 3));    assertTrue(graph.containsEdge(1, 2));    assertEquals(graph.getEdgesCount(), 1);    int node = 3;    graph.addNode(node, "v(3)");    assertFalse(graph.addEdge(node, node));  }  /**   * add subsequent node   */  @Test  public void testSubsequentNodes() {    makeGraph();    assertEquals(graph.getSubsequentNodes(1).size(), 1);  }  /**   * test indegree   */  @Test  public void testIndegree() {    makeGraph();    assertEquals(graph.getIndegree(1), 0);    assertEquals(graph.getIndegree(2), 1);    assertEquals(graph.getIndegree(3), 0);    assertEquals(graph.getIndegree(4), 0);  }  /**   * test begin node   */  @Test  public void testBeginNode() {    makeGraph();    assertEquals(graph.getBeginNode().size(), 3);    assertTrue(graph.getBeginNode().contains(1));    assertTrue(graph.getBeginNode().contains(3));    assertTrue(graph.getBeginNode().contains(4));  }  /**   * test end node   */  @Test  public void testEndNode() {    makeGraph();    assertEquals(graph.getEndNode().size(), 1);    assertTrue(graph.getEndNode().contains(7));  }  /**   * test cycle   */  @Test  public void testCycle() {    clear();    for (int i = 1; i <= 5; ++i) {      graph.addNode(i, "v(" + i + ")");    }    // construction side    try {      graph.addEdge(1, 2);      graph.addEdge(2, 3);      graph.addEdge(3, 4);      assertFalse(graph.hasCycle());    } catch (Exception e) {      e.printStackTrace();      fail();    }    try {      boolean addResult = graph.addEdge(4, 1);      if(!addResult){        assertTrue(true);      }      graph.addEdge(5, 1);      assertFalse(graph.hasCycle());    } catch (Exception e) {      e.printStackTrace();      fail();    }    clear();    // construction node    for (int i = 1; i <= 5; ++i) {      graph.addNode(i, "v(" + i +")");    }    // construction side, 1->2, 2->3, 3->4    try {      graph.addEdge(1, 2);      graph.addEdge(2, 3);      graph.addEdge(3, 4);      graph.addEdge(4, 5);      graph.addEdge(5, 2);//会失败,添加不进去,所以下一步无环      assertFalse(graph.hasCycle());    } catch (Exception e) {      e.printStackTrace();      fail();    }  }  @Test  public void testTopologicalSort(){    makeGraph();    try {      // topological result is : 1 3 4 2 5 6 7      List<Integer> topoList = new ArrayList<>();      topoList.add(1);      topoList.add(3);      topoList.add(4);      topoList.add(2);      topoList.add(5);      topoList.add(6);      topoList.add(7);      assertEquals(graph.topologicalSort(),topoList);    } catch (Exception e) {      e.printStackTrace();      fail();    }  }  @Test  public void testTopologicalSort2() {    clear();    graph.addEdge(1, 2, null, true);    graph.addEdge(2, 3, null, true);    graph.addEdge(3, 4, null, true);    graph.addEdge(4, 5, null, true);    graph.addEdge(5, 1, null, false); //The loop will fail to add    try {      List<Integer> topoList = new ArrayList<>();// topological result is : 1 2 3 4 5      topoList.add(1);      topoList.add(2);      topoList.add(3);      topoList.add(4);      topoList.add(5);      assertEquals(graph.topologicalSort(),topoList);    } catch (Exception e) {      e.printStackTrace();      fail();    }  }  @Test  public void testTopologicalSort3() throws Exception {    clear();    //         1->2    //         1->3    //         2->5    //         3->4    //         4->6    //         5->6    //         6->7    //         6->8    for (int i = 1; i <= 8; ++i) {      graph.addNode(i, "v(" + i + ")");    }    // construction node    assertTrue(graph.addEdge(1, 2));    assertTrue(graph.addEdge(1, 3));    assertTrue(graph.addEdge(2, 5));    assertTrue(graph.addEdge(3, 4));    assertTrue(graph.addEdge(4, 6));    assertTrue(graph.addEdge(5, 6));    assertTrue(graph.addEdge(6, 7));    assertTrue(graph.addEdge(6, 8));    assertEquals(graph.getNodesCount(), 8);    logger.info(Arrays.toString(graph.topologicalSort().toArray()));    List<Integer> expectedList = new ArrayList<>();    for (int i = 1; i <= 8; ++i) {      expectedList.add(i);      logger.info(i + " subsequentNodes : " + graph.getSubsequentNodes(i));    }    logger.info(6 + "  previousNodesb: " + graph.getPreviousNodes(6));    assertEquals(5, graph.getSubsequentNodes(2).toArray()[0]);  }  @Test  public void testTopologicalSort4() {    clear();    try {      graph.topologicalSort();    } catch (Exception e) {      assertTrue(e.getMessage().contains("serious error: graph has cycle"));    }  }}

干货分享,持续关注!

标签: #javadag算法