构建从Solid.js到containerd的跨异步边界分布式追踪系统


我们面临一个典型的分布式系统难题:用户在前端界面(一个基于Solid.js的单页应用)执行了一个关键操作,比如提交一份复杂的表单。这个操作通过API网关触发,向Kafka集群投递了一条消息。后端一个Go语言编写的消费者服务,以containerd容器的形式运行,负责处理这条消息。当后端处理失败时,日志里只记录了一条错误,却无法追溯到是哪一次前端操作导致的。Kafka的异步特性像一道鸿沟,切断了请求的完整上下文。

问题很明确:需要一条线索,将前端的用户点击、API网关的转发、Kafka的消息传递、以及后端容器内的业务处理串联起来。分布式追踪是解决这个问题的标准答案,但魔鬼在细节中。如何在Solid.js这种细粒度响应式框架中优雅地启动追踪?如何确保追踪上下文(Trace Context)无损地跨越HTTP和Kafka这两种完全不同的协议边界?以及,如何让最终的追踪数据不仅包含业务逻辑,还能关联到底层的容器基础设施信息?

初步构想是利用Zipkin作为追踪系统,因为它协议简单(B3 Propagation),社区成熟。我们将手动构建一个完整的追踪链路,覆盖从浏览器到容器的全过程。技术选型决策如下:

  • 追踪系统: Zipkin。轻量、易于部署,B3头部传播规范直观。
  • 前端: Solid.js。展示如何在非主流但性能卓越的框架中集成追踪逻辑。
  • 消息队列: Kafka。它是最常见的异步边界,也是追踪上下文最容易丢失的地方。
  • 后端服务: Go。语言性能优异,且有良好的库支持与containerd进行交互。
  • 容器运行时: containerd。我们不仅要运行服务,还要从服务内部反向获取容器自身的元数据,为追踪信息增加基础设施维度的上下文。

我们的实现将分为三部分:1)具备追踪与容器感知能力的Go消费者;2) 注入并传播追踪上下文的Solid.js前端;3) 串联两者的API网关与整体部署。

第一步:构建感知容器身份的追踪消费者

后端的Go消费者是整个链路的终点,也是信息最丰富的一站。它不仅要处理业务逻辑,还需要完成两项关键任务:从Kafka消息头中提取追踪上下文,以及查询自身运行的containerd容器ID,并将此信息附加到追踪数据中。

在真实项目中,直接从容器内部访问宿主机的containerd守护进程需要特权和正确的挂载。这是一种有意的设计,目的是为了在可观测性数据中建立应用层与基础设施层之间的关联。

// main.go
package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/IBM/sarama"
	"github.com/containerd/containerd"
	"github.com/openzipkin/zipkin-go"
	"github.com/openzipkin/zipkin-go/model"
	"github.com/openzipkin/zipkin-go/reporter"
	httpreporter "github.com/openzipkin/zipkin-go/reporter/http"
	"github.com/openzipkin/zipkin-go/propagation/b3"
)

const (
	kafkaBroker     = "kafka:9092"
	kafkaTopic      = "user-actions"
	zipkinEndpoint  = "http://zipkin:9411/api/v2/spans"
	serviceName     = "go-consumer-service"
	containerdSocket = "/run/containerd/containerd.sock"
)

// containerID 全局缓存容器ID,避免每次处理消息都查询
var containerID string

// setupTracer 初始化并配置Zipkin Tracer
func setupTracer() (*zipkin.Tracer, reporter.Reporter, error) {
	rep := httpreporter.NewReporter(zipkinEndpoint)
	
	// 创建本地服务的端点信息
	localEndpoint := &model.Endpoint{ServiceName: serviceName, Port: 8080}
	
	sampler, err := zipkin.NewCountingSampler(1.0) // 采样率100%用于演示
	if err != nil {
		return nil, nil, fmt.Errorf("failed to create sampler: %w", err)
	}

	tracer, err := zipkin.NewTracer(
		rep,
		zipkin.WithSampler(sampler),
		zipkin.WithLocalEndpoint(localEndpoint),
	)
	if err != nil {
		return nil, nil, fmt.Errorf("failed to create tracer: %w", err)
	}
	return tracer, rep, nil
}

