龙空技术网

Spark Graphx 实现图中极大团挖掘,伪并行化算法

AI科技园 122

前言:

目前姐妹们对“clique算法代码java”大概比较关切,你们都想要了解一些“clique算法代码java”的相关知识。那么小编也在网摘上收集了一些对于“clique算法代码java””的相关文章,希望小伙伴们能喜欢,咱们一起来了解一下吧!

背景:

spark graphx并未提供极大团挖掘算法当下的极大团算法都是串行化的算法,基于Bron–Kerbosch算法

思路:

spark graphx提供了连通图的算法,连通图和极大团都是无向图中的概念,极大团为连通图的子集利用spark graphx 找出连通图,在从各个连通图中,利用串行化的极大团算法,找出极大团 (伪并行化)对于关联性较强的图,找出来的连通图非常大,这时串行化的极大团算法,仍然会耗时很久,这里利用剪枝的思想减少样本数据量,但是对于大图,优化空间有限期待真正的并行化的极大团算法

配置文件:

graph_data_path=hdfs://localhost/graph_dataout_path=hdfs://localhost/cliqueck_path=hdfs://localhost/checkpointnumIter=50		剪枝次数count=3			极大团顶点数大小algorithm=2		极大团算法,1:个人实现 2:jgraphtpercent=90		剪枝后的顶点数,占前一次的百分比,如果剪完后,还剩下90%的数据,那么剪枝效率已然不高spark.master=localspark.app.name=graphspark.serializer=org.apache.spark.serializer.KryoSerializerspark.yarn.executor.memoryOverhead=20480spark.yarn.driver.memoryOverhead=20480spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+UseCompressedOops -XX:+DisableExplicitGCspark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+UseCompressedOops -XX:+DisableExplicitGCspark.driver.maxResultSize=10gspark.default.parallelism=60

jgrapht

样本数据:

{"src":"0","dst":"1"}{"src":"0","dst":"2"}{"src":"0","dst":"3"}{"src":"1","dst":"0"}{"src":"2","dst":"1"}{"src":"3","dst":"5"}{"src":"4","dst":"6"}{"src":"5","dst":"4"}{"src":"6","dst":"5"}{"src":"3","dst":"2"}{"src":"2","dst":"3"}{"src":"6","dst":"4"}{"src":"3","dst":"4"}{"src":"4","dst":"3"}{"src":"2","dst":"6"}{"src":"6","dst":"2"}{"src":"6","dst":"7"}{"src":"7","dst":"6"}

输出:

0,1,20,2,33,4,54,5,6

代码实现:

