Python 玩转 Excel 案例【第 2 期】
大家好!今天我们来学习一个实用的Python小案例:如何批量处理Excel文件并合并数据。
案例背景
假设我们有这样一个需求:在一个"销售表"文件夹中,存放着多个Excel文件,每个文件可能包含多个工作表。我们需要将所有数据合并到一个文件中,并添加来源信息。
代码实现
1. 导入必要的库
import logging
from pathlib import Path
from typing import List, Optional, Dict
import pandas as pd
代码解释:
- •
logging:Python的日志模块,用于记录程序运行信息 - •
pathlib.Path:面向对象的文件路径处理模块 - •
pandas as pd:数据分析核心库,处理Excel文件的神器
2. 配置日志系统
# 配置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
代码解释:
- •
level=logging.INFO:设置日志级别为INFO,只记录INFO及以上级别的信息 - •
format:定义日志输出格式,包含时间、级别和消息内容
执行结果:
日志系统配置完成,后续运行时会看到类似信息:
2026-03-05 20:30:25 - INFO - 开始处理文件夹: 销售表
💡 小Tip:使用logging替代print的好处是:
核心函数详解
函数一:读取Excel文件
def read_excel_files(folder_path: str, sheet_name: Optional[str] = None) -> List[pd.DataFrame]:
"""
读取文件夹中的所有Excel文件
Args:
folder_path: 文件夹路径
sheet_name: 指定的工作表名称,如果为None则读取所有工作表
Returns:
包含所有数据的数据框列表
"""
try:
folder = Path(folder_path)
if not folder.exists():
raise FileNotFoundError(f"文件夹不存在: {folder_path}")
dataframes = []
for file_path in folder.glob('*.xlsx*'):
try:
logger.info(f"正在读取文件: {file_path.name}")
# 读取Excel文件
if sheet_name:
df = pd.read_excel(file_path, sheet_name=sheet_name)
# 添加文件名和工作表名信息
df['文件名'] = file_path.stem
df['工作表名'] = sheet_name
dataframes.append(df)
else:
# 读取所有工作表
sheets = pd.read_excel(file_path, sheet_name=None)
for sheet_name_inner, sheet_df in sheets.items():
sheet_df['文件名'] = file_path.stem
sheet_df['工作表名'] = sheet_name_inner
dataframes.append(sheet_df)
except Exception as e:
logger.error(f"读取文件 {file_path.name} 失败: {str(e)}")
continue
return dataframes
except Exception as e:
logger.error(f"读取文件夹失败: {str(e)}")
raise
知识点详解
1. 函数定义与类型注解
- •
folder_path: str:参数类型注解,表示文件夹路径为字符串 - •
sheet_name: Optional[str] = None:这个参数可以接收字符串或None,如果不传值则默认为None - •
Optional的作用:告诉代码阅读者和类型检查工具,这个参数可以是字符串,也可以是None - •
Optional[str] 中的[ ]不是列表的意思,而是告诉Optional"我要包装的是字符串类型" - •
-> List[pd.DataFrame]:返回值类型注解,返回DataFrame列表
2. 文档字符串
3. 异常处理结构
- • 这样即使某个文件出错,也不会影响其他文件的处理
4. 文件路径处理
- •
Path(folder_path):将字符串路径转换为Path对象 - •
folder.exists():检查文件夹是否存在 - •
folder.glob('*.xlsx*'):使用通配符查找所有Excel文件 - •
file_path.name:获取文件名(带扩展名) - •
file_path.stem:获取文件名(不含扩展名)
Path对象属性一览表:
| | |
|---|
stem | 销售数据_01 | |
name | 销售数据_01.xlsx | |
suffix | .xlsx | |
parent | 销售表 | |
5. 读取Excel文件的两种模式
- • 模式1(指定工作表):
pd.read_excel(file_path, sheet_name=sheet_name),返回DataFrame - • 模式2(所有工作表):
pd.read_excel(file_path, sheet_name=None),返回字典 - •
for sheet_name_inner, sheet_df in sheets.items():遍历所有工作表
当sheet_name=None时,返回的sheets的数据结构:
sheets = {
'工作表名1': DataFrame1, # 第一个工作表的数据
'工作表名2': DataFrame2, # 第二个工作表的数据
'工作表名3': DataFrame3, # 第三个工作表的数据
...
}
6. 添加来源信息
- •
df['文件名'] = file_path.stem:新增"文件名"列 - •
df['工作表名'] = sheet_name或sheet_name_inner:新增"工作表名"列
7. 异常处理与继续执行
- •
except Exception as e:捕获所有类型的异常 - •
continue:跳过当前文件,继续处理下一个文件 - •
raise:重新抛出异常,让上层调用者知道程序失败了
函数二:合并数据
def merge_excel_data(dataframes: List[pd.DataFrame],
selected_columns: Optional[List[str]] = None,
new_column_names: Optional[Dict[str, str]] = None) -> pd.DataFrame:
"""
合并多个DataFrame并进行列处理
Args:
dataframes: 要合并的DataFrame列表
selected_columns: 需要保留的列名列表
new_column_names: 列名重命名字典,格式:{原列名: 新列名}
Returns:
合并处理后的DataFrame
"""
try:
if not dataframes:
logger.warning("没有数据可合并")
return pd.DataFrame()
# 合并所有数据
merged_df = pd.concat(dataframes, ignore_index=True)
logger.info(f"合并完成,总行数: {len(merged_df)}")
# 选择指定列
if selected_columns:
available_cols = [col for col in selected_columns if col in merged_df.columns]
merged_df = merged_df[available_cols]
logger.info(f"选择了列: {available_cols}")
# 重命名列
if new_column_names:
merged_df = merged_df.rename(columns=new_column_names)
logger.info(f"重命名列: {new_column_names}")
return merged_df
except Exception as e:
logger.error(f"合并数据失败: {str(e)}")
raise
知识点详解
1. 参数说明
- •
dataframes: List[pd.DataFrame]:要合并的DataFrame列表 - •
selected_columns: Optional[List[str]] = None:可选,需要保留的列名列表 - •
new_column_names: Optional[Dict[str, str]] = None:可选,列名重命名字典
2. 空数据检查
- •
if not dataframes:检查是否有数据 - • 返回空DataFrame而不是None,确保调用者总能得到DataFrame对象
- • 空DataFrame也可以用
df.empty来判断
3. 数据合并
- •
pd.concat(dataframes, ignore_index=True):合并所有DataFrame - •
ignore_index=True:重新生成索引 - •
len(merged_df):获取合并后的总行数
4. 列选择与列表推导式
- •
if selected_columns:如果传入了需要选择的列 - •
available_cols = [col for col in selected_columns if col in merged_df.columns]:这是一个列表推导式 - • 遍历selected_columns中的每个列名
- •
merged_df[available_cols]:只保留选中的列
列表推导式详解:
# 语法:[表达式 for 变量 in 可迭代对象 if 条件]
# 等价写法:
available_cols = []
for col in selected_columns:
if col in merged_df.columns:
available_cols.append(col)
5. 列重命名
- •
rename(columns=new_column_names):重命名列 - •
new_column_names:字典格式,{原列名: 新列名}
函数三:主处理函数
def process_excel_files(input_folder: str,
output_file: str,
sheet_name: Optional[str] = None,
selected_columns: Optional[List[str]] = None,
new_column_names: Optional[Dict[str, str]] = None) -> bool:
"""
主处理函数:读取、合并、保存Excel文件
Args:
input_folder: 输入文件夹路径
output_file: 输出文件路径
sheet_name: 指定的工作表名称
selected_columns: 需要保留的列名列表
new_column_names: 列名重命名字典
Returns:
bool: 处理是否成功
"""
try:
logger.info(f"开始处理文件夹: {input_folder}")
# 1. 读取所有Excel文件
dataframes = read_excel_files(input_folder, sheet_name)
# 2. 合并数据
merged_df = merge_excel_data(dataframes, selected_columns, new_column_names)
if merged_df.empty:
logger.warning("没有数据被处理")
return False
# 3. 保存结果
merged_df.to_excel(output_file, index=False)
logger.info(f"结果已保存到: {output_file}")
logger.info(f"共处理了 {len(dataframes)} 个数据表,最终数据 {len(merged_df)} 行")
return True
except Exception as e:
logger.error(f"处理失败: {str(e)}")
return False
知识点详解
1. 函数定义
- •
input_folder: str:输入文件夹路径 - •
output_file: str:输出文件路径 - •
-> bool:返回值类型是布尔值,表示处理是否成功
2. 三步走流程
- • 第1步:调用
read_excel_files读取所有Excel文件 - • 第2步:调用
merge_excel_data合并数据
3. 结果检查
- •
merged_df.empty:判断DataFrame是否为空
4. 保存文件
- •
to_excel(output_file, index=False):保存为Excel文件 - •
len(dataframes):总共读取了多少个数据表 - •
len(merged_df):合并后总共多少行数据
5. 返回值
4. 使用示例
if __name__ == "__main__":
# 示例1:基本使用 - 读取所有工作表
success = process_excel_files(
input_folder="销售表",
output_file="day2-result.xlsx",
sheet_name=None, # 读取所有工作表
selected_columns=["文件名", "工作表名", "店名", "品牌", "型号", "数量"],
new_column_names={"文件名": "来源文件", "工作表名": "来源工作表"}
)
if success:
logger.info("处理完成")
else:
logger.error("处理失败")
知识点详解
1. if __name__ == "__main__":
2. 函数调用
3. 参数灵活性
运行效果演示
执行代码后,你会看到类似这样的输出:
2026-03-05 20:30:25 - INFO - 开始处理文件夹: 销售表
2026-03-05 20:30:25 - INFO - 正在读取文件: 销售数据_01.xlsx
2026-03-05 20:30:26 - INFO - 正在读取文件: 销售数据_02.xlsx
2026-03-05 20:30:26 - INFO - 正在读取文件: 销售数据_03.xlsx
2026-03-05 20:30:27 - INFO - 合并完成,总行数: 156
2026-03-05 20:30:27 - INFO - 选择了列: ['文件名', '工作表名', '店名', '品牌', '型号', '数量']
2026-03-05 20:30:27 - INFO - 重命名列: {'文件名': '来源文件', '工作表名': '来源工作表'}
2026-03-05 20:30:28 - INFO - 结果已保存到: day2-result.xlsx
2026-03-05 20:30:28 - INFO - 共处理了 8 个数据表,最终数据 156 行
2026-03-05 20:30:28 - INFO - 处理完成
最终生成的Excel文件内容示例:
总结
通过这个案例,我们学会了:
📦 资源获取提示
关注「码农自习室」,后台回复关键词 PythonExcel案例,即可获取本文完整代码及所有示例数据集,动手实践批量处理Excel的技巧!
❤️ 支持我们
如果觉得本文对你有帮助,欢迎点赞 + 关注 + 点亮小红心,您的支持是我们持续创作优质内容的最大动力~