// getContainerID 连接containerd并获取当前容器的ID
// 这是一个关键步骤,将应用追踪与基础设施关联起来
func getContainerID() (string, error) {
	hostname, err := os.Hostname()
	if err != nil {
		return "unknown", fmt.Errorf("could not get hostname: %w", err)
	}

	// 容器的hostname通常就是它的容器ID(短ID)
	// 在生产环境中,可能有更复杂的逻辑,比如通过cgroup路径解析
	// 这里为了演示,我们假设hostname就是容器ID的前缀
	// 一个更健壮的方式是挂载containerd.sock并查询
	client, err := containerd.New(containerdSocket, containerd.WithDefaultNamespace("default"))
	if err != nil {
		log.Printf("Warning: failed to connect to containerd at %s: %v. Using hostname as fallback.", containerdSocket, err)
		return hostname, nil
	}
	defer client.Close()

	containers, err := client.Containers(context.Background())
	if err != nil {
		return hostname, fmt.Errorf("failed to list containers: %w", err)
	}

	for _, container := range containers {
		// 这里的匹配逻辑需要根据实际部署情况调整
		// 比如,检查容器标签或任务信息
		// 此处简化为通过hostname匹配
		task, err := container.Task(context.Background(), nil)
		if err != nil {
			continue // 容器可能没有运行中的任务
		}
		status, err := task.Status(context.Background())
		if err != nil {
			continue
		}
		if status.Status == containerd.Running {
            // 在K8s等环境中,hostname可能直接是pod name,需要更复杂的逻辑
            // 这里我们用一个简化的方式,假设我们能找到与hostname相关的容器
            info, _ := container.Info(context.Background())
            if len(info.Spec.Hostname) > 0 && info.Spec.Hostname == hostname {
                 return container.ID(), nil
            }
		}
	}

	log.Println("Warning: could not find matching running container, using hostname as fallback.")
	return hostname, nil
}


// ConsumerGroupHandler 实现了 sarama.ConsumerGroupHandler 接口
type ConsumerGroupHandler struct {
	tracer *zipkin.Tracer
}

func (h *ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }

func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		// 1. 从Kafka消息头中提取Trace Context
		spanContext := h.tracer.Extract(b3.ExtractKafka(message))
		
		// 2. 创建一个新的子Span
		span := h.tracer.StartSpan("process-kafka-message", zipkin.Parent(spanContext))
		span.Tag("kafka.topic", message.Topic)
		span.Tag("kafka.partition", fmt.Sprintf("%d", message.Partition))
		span.Tag("kafka.offset", fmt.Sprintf("%d", message.Offset))
		
		// 3. 将基础设施信息(containerd ID)作为标签添加
		if containerID != "" {
			span.Tag("container.id", containerID)
		} else {
            span.Tag("container.id", "not_found")
        }

		log.Printf("Processing message: value = %s, traceID = %s, spanID = %s", string(message.Value), span.Context().TraceID, span.Context().ID)
		
		// 模拟业务处理
		time.Sleep(50 * time.Millisecond)
		
		// 标记消息已处理
		session.MarkMessage(message, "")
		
		// 4. 结束Span
		span.Finish()
	}
	return nil
}

func main() {
	log.Println("Starting Kafka consumer...")

	tracer, reporter, err := setupTracer()
	if err != nil {
		log.Fatalf("Failed to setup tracer: %v", err)
	}
	defer reporter.Close()

	// 启动时获取一次container ID
	cid, err := getContainerID()
	if err != nil {
		log.Printf("Could not determine container ID: %v", err)
        containerID = "error-retrieving"
	} else {
        containerID = cid
        log.Printf("Successfully identified container ID: %s", containerID)
    }


	config := sarama.NewConfig()
	config.Version = sarama.V2_8_0_0 // 使用较新版本以支持头部
	config.Consumer.Offsets.Initial = sarama.OffsetOldest
	config.Consumer.Return.Errors = true

	client, err := sarama.NewConsumerGroup([]string{kafkaBroker}, "action-processor-group", config)
	if err != nil {
		log.Fatalf("Failed to create consumer group client: %v", err)
	}
	defer client.Close()

	handler := &ConsumerGroupHandler{tracer: tracer}
	ctx, cancel := context.WithCancel(context.Background())

	go func() {
		for {
			if err := client.Consume(ctx, []string{kafkaTopic}, handler); err != nil {
				log.Printf("Error from consumer: %v", err)
			}
			if ctx.Err() != nil {
				return
			}
		}
	}()
	
	// 优雅地关闭
	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
	<-sigterm
	log.Println("Terminating: context cancelled")
	cancel()
}

这里的核心在于 getContainerID 函数和 ConsumeClaim 方法。getContainerID 尝试连接到 containerd.sock 并通过匹配hostname来识别自身ID。这是一个简化实现,在生产级Kubernetes环境中,可能需要读取 Downward API 注入的环境变量或挂载的文件来获得更可靠的Pod和容器信息。ConsumeClaim 中,tracer.Extract(b3.ExtractKafka(message)) 从消息头中恢复追踪上下文,然后创建的子Span被打上了 container.id 标签。这一个小小的标签,在排障时价值千金。

第二步:在Solid.js中发起并传播追踪

前端是追踪链路的起点。我们需要在用户交互时创建一个根Span (Root Span),并确保后续发出的API请求(这里是fetch)都携带了B3格式的HTTP头部。zipkin-js 提供了很好的工具来完成这件事。

