在GKE上利用自定义Kong插件为OpenFaaS函数构建分布式GraphQL聚合层


我们面临一个典型的Serverless架构演进后的困境:几十个细粒度的OpenFaaS函数部署在Google Kubernetes Engine (GKE)上,每个函数负责一个独立的业务领域,例如查询用户信息、获取订单列表、检查库存状态。前端或移动端应用为了渲染一个完整的页面,需要调用多个函数,这导致了客户端的复杂性激增和多次网络往返的性能损耗。

最初的解决方案是引入一个聚合层,本身也是一个Faas函数,它接收前端请求,然后依次或并发地调用其他函数,最后组装数据返回。这个模式很快暴露了其脆弱性:聚合函数成了新的单点瓶颈,其响应时间等于最慢的下游函数调用,并且随着业务逻辑的增加,这个聚合函数会迅速膨胀为“Function-as-a-Monolith”(功能即单体)。

更糟的是,当引入GraphQL作为前端查询语言时,问题变得更加棘手。如果我们部署一个标准的GraphQL服务(例如用Apollo Server编写的Node.js函数),来解析查询并调用下游函数,那么经典的N+1查询问题会在Serverless环境中被无限放大。一个请求user { id name orders { id amount } }会首先调用user函数,然后根据返回的用户ID,循环调用N次orders函数。每一次函数调用都意味着冷启动延迟、网络开销和GCP的计费事件。这在生产环境中是完全不可接受的。

我们的目标是将GraphQL的查询能力与Serverless的分布式特性结合,同时规避其性能陷阱。方案的核心是将GraphQL的解析和数据聚合逻辑前移到API网关层。我们选择Kong,因为它基于Nginx和LuaJIT,性能极高,并且其插件化的架构允许我们注入自定义逻辑。我们将编写一个自定义的Kong Lua插件,它能在网关层面解析GraphQL查询,识别出需要调用的所有后端OpenFaaS函数,并发起并行的上游请求,最终在边缘(Edge)完成数据的拼接。整个过程对客户端透明,且只发生一次网络往返。

环境搭建:GKE上的OpenFaaS与Kong

在深入插件开发之前,一个稳定且配置合理的底层环境是基础。我们在GCP上选择GKE,因为它提供了生产级的托管Kubernetes体验。

  1. 创建GKE集群
    为OpenFaaS和Kong准备一个专用的节点池,实例类型建议选择e2-standard-4或更高,以便应对并发的函数执行和网关处理。

    # 设置GCP项目和区域
    export PROJECT_ID="your-gcp-project-id"
    export REGION="asia-northeast1"
    export CLUSTER_NAME="serverless-graphql-gw"
    
    gcloud config set project ${PROJECT_ID}
    gcloud config set compute/region ${REGION}
    
    # 创建一个具有专用节点池的GKE集群
    gcloud container clusters create ${CLUSTER_NAME} \
        --num-nodes=1 \
        --machine-type=e2-standard-2 \
        --node-locations=${REGION}-a \
        --workload-pool=${PROJECT_ID}.svc.id.goog \
        --release-channel=regular
    
    gcloud container node-pools create openfaas-pool \
        --cluster=${CLUSTER_NAME} \
        --machine-type=e2-standard-4 \
        --num-nodes=3 \
        --enable-autoscaling \
        --min-nodes=2 \
        --max-nodes=6
    
    # 获取集群凭证
    gcloud container clusters get-credentials ${CLUSTER_NAME} --region ${REGION}
  2. 部署OpenFaaS
    使用Helm是部署OpenFaaS最直接的方式。关键配置在于确保gateway服务类型为ClusterIP,因为我们将通过Kong来暴露它,而不是直接暴露给外部。

    # 添加OpenFaaS Helm仓库
    helm repo add openfaas https://openfaas.github.io/faas-netes/
    helm repo update
    
    # 为OpenFaaS创建命名空间
    kubectl apply -f https://raw.githubusercontent.com/openfaas/faas-netes/master/namespaces.yml
    
    # 使用自定义值部署OpenFaaS
    # 注意:generateBasicAuth=true 仅用于演示,生产环境应使用更安全的认证方式
    helm upgrade openfaas openfaas/faas-netes \
        --install \
        --namespace openfaas \
        --set functionNamespace=openfaas-fn \
        --set gateway.serviceType=ClusterIP \
        --set generateBasicAuth=true
  3. 部署带自定义插件能力的Kong
    我们将通过构建一个包含我们自定义插件的Docker镜像来部署Kong。首先部署标准的Kong,后续再替换镜像。

    # 添加Kong Helm仓库
    helm repo add kong https://charts.konghq.com
    helm repo update
    
    # 部署Kong Ingress Controller
    helm install kong kong/kong \
        --namespace kong \
        --create-namespace \
        --set proxy.type=LoadBalancer \
        --set ingressController.installCRDs=true

    部署完成后,记下Kong proxy服务的外部IP,这将是我们的统一入口。

