构建从 WebRTC SFU 集群到 Hadoop 的安全可信日志注入架构


当 WebRTC 服务从几百个并发会话扩展到数万甚至数十万时,其运维数据的体量会发生质变。单个会话产生的 ICE 协商、DTLS 握手、SRTP 密钥交换、RTP/RTCP 统计(抖动、丢包、RTT)等日志,在规模化后会形成数据洪流。传统的 ELK 体系在这种场景下,无论是存储成本还是查询分析能力,都开始捉襟见肘。我们需要一个能够进行深度、长期、离线分析的平台来做容量规划、质量归因分析以及更重要的——安全审计与威胁建模。Hadoop HDFS 顺理成章地进入了备选方案。

然而,核心问题随之而来:如何构建一个安全、可靠且高效的数据管道,将分布在全球各地边缘节点上的 SFU (Selective Forwarding Unit) 集群产生的海量日志,安全地注入到数据中心的 Hadoop 集群中?

方案A:SFU 节点直连 HDFS

最直观的方案是让每个 SFU 节点直接通过 HDFS 客户端或 WebHDFS API 写入数据。

  • 优势: 架构简单,链路最短,理论上延迟最低。
  • 劣势:
    1. 安全灾难: 将 Hadoop 集群的访问凭证(例如 Kerberos keytab)分发到成百上千个边缘 SFU 节点上,无异于将数据中心的钥匙散布到各处。任何一个 SFU 节点被攻破,都可能导致整个大数据平台的安全崩溃。
    2. 管理噩梦: 凭证轮换、权限管理、网络策略配置(需要为每个 SFU 节点开放到 NameNode 和 DataNode 的网络端口)将变得异常复杂且脆弱。
    3. 性能瓶颈: 大量并发的短连接和小文件写入会对 NameNode 造成巨大压力,这是 HDFS 的典型反模式。网络抖动也极易导致写入失败和数据不一致。

在真实项目中,这种方案几乎没有可行性,安全风险一票否决。

方案B:引入消息队列(如 Kafka)

一个更成熟的架构是引入消息队列作为缓冲层。SFU 节点将日志发送到 Kafka,再由一个独立的消费端(如 Flume 或 Spark Streaming)从 Kafka 拉取数据并写入 HDFS。

  • 优势:
    1. 解耦: SFU 与 Hadoop 集群完全解耦,SFU 只需与 Kafka 集群交互。
    2. 削峰填谷: Kafka 强大的缓冲能力可以平滑流量高峰,保护下游 HDFS。
    3. 高可用: Kafka 集群本身的高可用和数据复制机制保障了数据的可靠性。
  • 劣势:
    1. 运维成本: 引入并维护一个高可用的 Kafka 集群本身就是一项重大的运维投入。
    2. 安全边界: 虽然隔离了 HDFS,但 Kafka 集群的 Topic 权限、认证和加密(SASL/SSL)依然需要精细配置。安全边界从 HDFS 转移到了 Kafka。
    3. 端到端一致性: 保证从 SFU -> Kafka -> HDFS 的 Exactly-Once 语义,配置复杂,需要仔细处理消费端的位移提交和 HDFS 事务性写入。

这个方案在很多场景下是可行的,但对于我们的场景——日志注入,其复杂度和成本显得有些过高。我们真正需要的是一个更轻量、更聚焦于安全传输和批量写入的解决方案。

最终选择:定制化安全聚合服务

我们决定采用一种混合方案:在 SFU 节点上部署一个轻量级的日志代理(Agent),该代理负责批量收集日志,并通过 mTLS (Mutual TLS) 将数据安全地发送到一个中心化的日志聚合服务(Aggregator)。聚合服务再以最优化的方式将数据写入 HDFS。

这个架构的逻辑如下:

graph TD
    subgraph Edge Zone / PoP
        SFU1[SFU Instance 1] -- local socket --> Agent1[Log Agent]
        SFU2[SFU Instance 2] -- local socket --> Agent2[Log Agent]
        SFU3[...]
    end

    subgraph Secure Tunnel
        Agent1 -- mTLS over gRPC --> Aggregator
        Agent2 -- mTLS over gRPC --> Aggregator
    end

    subgraph Data Center
        Aggregator[Log Aggregator Service HA Cluster] -- Kerberos Auth --> HDFS
        HDFS[Hadoop HDFS Cluster]
    end

    Aggregator -- writes data as Avro/Parquet --> HDFS

