龙空技术网

利用Python优雅地操作Oracle数据库

云中随心而记 243

前言:

如今你们对“oracle删除database”大致比较看重,小伙伴们都想要了解一些“oracle删除database”的相关内容。那么小编同时在网上网罗了一些对于“oracle删除database””的相关资讯,希望朋友们能喜欢,你们快快来学习一下吧!

需要源码可以直接联系

方案一:

直接套用脚本,需可以看懂一些脚本逻辑

改代码中 类名为 OracleConnector,它可以同时连接多个 Oracle 数据库,并提供执行增删改查操作的方法。

import cx_Oracleclass OracleConnector:    def __init__(self, databases):        self.connections = {}        for db in databases:            conn = cx_Oracle.connect(                user=db['username'],                password=db['password'],                dsn=db['dsn'],                encoding=db.get('encoding', 'UTF-8')            )            self.connections[db['name']] = conn    def execute_query(self, database_name, query, params=None):        with self.connections[database_name].cursor() as cursor:            cursor.execute(query, params)            result = cursor.fetchall()        return result        def execute_non_query(self, database_name, query, params=None):        with self.connections[database_name].cursor() as cursor:            cursor.execute(query, params)            self.connections[database_name].commit()            return cursor.rowcount            def execute_query_all(self, databases, query, params=None):        results = {}        for db in databases:            database_name = db['name']            results[database_name] = self.execute_query(database_name, query, params)        return results        def execute_non_query_all(self, databases, query, params=None):        results = {}        for db in databases:            database_name = db['name']            try:                res = self.execute_non_query(database_name, query, params)                results[database_name] = res            except cx_Oracle.DatabaseError as e:                print(f"Error occurred when running query on {database_name}: {e}")                self.connections[database_name].rollback()        return results

以下是代码解释:

import cx_Oracle:导入 cx_Oracle 库,用于与 Oracle 数据库进行连接和交互。class OracleConnector::定义一个名为 OracleConnector 的类。def __init__(self, databases)::类的初始化方法,接受一个包含多个数据库信息的列表作为参数。self.connections = {}:初始化一个字典,用于存储数据库连接。for db in databases::遍历数据库列表中的每个数据库信息。conn = cx_Oracle.connect(...):根据数据库信息创建数据库连接对象。user=db['username']:连接数据库的用户名。password=db['password']:连接数据库的密码。dsn=db['dsn']:Oracle 数据库的 DSN(数据源名称)。encoding=db.get('encoding', 'UTF-8'):可选的编码方式,默认为 UTF-8。self.connections[db['name']] = conn:将数据库连接对象添加到字典中,以数据库名称作为键。def execute_query(self, database_name, query, params=None)::执行查询语句的方法。database_name:数据库名称。query:要执行的查询语句。params=None:可选的查询参数。with self.connections[database_name].cursor() as cursor::获取游标对象,用于执行 SQL 语句。cursor.execute(query, params):执行查询语句。result = cursor.fetchall():获取查询结果集。return result:返回查询结果。def execute_non_query(self, database_name, query, params=None)::执行非查询语句的方法,比如插入、更新和删除操作。database_name:数据库名称。query:要执行的非查询语句。params=None:可选的参数。with self.connections[database_name].cursor() as cursor::获取游标对象,用于执行 SQL 语句。cursor.execute(query, params):执行非查询语句,比如插入、更新和删除操作。self.connections[database_name].commit():提交事务,将修改操作永久保存到数据库中。return cursor.rowcount:返回受影响的行数,即执行非查询操作后所影响的行数。def execute_query_all(self, databases, query, params=None)::在多个数据库上执行相同的查询语句,并返回结果。databases:要执行查询的数据库列表。query:要执行的查询语句。params=None:可选的查询参数。results = {}:初始化一个字典,用于存储查询结果。for db in databases::遍历数据库列表中的每个数据库。database_name = db['name']:获取数据库名称。results[database_name] = self.execute_query(database_name, query, params):执行查询,并将结果存储到字典中。return results:返回包含查询结果的字典。def execute_non_query_all(self, databases, query, params=None)::在多个数据库上执行相同的非查询语句,并返回结果。databases:要执行非查询操作的数据库列表。query:要执行的非查询语句。params=None:可选的参数。results = {}:初始化一个字典,用于存储操作结果。for db in databases::遍历数据库列表中的每个数据库。database_name = db['name']:获取数据库名称。try::捕获可能发生的异常。res = self.execute_non_query(database_name, query, params):执行非查询操作。results[database_name] = res:将操作结果存储到字典中。except cx_Oracle.DatabaseError as e::捕获 Oracle 数据库相关的异常。print(f"Error occurred when running query on {database_name}: {e}"):打印出错信息。self.connections[database_name].rollback():执行回滚操作,撤销之前的操作。return results:返回包含操作结果的字典。这个封装类使得可以同时连接多个 Oracle 数据库,并在它们上执行增删改查操作。而在执行非查询操作时,如果遇到错误,将会进行回滚操作,保证数据的一致性。

