#!/usr/bin/env python3# -*- coding: utf-8 -*-"""智能销售预测系统支持多种预测算法、特征工程、模型评估"""import osimport sysimport jsonimport loggingimport warningsimport numpy as npimport pandas as pdfrom datetime import datetime, timedeltafrom typing import List, Dict, Any, Optional, Tuple, Unionfrom dataclasses import dataclass, asdict, fieldfrom decimal import Decimalfrom pathlib import Pathimport pickleimport hashlibimport tracebackfrom enum import Enumimport copyimport itertoolsfrom collections import defaultdict# 数据可视化import matplotlib.pyplot as pltimport seaborn as snsfrom matplotlib import rcParams# 统计检验from scipy import statsfrom scipy.stats import boxcox, yeojohnsonfrom scipy.signal import periodogram# 时间序列分析import statsmodels.api as smfrom statsmodels.tsa.stattools import adfuller, kpss, acf, pacffrom statsmodels.tsa.seasonal import seasonal_decompose, STLfrom statsmodels.tsa.holtwinters import ExponentialSmoothing, SimpleExpSmoothingfrom statsmodels.tsa.arima.model import ARIMAfrom statsmodels.tsa.statespace.sarimax import SARIMAXfrom statsmodels.tsa.vector_ar.var_model import VARfrom statsmodels.graphics.tsaplots import plot_acf, plot_pacffrom statsmodels.tsa.forecasting.stl import STLForecast# 机器学习from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScalerfrom sklearn.linear_model import LinearRegression, Ridge, Lasso, ElasticNetfrom sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressorfrom sklearn.svm import SVRfrom sklearn.neural_network import MLPRegressorfrom sklearn.model_selection import TimeSeriesSplit, cross_val_score, GridSearchCVfrom sklearn.metrics import mean_absolute_error, mean_squared_error, r2_scorefrom sklearn.pipeline import Pipelinefrom sklearn.compose import ColumnTransformerfrom sklearn.feature_selection import RFE, SelectKBest, f_regression# 深度学习try: import tensorflow as tf from tensorflow.keras.models import Sequential, Model from tensorflow.keras.layers import Dense, LSTM, GRU, Dropout, Input, Conv1D, MaxPooling1D, Flatten from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau TF_AVAILABLE = Trueexcept ImportError: TF_AVAILABLE = False logging.warning("TensorFlow not available, deep learning models disabled")# 预测评估from pmdarima import auto_arimafrom prophet import Prophetimport xgboost as xgbimport lightgbm as lgbimport optuna# 数据库import sqlalchemyfrom sqlalchemy import create_engine, text, MetaData, Table, selectfrom sqlalchemy.orm import sessionmaker, Session# 配置import yamlfrom pydantic import BaseModel, Field, validatorimport redis# 日志配置logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('sales_forecast.log'), logging.StreamHandler() ])logger = logging.getLogger(__name__)warnings.filterwarnings('ignore')# 设置中文字体plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei']plt.rcParams['axes.unicode_minus'] = False# 数据模型class ForecastMethod(Enum): """预测方法枚举""" SIMPLE_MA = "simple_moving_average" EXPONENTIAL_SMOOTHING = "exponential_smoothing" LINEAR_REGRESSION = "linear_regression" ARIMA = "arima" SARIMA = "sarima" PROPHET = "prophet" RANDOM_FOREST = "random_forest" XGBOOST = "xgboost" LSTM = "lstm" ENSEMBLE = "ensemble"@dataclassclass ForecastConfig: """预测配置""" # 数据配置 data_source: str target_column: str = "sales" date_column: str = "date" frequency: str = "D" # 频率:D日,W周,M月,Q季,Y年 # 预测配置 forecast_horizon: int = 30 test_size: float = 0.2 cv_folds: int = 5 # 特征工程配置 lag_features: List[int] = field(default_factory=lambda: [1, 7, 14, 30]) rolling_features: List[int] = field(default_factory=lambda: [3, 7, 14, 30]) seasonal_features: bool = True external_features: List[str] = field(default_factory=list) # 模型配置 primary_method: ForecastMethod = ForecastMethod.SARIMA ensemble_methods: List[ForecastMethod] = field(default_factory=list) use_automl: bool = False # 评估配置 metrics: List[str] = field(default_factory=lambda: ["MAE", "RMSE", "MAPE", "R2"]) confidence_level: float = 0.95 class Config: arbitrary_types_allowed = True@dataclassclass ForecastResult: """预测结果""" method: ForecastMethod forecasts: pd.Series lower_bounds: pd.Series upper_bounds: pd.Series metrics: Dict[str, float] model: Any feature_importance: Optional[pd.DataFrame] = None residuals: Optional[pd.Series] = None def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "method": self.method.value, "forecasts": self.forecasts.tolist(), "lower_bounds": self.lower_bounds.tolist(), "upper_bounds": self.upper_bounds.tolist(), "metrics": self.metrics, "feature_importance": self.feature_importance.to_dict() if self.feature_importance is not None else None }class SalesForecastSystem: """销售预测系统""" def __init__(self, config_path: str = "config/forecast_config.yaml"): """初始化""" self.config = self._load_config(config_path) self.data_loader = DataLoader(self.config) self.feature_engineer = FeatureEngineer(self.config) self.model_factory = ModelFactory(self.config) self.evaluator = ForecastEvaluator(self.config) self.visualizer = ForecastVisualizer() logger.info(f"销售预测系统初始化完成,主方法:{self.config.primary_method.value}") def _load_config(self, config_path: str) -> ForecastConfig: """加载配置""" with open(config_path, 'r', encoding='utf-8') as f: config_data = yaml.safe_load(f) return ForecastConfig(**config_data) def run_forecast_pipeline(self) -> Dict[str, Any]: """运行完整的预测流程""" logger.info("开始销售预测流程") start_time = datetime.now() try: # 1. 数据加载 logger.info("步骤1: 加载数据") data = self.data_loader.load_data() if data.empty: raise ValueError("数据加载失败或数据为空") logger.info(f"数据加载完成,共{len(data)}条记录") # 2. 数据探索 logger.info("步骤2: 数据探索分析") exploration_report = self._explore_data(data) # 3. 特征工程 logger.info("步骤3: 特征工程") features, target = self.feature_engineer.transform(data) # 4. 划分训练测试集 logger.info("步骤4: 划分数据集") X_train, X_test, y_train, y_test = self._split_time_series(features, target) # 5. 模型训练与预测 logger.info("步骤5: 模型训练") forecast_results = [] # 主模型预测 primary_result = self._train_and_predict( self.config.primary_method, X_train, X_test, y_train, y_test ) forecast_results.append(primary_result) # 集成模型预测 for method in self.config.ensemble_methods: try: result = self._train_and_predict(method, X_train, X_test, y_train, y_test) forecast_results.append(result) except Exception as e: logger.warning(f"模型{method.value}训练失败: {str(e)}") # 6. 模型评估 logger.info("步骤6: 模型评估") evaluation_report = self.evaluator.evaluate_all(forecast_results, y_test) # 7. 生成预测报告 logger.info("步骤7: 生成预测报告") final_forecast = self._generate_final_forecast(forecast_results, evaluation_report) report = self._generate_forecast_report( data, features, forecast_results, evaluation_report, final_forecast, exploration_report ) # 8. 可视化 logger.info("步骤8: 生成可视化") self.visualizer.generate_all_plots( data, features, forecast_results, evaluation_report, final_forecast, report["output_dir"] ) end_time = datetime.now() execution_time = (end_time - start_time).total_seconds() logger.info(f"预测流程完成,总耗时: {execution_time:.2f}秒") return { "success": True, "execution_time": execution_time, "best_model": evaluation_report["best_model"], "best_score": evaluation_report["best_score"], "report_path": report["report_path"], "forecast": final_forecast.to_dict() } except Exception as e: logger.error(f"预测流程失败: {str(e)}") logger.error(traceback.format_exc()) return { "success": False, "error": str(e), "traceback": traceback.format_exc() } def _explore_data(self, data: pd.DataFrame) -> Dict[str, Any]: """数据探索分析""" report = { "summary": {}, "stationarity": {}, "seasonality": {}, "trend": {}, "correlation": {} } # 基本统计 report["summary"] = { "total_records": len(data), "date_range": f"{data[self.config.date_column].min()} 到 {data[self.config.date_column].max()}", "mean": data[self.config.target_column].mean(), "std": data[self.config.target_column].std(), "min": data[self.config.target_column].min(), "max": data[self.config.target_column].max(), "missing_values": data[self.config.target_column].isnull().sum(), "zero_values": (data[self.config.target_column] == 0).sum() } # 平稳性检验 try: adf_result = adfuller(data[self.config.target_column].dropna()) kpss_result = kpss(data[self.config.target_column].dropna()) report["stationarity"] = { "adf_statistic": adf_result[0], "adf_pvalue": adf_result[1], "adf_stationary": adf_result[1] < 0.05, "kpss_statistic": kpss_result[0], "kpss_pvalue": kpss_result[1], "kpss_stationary": kpss_result[1] > 0.05 } except Exception as e: logger.warning(f"平稳性检验失败: {str(e)}") # 季节性分解 try: if len(data) >= 2 * 365: # 至少两年数据 decomposition = seasonal_decompose( data[self.config.target_column], model='additive', period=365 ) seasonal_strength = 1 - (decomposition.resid.var() / decomposition.observed.var()) trend_strength = 1 - (decomposition.resid.var() / decomposition.trend.var()) report["seasonality"] = { "seasonal_strength": seasonal_strength, "trend_strength": trend_strength, "has_seasonality": seasonal_strength > 0.5 } except Exception as e: logger.warning(f"季节性分解失败: {str(e)}") return report def _split_time_series(self, features: pd.DataFrame, target: pd.Series) -> tuple: """划分时间序列数据集""" test_size = int(len(features) * self.config.test_size) X_train = features.iloc[:-test_size] X_test = features.iloc[-test_size:] y_train = target.iloc[:-test_size] y_test = target.iloc[-test_size:] logger.info(f"数据集划分: 训练集{len(X_train)}条, 测试集{len(X_test)}条") return X_train, X_test, y_train, y_test def _train_and_predict( self, method: ForecastMethod, X_train: pd.DataFrame, X_test: pd.DataFrame, y_train: pd.Series, y_test: pd.Series ) -> ForecastResult: """训练模型并预测""" logger.info(f"训练模型: {method.value}") # 获取模型 model = self.model_factory.get_model(method) # 训练模型 if method in [ForecastMethod.ARIMA, ForecastMethod.SARIMA, ForecastMethod.PROPHET]: # 时间序列模型 model.fit(y_train) # 预测 forecast = model.predict(steps=len(X_test)) forecasts = pd.Series(forecast, index=X_test.index) # 置信区间 if hasattr(model, 'get_prediction'): pred_results = model.get_prediction(start=len(y_train), end=len(y_train) + len(y_test) - 1) lower_bounds = pred_results.conf_int(alpha=1-self.config.confidence_level).iloc[:, 0] upper_bounds = pred_results.conf_int(alpha=1-self.config.confidence_level).iloc[:, 1] else: # 简单置信区间 std = forecasts.std() z_score = stats.norm.ppf((1 + self.config.confidence_level) / 2) margin = z_score * std lower_bounds = forecasts - margin upper_bounds = forecasts + margin else: # 机器学习模型 model.fit(X_train, y_train) # 预测 forecasts = pd.Series(model.predict(X_test), index=X_test.index) # 特征重要性 feature_importance = None if hasattr(model, 'feature_importances_'): importance = model.feature_importances_ feature_importance = pd.DataFrame({ 'feature': X_train.columns, 'importance': importance }).sort_values('importance', ascending=False) # 置信区间(使用交叉验证) tscv = TimeSeriesSplit(n_splits=self.config.cv_folds) cv_scores = cross_val_score(model, X_train, y_train, cv=tscv, scoring='neg_mean_squared_error') cv_std = np.sqrt(-cv_scores).std() z_score = stats.norm.ppf((1 + self.config.confidence_level) / 2) margin = z_score * cv_std lower_bounds = forecasts - margin upper_bounds = forecasts + margin # 计算残差 residuals = y_test - forecasts # 评估指标 metrics = self.evaluator.calculate_metrics(y_test, forecasts) return ForecastResult( method=method, forecasts=forecasts, lower_bounds=lower_bounds, upper_bounds=upper_bounds, metrics=metrics, model=model, feature_importance=feature_importance if 'feature_importance' in locals() else None, residuals=residuals ) def _generate_final_forecast( self, results: List[ForecastResult], evaluation_report: Dict[str, Any] ) -> ForecastResult: """生成最终预测(选择最佳模型或集成)""" if not results: raise ValueError("没有可用的预测结果") # 如果只有一个结果,直接返回 if len(results) == 1: return results[0] # 获取最佳模型 best_method = evaluation_report["best_model"] best_result = next((r for r in results if r.method == best_method), results[0]) # 如果需要集成,计算加权平均 if self.config.ensemble_methods: weights = {} total_weight = 0 for result in results: # 根据MAPE分配权重(误差越小权重越大) mape = result.metrics.get("MAPE", 1.0) weight = 1.0 / (mape + 0.0001) # 避免除零 weights[result.method] = weight total_weight += weight # 归一化权重 for method in weights: weights[method] /= total_weight # 加权平均预测 weighted_forecasts = pd.Series(0.0, index=results[0].forecasts.index) weighted_lower = pd.Series(0.0, index=results[0].lower_bounds.index) weighted_upper = pd.Series(0.0, index=results[0].upper_bounds.index) for result in results: weight = weights[result.method] weighted_forecasts += result.forecasts * weight weighted_lower += result.lower_bounds * weight weighted_upper += result.upper_bounds * weight # 计算集成模型的指标 # 这里需要实际值来计算,但在最终预测中我们没有实际值 # 所以使用最佳模型的指标 ensemble_result = ForecastResult( method=ForecastMethod.ENSEMBLE, forecasts=weighted_forecasts, lower_bounds=weighted_lower, upper_bounds=weighted_upper, metrics=best_result.metrics, # 使用最佳模型的指标 model=None, feature_importance=None ) return ensemble_result return best_result def _generate_forecast_report( self, data: pd.DataFrame, features: pd.DataFrame, results: List[ForecastResult], evaluation_report: Dict[str, Any], final_forecast: ForecastResult, exploration_report: Dict[str, Any] ) -> Dict[str, Any]: """生成预测报告""" # 创建输出目录 output_dir = Path(f"output/forecast/{datetime.now().strftime('%Y%m%d_%H%M%S')}") output_dir.mkdir(parents=True, exist_ok=True) # 1. 生成文本报告 report_lines = [ "=" * 80, "销售预测分析报告", "=" * 80, f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", f"数据范围: {exploration_report['summary']['date_range']}", f"总记录数: {exploration_report['summary']['total_records']}", "", "一、数据概览", "-" * 40, f"平均值: {exploration_report['summary']['mean']:.2f}", f"标准差: {exploration_report['summary']['std']:.2f}", f"最小值: {exploration_report['summary']['min']:.2f}", f"最大值: {exploration_report['summary']['max']:.2f}", f"缺失值: {exploration_report['summary']['missing_values']}", "", "二、平稳性分析", "-" * 40, ] if exploration_report["stationarity"]: report_lines.extend([ f"ADF检验p值: {exploration_report['stationarity']['adf_pvalue']:.4f}", f"平稳性结论: {'平稳'if exploration_report['stationarity']['adf_stationary'] else'不平稳'}", f"KPSS检验p值: {exploration_report['stationarity']['kpss_pvalue']:.4f}", ]) report_lines.extend([ "", "三、模型评估结果", "-" * 40, f"最佳模型: {evaluation_report['best_model'].value}", f"最佳MAPE: {evaluation_report['best_score']:.2%}", "", "各模型表现:" ]) for method, scores in evaluation_report["model_scores"].items(): report_lines.append(f" {method}:") for metric, value in scores.items(): if metric == "MAPE": report_lines.append(f" {metric}: {value:.2%}") else: report_lines.append(f" {metric}: {value:.2f}") report_lines.extend([ "", "四、最终预测结果", "-" * 40, f"预测方法: {final_forecast.method.value}", f"预测期数: {self.config.forecast_horizon}", f"置信水平: {self.config.confidence_level:.0%}", "", "预测值:" ]) # 添加预测值 forecast_dates = pd.date_range( start=data[self.config.date_column].max() + timedelta(days=1), periods=self.config.forecast_horizon, freq=self.config.frequency ) for i, (date, value) in enumerate(zip(forecast_dates, final_forecast.forecasts)): lower = final_forecast.lower_bounds.iloc[i] if i < len(final_forecast.lower_bounds) else value upper = final_forecast.upper_bounds.iloc[i] if i < len(final_forecast.upper_bounds) else value report_lines.append( f" {date.strftime('%Y-%m-%d')}: {value:.2f} " f"[{lower:.2f}, {upper:.2f}]" ) report_lines.extend([ "", "五、业务建议", "-" * 40 ]) # 添加业务建议 avg_forecast = final_forecast.forecasts.mean() last_actual = data[self.config.target_column].iloc[-1] if avg_forecast > last_actual * 1.2: report_lines.append("1. 预计销售将显著增长,建议增加库存和生产计划") elif avg_forecast < last_actual * 0.8: report_lines.append("1. 预计销售将下降,建议控制库存,加大促销力度") else: report_lines.append("1. 预计销售保持平稳,建议维持当前运营策略") if exploration_report.get("seasonality", {}).get("has_seasonality", False): report_lines.append("2. 数据存在明显季节性,建议制定季节性营销策略") report_lines.extend([ "", "=" * 80, "报告结束", "=" * 80 ]) # 保存报告 report_path = output_dir / "forecast_report.txt" with open(report_path, 'w', encoding='utf-8') as f: f.write('\n'.join(report_lines)) # 2. 保存预测数据 forecast_df = pd.DataFrame({ 'date': forecast_dates, 'forecast': final_forecast.forecasts, 'lower_bound': final_forecast.lower_bounds, 'upper_bound': final_forecast.upper_bounds }) forecast_csv_path = output_dir / "forecast_results.csv" forecast_df.to_csv(forecast_csv_path, index=False, encoding='utf-8') # 3. 保存模型 if final_forecast.model is not None: model_path = output_dir / "forecast_model.pkl" with open(model_path, 'wb') as f: pickle.dump(final_forecast.model, f) logger.info(f"预测报告已生成: {report_path}") return { "report_path": str(report_path), "output_dir": str(output_dir), "forecast_csv": str(forecast_csv_path) }class DataLoader: """数据加载器""" def __init__(self, config: ForecastConfig): self.config = config def load_data(self) -> pd.DataFrame: """加载数据""" if self.config.data_source.endswith('.csv'): return self._load_csv() elif self.config.data_source.endswith('.xlsx'): return self._load_excel() elif self.config.data_source.startswith('sql://'): return self._load_database() else: raise ValueError(f"不支持的数据源: {self.config.data_source}") def _load_csv(self) -> pd.DataFrame: """加载CSV文件""" df = pd.read_csv(self.config.data_source, parse_dates=[self.config.date_column]) # 确保日期列正确 if self.config.date_column in df.columns: df[self.config.date_column] = pd.to_datetime(df[self.config.date_column]) df = df.set_index(self.config.date_column).sort_index() return df def _load_excel(self) -> pd.DataFrame: """加载Excel文件""" df = pd.read_excel(self.config.data_source, parse_dates=[self.config.date_column]) if self.config.date_column in df.columns: df[self.config.date_column] = pd.to_datetime(df[self.config.date_column]) df = df.set_index(self.config.date_column).sort_index() return df def _load_database(self) -> pd.DataFrame: """从数据库加载""" # 这里简化处理,实际应根据连接字符串解析 engine = create_engine(self.config.data_source.replace('sql://', '')) query = f""" SELECT {self.config.date_column}, {self.config.target_column} FROM sales_data ORDER BY {self.config.date_column} """ df = pd.read_sql(query, engine, parse_dates=[self.config.date_column]) df = df.set_index(self.config.date_column) return dfclass FeatureEngineer: """特征工程""" def __init__(self, config: ForecastConfig): self.config = config def transform(self, data: pd.DataFrame) -> Tuple[pd.DataFrame, pd.Series]: """特征转换""" # 复制数据避免修改原始数据 df = data.copy() # 确保是时间序列 if not isinstance(df.index, pd.DatetimeIndex): raise ValueError("数据索引必须是日期时间类型") # 处理缺失值 df = self._handle_missing_values(df) # 创建滞后特征 df = self._create_lag_features(df) # 创建滚动特征 df = self._create_rolling_features(df) # 创建时间特征 df = self._create_time_features(df) # 创建季节性特征 if self.config.seasonal_features: df = self._create_seasonal_features(df) # 分离特征和目标 features = df.drop(columns=[self.config.target_column]) target = df[self.config.target_column] # 删除包含NaN的行 valid_idx = features.notna().all(axis=1) & target.notna() features = features[valid_idx] target = target[valid_idx] logger.info(f"特征工程完成,特征数: {len(features.columns)},样本数: {len(features)}") return features, target def _handle_missing_values(self, df: pd.DataFrame) -> pd.DataFrame: """处理缺失值""" # 前向填充 df[self.config.target_column] = df[self.config.target_column].ffill() # 如果还有缺失,用均值填充 if df[self.config.target_column].isnull().any(): mean_value = df[self.config.target_column].mean() df[self.config.target_column] = df[self.config.target_column].fillna(mean_value) return df def _create_lag_features(self, df: pd.DataFrame) -> pd.DataFrame: """创建滞后特征""" for lag in self.config.lag_features: df[f'lag_{lag}'] = df[self.config.target_column].shift(lag) return df def _create_rolling_features(self, df: pd.DataFrame) -> pd.DataFrame: """创建滚动特征""" for window in self.config.rolling_features: df[f'rolling_mean_{window}'] = df[self.config.target_column].rolling(window=window).mean() df[f'rolling_std_{window}'] = df[self.config.target_column].rolling(window=window).std() df[f'rolling_min_{window}'] = df[self.config.target_column].rolling(window=window).min() df[f'rolling_max_{window}'] = df[self.config.target_column].rolling(window=window).max() return df def _create_time_features(self, df: pd.DataFrame) -> pd.DataFrame: """创建时间特征""" df['day_of_week'] = df.index.dayofweek df['day_of_month'] = df.index.day df['week_of_year'] = df.index.isocalendar().week df['month'] = df.index.month df['quarter'] = df.index.quarter df['year'] = df.index.year df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int) return df def _create_seasonal_features(self, df: pd.DataFrame) -> pd.DataFrame: """创建季节性特征""" # 使用正弦/余弦编码季节性 df['sin_day'] = np.sin(2 * np.pi * df.index.dayofyear / 365) df['cos_day'] = np.cos(2 * np.pi * df.index.dayofyear / 365) df['sin_week'] = np.sin(2 * np.pi * df.index.dayofweek / 7) df['cos_week'] = np.cos(2 * np.pi * df.index.dayofweek / 7) df['sin_month'] = np.sin(2 * np.pi * df.index.month / 12) df['cos_month'] = np.cos(2 * np.pi * df.index.month / 12) return dfclass ModelFactory: """模型工厂""" def __init__(self, config: ForecastConfig): self.config = config def get_model(self, method: ForecastMethod) -> Any: """获取模型""" if method == ForecastMethod.SIMPLE_MA: return SimpleMovingAverageModel(window=7) elif method == ForecastMethod.EXPONENTIAL_SMOOTHING: return ExponentialSmoothingModel() elif method == ForecastMethod.LINEAR_REGRESSION: return LinearRegressionModel() elif method == ForecastMethod.ARIMA: return ARIMAModel() elif method == ForecastMethod.SARIMA: return SARIMAModel() elif method == ForecastMethod.PROPHET: return ProphetModel() elif method == ForecastMethod.RANDOM_FOREST: return RandomForestModel() elif method == ForecastMethod.XGBOOST: return XGBoostModel() elif method == ForecastMethod.LSTM and TF_AVAILABLE: return LSTMModel() else: raise ValueError(f"不支持的模型: {method}")# 模型实现(简化的示例)class SimpleMovingAverageModel: """简单移动平均模型""" def __init__(self, window=7): self.window = window def fit(self, y): self.y = y return self def predict(self, steps): # 使用最后window个值的平均作为预测 last_values = self.y.iloc[-self.window:].values return np.full(steps, last_values.mean())# 其他模型实现类似,为节省篇幅省略# 实际应用中应使用完整的模型实现# 主程序if __name__ == "__main__": # 使用示例 forecast_system = SalesForecastSystem("config/forecast_config.yaml") result = forecast_system.run_forecast_pipeline() if result["success"]: print("预测成功!") print(f"最佳模型: {result['best_model']}") print(f"最佳MAPE: {result['best_score']:.2%}") print(f"报告路径: {result['report_path']}") else: print(f"预测失败: {result['error']}")