这种设计的核心权衡在于:

  • 安全性: 安全边界被清晰地定义在日志聚合服务上。只有这个服务需要 HDFS 的凭证。SFU 节点与聚合服务之间通过双向 TLS 认证建立信任链,即使 SFU 节点被攻破,也无法直接访问数据中心资源。
  • 性能: Agent 在本地批量打包和压缩日志,聚合服务以大文件、流式写入的方式与 HDFS 交互,完全符合 HDFS 的最佳实践,避免了 NameNode 瓶颈。
  • 可控性: 我们可以完全自定义日志格式(例如使用 Protobuf)、压缩算法、重试逻辑和背压机制,对数据管道有百分之百的控制力。

核心实现概览

1. 日志格式定义 (Protocol Buffers)

选择 Protobuf 作为日志的序列化格式,因为它高效、紧凑且强类型,便于后续 Schema 演进。

// acls.proto
syntax = "proto3";

package com.webrtc.logging;

import "google/protobuf/timestamp.proto";

option java_package = "com.webrtc.logging.proto";
option java_outer_classname = "AclsProto";
option go_package = "github.com/your/repo/acls_pb";

// 代表一个 WebRTC 会话的元数据日志
message SessionEvent {
    enum EventType {
        UNKNOWN = 0;
        SESSION_CREATED = 1;
        SESSION_ENDED = 2;
        ICE_SUCCESS = 3;
        ICE_FAILURE = 4;
        DTLS_HANDSHAKE_FAILED = 5;
    }

    string session_id = 1;
    string user_id = 2;
    string sfu_node_id = 3;
    string client_ip = 4;
    EventType event_type = 5;
    google.protobuf.Timestamp timestamp = 6;
    map<string, string> metadata = 7; // 存放额外信息, e.g., error_code
}

// 从 Agent 发送到 Aggregator 的批量数据
message LogBatch {
    repeated SessionEvent events = 1;
    string source_node_id = 2; // 批次来源的SFU节点ID
}

// 定义 gRPC 服务
service LogIngestionService {
    rpc Ingest(LogBatch) returns (IngestResponse);
}

message IngestResponse {
    bool success = 1;
    string message = 2;
}

2. SFU 端的轻量级日志 Agent (Go)

使用 Go 语言实现 Agent,因其出色的并发性能、低内存占用和静态编译的便利性,非常适合作为边缘节点上的系统守护进程。

// main.go - Log Agent Core Logic
package main

import (
	"context"
	"crypto/tls"
	"crypto/x509"
	"fmt"
	"io/ioutil"
	"log"
	"sync"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	
	pb "github.com/your/repo/acls_pb" //
)

const (
	aggregatorAddr = "aggregator.your-domain.com:9090"
	batchSize      = 1000
	flushInterval  = 5 * time.Second
	nodeID         = "sfu-prod-us-east-1-001" // Should be dynamically configured
)

var (
	buffer      []*pb.SessionEvent
	bufferMutex = &sync.Mutex{}
	grpcClient  pb.LogIngestionServiceClient
)