我们创建一个Solid.js组件,它包含一个按钮。点击按钮后,会创建一个追踪Span,然后向API网关发送一个请求。

// src/TracedButton.tsx
import { createSignal, Component } from 'solid-js';
import { Tracing, ExplicitContext, jsonEncoder } from 'zipkin';
import { HttpLogger } from 'zipkin-transport-http';
import wrapFetch from 'zipkin-instrumentation-fetch';

// --- 配置Zipkin Tracer ---
// 这里的配置通常会在应用初始化时完成,为了演示放在组件内部。
const ZIPKIN_API_URL = 'http://localhost:9411/api/v2/spans';
const API_GATEWAY_URL = 'http://localhost:3000/api/action';
const SERVICE_NAME = 'solidjs-frontend';

const ctxImpl = new ExplicitContext();
const recorder = new HttpLogger({
  endpoint: ZIPKIN_API_URL,
  jsonEncoder: jsonEncoder.JSON_V2,
});

const tracer = new Tracing({
  localServiceName: SERVICE_NAME,
  recorder,
  traceId128Bit: true, // 使用128位Trace ID
}).tracer;

// 使用zipkin-instrumentation-fetch来包装全局的fetch函数
// 这样所有fetch调用都会自动注入trace headers
const zipkinFetch = wrapFetch(fetch, {
  tracer,
  serviceName: SERVICE_NAME,
});


const TracedButton: Component = () => {
  const [status, setStatus] = createSignal('Idle');

  const handleClick = () => {
    setStatus('Processing...');
    
    // 1. 创建一个新的根Span,代表用户的一次完整操作
    tracer.scoped(() => {
      const traceId = tracer.id;
      console.log(`Starting trace with ID: ${traceId.traceId}`);
      
      const payload = {
        userId: 'user-123',
        action: 'submit-form',
        timestamp: new Date().toISOString(),
        data: { /* ... some form data ... */ },
      };

      // 2. 使用被包装过的fetch发起请求
      // B3 headers (X-B3-TraceId, X-B3-SpanId, etc.) 会被自动添加
      zipkinFetch(API_GATEWAY_URL, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        body: JSON.stringify(payload),
      })
      .then(response => {
        if (!response.ok) {
          throw new Error(`HTTP error! status: ${response.status}`);
        }
        return response.json();
      })
      .then(data => {
        console.log('Success:', data);
        setStatus(`Success: Message ID ${data.messageId}`);
      })
      .catch(error => {
        console.error('Error:', error);
        setStatus(`Error: ${error.message}`);
        // 在span中记录错误
        tracer.recordBinary('error', error.message);
      });
    });
  };

  return (
    <div>
      <button onClick={handleClick} disabled={status() === 'Processing...'}>
        Perform Tracked Action
      </button>
      <p>Status: {status()}</p>
    </div>
  );
};

export default TracedButton;

这段代码的关键是 wrapFetch。它劫持了标准的 fetch 函数,使其在每次调用时自动从当前上下文中获取Trace信息,并将其序列化为B3格式的HTTP头部附加到请求上。tracer.scoped 创建了一个作用域,确保在异步回调中也能正确访问到同一个Trace上下文。当用户点击按钮时,一个新的追踪开始,API请求头中会包含如 X-B3-TraceId 等信息,这就是传递给下游的第一棒。

第三步:API网关与整体部署

API网关是连接前端和后端的桥梁。它的职责是接收前端的HTTP请求,解析出B3头部,然后将这些头部信息原封不动地注入到即将发送的Kafka消息的头部中。这是一个纯粹的协议转换和上下文传递过程。

这里我们用一个简单的Node.js/Express应用作为API网关。

// api-gateway/index.js
const express = require('express');
const { Kafka } = require('kafkajs');
const cors = require('cors');

const app = express();
app.use(express.json());
app.use(cors()); // 允许前端跨域请求

const kafka = new Kafka({
  clientId: 'api-gateway',
  brokers: ['kafka:9092'],
});

const producer = kafka.producer();
const KAFKA_TOPIC = 'user-actions';

// B3 Propagation Headers
const B3_HEADERS = [
  'x-b3-traceid',
  'x-b3-spanid',
  'x-b3-parentspanid',
  'x-b3-sampled',
  'x-b3-flags',
];

