import pandas as pdimport numpy as npfrom datetime import datetimeimport reimport warningsfrom sqlalchemy import create_enginefrom sqlalchemy.exc import SQLAlchemyErrorwarnings.filterwarnings('ignore')class ExcelDataCleaner: """ Excel数据清洗与校验工具类 功能:读取Excel文件,进行数据清洗、格式校验,然后导入MySQL数据库 """ def __init__(self, excel_path, sheet_name=0): """ 初始化清洗器 参数: excel_path: Excel文件路径 sheet_name: 工作表名称或索引,默认为第一个工作表 """ self.excel_path = excel_path self.sheet_name = sheet_name self.df = None self.cleaning_log = [] def load_excel(self): """加载Excel文件""" try: self.df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name) self.cleaning_log.append(f"✓ 成功加载Excel文件: {self.excel_path}") self.cleaning_log.append(f"✓ 数据形状: {self.df.shape[0]}行 × {self.df.shape[1]}列") return True except Exception as e: self.cleaning_log.append(f"✗ 加载Excel失败: {str(e)}") return False def clean_column_names(self): """清洗列名:移除空格、特殊字符,统一命名格式""" if self.df is None: return original_columns = self.df.columns.tolist() # 定义列名清洗规则 new_columns = [] for col in original_columns: # 转换为字符串 col_str = str(col) # 移除前后空格 col_str = col_str.strip() # 替换特殊字符和空格为下划线 col_str = re.sub(r'[^\w]', '_', col_str) # 移除连续的下划线 col_str = re.sub(r'_+', '_', col_str) # 移除开头和结尾的下划线 col_str = col_str.strip('_') # 转换为小写(可选,根据数据库规范调整) col_str = col_str.lower() new_columns.append(col_str) self.df.columns = new_columns self.cleaning_log.append(f"✓ 列名已标准化: {dict(zip(original_columns, new_columns))}") def handle_missing_values(self, strategy='fill', fill_value=None, numeric_fill=0, text_fill=''): """ 处理缺失值 参数: strategy: 处理策略 'fill'填充, 'drop'删除, 'ignore'忽略 fill_value: 指定填充值(优先级最高) numeric_fill: 数值型列填充值 text_fill: 文本型列填充值 """ if self.df is None: return initial_rows = len(self.df) if strategy == 'drop': # 删除包含缺失值的行 self.df = self.df.dropna() dropped_rows = initial_rows - len(self.df) self.cleaning_log.append(f"✓ 删除包含缺失值的行: {dropped_rows}行") elif strategy == 'fill': # 按列类型填充缺失值 for col in self.df.columns: if self.df[col].dtype in ['int64', 'float64', 'int32', 'float32']: # 数值型列 fill_val = fill_value if fill_value is not None else numeric_fill missing_count = self.df[col].isnull().sum() if missing_count > 0: self.df[col].fillna(fill_val, inplace=True) self.cleaning_log.append(f"✓ 列 '{col}': 填充{missing_count}个缺失值为{fill_val}") else: # 文本或其他类型列 fill_val = fill_value if fill_value is not None else text_fill missing_count = self.df[col].isnull().sum() if missing_count > 0: self.df[col].fillna(fill_val, inplace=True) self.cleaning_log.append(f"✓ 列 '{col}': 填充{missing_count}个缺失值为'{fill_val}'") def remove_duplicates(self, subset=None, keep='first'): """ 删除重复行 参数: subset: 基于哪些列判断重复,None表示所有列 keep: 'first'保留第一个, 'last'保留最后一个, False删除所有重复 """ if self.df is None: return initial_rows = len(self.df) self.df = self.df.drop_duplicates(subset=subset, keep=keep) removed_rows = initial_rows - len(self.df) if removed_rows > 0: self.cleaning_log.append(f"✓ 删除重复行: {removed_rows}行") else: self.cleaning_log.append(f"✓ 未发现重复行") def trim_string_columns(self): """去除字符串列的前后空格""" if self.df is None: return trimmed_cols = [] for col in self.df.columns: if self.df[col].dtype == 'object': # 统计处理前后的变化 initial_sample = self.df[col].head(5).tolist() if len(self.df) > 0 else [] # 去除前后空格 self.df[col] = self.df[col].astype(str).str.strip() trimmed_cols.append(col) if trimmed_cols: self.cleaning_log.append(f"✓ 去除字符串列空格: {trimmed_cols}") def convert_date_columns(self, date_columns=None, date_format=None): """ 转换日期列格式 参数: date_columns: 指定为日期列的列名列表,None自动检测 date_format: 指定日期格式,如'%Y-%m-%d' """ if self.df is None: return converted_cols = [] # 如果未指定列,尝试自动检测日期列 if date_columns is None: # 通过列名和数据类型检测 date_pattern = re.compile(r'(date|time|生日|日期|时间)', re.IGNORECASE) potential_date_cols = [col for col in self.df.columns if date_pattern.search(col)] else: potential_date_cols = date_columns for col in potential_date_cols: if col in self.df.columns: try: if date_format: # 使用指定格式转换 self.df[col] = pd.to_datetime(self.df[col], format=date_format, errors='coerce') else: # 自动推断格式 self.df[col] = pd.to_datetime(self.df[col], errors='coerce') # 检查转换成功了多少行 success_count = self.df[col].notna().sum() total_count = len(self.df) if success_count > 0: converted_cols.append(col) self.cleaning_log.append(f"✓ 列 '{col}': 成功转换{success_count}/{total_count}行为日期格式") except Exception as e: self.cleaning_log.append(f"✗ 列 '{col}'日期转换失败: {str(e)}") if converted_cols: self.cleaning_log.append(f"✓ 已转换日期列: {converted_cols}") def validate_numeric_columns(self, numeric_columns=None, min_val=None, max_val=None): """ 验证数值列的有效性 参数: numeric_columns: 数值列名列表,None自动检测 min_val: 最小值限制 max_val: 最大值限制 """ if self.df is None: return validation_results = [] # 自动检测数值列 if numeric_columns is None: numeric_cols = self.df.select_dtypes(include=[np.number]).columns.tolist() else: numeric_cols = [col for col in numeric_columns if col in self.df.columns] for col in numeric_cols: # 检查非数值 non_numeric = pd.to_numeric(self.df[col], errors='coerce').isna().sum() # 检查范围 if min_val is not None: below_min = (self.df[col] < min_val).sum() else: below_min = 0 if max_val is not None: above_max = (self.df[col] > max_val).sum() else: above_max = 0 if non_numeric > 0 or below_min > 0 or above_max > 0: validation_results.append({ 'column': col, 'non_numeric': int(non_numeric), 'below_min': int(below_min) if min_val is not None else None, 'above_max': int(above_max) if max_val is not None else None }) if validation_results: self.cleaning_log.append("⚠ 数值列验证发现以下问题:") for result in validation_results: issues = [] if result['non_numeric'] > 0: issues.append(f"{result['non_numeric']}个非数值") if result.get('below_min', 0) > 0: issues.append(f"{result['below_min']}个小于最小值") if result.get('above_max', 0) > 0: issues.append(f"{result['above_max']}个大于最大值") if issues: self.cleaning_log.append(f" 列 '{result['column']}': {', '.join(issues)}") def clean_all(self, cleaning_steps=None): """ 执行完整的清洗流程 参数: cleaning_steps: 指定清洗步骤列表,None执行所有步骤 """ default_steps = [ 'clean_column_names', 'handle_missing_values', 'remove_duplicates', 'trim_string_columns', 'convert_date_columns', 'validate_numeric_columns' ] steps_to_execute = cleaning_steps if cleaning_steps is not None else default_steps self.cleaning_log.append("=" * 50) self.cleaning_log.append("开始数据清洗流程") self.cleaning_log.append("=" * 50) for step in steps_to_execute: if hasattr(self, step): try: # 调用对应的清洗方法 if step == 'handle_missing_values': getattr(self, step)('fill', numeric_fill=0, text_fill='未知') elif step == 'remove_duplicates': getattr(self, step)(keep='first') elif step == 'convert_date_columns': getattr(self, step)(date_format='%Y-%m-%d') elif step == 'validate_numeric_columns': getattr(self, step)(min_val=0, max_val=1000000) else: getattr(self, step)() except Exception as e: self.cleaning_log.append(f"✗ 步骤 '{step}' 执行失败: {str(e)}") self.cleaning_log.append("=" * 50) self.cleaning_log.append(f"清洗完成,剩余数据: {len(self.df)}行 × {len(self.df.columns)}列") self.cleaning_log.append("=" * 50) return self.df def export_to_mysql(self, db_url, table_name, if_exists='replace', index=False): """ 导出清洗后的数据到MySQL 参数: db_url: 数据库连接URL,格式: mysql+pymysql://用户名:密码@主机/数据库名 table_name: 目标表名 if_exists: 'fail', 'replace', 'append' index: 是否导出索引 """ if self.df is None or self.df.empty: self.cleaning_log.append("✗ 无数据可导出") return False try: # 创建数据库引擎 engine = create_engine(db_url) # 导出到MySQL self.df.to_sql( table_name, con=engine, if_exists=if_exists, index=index, chunksize=1000 ) self.cleaning_log.append(f"✓ 成功导出数据到MySQL表: {table_name}") self.cleaning_log.append(f"✓ 导出模式: {if_exists}") return True except SQLAlchemyError as e: self.cleaning_log.append(f"✗ MySQL导出失败: {str(e)}") return False except Exception as e: self.cleaning_log.append(f"✗ 导出过程出错: {str(e)}") return False def save_to_csv(self, output_path): """保存清洗后的数据到CSV文件""" if self.df is None or self.df.empty: return False try: self.df.to_csv(output_path, index=False, encoding='utf-8-sig') self.cleaning_log.append(f"✓ 数据已保存到: {output_path}") return True except Exception as e: self.cleaning_log.append(f"✗ 保存CSV失败: {str(e)}") return False def get_cleaning_log(self): """获取清洗日志""" return "\n".join(self.cleaning_log) def get_data_summary(self): """获取数据摘要""" if self.df is None: return "无数据" summary = [] summary.append("数据摘要:") summary.append(f"- 行数: {len(self.df)}") summary.append(f"- 列数: {len(self.df.columns)}") summary.append(f"- 列名: {list(self.df.columns)}") summary.append("\n数据类型分布:") for dtype, count in self.df.dtypes.value_counts().items(): summary.append(f"- {dtype}: {count}列") return "\n".join(summary)def main(): """主函数:演示数据清洗流程""" print("Excel数据清洗与MySQL导入演示") print("=" * 50) # 1. 创建示例数据(模拟从Excel读取) # 在实际使用中,这里应该从Excel文件读取 sample_data = { ' 用户ID ': ['U001', 'U002', 'U003', 'U004', 'U005', 'U001', None], '用户姓名': ['张三 ', '李四', '王五', '赵六', '孙七', '张三', '周八'], '年龄': [25, 30, 35, 28, 42, 25, 29], '注册日期': ['2023-01-15', '2023-02-20', '无效日期', '2023-03-10', '2023-04-05', '2023-01-15', '2023-05-01'], '消费金额': [1500.50, 2300.0, 1800.75, 950.25, 3200.0, 1500.50, 1200.0], '城市': ['北京', '上海', '广州', '深圳', '杭州', '北京', None], '电话号码': ['13800138000', '13900139000', '无效电话', '13600136000', '13700137000', '13800138000', '13500135000'] } # 创建DataFrame模拟Excel数据 df_sample = pd.DataFrame(sample_data) print("原始数据示例:") print(df_sample) print() # 2. 创建清洗器实例 # 注意:实际使用时,应该从Excel文件加载 # cleaner = ExcelDataCleaner('your_data.xlsx') # 这里我们直接使用示例数据 cleaner = ExcelDataCleaner('示例数据.xlsx') cleaner.df = df_sample.copy() # 3. 执行完整清洗流程 cleaned_df = cleaner.clean_all() # 4. 显示清洗日志 print("清洗日志:") print(cleaner.get_cleaning_log()) print() # 5. 显示清洗后的数据 print("清洗后数据:") print(cleaned_df) print() # 6. 显示数据摘要 print(cleaner.get_data_summary()) print() # 7. 保存到CSV(演示用) csv_path = 'cleaned_data.csv' if cleaner.save_to_csv(csv_path): print(f"数据已保存到: {csv_path}") # 8. 导出到MySQL(需要配置数据库连接) # 取消注释以下代码并配置数据库连接信息以实际导出到MySQL """ db_url = 'mysql+pymysql://username:password@localhost/your_database' table_name = 'cleaned_user_data' if cleaner.export_to_mysql(db_url, table_name, if_exists='replace'): print(f"数据已成功导出到MySQL表: {table_name}") else: print("MySQL导出失败,请检查数据库连接配置") """ print("\n提示: 要导出到MySQL,请取消注释代码中的数据库连接部分并配置正确的连接信息。") return cleanerif __name__ == "__main__": # 运行演示 cleaner = main() # 使用示例:单独调用清洗步骤 print("\n" + "="*50) print("高级用法示例: 自定义清洗流程") print("="*50) # 重新加载示例数据 cleaner2 = ExcelDataCleaner('示例数据.xlsx') cleaner2.df = pd.DataFrame(sample_data) # 自定义清洗步骤 print("\n1. 仅清洗列名和去重:") cleaner2.clean_column_names() cleaner2.remove_duplicates() print(f"清洗后列名: {list(cleaner2.df.columns)}") print(f"剩余行数: {len(cleaner2.df)}") print("\n2. 处理缺失值(指定策略):") cleaner3 = ExcelDataCleaner('示例数据.xlsx') cleaner3.df = pd.DataFrame(sample_data) cleaner3.handle_missing_values(strategy='fill', numeric_fill=-1, text_fill='暂无') print(f"缺失值处理后: {len(cleaner3.df)}行") print("\n3. 日期格式转换(指定格式):") cleaner4 = ExcelDataCleaner('示例数据.xlsx') cleaner4.df = pd.DataFrame(sample_data) cleaner4.convert_date_columns(date_columns=['注册日期'], date_format='%Y-%m-%d') print(f"日期列数据类型: {cleaner4.df['注册日期'].dtype}")