func main() {
	// 1. 初始化安全的 gRPC 连接
	// 这里的坑在于:必须同时加载 CA 证书、客户端证书和客户端私钥
	// 以实现双向 TLS (mTLS) 认证。
	caCert, err := ioutil.ReadFile("certs/ca.crt")
	if err != nil {
		log.Fatalf("failed to read CA certificate: %v", err)
	}
	certPool := x509.NewCertPool()
	if !certPool.AppendCertsFromPEM(caCert) {
		log.Fatalf("failed to add CA certificate to pool")
	}

	clientCert, err := tls.LoadX509KeyPair("certs/client.crt", "certs/client.key")
	if err != nil {
		log.Fatalf("failed to load client key pair: %v", err)
	}

	tlsConfig := &tls.Config{
		Certificates: []tls.Certificate{clientCert},
		RootCAs:      certPool,
	}

	conn, err := grpc.Dial(aggregatorAddr, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	grpcClient = pb.NewLogIngestionServiceClient(conn)

	// 2. 启动一个 goroutine 定期刷新缓冲区
	go flushBufferTicker()

	// 3. 模拟从 SFU 接收日志 (在真实项目中,这里会监听一个 Unix Socket 或 UDP 端口)
	simulateLogProduction()
}

// AddLogToBuffer 是线程安全的,用于接收 SFU 日志并加入批处理缓冲区
func AddLogToBuffer(event *pb.SessionEvent) {
	bufferMutex.Lock()
	defer bufferMutex.Unlock()

	buffer = append(buffer, event)
	if len(buffer) >= batchSize {
		// 复制当前缓冲区并异步发送,避免阻塞日志产生方
		batchToSend := make([]*pb.SessionEvent, len(buffer))
		copy(batchToSend, buffer)
		buffer = buffer[:0] // 清空缓冲区
		go sendBatch(batchToSend)
	}
}

func flushBufferTicker() {
	ticker := time.NewTicker(flushInterval)
	for range ticker.C {
		bufferMutex.Lock()
		if len(buffer) > 0 {
			batchToSend := make([]*pb.SessionEvent, len(buffer))
			copy(batchToSend, buffer)
			buffer = buffer[:0]
			go sendBatch(batchToSend)
		}
		bufferMutex.Unlock()
	}
}

// sendBatch 发送日志批次到聚合器,包含重试逻辑
func sendBatch(events []*pb.SessionEvent) {
	if len(events) == 0 {
		return
	}
	log.Printf("Sending batch of %d events", len(events))
	
	batch := &pb.LogBatch{
		Events:       events,
		SourceNodeId: nodeID,
	}

	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	// 这里的错误处理至关重要:必须实现带退避的重试机制,
	// 否则在聚合服务短暂不可用时会丢失大量数据。
	var retryCount = 0
	for retryCount < 3 {
		resp, err := grpcClient.Ingest(ctx, batch)
		if err == nil && resp.Success {
			log.Printf("Successfully ingested batch")
			return
		}
		log.Printf("Failed to ingest batch: %v. Retrying...", err)
		retryCount++
		time.Sleep(time.Duration(retryCount) * 2 * time.Second) // Exponential backoff
	}
	log.Printf("Failed to send batch after multiple retries. Data lost.")
	// 在生产环境中,这里应该将失败的批次写入本地磁盘上的死信队列,以便后续恢复。
}

func simulateLogProduction() {
	// ... 模拟代码
}

3. 日志聚合服务 (Java/Hadoop)

Java 是 Hadoop 生态的 “母语”,使用它可以最直接、高效地与 HDFS API 交互,特别是处理 Kerberos 安全认证。

// LogIngestionServiceImpl.java - gRPC Service Implementation
package com.webrtc.logging.service;

import com.webrtc.logging.proto.AclsProto.LogBatch;
import com.webrtc.logging.proto.AclsProto.IngestResponse;
import com.webrtc.logging.proto.LogIngestionServiceGrpc.LogIngestionServiceImplBase;
import io.grpc.stub.StreamObserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;

import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ConcurrentHashMap;

public class LogIngestionServiceImpl extends LogIngestionServiceImplBase {

    private final Configuration hdfsConf;
    private final FileSystem fileSystem;
    // 为每个SFU节点维护一个打开的输出流,避免频繁开关文件
    private final ConcurrentHashMap<String, FSDataOutputStream> openStreams = new ConcurrentHashMap<>();

    public LogIngestionServiceImpl() throws IOException {
        this.hdfsConf = new Configuration();
        // 必须正确设置 hdfs-site.xml 和 core-site.xml 的路径
        this.hdfsConf.addResource(new Path("/etc/hadoop/conf/core-site.xml"));
        this.hdfsConf.addResource(new Path("/etc/hadoop/conf/hdfs-site.xml"));

        // 在安全的 Hadoop 集群中,这是最关键的一步。
        // 服务启动前必须已经 kinit 获取了 ticket。
        this.hdfsConf.set("hadoop.security.authentication", "kerberos");
        UserGroupInformation.setConfiguration(this.hdfsConf);
        UserGroupInformation.loginUserFromKeytab("aggregator-svc@YOUR_REALM.COM", "/etc/security/keytabs/aggregator.keytab");

        this.fileSystem = FileSystem.get(URI.create("hdfs://namenode:8020"), this.hdfsConf);
    }

    @Override
    public void ingest(LogBatch request, StreamObserver<IngestResponse> responseObserver) {
        try {
            FSDataOutputStream out = getOutputStreamForNode(request.getSourceNodeId());
            for (com.webrtc.logging.proto.AclsProto.SessionEvent event : request.getEventsList()) {
                // 在生产级应用中,这里应该使用 Avro 或 Parquet 格式写入,
                // 而不是简单的 JSON/Text,以获得更好的压缩率和查询性能。
                // 为简化示例,我们写入JSON文本。
                String jsonLog = com.google.protobuf.util.JsonFormat.printer().print(event);
                out.write((jsonLog + "\n").getBytes(StandardCharsets.UTF_8));
            }
            // 不要在这里关闭流,hsync() 保证数据被写入 DataNode
            out.hsync();

            IngestResponse response = IngestResponse.newBuilder().setSuccess(true).build();
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        } catch (IOException e) {
            // 严谨的错误处理
            IngestResponse response = IngestResponse.newBuilder()
                    .setSuccess(false)
                    .setMessage("Failed to write to HDFS: " + e.getMessage())
                    .build();
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        }
    }

    private FSDataOutputStream getOutputStreamForNode(String nodeId) throws IOException {
        // 实现每日日志文件滚动
        LocalDate today = LocalDate.now();
        String datePath = today.format(DateTimeFormatter.ofPattern("yyyy/MM/dd"));
        String hdfsPathStr = String.format("/data/webrtc_logs/%s/%s.log", datePath, nodeId);
        Path hdfsPath = new Path(hdfsPathStr);

        return openStreams.compute(hdfsPathStr, (path, stream) -> {
            try {
                if (stream == null || stream.isClosed()) {
                    if (fileSystem.exists(hdfsPath)) {
                        return fileSystem.append(hdfsPath);
                    } else {
                        return fileSystem.create(hdfsPath, true);
                    }
                }
                return stream;
            } catch (IOException e) {
                throw new RuntimeException(e); // Lambda acks checked exceptions
            }
        });
    }
    
    // 需要一个关闭钩子来在服务停止时优雅地关闭所有文件流
    public void closeAllStreams() {
        openStreams.values().forEach(stream -> {
            try {
                if (stream != null) stream.close();
            } catch (IOException e) {
                // Log error
            }
        });
    }
}

架构的扩展性与局限性

此架构的优势在于其清晰的职责划分和安全边界。Agent 专注于可靠的数据收集和传输,Aggregator 专注于高效、安全地与后端存储系统交互。这种模式具备良好的扩展性:

  • 多目的地写入: Aggregator 可以轻松扩展,将同一份数据分发到多个目的地,例如除了 HDFS 外,再实时写入一份到 ClickHouse 用于即时查询。
  • 格式演进: 只要 Agent 和 Aggregator 之间的 gRPC 协议(Protobuf)保持兼容,底层的 HDFS 存储格式(从 Text 升级到 Parquet)可以独立演进,对 SFU 端完全透明。
  • 动态配置: 可以引入一个配置中心,动态调整 Agent 的批处理大小、发送频率等参数,以适应不同的网络条件和负载。

然而,这个方案并非没有局限性:

  • 近实时而非实时: 存在批处理和网络传输延迟,不适用于需要亚秒级响应的实时分析场景。
  • Aggregator 的高可用: Aggregator 服务本身成为一个关键组件。在生产环境中,它必须以高可用的方式部署(例如,在 Kubernetes 上运行多个实例,通过负载均衡器接收流量),并需要处理好 HDFS 文件写入的并发控制问题,避免多个实例同时写同一个文件。
  • 数据恢复: Agent 端的死信队列机制必须足够健壮。如果一个边缘节点的网络长时间中断,本地磁盘可能会被写满,这需要有相应的监控和告警来处理。

最终,没有任何架构是完美的,只有最适合特定场景、在成本、复杂性、安全性和性能之间做出合理权衡的架构。对于大规模 WebRTC 服务的离线数据分析需求,这套定制化的安全注入方案提供了一个坚实且可控的基础。


  目录