后端函数设计:原子化与契约化

为了让Kong插件能有效地聚合数据,后端函数的接口设计必须遵循一些原则。它们应该是无状态的、原子化的,并且输入输出结构是可预测的。我们用Go编写两个示例函数:user-profileuser-orders

  1. user-profile 函数
    该函数接收一个用户ID,返回用户的基本信息。

    // file: user-profile/handler.go
    package function
    
    import (
        "encoding/json"
        "fmt"
        "net/http"
    )
    
    type User struct {
        ID    string `json:"id"`
        Name  string `json:"name"`
        Email string `json:"email"`
    }
    
    // 模拟数据库查询
    var users = map[string]User{
        "1": {ID: "1", Name: "Alice", Email: "[email protected]"},
        "2": {ID: "2", Name: "Bob", Email: "[email protected]"},
    }
    
    func Handle(w http.ResponseWriter, r *http.Request) {
        // 在真实项目中,id会从请求体或查询参数中获取
        // 这里为了简化,我们硬编码一个ID
        userID := r.URL.Query().Get("id")
        if userID == "" {
            http.Error(w, "user id is required", http.StatusBadRequest)
            return
        }
    
        user, ok := users[userID]
        if !ok {
            http.Error(w, fmt.Sprintf("user with id %s not found", userID), http.StatusNotFound)
            return
        }
    
        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(user)
    }
  2. user-orders 函数
    该函数接收用户ID,返回该用户的订单列表。

    // file: user-orders/handler.go
    package function
    
    import (
        "encoding/json"
        "net/http"
    )
    
    type Order struct {
        ID         string  `json:"id"`
        UserID     string  `json:"-"` // 在JSON中忽略
        Amount     float64 `json:"amount"`
        ItemCount  int     `json:"itemCount"`
    }
    
    // 模拟订单数据
    var orders = map[string][]Order{
        "1": {
            {ID: "order-101", UserID: "1", Amount: 99.99, ItemCount: 2},
            {ID: "order-102", UserID: "1", Amount: 19.50, ItemCount: 1},
        },
        "2": {
            {ID: "order-201", UserID: "2", Amount: 250.00, ItemCount: 5},
        },
    }
    
    func Handle(w http.ResponseWriter, r *http.Request) {
        userID := r.URL.Query().Get("id")
        if userID == "" {
            http.Error(w, "user id is required", http.StatusBadRequest)
            return
        }
    
        userOrders := orders[userID]
        if userOrders == nil {
            userOrders = []Order{} // 返回空数组而不是null
        }
    
        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(userOrders)
    }

    将这两个函数通过faas-cli部署到OpenFaaS。

    # faas-cli build, push, deploy ...
    faas-cli deploy --image your-docker-repo/user-profile:latest --name user-profile
    faas-cli deploy --image your-docker-repo/user-orders:latest --name user-orders

核心实现:自定义Kong GraphQL聚合插件

