我们的特征工程管道遇到了一个无法回避的物理瓶颈。最初为GB级数据集设计的、基于SciPy和Pandas的信号处理脚本,在面对数据湖中TB级的时序数据时,单机执行时间从几小时飙升到数天。简单地垂直扩展实例(使用内存和CPU更大的机器)很快就触及了成本和可用实例规格的上限。核心问题在于,我们的计算任务本质上是 embarrassingly parallel 的,但整个流程被封装在一个单体的Python脚本中,无法利用Kubernetes集群的分布式能力。
初步的构想是抛弃SciPy,将所有逻辑用PySpark重写。这在技术上可行,但对于一个深度依赖scipy.signal
、scipy.stats
等复杂科学计算库的算法团队来说,迁移成本和潜在的数值计算精度差异是难以接受的。我们的目标变成了:如何在不重写核心算法的前提下,将现有的Python科学计算代码进行分布式改造,并将其无缝集成到现有的Kubeflow MLOps体系中。
我们决定构建一个通用的Kubeflow组件,它能够接收一个数据湖中的数据路径,并自动将计算任务分发到多个并行的Pod中执行,最后将结果聚合。这种模式允许算法工程师继续使用他们熟悉的SciPy工具栈,而平台工程团队则负责提供底层的分布式执行能力。
架构决策与工作流设计
我们排除了直接使用Dask或Ray这类成熟的分布式计算框架,主要原因是为了控制技术栈的复杂性。引入一个全新的分布式框架意味着额外的学习成本和运维负担。取而代之,我们选择了一种更“云原生”的方式:利用Kubeflow Pipelines本身作为任务调度器,将一个大的数据集切分成多个小的数据分片,然后动态地启动一组并行的Kubernetes Pod来处理这些分片。
这种方法的优势在于:
- 轻量级:不需要引入额外的分布式计算集群(如Ray或Dask集群),复用现有的Kubernetes资源。
- 隔离性好:每个数据分片的处理都在一个独立的Pod中进行,资源和故障完全隔离。
- 与Kubeflow原生集成:可以利用Kubeflow的
withParallelism
或DAG扇出(Fan-out)能力来动态控制并行度。
最终确定的工作流如下:
graph TD A[Start Pipeline] --> B(1. Data Splitter Component); B --> C{Data Shards List}; C --> |Shard 1| D1[2. SciPy Worker Pod 1]; C --> |Shard 2| D2[2. SciPy Worker Pod 2]; C --> |Shard N| Dn[2. SciPy Worker Pod N]; D1 --> E(3. Results Merger Component); D2 --> E; Dn --> E; E --> F[Final Output Artifact];
这个流程由三个核心的Kubeflow组件构成:
- Data Splitter: 负责定位数据湖中的原始数据集(例如一个包含数千个Parquet文件的目录),并根据指定的并行度参数,生成一个数据分片清单(Manifest)。这个清单可以是文件路径列表,也可以是数据库查询的偏移量范围。
- SciPy Worker: 这是执行实际计算的核心组件。它接收一个数据分片作为输入,加载数据,执行CPU密集型的SciPy计算,并将结果写回到数据湖的一个临时位置。这个组件会被并行实例化多次。
- Results Merger: 当所有的Worker Pod执行完毕后,这个组件负责收集所有临时结果,将它们合并成最终的输出,并进行必要的清理工作。
核心实现:可并行化的SciPy Worker组件
关键在于SciPy Worker
组件的设计。它必须是无状态的、可重入的,并且能够处理好与数据湖的I/O。下面是这个组件的核心代码结构。
首先,是定义组件接口的component.yaml
文件。在真实的MLOps项目中,将组件定义与实现分离是至关重要的最佳实践。
# scipy_worker_component.yaml
name: Parallel SciPy Feature Processor
description: |
Processes a single data shard from a data lake using SciPy for feature engineering.
This component is designed to be run in parallel.
inputs:
- name: data_shard_path
type: String
description: 'Path to the input data shard (e.g., s3://bucket/raw_data/shard_001.parquet)'
- name: output_path_prefix
type: String
description: 'Path prefix in the data lake to write the processed results (e.g., s3://bucket/processed_data/)'
- name: job_id
type: String
description: 'A unique identifier for this processing job, used for naming output files.'
outputs:
- name: processed_shard_path
type: String
description: 'The full path to the processed output shard.'
implementation:
container:
image: my-registry/scipy-feature-processor:v1.2.0
command: [
python,
/app/process_shard.py,
--data-shard-path,
{inputValue: data_shard_path},
--output-path-prefix,
{inputValue: output_path_prefix},
--job-id,
{inputValue: job_id},
--processed-shard-path-output,
{outputPath: processed_shard_path}
]
这个定义清晰地声明了组件的依赖(输入)、产出(输出)和执行环境(容器镜像)。接下来是容器内的核心执行脚本process_shard.py
。
# /app/process_shard.py
import os
import argparse
import logging
import time
import uuid
import pandas as pd
import numpy as np
from scipy import signal
from scipy.stats import skew, kurtosis
import pyarrow.parquet as pq
import s3fs
# --- Configuration & Setup ---
# 在生产环境中,日志级别应该由环境变量控制
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 使用S3FS与S3兼容的数据湖(如MinIO)交互
# 凭证应通过Kubernetes Secrets挂载为环境变量,而不是硬编码
S3_ENDPOINT_URL = os.environ.get("S3_ENDPOINT_URL", "http://minio-service.minio.svc.cluster.local:9000")
s3 = s3fs.S3FileSystem(client_kwargs={'endpoint_url': S3_ENDPOINT_URL})
# --- Core Computation Logic ---
def extract_signal_features(series: pd.Series, fs: int = 100) -> dict:
"""
一个计算密集型的特征提取函数示例。
在真实场景中,这里的逻辑会非常复杂。
Args:
series (pd.Series): 单个通道的时序信号数据.
fs (int): 采样频率.
Returns:
dict: 包含提取特征的字典.
"""
features = {}
try:
# 1. 基本统计特征
features['mean'] = series.mean()
features['std'] = series.std()
features['skewness'] = skew(series)
features['kurtosis'] = kurtosis(series)
# 2. 频域特征 - 使用Welch方法计算功率谱密度
# 这是一个计算密集型操作
freqs, psd = signal.welch(series, fs, nperseg=min(len(series), 256))
features['psd_peak_freq'] = freqs[np.argmax(psd)]
features['psd_total_power'] = np.sum(psd)
# 3. 小波变换特征 - 另一个CPU密集型操作
# 使用'morl'小波在5个不同尺度上进行连续小波变换
widths = np.arange(1, 6)
cwt_matrix = signal.cwt(series, signal.morlet, widths)
features['cwt_energy_mean'] = np.mean(np.abs(cwt_matrix)**2)
except Exception as e:
logger.error(f"Failed to extract features for a series. Error: {e}", exc_info=True)
# 即使部分计算失败,也返回空字典,避免整个任务崩溃
return {}
return features
def process_data_shard(local_input_path: str) -> pd.DataFrame:
"""
加载数据分片,应用特征提取,并返回结果.
"""
logger.info(f"Processing data from local path: {local_input_path}")
df = pd.read_parquet(local_input_path)
# 假设数据包含 'sensor_id' 和 'signal_value' 列
# 我们按 sensor_id 分组,并对每个传感器的信号进行特征提取
if 'sensor_id' not in df.columns or 'signal_value' not in df.columns:
raise ValueError("Input data must contain 'sensor_id' and 'signal_value' columns.")
start_time = time.time()
# 使用groupby().apply()来处理每个传感器的时序数据
# 这是可以被joblib或dask优化的热点
feature_results = df.groupby('sensor_id')['signal_value'].apply(extract_signal_features)
# 将结果字典转换回DataFrame
processed_df = feature_results.apply(pd.Series)
duration = time.time() - start_time
logger.info(f"Shard processing completed in {duration:.2f} seconds for {len(processed_df)} sensors.")
return processed_df.reset_index()
# --- Main Execution Block ---
def main():
parser = argparse.ArgumentParser(description="SciPy Feature Processor for a single data shard.")
parser.add_argument("--data-shard-path", type=str, required=True)
parser.add_argument("--output-path-prefix", type=str, required=True)
parser.add_argument("--job-id", type=str, required=True)
parser.add_argument("--processed-shard-path-output", type=str, required=True)
args = parser.parse_args()
logger.info(f"Starting shard processing job {args.job_id} for shard {args.data_shard_path}")
# 为本地处理创建一个临时目录
local_temp_dir = "/tmp/data"
os.makedirs(local_temp_dir, exist_ok=True)
local_input_path = os.path.join(local_temp_dir, os.path.basename(args.data_shard_path))
try:
# 1. 从数据湖下载数据分片到本地
logger.info(f"Downloading shard from {args.data_shard_path} to {local_input_path}")
s3.get(args.data_shard_path, local_input_path)
# 2. 执行核心计算
processed_df = process_data_shard(local_input_path)
# 3. 将处理结果写回数据湖
# 使用UUID确保输出文件名唯一,防止并发写入冲突
shard_id = os.path.splitext(os.path.basename(args.data_shard_path))[0]
output_filename = f"processed_{shard_id}_{uuid.uuid4().hex}.parquet"
remote_output_path = os.path.join(args.output_path_prefix, args.job_id, output_filename)
logger.info(f"Uploading processed data to {remote_output_path}")
with s3.open(remote_output_path, 'wb') as f:
processed_df.to_parquet(f)
# 4. 将输出路径写入Kubeflow指定的输出文件,以便后续步骤使用
logger.info(f"Writing output artifact path to {args.processed_shard_path_output}")
with open(args.processed_shard_path_output, 'w') as f:
f.write(remote_output_path)
logger.info("Shard processing successful.")
except Exception as e:
logger.error(f"An unhandled exception occurred: {e}", exc_info=True)
# 在生产环境中,这里应该有更复杂的重试或失败处理逻辑
exit(1)
finally:
# 清理本地临时文件
if os.path.exists(local_input_path):
os.remove(local_input_path)
logger.info("Cleanup complete.")
if __name__ == "__main__":
main()
这段代码有几个关键的生产级考量:
- 依赖注入: S3的端点和凭证通过环境变量注入,遵循十二要素应用原则。
- 错误处理:
try...except
块捕获了可能发生的I/O或计算错误,并提供了详细的日志。extract_signal_features
函数内部的错误捕获确保单个序列的失败不会中断整个分片的处理。 - 原子性输出: 输出文件名包含UUID,避免了多个并行Pod尝试写入同一个文件导致的竞争条件。
- 资源管理: I/O操作(下载/上传)与计算操作分离,并且在
finally
块中清理本地临时文件,确保Pod退出时不会留下垃圾数据。 - Kubeflow集成: 脚本通过
argparse
接收参数,并将输出结果的路径写入由Kubeflow指定的outputPath
文件中,这是组件间传递数据的标准方式。
在Kubeflow Pipeline中编排并行任务
有了健壮的Worker组件,我们就可以在Kubeflow Pipeline的Python DSL中进行编排了。这里我们使用kfp.dsl.ParallelFor
来实现扇出(Fan-out)模式。
# pipeline.py
import kfp
from kfp import dsl
from kfp.components import load_component_from_file
# --- 加载预先定义的组件 ---
# Data Splitter和Results Merger组件这里为简化而省略,假设已存在
data_splitter_op = load_component_from_file('data_splitter_component.yaml')
scipy_worker_op = load_component_from_file('scipy_worker_component.yaml')
results_merger_op = load_component_from_file('results_merger_component.yaml')
@dsl.pipeline(
name='Distributed SciPy Feature Engineering Pipeline',
description='A pipeline to parallelize CPU-bound SciPy tasks on data from a data lake.'
)
def distributed_scipy_pipeline(
raw_data_path: str = 's3://raw-data/timeseries/',
output_path_prefix: str = 's3://processed-data/features/',
parallelism: int = 10,
job_id: str = 'job-{{workflow.uid}}' # 使用工作流UID作为唯一ID
):
# --- 1. 数据分片 ---
# 这个组件会扫描raw_data_path,根据parallelism参数生成一个JSON数组字符串,
# 其中每个元素是一个待处理的文件路径。
splitter_task = data_splitter_op(
source_data_path=raw_data_path,
num_shards=parallelism
)
# 为每个Worker Pod设置资源请求,这在生产环境中至关重要
splitter_task.set_cpu_request("1").set_memory_request("2G")
# --- 2. 并行处理 (Fan-out) ---
# 使用with dsl.ParallelFor,Kubeflow会遍历splitter_task.outputs['shards_list']
# 并为列表中的每一项启动一个scipy_worker_op任务。
with dsl.ParallelFor(splitter_task.outputs['shards_list']) as shard_path:
worker_task = scipy_worker_op(
data_shard_path=shard_path,
output_path_prefix=output_path_prefix,
job_id=job_id
)
# 为计算密集型任务请求更多资源
worker_task.set_cpu_request("4").set_memory_request("16G")
# 设置重试策略,应对K8s节点可能出现的临时性问题
worker_task.set_retry(num_retries=3)
# --- 3. 结果聚合 (Fan-in) ---
# 这里需要一个技巧:将所有并行任务的输出收集起来。
# 我们修改merger组件,让它接收一个包含所有分片路径的目录。
# Kubeflow会自动将ParallelFor循环中所有worker_task的输出收集起来。
# 注意:KFP v1中处理聚合输出的方式可能更复杂,这里展示的是一个概念性的设计。
# 一个常见的模式是让worker将输出路径写入一个共享的清单文件或数据库记录中。
# 另一个更简单的方法是,merger组件直接扫描 `output_path_prefix/job_id/` 目录。
# 确保合并任务在所有worker任务完成后执行
merger_task = results_merger_op(
processed_shards_path=f"{output_path_prefix}/{job_id}/",
final_output_path=f"{output_path_prefix}/final_output_{job_id}.parquet"
).after(splitter_task) # 明确依赖关系
merger_task.set_cpu_request("2").set_memory_request("8G")
if __name__ == '__main__':
# 编译Pipeline
kfp.compiler.Compiler().compile(
pipeline_func=distributed_scipy_pipeline,
package_path='distributed_scipy_pipeline.yaml'
)
这个Pipeline定义展示了如何将单个组件串联成一个有向无环图(DAG),并利用ParallelFor
实现强大的并行计算能力。通过调整parallelism
参数,我们可以轻松地控制用于计算的资源量,从而在成本和执行时间之间找到平衡。
局限性与未来迭代方向
尽管此方案有效地解决了我们的燃眉之急,但它并非银弹。当前的实现存在几个局限性:
- 数据分片的粒度:
Data Splitter
目前基于文件进行分片。如果单个文件过大,依然会造成计算瓶颈。更优化的方案应该能够读取大文件的元数据(如Parquet的行组),实现更细粒度的任务切分。 - Shuffle的缺失: 当前架构非常适合Map-only类型的任务。如果特征工程需要全局性的操作(如跨分片的排序或连接,即Shuffle),这个简单的并行模型将不再适用,届时可能真的需要引入Spark或Dask这样的框架。
- 结果聚合的效率:
Results Merger
组件可能会成为新的瓶颈,因为它是一个单点操作。对于海量结果的合并,可能需要设计一个多阶段的树形合并(Tree Reduction)架构来进一步并行化。 - 动态资源分配: 当前每个Worker Pod的资源请求是静态的。对于异构的数据分片(有些分片计算量远大于其他),可能会导致资源浪费。未来的迭代可以考虑集成KEDA(Kubernetes-based Event-Driven Autoscaling),根据实际任务队列的积压情况来动态伸缩Worker数量。