龙空技术网

手写动态数据源连接池管理

IT技术控 152

前言:

现在咱们对“vbnet复制数据库表”大概比较着重,姐妹们都需要分析一些“vbnet复制数据库表”的相关资讯。那么小编同时在网络上搜集了一些对于“vbnet复制数据库表””的相关内容,希望大家能喜欢,姐妹们一起来学习一下吧!

手写动态数据源连接池管理

对于公司的中台系统而言,采集和告警的作用是很关键的。

在数据治理领域,告警可以帮助程序员对不同表的字段缺失,数量统计做有效的根因定位。对于一些重要指标的监控,我们需要对此限定阈值。

而告警以及多任务执行的源头也就是数据源的管理,一些中大型公司的后台系统,往往藏着多个数据库,有大数据领域的数据库如Doris,关系型数据库如MySQL。

对于采集任务而言,即使是同一类型的数据库,不同的的库也需要进行定位。

我们简化一下场景,现在有10条采集规则,每个采集规则30s执行一次,这10条采集规则分别都来自于不同的库,表,每次采集的时间是不定的,可能是10s,也有可能是30s,短一些的有5s。

如果说,我们对每一条采集SQL,都是先定位数据源,执行完后断开。再定位另一个数据源,执行完后再断开。每一次采集都需要去新建一个线程去创建连接,无法像固定的数据源一样去静态的把连接线程放进线程池里面,这该如何是好?

问题思考

原本我们后端系统一般是一个后端一个库,因此我们方便静态的为这个库创建数据库的连接池。为此我们需要研发一套高性能的,能将新的连接复用到连接池的东西。

结合上图来看,若两个连接,它的(url+username+password)组成的key相同,我们就认为他是可以在同一个连接池中进行复用的。

我们可以对于这个连接池集合做一个限定,防止它来一个数据源创建一个线程池。现在有一个问题,如果连接池集合都占满了,连接池1,2都在有线程在使用,而3,4里面没有线程使用。

而此时我们会想到,把3或者4这些没有使用到的线程池移除不就好了。

没错,问题是,移除3,还是移除4?

其实这也很好想到,也就是通过观测它过去的使用频率来判定哪一个需要被移除。

想完这些我们就可以开始写一个小的Demo了。

代码实战

java复制代码package org.example;import org.apache.commons.dbcp2.BasicDataSource;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.util.*;import java.util.concurrent.ConcurrentHashMap;public class DynamicConnPoolCache {    private static DynamicConnPoolCache cache;    private ConcurrentHashMap<String,MysqlConnPool> connectPoolMap;    private ConcurrentHashMap<String,Integer> useMap;    private ConcurrentHashMap<String,Integer> rateMap;    private ConcurrentHashMap<String,Long> createTimeMap;    private int maxConnectNum;    private int connectNum;    public DynamicConnPoolCache(){    }    public static DynamicConnPoolCache connPoolCache(int maxConnectNum){        if(null == cache){            synchronized (DynamicConnPoolCache.class){                cache = new DynamicConnPoolCache();                cache.connectPoolMap = new ConcurrentHashMap<>(maxConnectNum);                cache.maxConnectNum = maxConnectNum;                cache.connectNum = 0;                cache.rateMap = new ConcurrentHashMap<>(maxConnectNum);                cache.useMap = new ConcurrentHashMap<>(maxConnectNum);                cache.createTimeMap = new ConcurrentHashMap<>(maxConnectNum);            }        }        return cache;    }    public boolean put(String key,MysqlConnPool value){        //删除某个连接如果已达到最大设置连接数,先删除未在使用,并且使用评率最低的连接        if(connectNum > maxConnectNum){            String needRemoveKey = chooseRemoveKey();            if(null == needRemoveKey){                //连接池数已达上限,并且都在使用中                return false;            }            deleteConnPool(needRemoveKey);        }        //保存连接        connectPoolMap.put(key, value);        useMap.put(key, 0);        rateMap.put(key, 0);        createTimeMap.put(key, System.currentTimeMillis());        connectNum++;        return true;    }    public void close(){        for(String key : connectPoolMap.keySet()){            deleteConnPool(key);        }    }    public void deleteConnPool(String remove_key) {        MysqlConnPool remove_value = connectPoolMap.get(remove_key);        //关闭连接        remove_value.close();        //从maps清除key        connectPoolMap.remove(remove_key);        useMap.remove(remove_key);        rateMap.remove(remove_key);        createTimeMap.remove(remove_key);        connectNum--;    }    public MysqlConnPool get(String key){        if(connectPoolMap.containsKey(key)){            //正在使用的用户数+1            useMap.put(key, useMap.get(key)+1);            //历史使用数+1            rateMap.put(key, rateMap.get(key)+1);        }        return connectPoolMap.get(key);    }    public boolean useOver(String key,Connection conn,PreparedStatement st,ResultSet rs){        MysqlConnPool.release(conn, st, rs);        //正在使用的用户数-1        try {            useMap.put(key, useMap.get(key)-1);        } catch (Exception e) {            System.out.println(e.getStackTrace());            return false;        }        return true;    }    public String chooseRemoveKey(){        Map<String,Double> k_r = new HashMap<>();//存储未被使用的连接的使用评率(总使用数/(now-createTime))        useMap.forEach((k,v)->{            if(v<1){                k_r.put(k,rateMap.get(k)/(double)System.currentTimeMillis()-createTimeMap.get(k));            }        });        //取使用率最小的key        if(k_r.size()>0){            List<Map.Entry<String, Double>> entryList = new ArrayList(k_r.entrySet());            Collections.sort(entryList,(e1, e2)->(e1.getValue()>=e2.getValue() ? 1 : -1));            return entryList.get(0).getKey();        }        return null;    }}

对于这段代码,还是会有一定的并发问题的。对于put和get方法会有读写并发问题,put方法会有写写并发问题。

csharp复制代码    private ReentrantLock lock = new ReentrantLock();
scss复制代码    public boolean put(String key,MysqlConnPool value){        //删除某个连接如果已达到最大设置连接数,先删除未在使用,并且使用评率最低的连接        final ReentrantLock currentLock = this.lock;        currentLock.lock();        try {            if(connectNum >= maxConnectNum){                String needRemoveKey = chooseRemoveKey();                if(null == needRemoveKey){                    //连接池数已达上限,并且都在使用中                    return false;                }                deleteConnPool(needRemoveKey);            }            //保存连接            connectPoolMap.put(key, value);            useMap.put(key, 0);            rateMap.put(key, 0);            createTimeMap.put(key, System.currentTimeMillis());            connectNum++;        }finally {            currentLock.unlock();        }        return true;    }
vbnet复制代码    public MysqlConnPool get(String key){        final ReentrantLock currentLock = this.lock;        currentLock.lock();        try {            if(connectPoolMap.containsKey(key)){                //正在使用的用户数+1                useMap.put(key, useMap.get(key)+1);                //历史使用数+1                rateMap.put(key, rateMap.get(key)+1);            }            return connectPoolMap.get(key);        }finally {            lock.unlock();        }    }

标签: #vbnet复制数据库表