import java.utilimport java.util.Propertiesimport org.apache.spark.broadcast.Broadcastimport org.apache.spark.graphx.{Edge, Graph}import org.apache.spark.rdd.RDDimport org.apache.spark.sql.{Row, SQLContext}import org.apache.spark.storage.StorageLevelimport org.apache.spark.{SparkConf, SparkContext}import org.jgrapht.alg.BronKerboschCliqueFinderimport org.jgrapht.graph.{DefaultEdge, SimpleGraph}import scala.collection.JavaConverters._import scala.collection.mutableobject ApplicationTitan {	def main(args: Array[String]) {	 val prop = new Properties()	 prop.load(getClass.getResourceAsStream("/config.properties"))		 val graph_data_path = prop.getProperty("graph_data_path")	 val out_path = prop.getProperty("out_path")	 val ck_path = prop.getProperty("ck_path")	 val count = Integer.parseInt(prop.getProperty("count"))	 val numIter = Integer.parseInt(prop.getProperty("numIter"))	 val algorithm = Integer.parseInt(prop.getProperty("algorithm"))	 val percent = Integer.parseInt(prop.getProperty("percent"))	 val conf = new SparkConf()	 try {	 Runtime.getRuntime.exec("hdfs dfs -rm -r " + out_path)// Runtime.getRuntime.exec("cmd.exe /C rd /s /q " + out_path)	 } catch {	 case ex: Exception =>	 ex.printStackTrace(System.out)	 }		 prop.stringPropertyNames().asScala.foreach(s => {	 if (s.startsWith("spark")) {	 conf.set(s, prop.getProperty(s))	 }	 })	 conf.registerKryoClasses(Array(getClass))	 val sc = new SparkContext(conf)	 sc.setLogLevel("ERROR")	 sc.setCheckpointDir(ck_path)	 val sqlc = new SQLContext(sc)	 try {	 val e_df = sqlc.read// .json(graph_data_path) 	.parquet(graph_data_path)	 var e_rdd = e_df	 .mapPartitions(it => {	 it.map({	 case Row(dst: String, src: String) =>	 val src_long = src.toLong	 val dst_long = dst.toLong	 if (src_long < dst_long) (src_long, dst_long) else (dst_long, src_long)	 })	 }).distinct()	 e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)		 var bc: Broadcast[Set[Long]] = null	 var iter = 0	 var bc_size = 0		 //剪枝	 while (iter <= numIter) {	 val temp = e_rdd	 .flatMap(x => List((x._1, 1), (x._2, 1)))	 .reduceByKey((x, y) => x + y)	 .filter(x => x._2 >= count - 1)	 .mapPartitions(it => it.map(x => x._1))	 val bc_value = temp.collect().toSet	 bc = sc.broadcast(bc_value)	 e_rdd = e_rdd.filter(x => bc.value.contains(x._1) && bc.value.contains(x._2))	 e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)	 iter += 1	 if (bc_size != 0 && bc_value.size >= bc_size * percent / 100) {	 println("total iter : "+ iter)	 iter = Int.MaxValue	 }	 bc_size = bc_value.size	 }		 // 构造图	 val edge: RDD[Edge[Long]] = e_rdd.mapPartitions(it => it.map(x => Edge(x._1, x._2)))	 val graph = Graph.fromEdges(edge, 0, StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER)		 //连通图	 val cc = graph.connectedComponents().vertices	 cc.persist(StorageLevel.MEMORY_AND_DISK_SER)		 cc.join(e_rdd)	 .mapPartitions(it => it.map(x => ((math.random * 10).toInt.toString.concat(x._2._1.toString), (x._1, x._2._2))))	 .aggregateByKey(List[(Long, Long)]())((list, v) => list :+ v, (list1, list2) => list1 ::: list2)	 .mapPartitions(it => it.map(x => (x._1.substring(1), x._2)))	 .aggregateByKey(List[(Long, Long)]())((list1, list2) => list1 ::: list2, (list3, list4) => list3 ::: list4)	 .filter(x => x._2.size >= count - 1)	 .flatMap(x => {	 if (algorithm == 1)	 find(x, count)	 else	 find2(x, count)	 })	 .mapPartitions(it => {	 it.map({	 case set =>	 var temp = ""	 set.asScala.foreach(x => temp += x + ",")	 temp.substring(0, temp.length - 1)	 case _ =>	 })	 })	// .coalesce(1) .saveAsTextFile(out_path)}	catch { case ex: Exception => ex.printStackTrace(System.out)	}	sc.stop()}//自己实现的极大团算法 def find(x: (String, List[(Long, Long)]), count: Int): mutable.Set[util.Set[String]] = { println(x._1 + "|s|" + x._2.size) println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis()) val neighbors = new util.HashMap[String, util.Set[String]] val finder = new CliqueFinder(neighbors, count) x._2.foreach(r => { val v1 = r._1.toString val v2 = r._2.toString if (neighbors.containsKey(v1)) { neighbors.get(v1).add(v2) } else { val temp = new util.HashSet[String]() temp.add(v2) neighbors.put(v1, temp) } if (neighbors.containsKey(v2)) { neighbors.get(v2).add(v1) } else { val temp = new util.HashSet[String]() temp.add(v1) neighbors.put(v2, temp) } }) println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis()) finder.findMaxCliques().asScala}//jgrapht 中的极大团算法 def find2(x: (String, List[(Long, Long)]), count: Int): Set[util.Set[String]] = { println(x._1 + "|s|" + x._2.size) println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis()) val to_clique = new SimpleGraph[String, DefaultEdge](classOf[DefaultEdge]) x._2.foreach(r => { val v1 = r._1.toString val v2 = r._2.toString to_clique.addVertex(v1) to_clique.addVertex(v2) to_clique.addEdge(v1, v2) }) val finder = new BronKerboschCliqueFinder(to_clique) val list = finder.getAllMaximalCliques.asScala var result = Set[util.Set[String]]() list.foreach(x => { if (x.size() >= count) result = result + x }) println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis()) result}}

自己实现的极大团算法:

import java.util.*;/** * [@author]() mopspecial@gmail.com * [@date]() 2017/7/31 */public class CliqueFinder { private Map<String, Set<String>> neighbors; private Set<String> nodes; private Set<Set<String>> maxCliques = new HashSet<>(); private Integer minSize; public CliqueFinder(Map<String, Set<String>> neighbors, Integer minSize) { this.neighbors = neighbors; this.nodes = neighbors.keySet(); this.minSize = minSize; } private void bk3(Set<String> clique, List<String> candidates, List<String> excluded) { if (candidates.isEmpty() && excluded.isEmpty()) { if (!clique.isEmpty() && clique.size() >= minSize) { maxCliques.add(clique); } return; } for (String s : degeneracy_order(candidates)) { List<String> new_candidates = new ArrayList<>(candidates); new_candidates.retainAll(neighbors.get(s)); List<String> new_excluded = new ArrayList<>(excluded); new_excluded.retainAll(neighbors.get(s)); Set<String> nextClique = new HashSet<>(clique); nextClique.add(s); bk2(nextClique, new_candidates, new_excluded); candidates.remove(s); excluded.add(s); } } private void bk2(Set<String> clique, List<String> candidates, List<String> excluded) { if (candidates.isEmpty() && excluded.isEmpty()) { if (!clique.isEmpty() && clique.size() >= minSize) { maxCliques.add(clique); } return; } String pivot = pick_random(candidates); if (pivot == null) { pivot = pick_random(excluded); } List<String> tempc = new ArrayList<>(candidates); tempc.removeAll(neighbors.get(pivot)); for (String s : tempc) { List<String> new_candidates = new ArrayList<>(candidates); new_candidates.retainAll(neighbors.get(s)); List<String> new_excluded = new ArrayList<>(excluded); new_excluded.retainAll(neighbors.get(s)); Set<String> nextClique = new HashSet<>(clique); nextClique.add(s); bk2(nextClique, new_candidates, new_excluded); candidates.remove(s); excluded.add(s); } } private List<String> degeneracy_order(List<String> innerNodes) { List<String> result = new ArrayList<>(); Map<String, Integer> deg = new HashMap<>(); for (String node : innerNodes) { deg.put(node, neighbors.get(node).size()); } while (!deg.isEmpty()) { Integer min = Collections.min(deg.values()); String minKey = null; for (String key : deg.keySet()) { if (deg.get(key).equals(min)) { minKey = key; break; } } result.add(minKey); deg.remove(minKey); for (String k : neighbors.get(minKey)) { if (deg.containsKey(k)) { deg.put(k, deg.get(k) - 1); } } } return result; } private String pick_random(List<String> random) { if (random != null && !random.isEmpty()) { return random.get(0); } else { return null; } } public Set<Set<String>> findMaxCliques() { this.bk3(new HashSet<>(), new ArrayList<>(nodes), new ArrayList<>()); return maxCliques; } public static void main(String[] args) { Map<String, Set<String>> neighbors = new HashMap<>(); neighbors.put("0", new HashSet<>(Arrays.asList("1", "2", "3"))); neighbors.put("1", new HashSet<>(Arrays.asList("0", "2"))); neighbors.put("2", new HashSet<>(Arrays.asList("0", "1", "3", "6"))); neighbors.put("3", new HashSet<>(Arrays.asList("0", "2", "4", "5"))); neighbors.put("4", new HashSet<>(Arrays.asList("3", "5", "6"))); neighbors.put("5", new HashSet<>(Arrays.asList("3", "4", "6"))); neighbors.put("6", new HashSet<>(Arrays.asList("2", "4", "5"))); neighbors.put("7", new HashSet<>(Arrays.asList("6"))); CliqueFinder finder = new CliqueFinder(neighbors, 3); finder.bk3(new HashSet<>(), new ArrayList<>(neighbors.keySet()), new ArrayList<>()); System.out.println(finder.maxCliques); }}

标签: #clique算法代码java