前言:
如今你们对“oracle删除database”大致比较看重,小伙伴们都想要了解一些“oracle删除database”的相关内容。那么小编同时在网上网罗了一些对于“oracle删除database””的相关资讯,希望朋友们能喜欢,你们快快来学习一下吧!需要源码可以直接联系
方案一:
直接套用脚本,需可以看懂一些脚本逻辑
改代码中 类名为 OracleConnector,它可以同时连接多个 Oracle 数据库,并提供执行增删改查操作的方法。
import cx_Oracleclass OracleConnector: def __init__(self, databases): self.connections = {} for db in databases: conn = cx_Oracle.connect( user=db['username'], password=db['password'], dsn=db['dsn'], encoding=db.get('encoding', 'UTF-8') ) self.connections[db['name']] = conn def execute_query(self, database_name, query, params=None): with self.connections[database_name].cursor() as cursor: cursor.execute(query, params) result = cursor.fetchall() return result def execute_non_query(self, database_name, query, params=None): with self.connections[database_name].cursor() as cursor: cursor.execute(query, params) self.connections[database_name].commit() return cursor.rowcount def execute_query_all(self, databases, query, params=None): results = {} for db in databases: database_name = db['name'] results[database_name] = self.execute_query(database_name, query, params) return results def execute_non_query_all(self, databases, query, params=None): results = {} for db in databases: database_name = db['name'] try: res = self.execute_non_query(database_name, query, params) results[database_name] = res except cx_Oracle.DatabaseError as e: print(f"Error occurred when running query on {database_name}: {e}") self.connections[database_name].rollback() return results
以下是代码解释:
import cx_Oracle:导入 cx_Oracle 库,用于与 Oracle 数据库进行连接和交互。class OracleConnector::定义一个名为 OracleConnector 的类。def __init__(self, databases)::类的初始化方法,接受一个包含多个数据库信息的列表作为参数。self.connections = {}:初始化一个字典,用于存储数据库连接。for db in databases::遍历数据库列表中的每个数据库信息。conn = cx_Oracle.connect(...):根据数据库信息创建数据库连接对象。user=db['username']:连接数据库的用户名。password=db['password']:连接数据库的密码。dsn=db['dsn']:Oracle 数据库的 DSN(数据源名称)。encoding=db.get('encoding', 'UTF-8'):可选的编码方式,默认为 UTF-8。self.connections[db['name']] = conn:将数据库连接对象添加到字典中,以数据库名称作为键。def execute_query(self, database_name, query, params=None)::执行查询语句的方法。database_name:数据库名称。query:要执行的查询语句。params=None:可选的查询参数。with self.connections[database_name].cursor() as cursor::获取游标对象,用于执行 SQL 语句。cursor.execute(query, params):执行查询语句。result = cursor.fetchall():获取查询结果集。return result:返回查询结果。def execute_non_query(self, database_name, query, params=None)::执行非查询语句的方法,比如插入、更新和删除操作。database_name:数据库名称。query:要执行的非查询语句。params=None:可选的参数。with self.connections[database_name].cursor() as cursor::获取游标对象,用于执行 SQL 语句。cursor.execute(query, params):执行非查询语句,比如插入、更新和删除操作。self.connections[database_name].commit():提交事务,将修改操作永久保存到数据库中。return cursor.rowcount:返回受影响的行数,即执行非查询操作后所影响的行数。def execute_query_all(self, databases, query, params=None)::在多个数据库上执行相同的查询语句,并返回结果。databases:要执行查询的数据库列表。query:要执行的查询语句。params=None:可选的查询参数。results = {}:初始化一个字典,用于存储查询结果。for db in databases::遍历数据库列表中的每个数据库。database_name = db['name']:获取数据库名称。results[database_name] = self.execute_query(database_name, query, params):执行查询,并将结果存储到字典中。return results:返回包含查询结果的字典。def execute_non_query_all(self, databases, query, params=None)::在多个数据库上执行相同的非查询语句,并返回结果。databases:要执行非查询操作的数据库列表。query:要执行的非查询语句。params=None:可选的参数。results = {}:初始化一个字典,用于存储操作结果。for db in databases::遍历数据库列表中的每个数据库。database_name = db['name']:获取数据库名称。try::捕获可能发生的异常。res = self.execute_non_query(database_name, query, params):执行非查询操作。results[database_name] = res:将操作结果存储到字典中。except cx_Oracle.DatabaseError as e::捕获 Oracle 数据库相关的异常。print(f"Error occurred when running query on {database_name}: {e}"):打印出错信息。self.connections[database_name].rollback():执行回滚操作,撤销之前的操作。return results:返回包含操作结果的字典。这个封装类使得可以同时连接多个 Oracle 数据库,并在它们上执行增删改查操作。而在执行非查询操作时,如果遇到错误,将会进行回滚操作,保证数据的一致性。
方案二:直接调用封装脚本(写用例,执行脚本即可)
脚本实现封装后,只需要在Oracle.yaml文件中写用例即可,此后执行Oracle.py脚本即实现数据库的增删改查操作
目录介绍:
Oracle.yaml 编写基本数据库信息
Orcale: username: user password: pwd ip: 127.0.0.1 port: 1521 sid: orcl
PublicConfig.py脚本: 配置读取信息,方便调用
import osfrom Public_Utils.util_yaml import YamlReaderclass YamlPath: def __init__(self): current = os.path.abspath(__file__) self.base_dir = os.path.dirname(os.path.dirname(current)) self._config_path = self.base_dir + os.sep + "Public_Config\Public_yaml" def get_oracle_file(self): self._config_file = self._config_path + os.sep + "Oracle.yaml" return self._config_fileclass ConfigYaml: def __init__(self): #初始yaml读取配置文件 self.oracle_config = YamlReader(YamlPath().get_oracle_file()).yaml_data() def get_oracle_yaml(self): return self.oracle_config['Orcale']if __name__ == '__main__': pass
Oracle.py执行脚本
# coding=utf-8import cx_Oracleimport osimport jsonfrom Public_Config.PublicConfig import ConfigYamlos.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'class Oracle: def __init__(self): user = ConfigYaml().get_oracle_yaml()['username'] pwd = ConfigYaml().get_oracle_yaml()['password'] ip = ConfigYaml().get_oracle_yaml()['ip'] port = ConfigYaml().get_oracle_yaml()['port'] sid = ConfigYaml().get_oracle_yaml()['sid'] self.connect = cx_Oracle.connect(str(user) + "/" + str(pwd) + "@" + str(ip) + ":" + str(port) + "/" + str(sid)) # self.connect = cx_Oracle.connect(f'{user}/{pwd}@{ip}:{port}/{sid}') # 这里的顺序是 用户名/密码@oracleserver的ip地址/数据库名字 self.cursor = self.connect.cursor() # 使用cursor()方法获取操作游标 def GetData(self,sql): self.cursor.execute(sql) # 使用Rowfactory更改查询结果,更直观查看数据 columns = [col[0] for col in self.cursor.description] self.cursor.rowfactory = lambda *args: dict(zip(columns, args)) # fetchall()一次取完所有结果 # fetchone()一次取一行结果 data = self.cursor.fetchall() return data def select(self, sql): #查询 list = [] self.cursor.execute(sql) # 使用execute方法执行SQL语句 result = self.cursor.fetchall() # fetchall()一次取完所有结果,fetchone()一次取一行结果 col_name = self.cursor.description for row in result: dict = {} for col in range(len(col_name)): key = col_name[col][0] value = row[col] dict[key] = value list.append(dict) js = json.dumps(list, ensure_ascii=False, indent=2, separators=(',', ':')) '''json.dumps() 是把python对象转换成json对象的一个过程,生成的是字符串 json.dump() 是把python对象转换成json对象生成一个fp的文件流,和文件相关''' return js #将结果返回为一个字符串 def selectlist(self,sql): list = [] self.cursor.execute(sql) result = self.cursor.fetchall() col_name = self.cursor.description for row in result: dict = {} for col in range(len(col_name)): key = col_name[col][0] value = row[col] dict[key] = value list.append(dict) return list #将结果以列表返回 def disconnect(self): #未连接 self.cursor.close() self.connect.close() def insert(self, sql, list_param): #插入 try: self.cursor.executemany(sql, list_param) self.connect.commit() print("插入ok") except Exception as e: print(e) finally: self.disconnect() def update(self, sql): #更新 try: self.cursor.execute(sql) self.connect.commit() except Exception as e: print(e) finally: self.disconnect() def delete(self, sql): #删除 try: self.cursor.execute(sql) self.connect.commit() print("delete ok") except Exception as e: print(e) finally: self.disconnect()if __name__ == '__main__': print(Oracle().select(sql="select * from cifaccount a where a.cif_account ='310400009590' ")) pass
util_yaml.py
import osimport yamlclass YamlReader: #初始化,判断文件是否存在 def __init__(self,yaml_file): if os.path.exists(yaml_file): self.yaml_file = yaml_file else: raise FileNotFoundError("yaml文件不存在") self._data = None self._data_all = None def yaml_data(self): #yaml文件读取 --单个文档读取 #第一次调用data,读取yaml文档,如果不是,直接返回之前保存的数据 if not self._data: with open(self.yaml_file,'rb') as f: self._data = yaml.safe_load(f) return self._data def yaml_data_all(self): #多个文档的读取 if not self._data_all: with open(self.yaml_file,'rb') as f: self._data_all = yaml.safe_load_all(f) return self._data_all
版权声明:
本站文章均来自互联网搜集,如有侵犯您的权益,请联系我们删除,谢谢。
标签: #oracle删除database