方案二:直接调用封装脚本(写用例,执行脚本即可)

脚本实现封装后,只需要在Oracle.yaml文件中写用例即可,此后执行Oracle.py脚本即实现数据库的增删改查操作

目录介绍:

Oracle.yaml 编写基本数据库信息

Orcale:  username: user  password:  pwd  ip: 127.0.0.1  port: 1521  sid: orcl

PublicConfig.py脚本: 配置读取信息,方便调用

import osfrom Public_Utils.util_yaml import YamlReaderclass YamlPath:    def __init__(self):        current = os.path.abspath(__file__)        self.base_dir = os.path.dirname(os.path.dirname(current))        self._config_path = self.base_dir + os.sep + "Public_Config\Public_yaml"    def get_oracle_file(self):        self._config_file = self._config_path + os.sep + "Oracle.yaml"        return self._config_fileclass ConfigYaml:    def __init__(self):   #初始yaml读取配置文件        self.oracle_config = YamlReader(YamlPath().get_oracle_file()).yaml_data()    def get_oracle_yaml(self):        return self.oracle_config['Orcale']if __name__ == '__main__':    pass

Oracle.py执行脚本

#  coding=utf-8import cx_Oracleimport osimport jsonfrom Public_Config.PublicConfig import ConfigYamlos.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'class Oracle:    def __init__(self):        user = ConfigYaml().get_oracle_yaml()['username']        pwd = ConfigYaml().get_oracle_yaml()['password']        ip = ConfigYaml().get_oracle_yaml()['ip']        port = ConfigYaml().get_oracle_yaml()['port']        sid = ConfigYaml().get_oracle_yaml()['sid']        self.connect = cx_Oracle.connect(str(user) + "/" + str(pwd) + "@" + str(ip) + ":" + str(port) + "/" + str(sid))        # self.connect = cx_Oracle.connect(f'{user}/{pwd}@{ip}:{port}/{sid}')   # 这里的顺序是 用户名/密码@oracleserver的ip地址/数据库名字        self.cursor = self.connect.cursor()   # 使用cursor()方法获取操作游标    def GetData(self,sql):        self.cursor.execute(sql)        # 使用Rowfactory更改查询结果,更直观查看数据        columns = [col[0] for col in self.cursor.description]        self.cursor.rowfactory = lambda *args: dict(zip(columns, args))        # fetchall()一次取完所有结果        # fetchone()一次取一行结果        data = self.cursor.fetchall()        return data    def select(self, sql):   #查询        list = []        self.cursor.execute(sql)            # 使用execute方法执行SQL语句        result = self.cursor.fetchall()     # fetchall()一次取完所有结果,fetchone()一次取一行结果        col_name = self.cursor.description        for row in result:            dict = {}            for col in range(len(col_name)):                key = col_name[col][0]                value = row[col]                dict[key] = value            list.append(dict)        js = json.dumps(list, ensure_ascii=False, indent=2, separators=(',', ':'))        '''json.dumps() 是把python对象转换成json对象的一个过程,生成的是字符串           json.dump() 是把python对象转换成json对象生成一个fp的文件流,和文件相关'''        return js #将结果返回为一个字符串    def selectlist(self,sql):        list = []        self.cursor.execute(sql)        result = self.cursor.fetchall()        col_name = self.cursor.description        for row in result:            dict = {}            for col in range(len(col_name)):                key = col_name[col][0]                value = row[col]                dict[key] = value            list.append(dict)        return list   #将结果以列表返回    def disconnect(self):   #未连接        self.cursor.close()        self.connect.close()    def insert(self, sql, list_param):   #插入        try:            self.cursor.executemany(sql, list_param)            self.connect.commit()            print("插入ok")        except Exception as e:            print(e)        finally:            self.disconnect()    def update(self, sql):   #更新        try:            self.cursor.execute(sql)            self.connect.commit()        except Exception as e:            print(e)        finally:            self.disconnect()    def delete(self, sql):   #删除        try:            self.cursor.execute(sql)            self.connect.commit()            print("delete ok")        except Exception as e:            print(e)        finally:            self.disconnect()if __name__ == '__main__':    print(Oracle().select(sql="select * from cifaccount a where a.cif_account ='310400009590' "))    pass

util_yaml.py

import osimport yamlclass YamlReader:    #初始化,判断文件是否存在    def __init__(self,yaml_file):        if os.path.exists(yaml_file):            self.yaml_file = yaml_file        else:            raise FileNotFoundError("yaml文件不存在")        self._data = None        self._data_all = None    def yaml_data(self):  #yaml文件读取 --单个文档读取        #第一次调用data,读取yaml文档,如果不是,直接返回之前保存的数据        if not self._data:            with open(self.yaml_file,'rb') as f:                self._data = yaml.safe_load(f)        return self._data    def yaml_data_all(self):  #多个文档的读取        if not self._data_all:            with open(self.yaml_file,'rb') as f:                self._data_all = yaml.safe_load_all(f)        return self._data_all

标签: #oracle删除database