这是整个架构的核心。我们将创建一个名为 graphql-aggregator 的Kong插件。这个插件会在access阶段运行,拦截发往特定路径的POST请求。

项目结构如下:

graphql-aggregator/
├── kong/plugins/graphql-aggregator/
│   ├── handler.lua
│   └── schema.lua
└── Dockerfile
  1. schema.lua:定义插件配置和GraphQL Schema
    schema.lua文件定义了插件自身的配置,比如GraphQL的schema定义和类型到上游函数的映射。在真实项目中,这些配置可能来自ConfigMap或外部服务。

    -- file: kong/plugins/graphql-aggregator/schema.lua
    local typedefs = require "kong.db.schema.typedefs"
    
    return {
        name = "graphql-aggregator",
        fields = {
            -- 插件本身没有运行时配置,所有逻辑硬编码
            { consumer = { type = "foreign", reference = "consumers" } },
        },
        -- 自定义元数据,非Kong标准,用于我们的handler
        graphql_meta = {
            schema = [[
                type Query {
                    user(id: ID!): User
                }
    
                type User {
                    id: ID!
                    name: String
                    email: String
                    orders: [Order]
                }
    
                type Order {
                    id: ID!
                    amount: Float
                    itemCount: Int
                }
            ]],
            resolvers = {
                Query = {
                    user = {
                        -- 将Query.user解析委托给user-profile函数
                        upstream = "http://user-profile.openfaas-fn.svc.cluster.local:8080",
                        -- 参数映射:将GraphQL参数'id'映射到上游请求的查询参数'id'
                        arg_mapping = {
                            id = { type = "query", value = "id" }
                        }
                    }
                },
                User = {
                    orders = {
                        -- 将User.orders解析委托给user-orders函数
                        upstream = "http://user-orders.openfaas-fn.svc.cluster.local:8080",
                        -- User.orders 需要 user.id 作为参数
                        -- parent_arg指定了从父对象(User)的哪个字段获取值
                        parent_arg = { field = "id", maps_to = { type = "query", value = "id" } }
                    }
                }
            }
        }
    }
  2. handler.lua:插件的核心处理逻辑
    这是最复杂的部分。我们将使用lua-graphql库来解析查询,并使用Kong的ngx.thread API来实现并发的上游请求。

    -- file: kong/plugins/graphql-aggregator/handler.lua
    local BasePlugin = require "kong.plugins.base_plugin"
    local cjson = require "cjson.safe"
    local graphql = require "graphql" -- 假设已安装lua-graphql
    
    local GraphQLAggregatorHandler = BasePlugin:extend()
    
    GraphQLAggregatorHandler.PRIORITY = 1000
    GraphQLAggregatorHandler.VERSION = "0.1.0"
    
    function GraphQLAggregatorHandler:new()
        GraphQLAggregatorHandler.super.new(self, "graphql-aggregator")
    end
    
    -- 核心处理函数
    function GraphQLAggregatorHandler:access(conf)
        GraphQLAggregatorHandler.super.access(self)
    
        -- 1. 只处理POST请求
        if ngx.req.get_method() ~= "POST" then
            return
        end
    
        -- 2. 读取并解析请求体
        ngx.req.read_body()
        local body_str = ngx.req.get_body_data()
        if not body_str then
            return kong.response.exit(400, { message = "Request body is required for GraphQL." })
        end
    
        local body, err = cjson.decode(body_str)
        if not body or err then
            return kong.response.exit(400, { message = "Invalid JSON body: " .. (err or "") })
        end
    
        local query = body.query
        local variables = body.variables or {}
        if not query then
            return kong.response.exit(400, { message = "'query' field is missing in request body." })
        end
    
        -- 3. 获取Schema和Resolver元数据 (从schema.lua中获取)
        local plugin_schema = kong.db.plugins.schemas["graphql-aggregator"]
        local meta = plugin_schema.graphql_meta
        local schema_str = meta.schema
        local resolvers_map = meta.resolvers
    
        -- 4. 解析和验证GraphQL查询
        local schema, err = graphql.schema(schema_str)
        if err then
            kong.log.err("GraphQL schema error: ", err)
            return kong.response.exit(500, { message = "Internal Server Error: Invalid GraphQL Schema" })
        end
    
        local ast, err = graphql.parse(query)
        if err then
            return kong.response.exit(400, { errors = { { message = "Query parsing error: " .. err } } })
        end
    
        -- 验证AST
        local validation_errors = schema:validate(ast)
        if #validation_errors > 0 then
            return kong.response.exit(400, { errors = validation_errors })
        end
    
        -- 5. 执行查询:这是最关键的步骤
        local execution_result, exec_err = self:execute_query(ast, schema, resolvers_map, variables)
        if exec_err then
            return kong.response.exit(500, { message = "Error during query execution: " .. exec_err })
        end
        
        -- 6. 返回结果并终止请求处理
        return kong.response.exit(200, execution_result, { ["Content-Type"] = "application/json" })
    end
    
    -- 查询执行引擎
    function GraphQLAggregatorHandler:execute_query(ast, schema, resolvers_map, variables)
        local results = {}
        local threads = {}
        local root_operation = ast.definitions[1]
    
        -- 遍历Query下的所有字段,例如 'user'
        for _, selection in ipairs(root_operation.selection_set.selections) do
            local field_name = selection.name.value
            local resolver_info = resolvers_map.Query[field_name]
    
            if resolver_info then
                -- 为每个根查询创建一个协程
                local t, err = ngx.thread.spawn(function()
                    local client = require "resty.http"
                    local httpc = client.new()
                    httpc:set_timeout(5000) -- 5秒超时
    
                    -- 构造上游请求URL
                    local path = "/"
                    local params = {}
                    for arg_name, arg_info in pairs(resolver_info.arg_mapping) do
                        -- 实际应用中需要处理变量variables
                        for _, ast_arg in ipairs(selection.arguments) do
                            if ast_arg.name.value == arg_name then
                                -- 注意:这里极度简化,只处理了StringValue
                                params[arg_info.value] = ast_arg.value.value
                            end
                        end
                    end
    
                    local res, err = httpc:request_uri(resolver_info.upstream, {
                        method = "GET", -- OpenFaaS函数通常用GET或POST
                        path = path,
                        query = params,
                    })
    
                    if not res or err then
                        ngx.thread.exit({ error = "Upstream call failed: " .. (err or "unknown") })
                    end
    
                    local body_str = res:read_body()
                    local body_json, json_err = cjson.decode(body_str)
    
                    if json_err then
                         ngx.thread.exit({ error = "Upstream response is not valid JSON." })
                    end
                    
                    if res.status >= 400 then
                        ngx.thread.exit({ error = "Upstream service returned error: " .. res.status, details = body_json })
                    end
    
                    -- ** 递归处理子字段 (N+1问题解决点) **
                    local sub_threads = {}
                    for _, sub_selection in ipairs(selection.selection_set.selections) do
                         local sub_field_name = sub_selection.name.value
                         local sub_resolver_info = resolvers_map[schema:get_type("User").name] and resolvers_map[schema:get_type("User").name][sub_field_name]
    
                         if sub_resolver_info then
                             -- 为子字段解析创建协程
                             local sub_t, sub_err = ngx.thread.spawn(function()
                                 local sub_client = require "resty.http"
                                 local sub_httpc = sub_client.new()
                                 sub_httpc:set_timeout(5000)
    
                                 local sub_params = {}
                                 -- 从父对象结果(body_json)中获取参数
                                 local parent_val = body_json[sub_resolver_info.parent_arg.field]
                                 if parent_val then
                                     sub_params[sub_resolver_info.parent_arg.maps_to.value] = parent_val
                                 end
    
                                 local sub_res, sub_err = sub_httpc:request_uri(sub_resolver_info.upstream, {
                                     method = "GET",
                                     path = "/",
                                     query = sub_params,
                                 })
    
                                 -- ... 错误处理同上 ...
                                 local sub_body_str = sub_res:read_body()
                                 local sub_body_json = cjson.decode(sub_body_str)
    
                                 ngx.thread.exit({ field = sub_field_name, data = sub_body_json })
                             end)
                             table.insert(sub_threads, { thread = sub_t, field = sub_field_name })
                         end
                    end
                    
                    -- 等待所有子查询完成
                    for _, st in ipairs(sub_threads) do
                        local ok, sub_result = ngx.thread.wait(st.thread)
                        if ok and not sub_result.error then
                            body_json[sub_result.field] = sub_result.data
                        else
                           -- 错误处理
                        end
                    end
    
                    ngx.thread.exit({ field = field_name, data = body_json })
                end)
    
                if err then
                    kong.log.err("Failed to spawn thread: ", err)
                else
                    table.insert(threads, { thread = t, field = field_name })
                end
            end
        end
    
        -- 等待所有根查询的协程完成
        for _, item in ipairs(threads) do
            local ok, res_data = ngx.thread.wait(item.thread)
            if ok and not res_data.error then
                results[item.field] = res_data.data
            else
                -- 处理错误,可以组装到GraphQL的errors字段
                results[item.field] = nil
                -- log error...
            end
        end
    
        return { data = results }, nil
    end
    
    
    return GraphQLAggregatorHandler

    代码注释:

    • 这段Lua代码是一个高度简化的执行引擎。生产级的实现需要更完善的错误处理、变量替换、AST遍历、类型检查和安全性考量。
    • ngx.thread.spawn 是关键,它允许我们在Nginx的事件循环中创建轻量级的协程,实现非阻塞的并行I/O。
    • execute_query 函数的核心思想是:解析AST,为每个可以并行解析的字段(无论是根查询还是嵌套字段)创建一个协程去调用对应的OpenFaaS函数。
    • 通过resolvers_map这个元数据,我们将GraphQL的字段解析逻辑声明式地映射到了后端的HTTP服务上。
  3. 构建自定义Kong镜像
    Dockerfile将官方Kong镜像作为基础,并将我们的插件代码复制进去。

    # Dockerfile
    ARG KONG_VERSION=2.8
    FROM kong:${KONG_VERSION}
    
    USER root
    RUN luarocks install lua-graphql
    
    USER kong
    COPY kong/plugins/graphql-aggregator /usr/local/kong/plugins/graphql-aggregator
    ENV KONG_PLUGINS=bundled,graphql-aggregator

    构建并推送到镜像仓库。
    docker build . -t your-docker-repo/custom-kong:latest && docker push your-docker-repo/custom-kong:latest

