方案一:
直接套用脚本,需可以看懂一些脚本逻辑

改代码中 类名为 OracleConnector,它可以同时连接多个 Oracle 数据库,并提供执行增删改查操作的方法。

import cx_Oracleclass OracleConnector:def __init__(self, databases):self.connections = {}for db in databases:conn = cx_Oracle.connect(user=db['username'],password=db['password'],dsn=db['dsn'],encoding=db.get('encoding', 'UTF-8'))self.connections[db['name']] = conndef execute_query(self, database_name, query, params=None):with self.connections[database_name].cursor() as cursor:cursor.execute(query, params)result = cursor.fetchall()return resultdef execute_non_query(self, database_name, query, params=None):with self.connections[database_name].cursor() as cursor:cursor.execute(query, params)self.connections[database_name].commit()return cursor.rowcountdef execute_query_all(self, databases, query, params=None):results = {}for db in databases:database_name = db['name']results[database_name] = self.execute_query(database_name, query, params)return resultsdef execute_non_query_all(self, databases, query, params=None):results = {}for db in databases:database_name = db['name']try:res = self.execute_non_query(database_name, query, params)results[database_name] = resexcept cx_Oracle.DatabaseError as e:print(f"Error occurred when running query on {database_name}: {e}")self.connections[database_name].rollback()return results

以下是代码解释:

import cx_Oracle:导入 cx_Oracle 库,用于与 Oracle 数据库进行连接和交互。class OracleConnector::定义一个名为 OracleConnector 的类。def __init__(self, databases)::类的初始化方法,接受一个包含多个数据库信息的列表作为参数。self.connections = {}:初始化一个字典,用于存储数据库连接。for db in databases::遍历数据库列表中的每个数据库信息。conn = cx_Oracle.connect(...):根据数据库信息创建数据库连接对象。user=db['username']:连接数据库的用户名。password=db['password']:连接数据库的密码。dsn=db['dsn']:Oracle 数据库的 DSN(数据源名称)。encoding=db.get('encoding', 'UTF-8'):可选的编码方式,默认为 UTF-8。self.connections[db['name']] = conn:将数据库连接对象添加到字典中,以数据库名称作为键。def execute_query(self, database_name, query, params=None)::执行查询语句的方法。database_name:数据库名称。query:要执行的查询语句。params=None:可选的查询参数。with self.connections[database_name].cursor() as cursor::获取游标对象,用于执行 SQL 语句。cursor.execute(query, params):执行查询语句。result = cursor.fetchall():获取查询结果集。return result:返回查询结果。def execute_non_query(self, database_name, query, params=None)::执行非查询语句的方法,比如插入、更新和删除操作。database_name:数据库名称。query:要执行的非查询语句。params=None:可选的参数。with self.connections[database_name].cursor() as cursor::获取游标对象,用于执行 SQL 语句。cursor.execute(query, params):执行非查询语句,比如插入、更新和删除操作。self.connections[database_name].commit():提交事务,将修改操作永久保存到数据库中。return cursor.rowcount:返回受影响的行数,即执行非查询操作后所影响的行数。def execute_query_all(self, databases, query, params=None)::在多个数据库上执行相同的查询语句,并返回结果。databases:要执行查询的数据库列表。query:要执行的查询语句。params=None:可选的查询参数。results = {}:初始化一个字典,用于存储查询结果。for db in databases::遍历数据库列表中的每个数据库。database_name = db['name']:获取数据库名称。results[database_name] = self.execute_query(database_name, query, params):执行查询,并将结果存储到字典中。return results:返回包含查询结果的字典。def execute_non_query_all(self, databases, query, params=None)::在多个数据库上执行相同的非查询语句,并返回结果。databases:要执行非查询操作的数据库列表。query:要执行的非查询语句。params=None:可选的参数。results = {}:初始化一个字典,用于存储操作结果。for db in databases::遍历数据库列表中的每个数据库。database_name = db['name']:获取数据库名称。try::捕获可能发生的异常。res = self.execute_non_query(database_name, query, params):执行非查询操作。results[database_name] = res:将操作结果存储到字典中。except cx_Oracle.DatabaseError as e::捕获 Oracle 数据库相关的异常。print(f"Error occurred when running query on {database_name}: {e}"):打印出错信息。self.connections[database_name].rollback():执行回滚操作,撤销之前的操作。return results:返回包含操作结果的字典。

这个封装类使得可以同时连接多个 Oracle 数据库,并在它们上执行增删改查操作。而在执行非查询操作时,如果遇到错误,将会进行回滚操作,保证数据的一致性。

