Granite TimeSeries FlowState R1模型数据预处理完整教程:从原始数据到模型输入
你是不是觉得时间序列预测听起来很复杂,尤其是看到那些模型名字就头疼?别担心,今天我们不聊复杂的算法,就聊一件事:怎么把一堆看起来乱七八糟的数据,变成模型能“吃”得下去的“食物”。
想象一下,你要教一个小朋友认水果。你不能直接把一整个果园搬到他面前,说“你自己看吧”。你得先洗干净苹果,剥好香蕉,切成小块,然后告诉他“这是苹果,甜的”、“这是香蕉,软的”。数据预处理,干的差不多就是这个活儿。对于 Granite TimeSeries FlowState R1 这样的模型来说,你喂给它的数据质量,直接决定了它预测得准不准。
很多人模型跑不好,第一反应是“模型不行”或者“参数没调对”,但其实八成问题都出在数据预处理这一步。数据没洗干净,再厉害的模型也白搭。这篇教程,我就带你手把手走一遍从原始数据到模型输入的完整流程,用最直白的语言和能直接运行的代码,让你彻底搞明白时间序列数据该怎么“收拾”。
1. 准备工作:认识你的数据和工具
在开始动手之前,我们得先知道要处理的是什么,以及用什么工具。别急着写代码,这一步想清楚,后面能省一半的力气。
1.1 理解时间序列数据
时间序列数据,说白了就是按时间顺序排列的一串数字。比如,你家每天的电表读数、股票每分钟的价格、商店每小时的销售额。它最大的特点就是“顺序”很重要,明天的数据跟今天、昨天的数据有关系。
我们这次要处理的 Granite TimeSeries FlowState R1 模型,是一个专门用来做时间序列预测的模型。它喜欢“干净”、“规整”的数据。所谓“干净”,就是没有乱七八糟的异常值和缺失;所谓“规整”,就是数据要均匀地按照时间间隔排列好。
1.2 搭建你的Python环境
工欲善其事,必先利其器。我们主要用 Python 的几个经典库,如果你还没安装,打开命令行(Windows 叫 CMD 或 PowerShell,Mac/Linux 叫终端),输入下面的命令安装:
pip install pandas numpy scikit-learn matplotlib简单解释一下这几个库是干嘛的:
- pandas:处理表格数据的“瑞士军刀”,读数据、清洗数据、分析数据都靠它。
- numpy:进行数学计算的“发动机”,速度快,功能强。
- scikit-learn:机器学习的“工具箱”,里面有很多现成的数据预处理和模型工具。
- matplotlib:画图的“画笔”,让我们能直观地看到数据长什么样。
安装好之后,我们就可以在 Python 脚本或 Jupyter Notebook 里导入它们了:
import pandas as pd import numpy as np from sklearn.preprocessing import StandardScaler, MinMaxScaler import matplotlib.pyplot as plt # 为了让图表显示在 Notebook 里(如果你用 Jupyter) %matplotlib inline2. 第一步:把数据“请”进来并看个明白
假设你手头有一个 CSV 文件,名字叫sales_data.csv,里面记录了某个产品每天的销售额。我们的第一步就是把它读进来,然后好好“端详”一番。
2.1 加载数据
用 pandas 读取数据非常简单,一行代码搞定:
# 读取数据 df = pd.read_csv('sales_data.csv') # 看看前5行,长什么样 print("数据的前几行:") print(df.head()) # 看看数据的基本信息 print("\n数据的基本信息:") print(df.info()) # 看看数据的统计摘要(比如平均值、最大最小值) print("\n数据的统计描述:") print(df.describe())运行df.head(),你可能会看到类似这样的表格:
| date | sales |
|---|---|
| 2023-01-01 | 150.2 |
| 2023-01-02 | 162.5 |
| 2023-01-03 | 158.0 |
| ... | ... |
df.info()会告诉你数据有多少行、多少列,每列是什么类型(比如日期是不是被识别成了日期格式,数字是不是被识别成了浮点数),以及有没有缺失值。这是你了解数据状况的第一次“体检”。
2.2 处理时间索引
对于时间序列,我们必须把日期那一列设置成数据的“索引”。索引就像是数据的身份证号,有了它,pandas 才能按照时间顺序去理解和处理数据。
# 假设你的日期列名叫 'date',确保它被转换成真正的日期格式 df['date'] = pd.to_datetime(df['date']) # 将‘date’列设置为索引 df.set_index('date', inplace=True) # 按时间顺序排序,确保数据是按时间排好的 df.sort_index(inplace=True) # 再次查看索引和数据 print("设置时间索引后的数据前几行:") print(df.head()) print(f"\n时间范围:从 {df.index.min()} 到 {df.index.max()}")做完这一步,你的数据框(DataFrame)的左边就不再是普通的0,1,2编号,而是具体的日期了。这是处理时间序列非常关键的一步。
3. 第二步:给数据“洗个澡”——清洗与规整
原始数据往往不那么完美,可能有缺失,可能有异常。这一步就是来做清洁的。
3.1 处理缺失值
数据里偶尔缺了一两天,这很正常。但模型不喜欢“空”的东西。我们有几种办法来补上这些空缺:
- 向前填充:用前一天的数据补上今天的空缺。这适合数据变化比较平缓的情况。
- 向后填充:用后一天的数据补上今天的空缺。
- 插值:根据前后几天的数据,算出一个合理的中间值来填充。比如线性插值,就是假设数据在中间是均匀变化的。
# 检查是否有缺失值 print(f"缺失值数量:\n{df.isnull().sum()}") # 方法1:向前填充(用前一天的值填充) df_filled_ffill = df.fillna(method='ffill') # 方法2:线性插值(更平滑) df_filled_interpolate = df.interpolate(method='linear') # 通常我会先尝试插值,因为它更“聪明”一些。这里我们选择插值的结果继续后续步骤。 df = df_filled_interpolate print("\n填充缺失值后,再次检查:") print(df.isnull().sum())3.2 处理异常值
异常值就是那些“不合群”的、特别高或特别低的数据点。比如销售额平时都是100左右,突然有一天变成了10000,这很可能是个错误(或者发生了特殊事件,比如双十一)。
一个简单的方法是使用“分位数”来检测。我们认为,在99%的数据范围之外的点,可能是异常值。
# 假设我们处理‘sales’这一列 sales_series = df['sales'] # 计算上下限(这里用99%和1%分位数,你可以调整) Q1 = sales_series.quantile(0.01) Q3 = sales_series.quantile(0.99) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR print(f"异常值判定边界:下限 {lower_bound:.2f}, 上限 {upper_bound:.2f}") # 找出异常值 outliers = sales_series[(sales_series < lower_bound) | (sales_series > upper_bound)] print(f"发现 {len(outliers)} 个潜在的异常值。") # 处理异常值:一种常见方法是用上下边界值替换,或者用前后数据的均值替换 df['sales_cleaned'] = sales_series.clip(lower_bound, upper_bound) # 可视化一下,看看清洗前后的对比 plt.figure(figsize=(12, 5)) plt.subplot(1,2,1) plt.plot(sales_series, label='原始数据', alpha=0.7) plt.scatter(outliers.index, outliers, color='red', label='异常值', zorder=5) plt.title('原始数据与异常值') plt.legend() plt.subplot(1,2,2) plt.plot(df['sales_cleaned'], label='清洗后数据', color='orange') plt.title('清洗(缩尾)后数据') plt.legend() plt.tight_layout() plt.show() # 后续我们将使用清洗后的列 df['sales'] = df['sales_cleaned'] df.drop(columns=['sales_cleaned'], inplace=True, errors='ignore')图表能帮你直观地看到哪些点被处理了,确保你的操作是合理的。
3.3 确保时间序列的连续性
有时候数据不是每天都有,可能周末没有记录。但模型训练通常要求等间隔的数据。我们需要生成一个完整的时间索引,并把缺失的日期补上(用我们之前处理缺失值的方法填充数据)。
# 生成一个完整的、按天的日期范围 full_date_range = pd.date_range(start=df.index.min(), end=df.index.max(), freq='D') # 'D'代表天 # 将数据重新索引到这个完整的范围,缺失的日期会自动变成NaN df = df.reindex(full_date_range) # 再次用插值法填充因补全日期而产生的缺失值 df = df.interpolate(method='linear') print(f"规整后,数据总行数:{len(df)}")4. 第三步:把数据“变个样”——特征工程
这是最有趣也最重要的一步。原始数据可能只有一个“销售额”数字,但我们能从中挖掘出更多信息喂给模型,比如“趋势”、“周期性”。
4.1 构建滞后特征
预测明天,最重要的信息往往是今天和昨天。滞后特征就是把过去几天的数据,作为今天的新特征。
# 创建滞后1天、7天(一周)、30天(一个月)的特征 df['sales_lag1'] = df['sales'].shift(1) df['sales_lag7'] = df['sales'].shift(7) df['sales_lag30'] = df['sales'].shift(30) # 由于创建滞后特征会导致前几行出现NaN,我们删除这些行 df.dropna(inplace=True) print(f"创建滞后特征后,数据行数:{len(df)}")4.2 构建滚动统计特征
除了具体的某一天,过去一段时间的“整体情况”也很重要,比如过去7天的平均销售额、最大销售额。
# 滚动窗口统计:过去7天的均值和标准差 df['rolling_mean_7'] = df['sales'].rolling(window=7).mean() df['rolling_std_7'] = df['sales'].rolling(window=7).std() # 同样,滚动特征也会在开头产生NaN,我们用向后填充简单处理一下(或者可以删除) df[['rolling_mean_7', 'rolling_std_7']] = df[['rolling_mean_7', 'rolling_std_7']].fillna(method='bfill')4.3 构建时间周期性特征
很多数据有周期性,比如销售额周末高、工作日低。我们可以把时间信息拆解出来。
# 从日期索引中提取周期性特征 df['day_of_week'] = df.index.dayofweek # 周一=0, 周日=6 df['day_of_month'] = df.index.day df['month'] = df.index.month df['is_weekend'] = df['day_of_week'].apply(lambda x: 1 if x >= 5 else 0) # 周末标记 # 对于周期性特征,更推荐使用正弦余弦编码,能更好地表达“星期一的后面是星期二”这种循环关系 df['day_sin'] = np.sin(2 * np.pi * df['day_of_week']/7) df['day_cos'] = np.cos(2 * np.pi * df['day_of_week']/7) df['month_sin'] = np.sin(2 * np.pi * df['month']/12) df['month_cos'] = np.cos(2 * np.pi * df['month']/12)5. 第四步:把数据“调调口味”——归一化与划分
不同特征的值可能相差巨大(比如销售额是几万,星期几只是1-7),这会让模型学习起来很困难。归一化就是把所有特征都调整到差不多的大小范围。
5.1 数据归一化
这里介绍两种最常用的方法:
- 标准化:让数据变成均值为0,标准差为1的分布。适合数据分布没有明显边界的情况。
- 最小最大缩放:把数据缩放到一个固定的范围,比如[0, 1]。适合你知道数据有明确上下界的情况。
对于时间序列,要特别注意不能在未来数据上训练归一化器!我们必须用训练集的数据来“学习”如何归一化,然后用同样的规则去处理验证集和测试集。
# 首先,我们需要先划分数据集,再分别对它们进行归一化 # 但我们先准备好特征(X)和目标(y)。目标通常就是未来的‘sales’。 # 例如,我们用过去30天的特征预测未来1天的销售额。 # 假设我们的目标是预测下一天的销售额 df['target'] = df['sales'].shift(-1) # 将‘sales’向上移动一天,作为目标 # 删除最后一行(因为它的target是NaN) df.dropna(subset=['target'], inplace=True) # 选择我们构建的所有特征列,排除原始‘sales’和目标‘target’ feature_columns = [col for col in df.columns if col not in ['sales', 'target']] X = df[feature_columns].values # 特征矩阵 y = df['target'].values # 目标向量 print(f"特征矩阵 X 的形状:{X.shape}") print(f"目标向量 y 的形状:{y.shape}")5.2 划分训练集、验证集和测试集
时间序列的划分不能打乱顺序!必须按时间先后划分。
# 确定划分比例,例如 70% 训练,15% 验证,15% 测试 train_size = int(len(X) * 0.7) val_size = int(len(X) * 0.15) X_train, X_val, X_test = X[:train_size], X[train_size:train_size+val_size], X[train_size+val_size:] y_train, y_val, y_test = y[:train_size], y[train_size:train_size+val_size], y[train_size+val_size:] print(f"训练集大小:{X_train.shape}") print(f"验证集大小:{X_val.shape}") print(f"测试集大小:{X_test.shape}")5.3 在划分后的数据集上进行归一化
# 初始化归一化器(这里以标准化为例) scaler_X = StandardScaler() scaler_y = StandardScaler() # 只在训练集上拟合归一化器 X_train_scaled = scaler_X.fit_transform(X_train) y_train_scaled = scaler_y.fit_transform(y_train.reshape(-1, 1)).flatten() # y需要reshape一下 # 用训练集拟合好的归一化器去转换验证集和测试集 X_val_scaled = scaler_X.transform(X_val) X_test_scaled = scaler_X.transform(X_test) y_val_scaled = scaler_y.transform(y_val.reshape(-1, 1)).flatten() y_test_scaled = scaler_y.transform(y_test.reshape(-1, 1)).flatten()重要提示:scaler_X和scaler_y这两个对象需要保存下来。当你用训练好的模型去预测全新的、未来的数据时,你需要用同样的scaler_X去预处理新数据的特征,并用scaler_y把模型的预测结果“反归一化”回原始的销售额数值。
6. 第五步:把数据“摆好盘”——构建模型输入格式
Granite TimeSeries FlowState R1 这类模型通常期望的输入格式是(样本数, 时间步长, 特征数)。这被称为“滑动窗口”法。
- 样本数:你有多少段序列。
- 时间步长:你用过去多少天的数据来预测未来(比如过去30天)。
- 特征数:每一天你有多少个特征(比如销售额、滞后特征、星期几等)。
def create_sequences(features, targets, time_steps=30): """ 将时间序列数据转换为监督学习格式。 参数: features: 缩放后的特征数组 (样本数, 特征数) targets: 缩放后的目标数组 (样本数,) time_steps: 用过去多少时间步的数据 返回: X_seq: 形状为 (样本数 - time_steps, time_steps, 特征数) 的数组 y_seq: 形状为 (样本数 - time_steps,) 的数组 """ X_seq, y_seq = [], [] for i in range(len(features) - time_steps): X_seq.append(features[i:(i + time_steps)]) # 取连续time_steps个时间步的特征 y_seq.append(targets[i + time_steps]) # 取对应时间步的目标值 return np.array(X_seq), np.array(y_seq) TIME_STEPS = 30 # 使用过去30天的数据 # 为训练集、验证集、测试集创建序列 X_train_seq, y_train_seq = create_sequences(X_train_scaled, y_train_scaled, TIME_STEPS) X_val_seq, y_val_seq = create_sequences(X_val_scaled, y_val_scaled, TIME_STEPS) X_test_seq, y_test_seq = create_sequences(X_test_scaled, y_test_scaled, TIME_STEPS) print(f"训练集序列形状:X: {X_train_seq.shape}, y: {y_train_seq.shape}") print(f"验证集序列形状:X: {X_val_seq.shape}, y: {y_val_seq.shape}") print(f"测试集序列形状:X: {X_test_seq.shape}, y: {y_test_seq.shape}")现在,X_train_seq就是一个完美的、三维的模型输入了。它的形状是(样本数, 30, 特征数),可以直接喂给 Granite TimeSeries FlowState R1 模型进行训练。
7. 总结与回顾
走完这一整套流程,你可能觉得步骤不少。但总结起来,核心思想就三个:干净、丰富、规整。
“干净”是说数据不能有缺失和异常值,就像做饭前要把菜洗干净。“丰富”是说我们不能只给模型看一个原始数字,要帮它把趋势、周期这些信息都提取出来,做成更可口的“特征菜”。“规整”是说数据要按照固定的时间间隔排好队,并且转换成模型喜欢的那个三维格式(样本, 时间步, 特征)。
整个过程最需要小心的是数据泄露。记住,任何从数据中学习规律的操作(比如计算归一化的均值标准差、填充缺失值的策略),都只能从训练集里学,然后再用学到的规则去处理验证集和测试集。打乱时间顺序更是大忌。
当你把这些预处理好的X_train_seq和y_train_seq交给 Granite TimeSeries FlowState R1 时,模型的学习效率会高很多。你可以把这份预处理代码保存成一个脚本或者函数,下次遇到新的时间序列数据,基本上改改文件路径和几个参数就能直接跑起来,这才是真正的“一次劳动,多次受益”。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。