部署与联调

  1. 更新Kong部署
    使用helm upgrade命令,将Kong的Deployment镜像更新为我们刚刚构建的自定义镜像。

    helm upgrade kong kong/kong \
        --namespace kong \
        --set image.repository=your-docker-repo/custom-kong \
        --set image.tag=latest
  2. 应用Kong配置
    我们使用Kubernetes CRD来配置Kong。创建一个Ingress来定义路由,并通过KongPlugin CRD来启用我们的插件。

    # file: kong-graphql-config.yaml
    apiVersion: networking.k8s.io/v1
    kind: Ingress
    metadata:
      name: graphql-endpoint
      namespace: openfaas-fn # Ingress需要和服务在同一个namespace,或者引用不同namespace的服务
      annotations:
        konghq.com/strip-path: 'true'
        # 关键:启用我们的自定义插件
        konghq.com/plugins: graphql-aggregator-plugin
    spec:
      ingressClassName: kong
      rules:
      - http:
          paths:
          - path: /graphql
            pathType: ImplementationSpecific
            backend:
              # 这个backend只是一个占位符,因为我们的插件会拦截并处理请求,不会真正转发到这里
              service:
                name: user-profile # 任意一个存在的服务即可
                port:
                  number: 8080
    ---
    apiVersion: configuration.konghq.com/v1
    kind: KongPlugin
    metadata:
      name: graphql-aggregator-plugin
      namespace: openfaas-fn
    plugin: graphql-aggregator
    # 这个插件目前没有可配置项,所以config是空的
    # config: {}

    应用这个YAML文件:kubectl apply -f kong-graphql-config.yaml

  3. 客户端测试
    现在,一切准备就绪。我们可以使用任何GraphQL客户端或curl来测试我们的新端点。

    export KONG_PROXY_IP=$(kubectl get svc -n kong kong-proxy -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
    
    curl -X POST \
      -H "Content-Type: application/json" \
      -d '{
        "query": "query GetUserWithOrders($userId: ID!) { user(id: $userId) { id name email orders { id amount } } }",
        "variables": {
          "userId": "1"
        }
      }' \
      http://${KONG_PROXY_IP}/graphql

    如果一切正常,返回结果应该是:

    {
      "data": {
        "user": {
          "id": "1",
          "name": "Alice",
          "email": "[email protected]",
          "orders": [
            {
              "id": "order-101",
              "amount": 99.99
            },
            {
              "id": "order-102",
              "amount": 19.5
            }
          ]
        }
      }
    }

    在后台,Kong插件接收到这个请求后,会并行地向user-profileuser-orders函数发起请求,并将结果拼接在一起。

    我们可以用Mermaid图来清晰地展示请求流程的优化:

        sequenceDiagram
         participant Client
         participant KongGateway
         participant GraphQL_Aggregator_Plugin
         participant UserProfile_Func
         participant UserOrders_Func
    
         Client->>+KongGateway: POST /graphql { user(id:1){... orders{...} } }
         KongGateway->>+GraphQL_Aggregator_Plugin: access phase
         GraphQL_Aggregator_Plugin-->>GraphQL_Aggregator_Plugin: Parse & Validate Query
         
         par
             GraphQL_Aggregator_Plugin->>+UserProfile_Func: GET /?id=1
             UserProfile_Func-->>-GraphQL_Aggregator_Plugin: {id, name, email}
         and
             GraphQL_Aggregator_Plugin->>+UserOrders_Func: GET /?id=1
             UserOrders_Func-->>-GraphQL_Aggregator_Plugin: [{id, amount}, ...]
         end
    
         GraphQL_Aggregator_Plugin-->>GraphQL_Aggregator_Plugin: Stitch JSON results
         GraphQL_Aggregator_Plugin->>-KongGateway: Return final GraphQL JSON
         KongGateway-->>-Client: 200 OK with aggregated data

    这个流程避免了任何中间聚合服务,将并行扇出(fan-out)和数据聚合(fan-in)的操作放在了离客户端最近的网关层,实现了性能的最大化。

当前方案的局限性与未来展望

这个自定义Kong插件的方案虽然性能优越,但并非银弹。在真实项目中,它引入了新的复杂性。维护自定义Lua插件需要专门的技能,并且我们手写的执行引擎远没有Apollo Server或Hot Chocolate等成熟GraphQL框架功能完善。例如,当前实现没有支持GraphQL的Mutation、Subscription,也没有复杂的错误处理、数据加载器(Dataloader)来处理批处理和缓存。

此外,Schema和Resolver的映射逻辑目前是硬编码在schema.lua中的,这在函数数量和类型定义爆炸式增长时会变得难以维护。一个更健壮的方案是让插件在启动时从一个Schema注册中心(Schema Registry)动态拉取这些配置,实现配置与代码的分离。

对于更复杂的、跨多个团队的大规模微服务场景,GraphQL Federation可能是更合适的架构选择。它提供了一套标准化的方式来组合多个独立的GraphQL子图。然而,对于中小型项目,或者对边缘性能有极致要求的特定场景,在Kong这样的高性能网关上实现一个轻量级的、定制化的GraphQL聚合层,仍然是一个极具吸引力且行之有效的架构决策。


  目录