方案二:直接调用封装脚本(写用例,执行脚本即可)
脚本实现封装后,只需要在Oracle.yaml文件中写用例即可,此后执行Oracle.py脚本即实现数据库的增删改查操作

目录介绍:

Oracle.yaml 编写基本数据库信息

Orcale:username: userpassword:pwdip: 127.0.0.1port: 1521sid: orcl

PublicConfig.py脚本: 配置读取信息,方便调用

import osfrom Public_Utils.util_yaml import YamlReaderclass YamlPath:def __init__(self):current = os.path.abspath(__file__)self.base_dir = os.path.dirname(os.path.dirname(current))self._config_path = self.base_dir + os.sep + "Public_Config\Public_yaml"def get_oracle_file(self):self._config_file = self._config_path + os.sep + "Oracle.yaml"return self._config_fileclass ConfigYaml:def __init__(self): #初始yaml读取配置文件self.oracle_config = YamlReader(YamlPath().get_oracle_file()).yaml_data()def get_oracle_yaml(self):return self.oracle_config['Orcale']if __name__ == '__main__':pass

Oracle.py执行脚本

#coding=utf-8import cx_Oracleimport osimport jsonfrom Public_Config.PublicConfig import ConfigYamlos.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'class Oracle:def __init__(self):user = ConfigYaml().get_oracle_yaml()['username']pwd = ConfigYaml().get_oracle_yaml()['password']ip = ConfigYaml().get_oracle_yaml()['ip']port = ConfigYaml().get_oracle_yaml()['port']sid = ConfigYaml().get_oracle_yaml()['sid']self.connect = cx_Oracle.connect(str(user) + "/" + str(pwd) + "@" + str(ip) + ":" + str(port) + "/" + str(sid))# self.connect = cx_Oracle.connect(f'{user}/{pwd}@{ip}:{port}/{sid}') # 这里的顺序是 用户名/密码@oracleserver的ip地址/数据库名字self.cursor = self.connect.cursor() # 使用cursor()方法获取操作游标def GetData(self,sql):self.cursor.execute(sql)# 使用Rowfactory更改查询结果,更直观查看数据columns = [col[0] for col in self.cursor.description]self.cursor.rowfactory = lambda *args: dict(zip(columns, args))# fetchall()一次取完所有结果# fetchone()一次取一行结果data = self.cursor.fetchall()return datadef select(self, sql): #查询list = []self.cursor.execute(sql)# 使用execute方法执行SQL语句result = self.cursor.fetchall() # fetchall()一次取完所有结果,fetchone()一次取一行结果col_name = self.cursor.descriptionfor row in result:dict = {}for col in range(len(col_name)):key = col_name[col][0]value = row[col]dict[key] = valuelist.append(dict)js = json.dumps(list, ensure_ascii=False, indent=2, separators=(',', ':'))'''json.dumps() 是把python对象转换成json对象的一个过程,生成的是字符串 json.dump() 是把python对象转换成json对象生成一个fp的文件流,和文件相关'''return js #将结果返回为一个字符串def selectlist(self,sql):list = []self.cursor.execute(sql)result = self.cursor.fetchall()col_name = self.cursor.descriptionfor row in result:dict = {}for col in range(len(col_name)):key = col_name[col][0]value = row[col]dict[key] = valuelist.append(dict)return list #将结果以列表返回def disconnect(self): #未连接self.cursor.close()self.connect.close()def insert(self, sql, list_param): #插入try:self.cursor.executemany(sql, list_param)self.connect.commit()print("插入ok")except Exception as e:print(e)finally:self.disconnect()def update(self, sql): #更新try:self.cursor.execute(sql)self.connect.commit()except Exception as e:print(e)finally:self.disconnect()def delete(self, sql): #删除try:self.cursor.execute(sql)self.connect.commit()print("delete ok")except Exception as e:print(e)finally:self.disconnect()if __name__ == '__main__':print(Oracle().select(sql="select * from cifaccount a where a.cif_account ='310400009590' "))pass

util_yaml.py

import osimport yamlclass YamlReader:#初始化,判断文件是否存在def __init__(self,yaml_file):if os.path.exists(yaml_file):self.yaml_file = yaml_fileelse:raise FileNotFoundError("yaml文件不存在")self._data = Noneself._data_all = Nonedef yaml_data(self):#yaml文件读取 --单个文档读取#第一次调用data,读取yaml文档,如果不是,直接返回之前保存的数据if not self._data:with open(self.yaml_file,'rb') as f:self._data = yaml.safe_load(f)return self._datadef yaml_data_all(self):#多个文档的读取if not self._data_all:with open(self.yaml_file,'rb') as f:self._data_all = yaml.safe_load_all(f)return self._data_all