前言:
今天大家对“java读取sftp文件”大体比较关心,同学们都想要了解一些“java读取sftp文件”的相关文章。那么小编在网摘上网罗了一些有关“java读取sftp文件””的相关知识,希望姐妹们能喜欢,咱们快快来学习一下吧!写在前面有这样一个需求以文件的方式定期给集团同步增量数据,我想把所有的静态数据抽离出来,通过配置文件的方式需求比较简单,所以用选择pythoh配置文件用yaml,写了一个小模块 实现配置文件读入内存为配置字典实现配置文件的动态加载读入内存为配置字典实现配置字典由内存导出静态文件理解错误的地方请小伙伴批评指正
「 我只是怕某天死了,我的生命却一无所有。----《奇幻之旅》」
这里需要说明的是,常说的动态加载配置,一般基于观察者设计模式实现的发布/订阅系统,一般有两种模式,分别是推(Push)模式和拉(Pull)模式。
推模式:服务端主动将数据更新发送给所有订阅的客户端,拉模式:由客户端主动发起请求来获取最新数据,通常客户端都采用定时进行轮询拉取的方式。
我们这里只是提供了一个可以动态加载配置文件刷新配置对象的方法,把配置对象定义为单例,刷新的时候把当前存在的配置对象干掉,然后从新加载配置文件生成新的配置对象。即通过拉(Pull)的方式实现。这里配置对象为主题,使用配置对象的多个代码为观察者。
先来看一下脚本yaml_util.py
#!/usr/bin/env python3# -*- encoding: utf-8 -*-"""@File : yaml_util.py@Time : 2022/03/22 14:10:46@Author : Li Ruilong@Version : 1.0@Contact : 1224965096@qq.com@Desc : 加载配置文件pip install pyyaml """# here put the import libimport osimport timeimport yamlimport loggingimport jsonlogging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')class Yaml: _config = None def __new__(cls, *args, **kw): # hasattr函数用于判断对象是否包含对应的属性。 if not hasattr(cls, '_instance'): cls._instance = object.__new__(cls) return cls._instance def __init__(self, file_name="config.yaml"): config_temp = None try: # 获取当前脚本所在文件夹路径 cur_path = os.path.dirname(os.path.realpath(__file__)) # 获取yaml文件路径 yaml_path = os.path.join(cur_path, file_name) f = open(yaml_path, 'r', encoding='utf-8') config_temp = f.read() except Exception as e: logging.info("配置文件加载失败", e) finally: f.close() self._config = yaml.safe_load(config_temp) # 用load方法转化 def __str__(self): return json.dumps(self._config) def __del__(self): self._config = None self = None @staticmethod def get_config(file_name="config.yaml"): return Yaml(file_name)._config @staticmethod def refresh_config(cls, file_name="config.yaml"): del cls return Yaml(file_name)._configdef set_config(contain, file_name="config_.yaml"): # 配置字典由内存导入静态文件 cur_path = os.path.dirname(os.path.realpath(__file__)) yaml_path = os.path.join(cur_path, file_name) with open(yaml_path, 'w', encoding='utf-8') as f: yaml.dump(contain, f)def get_yaml_config(file_name="config.yaml"): # 配置文件读入内存为配置字典 return Yaml.get_config(file_name)def refresh_yaml_config(cls, file_name="config.yaml"): # 配置文件的动态加载读入内存为字典 return Yaml.refresh_config(cls,file_name)if __name__ == '__main__': my_yaml_1 = Yaml() my_yaml_2 = Yaml() #id关键字可用来查看对象在内存中的存放位置 print(id(my_yaml_1) == id(my_yaml_2)) time.sleep(10) # 修改配置文件后从新加载配置字典会刷新 refresh_yaml_config(my_yaml_1)
上面是写好加载配置类模块,下面为定义的配置文件。
# mysql数据库相关配置mysql: db_host: "127.0.0.1" db_port: 3306 db_user: "root" db_password: "root" db_name: "uamdb"# ssh相关配置ssh: ssh_hostname: "192.168.26.55" ssh_username: "root" ssh_password: 'redhat' ssh_port: 22#文件模式UAG+省份标识+年月日+序号.txtfile_name_template: "UAG07{0}{1}.txt"# 本地文件存储位置local_file_path: "./uam/sftp/"# 集团服务器上传文件位置remote_path: "/app/sftp/ftpuam07/increment/"# 上传周期period: date: YEAR #MINUTE,HOUR ,DAY,WEEK,MONTH,YEAR offset: 1 #几天,几周,几月 前的数据# 增量数据模板: 文件内容格式:账号,账号类型,账号密码,客户ID,密码加密类型,省份编码,账号状态template: - line: "{unified_code}^{unified_type}^{unified_pwd}^{cust_codes}^Pwd007^07^{unified_state}" sql: "SELECT a.unified_code as unified_code ,a.unified_type as unified_type ,a.unified_pwd as unified_pwd \ ,a.unified_state as unified_state , b.cust_code as cust_codes FROM AU_UNIFIED a INNER JOIN AU_PRODUCT b \ ON a.unified_code = b.product_code AND a.chg_date > SUBDATE( NOW( ), INTERVAL {0} {1} )" - line: "{product_code}^{prod_type}^{product_pwd}^{cust_code}^{pwd_type}^07^St001" sql: "SELECT product_code, product_pwd,cust_code,prod_type, \ (CASE WHEN a.prod_type='2000004' then 'Pwd001' WHEN a.prod_type='2000001' then 'Pwd001' WHEN a.prod_type='2000002' then 'Pwd001' \ WHEN a.prod_type='2110008' then 'Pwd001' WHEN a.prod_type='2110011' then 'Pwd001' ELSE 'Pwd001' END) as pwd_type \ from AU_PRODUCT a WHERE a.chg_date > SUBDATE( NOW( ), INTERVAL {0} {1} )"
「如何使用」
下面为一个数据库连接池工具类的使用。
.....import yaml_utilfrom dbutils.pooled_db import PooledDBclass MysqlPool: def __init__(self): mysql = yaml_util.get_config()["mysql"] print(mysql) self.POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, host=mysql["db_host"], port=mysql["db_port"], user=mysql["db_user"], password=mysql["db_password"], database=mysql["db_name"], charset='utf8' )...................
「关于如何触发刷新配置文件方法」
我们这里修改完配置文件通过UI界面主动调用函数加载。其他项目场景个人觉得可以通过心跳或者探针的机制传递文件摘要信息串(通过MD5,SHA等信息摘要算法生成)进行比对,具体的手段可以通过类似脏值轮询检查或者数据劫持等方式
「关于观察者设计模式,是一个很常用的设计模式」
基于MVVM模式的前端框架中,双向数据绑定特性,如Vue.js都是基于此,在系统运行过程中,一旦系统中的数据模型发生了变化,观察者 Observer的setter访问器属性就会被触发,此时消息订阅中心会遍历它所维护的所有订阅者,对于每一个订阅了该数据的对象,向它发出一个更新通知,订阅者收到通知后就会对视图进行相应的更新。以上过程不断往复循环,这就是MVVM模式在Vue.js中的运行原理。
后端分布式一致性解决方案中,使用ZooKeeper做配置中心时,也是基于观察者模式,采用的是推拉相结合的方式,客户端向服务端注册自己需要关注的节点,一旦该节点的数据发生变更,那么服务端就会向相应的客户端发送Watcher事件通知,客户端接收到这个消息通知之后,需要主动到服务端获取最新的数据。将配置信息存放到ZooKeeper上进行集中管理,应用在启动的时候都会主动到ZooKeeper服务端上进行一次配置信息的获取,同时,在指定节点上注册一个Watcher监听,这样一来,但凡配置信息发生变更,服务端都会实时通知到所有订阅的客户端,从而达到实时获取最新配置信息的目的。
包括JDK很早的版本就有观察者模式的实现,java.util包内包含最基本的Observer接口与Observable类,不过主题类(Observable)定义成了一个基本类不是接口,所以只能通过继承的方式实现,考虑的继承的可维护性太差,一般不怎么使用。
「关于单例模式」
单例模式是很常用的一种设计模式,即在整个生命周期中,对于该一个类生产的对象始终都是一个,不曾变化。保证了一个类仅有一个实例,并提供一个访问它的全局访问点。
单例的优点有很多,GOF中这样描述:
对唯一实例的受控访问,缩小名空间,Singleton模式是对全局变量的一种改进。它避免了那些存储唯一实例的全局变量污染名空间。允许对操作和表示的精化,Singleton类可以有子类,而且用这个扩展类的实例来配置一个应用是很容易的。你可以用你所需要的类的实例在运行时刻配置应用等等,感兴趣小伙伴可以去看看《设计模式_可复用面向对象软件的基础》