一、概述
1.1 项目背景
假设你是数据管理员,需要从Oracle数据库中提取2025年全省上千家机构的数据,并按以下要求处理:
每个机构生成单独的Excel文件
每个文件包含「问题汇总」和「原始明细」两个工作表
所有文件按所属市州分类存放
1.2 技术栈
Python 3.x
cx_Oracle:Oracle数据库连接
pandas:数据处理与分析
openpyxl:Excel文件操作
pathlib:路径管理
二、环境准备
2.1 安装依赖包
安装Python包
pip install cx_Oracle pandas openpyxl# Oracle Instant Client(需单独下载)# 访问Oracle官网下载对应版本的Instant Client# 将dll文件所在目录添加到系统PATH环境变量
2.2 目录结构
project/├── export_data.py # 主程序├── config.py # 配置文件(可选)├── requirements.txt # 依赖列表├── logs/ # 日志目录└── output/ # 输出目录(自动创建)
三、完整代码实现
从Oracle数据库导出机构自查自纠数据功能:按机构拆分数据,生成多工作表Excel文件,按市州分类存储作者:您的名字日期:2026-02-12"""import osimport cx_Oracleimport pandas as pdfrom pathlib import Pathfrom datetime import datetime# ==================== 配置部分 ====================DB_CONFIG = { 'host': '192.168.1.100', # 数据库服务器IP地址 'port': 1521, # Oracle默认端口 'service_name': 'ORCL', # 数据库服务名 'user': 'med_inspect', # 用户名 'password': 'your_password_123' # 密码}OUTPUT_ROOT = './output' # 输出根目录TABLE_NAME = '全量明细数据' # 源数据表名# ==================== 数据库连接 ====================def get_connection(): """ 建立Oracle数据库连接 Returns: connection: Oracle数据库连接对象 """ try: # 构造数据源名称 dsn = cx_Oracle.makedsn( host=DB_CONFIG['host'], port=DB_CONFIG['port'], service_name=DB_CONFIG['service_name'] ) # 创建连接 conn = cx_Oracle.connect( user=DB_CONFIG['user'], password=DB_CONFIG['password'], dsn=dsn ) return conn except cx_Oracle.DatabaseError as e: print(f"❌ 数据库连接失败: {e}") raise# ==================== 数据获取函数 ====================def get_all_institutions(): """ 获取所有机构的基本信息(市州、编号、名称) Returns: DataFrame: 包含机构信息的表格 """ sql = f''' SELECT DISTINCT "机构归属_市州" AS city, "机构编号" AS ins_code, "机构名称" AS ins_name FROM "{TABLE_NAME}" WHERE "机构编号" IS NOT NULL AND "机构归属_市州" IS NOT NULL ORDER BY "机构归属_市州", "机构编号" ''' try: with get_connection() as conn: # 执行查询并转换为DataFrame df = pd.read_sql(sql, conn) # 统一列名为小写,方便后续使用 df.columns = [col.lower() for col in df.columns] return df except Exception as e: print(f"❌ 获取机构列表失败: {e}") raisedef fetch_detail(ins_code): """ 获取指定机构的原始明细数据 Args: ins_code: 机构编号 Returns: DataFrame: 该机构的所有明细记录 """ sql = f'SELECT * FROM "{TABLE_NAME}" WHERE "机构编号" = :ins_code' with get_connection() as conn: df = pd.read_sql(sql, conn, params={'ins_code': ins_code}) return dfdef fetch_summary(ins_code): """ 获取指定机构的问题汇总统计 Args: ins_code: 机构编号 Returns: DataFrame: 按问题类别分组汇总的数据 """ sql = f''' SELECT "问题类别", "医保目录名称" AS "问题项目", "问题描述", "政策依据", COUNT(DISTINCT "机构编号") AS "违规机构数", COUNT(1) AS "违规数据条数", SUM(CASE WHEN "违规金额" IS NOT NULL THEN TO_NUMBER("违规金额") ELSE 0 END) AS "违规金额" FROM "{TABLE_NAME}" WHERE "机构编号" = :ins_code AND "问题类别" IS NOT NULL GROUP BY "问题类别", "医保目录名称", "问题描述", "政策依据" ORDER BY "违规金额" DESC ''' with get_connection() as conn: df = pd.read_sql(sql, conn, params={'ins_code': ins_code}) return df# ==================== 辅助函数 ====================def sanitize_filename(name): """ 清理文件名,移除不安全的字符 Args: name: 原始文件名 Returns: str: 安全的文件名 """ # 处理空值 if pd.isna(name) or not str(name).strip(): return "未知机构" # 定义允许的字符集合 allowed_chars = (' ', '_', '-', '(', ')', '(', ')', '.', '·') # 过滤字符 safe_chars = [] for char in str(name): if char.isalnum() or char in allowed_chars: safe_chars.append(char) # 组合并清理 safe_name = ''.join(safe_chars) safe_name = safe_name.strip().replace(" ", "_") return safe_name if safe_name else "未知机构"def get_summary_template(): """ 获取汇总表的空模板(当机构无问题时使用) Returns: DataFrame: 包含标准列名的空DataFrame """ columns = [ "问题类别", "问题项目", "问题描述", "政策依据", "违规机构数", "违规数据条数", "违规金额" ] return pd.DataFrame(columns=columns)# ==================== 主处理函数 ====================def export_institution_data(city, ins_code, ins_name, output_root): """ 导出单个机构的数据到Excel Args: city: 市州名称 ins_code: 机构编号 ins_name: 机构名称 output_root: 输出根目录 Returns: bool: 是否成功 str: 输出文件路径(成功时)或错误信息(失败时) """ try: # 1. 创建市州目录 safe_city = sanitize_filename(city) city_dir = output_root / safe_city city_dir.mkdir(parents=True, exist_ok=True) # 2. 构建文件名 safe_name = sanitize_filename(ins_name) filename = f"{ins_code}_{safe_name}.xlsx" filepath = city_dir / filename # 3. 获取数据 print(f" 正在获取明细数据...", end='', flush=True) detail_df = fetch_detail(ins_code) print(f"✅ 获取到 {len(detail_df)} 条记录") if detail_df.empty: return False, f"机构 {ins_code} 无数据" print(f" 正在生成汇总统计...", end='', flush=True) summary_df = fetch_summary(ins_code) if summary_df.empty: summary_df = get_summary_template() print(f"⚠️ 无问题数据") else: print(f"✅ 发现 {len(summary_df)} 类问题") # 4. 写入Excel with pd.ExcelWriter(filepath, engine='openpyxl') as writer: # 写入汇总表 summary_df.to_excel( writer, sheet_name='问题汇总', index=False ) # 写入明细表 detail_df.to_excel( writer, sheet_name='原始明细', index=False ) # 获取工作簿和工作表对象 workbook = writer.book worksheet_summary = workbook['问题汇总'] worksheet_detail = workbook['原始明细'] # 自动调整列宽(基础版) for worksheet in [worksheet_summary, worksheet_detail]: for column in worksheet.columns: max_length = 0 column_letter = column[0].column_letter for cell in column: try: cell_length = len(str(cell.value)) except: cell_length = 0 if cell_length > max_length: max_length = cell_length adjusted_width = min(max_length + 2, 50) # 最大宽度50 worksheet.column_dimensions[column_letter].width = adjusted_width return True, str(filepath.relative_to(output_root)) except Exception as e: return False, f"处理失败: {str(e)}"def main(): """ 主程序:遍历所有机构并导出数据 """ print("=" * 60) print("🚀 开始导出机构自查自纠数据") print(f"📅 执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("=" * 60) # 1. 获取机构列表 print("📋 正在获取机构列表...", end='', flush=True) try: inst_df = get_all_institutions() print(f"✅ 成功获取 {len(inst_df)} 个机构") except Exception as e: print(f"❌ 失败: {e}") return if len(inst_df) == 0: print("⚠️ 未找到任何机构数据,程序退出") return # 2. 准备输出目录 output_root = Path(OUTPUT_ROOT) output_root.mkdir(exist_ok=True) print(f"📁 输出目录: {output_root.absolute()}") # 3. 统计变量 success_count = 0 fail_count = 0 no_data_count = 0 # 4. 遍历处理每个机构 print("\n" + "=" * 60) print("开始处理各个机构...") print("=" * 60) for idx, row in inst_df.iterrows(): city = row['city'] ins_code = row['ins_code'] ins_name = row['ins_name'] # 进度显示 progress = f"[{idx + 1}/{len(inst_df)}]" print(f"\n{progress} 处理: {city} - {ins_name} ({ins_code})") # 跳过无效数据 if pd.isna(city) or pd.isna(ins_code): print(f" ⚠️ 跳过:市州或机构编号为空") fail_count += 1 continue # 导出数据 success, result = export_institution_data( city, ins_code, ins_name, output_root ) # 统计结果 if success: print(f" ✅ 已保存: {result}") success_count += 1 else: if "无数据" in result: print(f" ⚠️ 跳过: {result}") no_data_count += 1 else: print(f" ❌ 失败: {result}") fail_count += 1 # 5. 输出汇总报告 print("\n" + "=" * 60) print("🎉 数据导出完成!") print("=" * 60) print(f"📊 处理结果统计:") print(f" 成功导出: {success_count} 个机构") print(f" 无数据: {no_data_count} 个机构") print(f" 处理失败: {fail_count} 个机构") print(f" 总计: {len(inst_df)} 个机构") if success_count > 0: print(f"\n📁 文件保存位置: {output_root.absolute()}") print(" 文件结构:") for city_dir in output_root.iterdir(): if city_dir.is_dir(): excel_files = list(city_dir.glob("*.xlsx")) print(f" ├─ {city_dir.name}/ ({len(excel_files)}个文件)") print(f"\n⏰ 完成时间: {datetime.now().strftime('%H:%M:%S')}")# ==================== 程序入口 ====================if __name__ == "__main__": # 记录开始时间 start_time = datetime.now() try: main() except KeyboardInterrupt: print("\n\n⏹️ 用户中断程序执行") except Exception as e: print(f"\n💥 程序执行出错: {e}") import traceback traceback.print_exc() finally: # 计算运行时间 end_time = datetime.now() run_time = end_time - start_time print(f"\n⏱️ 总运行时间: {run_time}")
四、关键知识点详解
4.1 数据库连接管理
使用上下文管理器自动管理连接
withget_connection() as conn: df = pd.read_sql(query, conn)
退出with块时自动关闭连接,避免资源泄露
4.2 SQL参数化查询
# 安全的方式:使用参数化查询防止SQL注入query = 'SELECT * FROM table WHERE id = :ins_code'df = pd.read_sql(query, conn, params={'ins_code': ins_code})# 危险的方式:字符串拼接(不要使用!)query = f'SELECT * FROM table WHERE id = {ins_code}' # ❌
4.3 数据处理技巧
1. 处理空值
if pd.isna(value): # 判断是否为NaN return "未知"
# 2. 列名处理
df.columns = [col.lower() for col in df.columns] # 统一小写
3. 空DataFrame判断
if df.empty: # 判断是否为空 return pd.DataFrame(columns=columns) # 返回空模板
4.4 文件路径管理
from pathlib import Path# 创建Path对象output_root = Path('./output')# 创建目录(自动创建父目录)city_dir = output_root / "成都市"city_dir.mkdir(parents=True, exist_ok=True) # 目录存在时不报错# 构建文件路径filepath = city_dir / f"{ins_code}_医院.xlsx"# 获取相对路径relative_path = filepath.relative_to(output_root)
4.5 Excel写入优化
with pd.ExcelWriter(filepath, engine='openpyxl') as writer: # 写入多个工作表 summary_df.to_excel(writer, sheet_name='汇总', index=False) detail_df.to_excel(writer, sheet_name='明细', index=False) # 获取工作簿进行高级设置 workbook = writer.book worksheet = workbook['汇总'] # 自动调整列宽 for column in worksheet.columns: max_length = 0 column_letter = column[0].column_letter for cell in column: cell_length = len(str(cell.value)) max_length = max(max_length, cell_length) adjusted_width = min(max_length + 2, 50) worksheet.column_dimensions[column_letter].width = adjusted_width
五、错误处理与日志
5.1 增强版错误处理
import loggingfrom datetime import datetime# 配置日志def setup_logging(): log_dir = Path('./logs') log_dir.mkdir(exist_ok=True) log_file = log_dir / f"export_{datetime.now().strftime('%Y%m%d')}.log" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file, encoding='utf-8'), logging.StreamHandler() # 同时输出到控制台 ] ) return logging.getLogger(__name__)# 使用日志logger = setup_logging()logger.info(f"开始处理机构 {ins_code}")try: # 业务代码 passexcept Exception as e: logger.error(f"处理机构 {ins_code} 失败: {e}")
六、配置优化建议
6.1 配置文件分离
创建config.py:# config.pyimport osfrom pathlib import Pathclass Config: # 数据库配置 DB_HOST = os.getenv('DB_HOST', '192.168.1.100') DB_PORT = int(os.getenv('DB_PORT', 1521)) DB_SERVICE = os.getenv('DB_SERVICE', 'ORCL') DB_USER = os.getenv('DB_USER', 'med_inspect') DB_PASSWORD = os.getenv('DB_PASSWORD', '') # 输出配置 OUTPUT_DIR = Path(os.getenv('OUTPUT_DIR', './output')) # 数据表 TABLE_NAME = '自查自纠2025年全量明细数据' # 处理控制 BATCH_SIZE = 100 # 批量处理数量 MAX_RETRIES = 3 # 重试次数
6.2 命令行参数支持
import argparsedef parse_args(): parser = argparse.ArgumentParser(description='导出机构自查自纠数据') parser.add_argument('--output', '-o', default='./output', help='输出目录路径') parser.add_argument('--city', '-c', help='指定市州,不指定则处理所有') parser.add_argument('--test', '-t', action='store_true', help='测试模式,只处理前5个机构') return parser.parse_args()# 在主程序中使用args = parse_args()if args.test: inst_df = inst_df.head(5) # 测试模式下只处理前5个
七、运行与测试
7.1 运行程序
# 基本运行python export_data.py# 带参数运行python export_data.py --output ./2025年自查数据 --test# 指定市州python export_data.py --city "成都市"
7.2 预期输出结构
output/├── 成都市/│ ├── 110101_成都市第一人民医院.xlsx│ ├── 110102_华西医院.xlsx│ └── ...├── 绵阳市/│ ├── 120101_绵阳市中心医院.xlsx│ └── ...└── 泸州市/ └── ...
7.3 生成的Excel文件结构
每个Excel文件包含两个工作表:
问题汇总:按问题类别统计
问题类别、问题项目、问题描述、政策依据
违规机构数、违规数据条数、违规金额
按违规金额降序排列
原始明细:所有原始记录
八、常见问题与解决方案
8.1 数据库连接问题
问题:cx_Oracle.DatabaseError: DPI-1047
解决:确保已安装Oracle Instant Client并正确配置PATH
8.2 内存不足
问题:处理大数据时内存溢出
解决:
# 分批处理batch_size = 100for i in range(0, len(inst_df), batch_size): batch = inst_df.iloc[i:i+batch_size] process_batch(batch)
8.3 文件名乱码
问题:中文字符在文件名中显示异常
解决:使用sanitize_filename函数清理特殊字符
8.4 权限问题
问题:无法创建目录或写入文件
解决:检查输出目录的写入权限,或更换有权限的目录
九、扩展功能建议
9.1 添加邮件通知
import smtplibfrom email.mime.text import MIMETextfrom email.mime.multipart import MIMEMultipartdef send_email_report(success_count, total_count, output_dir): """发送执行结果邮件""" # 邮件配置 msg = MIMEMultipart() msg['Subject'] = f'数据导出完成报告 - {datetime.now().date()}' # 构建邮件内容 content = f""" 数据导出任务已完成! 统计结果: - 成功导出:{success_count} 个机构 - 失败:{total_count - success_count} 个机构 - 总计:{total_count} 个机构 文件位置:{output_dir} 生成时间:{datetime.now()} """ msg.attach(MIMEText(content, 'plain')) # 发送邮件...
9.2 添加进度条
from tqdm import tqdm# 在循环处理时显示进度条for idx, row in tqdm(inst_df.iterrows(), total=len(inst_df)): # 处理每个机构 pass
9.3 配置文件支持
import yamlimport json# 支持YAML或JSON配置文件def load_config(config_file): if config_file.endswith('.yaml') or config_file.endswith('.yml'): with open(config_file, 'r', encoding='utf-8') as f: return yaml.safe_load(f) elif config_file.endswith('.json'): with open(config_file, 'r', encoding='utf-8') as f: return json.load(f)
十、总结
这个项目展示了如何:
安全连接数据库:使用参数化查询,防止SQL注入
高效处理数据:利用pandas进行数据操作
灵活文件管理:按市州分类存储,文件名自动清理
健壮错误处理:单个机构失败不影响整体流程
用户体验优化:进度提示,清晰日志
通过这个项目,您不仅学会了具体的代码实现,更重要的是掌握了处理实际数据导出任务的完整思路和方法论。这种模式可以广泛应用于各种数据导出、报表生成、数据分发等场景。
适用场景扩展:
医疗机构考核数据分发
学校成绩单按班级分类
企业工资条按部门发放
客户账单按地区分类
任何需要"一对多"数据分发的场景
希望这份完整的实现方案和详细解释能帮助您更好地理解和应用Python进行数据处理工作!