app.post('/api/action', async (req, res) => {
  try {
    const kafkaHeaders = {};
    
    // 1. 从HTTP请求头中提取所有B3相关的头部
    B3_HEADERS.forEach(header => {
      if (req.headers[header]) {
        // KafkaJS 要求头部的值是 string 或 Buffer,这里确保是string
        kafkaHeaders[header] = String(req.headers[header]);
      }
    });

    console.log('Propagating Kafka headers:', kafkaHeaders);

    // 2. 将B3头部注入Kafka消息
    await producer.send({
      topic: KAFKA_TOPIC,
      messages: [
        {
          value: JSON.stringify(req.body),
          headers: kafkaHeaders,
        },
      ],
    });

    console.log('Message sent to Kafka successfully.');
    res.status(202).json({ messageId: kafkaHeaders['x-b3-traceid'] || 'N/A' });

  } catch (error) {
    console.error('Failed to send message to Kafka', error);
    res.status(500).json({ error: 'Internal Server Error' });
  }
});

const run = async () => {
  await producer.connect();
  app.listen(3000, () => {
    console.log('API Gateway listening on port 3000');
  });
};

run().catch(console.error);

这个网关的核心逻辑非常简单但至关重要:遍历B3头部列表,将HTTP请求中存在的所有相关头部复制到Kafka消息的headers字段。kafkajs库很好地支持了消息头,使得这个传递过程非常直接。

整体部署与可视化

为了将所有组件运行起来,我们使用docker-compose

# docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.2
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.3.2
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CREATE_TOPICS: "user-actions:1:1"

  zipkin:
    image: openzipkin/zipkin:latest
    ports:
      - "9411:9411"

  api-gateway:
    build: ./api-gateway
    ports:
      - "3000:3000"
    depends_on:
      - kafka

  go-consumer:
    build: ./go-consumer
    volumes:
      # 这里是关键:将宿主机的containerd socket挂载到容器内部
      - /run/containerd/containerd.sock:/run/containerd/containerd.sock:ro
    depends_on:
      - kafka
      - zipkin
    # 容器的hostname设置为唯一,以便在Go代码中识别
    hostname: '{{.Service.Name}}-{{.Task.Slot}}' 
    deploy:
      replicas: 2 # 运行两个实例以展示不同容器ID

  solid-app:
    build: ./solid-app
    ports:
      - "8080:80" # 假设Solid应用通过nginx等服务在80端口提供
    depends_on:
      - api-gateway

在这个docker-compose配置中,go-consumer服务的volumes配置是核心。它以只读方式将宿主机的containerd套接字文件挂载进容器,授权Go应用查询容器信息。hostname配置确保每个容器实例有可预测的标识。

当整个系统运行起来后,一次完整的交互流程如下:

sequenceDiagram
    participant User
    participant SolidApp as Solid.js Frontend
    participant Gateway as API Gateway
    participant Kafka
    participant Consumer as Go Consumer (in containerd)
    participant Zipkin

    User->>SolidApp: Clicks 'Perform Action' button
    SolidApp->>SolidApp: Create Root Span (Trace T1, Span S1)
    SolidApp->>Gateway: POST /api/action (Headers: T1, S1)
    Gateway->>Gateway: Receive request, extract B3 headers
    Gateway->>Kafka: Produce message (Headers: T1, S1)
    Kafka-->>Consumer: Delivers message
    Consumer->>Consumer: Extract B3 headers (T1, S1)
    Consumer->>Consumer: Create Child Span (Trace T1, Span S2, Parent S1)
    Consumer->>Consumer: Get containerd ID ('abc-123')
    Consumer->>Consumer: Add tag 'container.id=abc-123' to Span S2
    Consumer->>Zipkin: Report Span S2
    SolidApp->>Zipkin: Report Span S1

最终在Zipkin的UI界面,我们会看到一个完整的调用链。这个调用链由两个Span组成,清晰地展示了从前端发起到后端处理完毕的全过程。更重要的是,当我们点开后端消费者的那个Span时,它的标签页会明确显示container.id,直接将这次业务操作与它运行时所在的物理容器实例关联起来。

方案局限性与未来路径

这个手动构建的追踪系统虽然完整,但在生产环境中存在一些局限性。首先,我们大量使用了手动埋点(Manual Instrumentation)。无论是前端的tracer.scoped还是后端的Span创建,都与业务代码紧密耦合。更理想的方案是采用基于OpenTelemetry的自动埋点(Auto-Instrumentation),通过无侵入的方式自动完成大部分追踪上下文的传播。

其次,从容器内部访问containerd.sock需要特定的安全权限配置,这在严格的安全策略下可能不被允许。在Kubernetes环境中,更云原生的方式是通过Downward API将Pod信息作为环境变量或文件注入容器,这解耦了应用与底层CRI的直接通信。

最后,我们只展示了最简单的B3传播,没有涉及采样策略、多级下游调用等更复杂的场景。生产级的可观测性体系还需要考虑日志、指标与追踪的关联,例如在追踪Span中记录关联的日志ID,或者根据追踪数据生成关键业务的RED指标(Rate, Errors, Duration)。当前的实现只是一个起点,一个验证跨异步边界和基础设施感知的追踪可行性的原型。


  目录