我们面临一个典型的Serverless架构演进后的困境:几十个细粒度的OpenFaaS函数部署在Google Kubernetes Engine (GKE)上,每个函数负责一个独立的业务领域,例如查询用户信息、获取订单列表、检查库存状态。前端或移动端应用为了渲染一个完整的页面,需要调用多个函数,这导致了客户端的复杂性激增和多次网络往返的性能损耗。
最初的解决方案是引入一个聚合层,本身也是一个Faas函数,它接收前端请求,然后依次或并发地调用其他函数,最后组装数据返回。这个模式很快暴露了其脆弱性:聚合函数成了新的单点瓶颈,其响应时间等于最慢的下游函数调用,并且随着业务逻辑的增加,这个聚合函数会迅速膨胀为“Function-as-a-Monolith”(功能即单体)。
更糟的是,当引入GraphQL作为前端查询语言时,问题变得更加棘手。如果我们部署一个标准的GraphQL服务(例如用Apollo Server编写的Node.js函数),来解析查询并调用下游函数,那么经典的N+1
查询问题会在Serverless环境中被无限放大。一个请求user { id name orders { id amount } }
会首先调用user
函数,然后根据返回的用户ID,循环调用N次orders
函数。每一次函数调用都意味着冷启动延迟、网络开销和GCP的计费事件。这在生产环境中是完全不可接受的。
我们的目标是将GraphQL的查询能力与Serverless的分布式特性结合,同时规避其性能陷阱。方案的核心是将GraphQL的解析和数据聚合逻辑前移到API网关层。我们选择Kong,因为它基于Nginx和LuaJIT,性能极高,并且其插件化的架构允许我们注入自定义逻辑。我们将编写一个自定义的Kong Lua插件,它能在网关层面解析GraphQL查询,识别出需要调用的所有后端OpenFaaS函数,并发起并行的上游请求,最终在边缘(Edge)完成数据的拼接。整个过程对客户端透明,且只发生一次网络往返。
环境搭建:GKE上的OpenFaaS与Kong
在深入插件开发之前,一个稳定且配置合理的底层环境是基础。我们在GCP上选择GKE,因为它提供了生产级的托管Kubernetes体验。
创建GKE集群
为OpenFaaS和Kong准备一个专用的节点池,实例类型建议选择e2-standard-4
或更高,以便应对并发的函数执行和网关处理。# 设置GCP项目和区域 export PROJECT_ID="your-gcp-project-id" export REGION="asia-northeast1" export CLUSTER_NAME="serverless-graphql-gw" gcloud config set project ${PROJECT_ID} gcloud config set compute/region ${REGION} # 创建一个具有专用节点池的GKE集群 gcloud container clusters create ${CLUSTER_NAME} \ --num-nodes=1 \ --machine-type=e2-standard-2 \ --node-locations=${REGION}-a \ --workload-pool=${PROJECT_ID}.svc.id.goog \ --release-channel=regular gcloud container node-pools create openfaas-pool \ --cluster=${CLUSTER_NAME} \ --machine-type=e2-standard-4 \ --num-nodes=3 \ --enable-autoscaling \ --min-nodes=2 \ --max-nodes=6 # 获取集群凭证 gcloud container clusters get-credentials ${CLUSTER_NAME} --region ${REGION}
部署OpenFaaS
使用Helm是部署OpenFaaS最直接的方式。关键配置在于确保gateway
服务类型为ClusterIP
,因为我们将通过Kong来暴露它,而不是直接暴露给外部。# 添加OpenFaaS Helm仓库 helm repo add openfaas https://openfaas.github.io/faas-netes/ helm repo update # 为OpenFaaS创建命名空间 kubectl apply -f https://raw.githubusercontent.com/openfaas/faas-netes/master/namespaces.yml # 使用自定义值部署OpenFaaS # 注意:generateBasicAuth=true 仅用于演示,生产环境应使用更安全的认证方式 helm upgrade openfaas openfaas/faas-netes \ --install \ --namespace openfaas \ --set functionNamespace=openfaas-fn \ --set gateway.serviceType=ClusterIP \ --set generateBasicAuth=true
部署带自定义插件能力的Kong
我们将通过构建一个包含我们自定义插件的Docker镜像来部署Kong。首先部署标准的Kong,后续再替换镜像。# 添加Kong Helm仓库 helm repo add kong https://charts.konghq.com helm repo update # 部署Kong Ingress Controller helm install kong kong/kong \ --namespace kong \ --create-namespace \ --set proxy.type=LoadBalancer \ --set ingressController.installCRDs=true
部署完成后,记下Kong
proxy
服务的外部IP,这将是我们的统一入口。
后端函数设计:原子化与契约化
为了让Kong插件能有效地聚合数据,后端函数的接口设计必须遵循一些原则。它们应该是无状态的、原子化的,并且输入输出结构是可预测的。我们用Go编写两个示例函数:user-profile
和user-orders
。
user-profile
函数
该函数接收一个用户ID,返回用户的基本信息。// file: user-profile/handler.go package function import ( "encoding/json" "fmt" "net/http" ) type User struct { ID string `json:"id"` Name string `json:"name"` Email string `json:"email"` } // 模拟数据库查询 var users = map[string]User{ "1": {ID: "1", Name: "Alice", Email: "[email protected]"}, "2": {ID: "2", Name: "Bob", Email: "[email protected]"}, } func Handle(w http.ResponseWriter, r *http.Request) { // 在真实项目中,id会从请求体或查询参数中获取 // 这里为了简化,我们硬编码一个ID userID := r.URL.Query().Get("id") if userID == "" { http.Error(w, "user id is required", http.StatusBadRequest) return } user, ok := users[userID] if !ok { http.Error(w, fmt.Sprintf("user with id %s not found", userID), http.StatusNotFound) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(user) }
user-orders
函数
该函数接收用户ID,返回该用户的订单列表。// file: user-orders/handler.go package function import ( "encoding/json" "net/http" ) type Order struct { ID string `json:"id"` UserID string `json:"-"` // 在JSON中忽略 Amount float64 `json:"amount"` ItemCount int `json:"itemCount"` } // 模拟订单数据 var orders = map[string][]Order{ "1": { {ID: "order-101", UserID: "1", Amount: 99.99, ItemCount: 2}, {ID: "order-102", UserID: "1", Amount: 19.50, ItemCount: 1}, }, "2": { {ID: "order-201", UserID: "2", Amount: 250.00, ItemCount: 5}, }, } func Handle(w http.ResponseWriter, r *http.Request) { userID := r.URL.Query().Get("id") if userID == "" { http.Error(w, "user id is required", http.StatusBadRequest) return } userOrders := orders[userID] if userOrders == nil { userOrders = []Order{} // 返回空数组而不是null } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(userOrders) }
将这两个函数通过
faas-cli
部署到OpenFaaS。# faas-cli build, push, deploy ... faas-cli deploy --image your-docker-repo/user-profile:latest --name user-profile faas-cli deploy --image your-docker-repo/user-orders:latest --name user-orders
核心实现:自定义Kong GraphQL聚合插件
这是整个架构的核心。我们将创建一个名为 graphql-aggregator
的Kong插件。这个插件会在access
阶段运行,拦截发往特定路径的POST请求。
项目结构如下:
graphql-aggregator/
├── kong/plugins/graphql-aggregator/
│ ├── handler.lua
│ └── schema.lua
└── Dockerfile
schema.lua
:定义插件配置和GraphQL Schema
schema.lua
文件定义了插件自身的配置,比如GraphQL的schema定义和类型到上游函数的映射。在真实项目中,这些配置可能来自ConfigMap或外部服务。-- file: kong/plugins/graphql-aggregator/schema.lua local typedefs = require "kong.db.schema.typedefs" return { name = "graphql-aggregator", fields = { -- 插件本身没有运行时配置,所有逻辑硬编码 { consumer = { type = "foreign", reference = "consumers" } }, }, -- 自定义元数据,非Kong标准,用于我们的handler graphql_meta = { schema = [[ type Query { user(id: ID!): User } type User { id: ID! name: String email: String orders: [Order] } type Order { id: ID! amount: Float itemCount: Int } ]], resolvers = { Query = { user = { -- 将Query.user解析委托给user-profile函数 upstream = "http://user-profile.openfaas-fn.svc.cluster.local:8080", -- 参数映射:将GraphQL参数'id'映射到上游请求的查询参数'id' arg_mapping = { id = { type = "query", value = "id" } } } }, User = { orders = { -- 将User.orders解析委托给user-orders函数 upstream = "http://user-orders.openfaas-fn.svc.cluster.local:8080", -- User.orders 需要 user.id 作为参数 -- parent_arg指定了从父对象(User)的哪个字段获取值 parent_arg = { field = "id", maps_to = { type = "query", value = "id" } } } } } } }
handler.lua
:插件的核心处理逻辑
这是最复杂的部分。我们将使用lua-graphql
库来解析查询,并使用Kong的ngx.thread
API来实现并发的上游请求。-- file: kong/plugins/graphql-aggregator/handler.lua local BasePlugin = require "kong.plugins.base_plugin" local cjson = require "cjson.safe" local graphql = require "graphql" -- 假设已安装lua-graphql local GraphQLAggregatorHandler = BasePlugin:extend() GraphQLAggregatorHandler.PRIORITY = 1000 GraphQLAggregatorHandler.VERSION = "0.1.0" function GraphQLAggregatorHandler:new() GraphQLAggregatorHandler.super.new(self, "graphql-aggregator") end -- 核心处理函数 function GraphQLAggregatorHandler:access(conf) GraphQLAggregatorHandler.super.access(self) -- 1. 只处理POST请求 if ngx.req.get_method() ~= "POST" then return end -- 2. 读取并解析请求体 ngx.req.read_body() local body_str = ngx.req.get_body_data() if not body_str then return kong.response.exit(400, { message = "Request body is required for GraphQL." }) end local body, err = cjson.decode(body_str) if not body or err then return kong.response.exit(400, { message = "Invalid JSON body: " .. (err or "") }) end local query = body.query local variables = body.variables or {} if not query then return kong.response.exit(400, { message = "'query' field is missing in request body." }) end -- 3. 获取Schema和Resolver元数据 (从schema.lua中获取) local plugin_schema = kong.db.plugins.schemas["graphql-aggregator"] local meta = plugin_schema.graphql_meta local schema_str = meta.schema local resolvers_map = meta.resolvers -- 4. 解析和验证GraphQL查询 local schema, err = graphql.schema(schema_str) if err then kong.log.err("GraphQL schema error: ", err) return kong.response.exit(500, { message = "Internal Server Error: Invalid GraphQL Schema" }) end local ast, err = graphql.parse(query) if err then return kong.response.exit(400, { errors = { { message = "Query parsing error: " .. err } } }) end -- 验证AST local validation_errors = schema:validate(ast) if #validation_errors > 0 then return kong.response.exit(400, { errors = validation_errors }) end -- 5. 执行查询:这是最关键的步骤 local execution_result, exec_err = self:execute_query(ast, schema, resolvers_map, variables) if exec_err then return kong.response.exit(500, { message = "Error during query execution: " .. exec_err }) end -- 6. 返回结果并终止请求处理 return kong.response.exit(200, execution_result, { ["Content-Type"] = "application/json" }) end -- 查询执行引擎 function GraphQLAggregatorHandler:execute_query(ast, schema, resolvers_map, variables) local results = {} local threads = {} local root_operation = ast.definitions[1] -- 遍历Query下的所有字段,例如 'user' for _, selection in ipairs(root_operation.selection_set.selections) do local field_name = selection.name.value local resolver_info = resolvers_map.Query[field_name] if resolver_info then -- 为每个根查询创建一个协程 local t, err = ngx.thread.spawn(function() local client = require "resty.http" local httpc = client.new() httpc:set_timeout(5000) -- 5秒超时 -- 构造上游请求URL local path = "/" local params = {} for arg_name, arg_info in pairs(resolver_info.arg_mapping) do -- 实际应用中需要处理变量variables for _, ast_arg in ipairs(selection.arguments) do if ast_arg.name.value == arg_name then -- 注意:这里极度简化,只处理了StringValue params[arg_info.value] = ast_arg.value.value end end end local res, err = httpc:request_uri(resolver_info.upstream, { method = "GET", -- OpenFaaS函数通常用GET或POST path = path, query = params, }) if not res or err then ngx.thread.exit({ error = "Upstream call failed: " .. (err or "unknown") }) end local body_str = res:read_body() local body_json, json_err = cjson.decode(body_str) if json_err then ngx.thread.exit({ error = "Upstream response is not valid JSON." }) end if res.status >= 400 then ngx.thread.exit({ error = "Upstream service returned error: " .. res.status, details = body_json }) end -- ** 递归处理子字段 (N+1问题解决点) ** local sub_threads = {} for _, sub_selection in ipairs(selection.selection_set.selections) do local sub_field_name = sub_selection.name.value local sub_resolver_info = resolvers_map[schema:get_type("User").name] and resolvers_map[schema:get_type("User").name][sub_field_name] if sub_resolver_info then -- 为子字段解析创建协程 local sub_t, sub_err = ngx.thread.spawn(function() local sub_client = require "resty.http" local sub_httpc = sub_client.new() sub_httpc:set_timeout(5000) local sub_params = {} -- 从父对象结果(body_json)中获取参数 local parent_val = body_json[sub_resolver_info.parent_arg.field] if parent_val then sub_params[sub_resolver_info.parent_arg.maps_to.value] = parent_val end local sub_res, sub_err = sub_httpc:request_uri(sub_resolver_info.upstream, { method = "GET", path = "/", query = sub_params, }) -- ... 错误处理同上 ... local sub_body_str = sub_res:read_body() local sub_body_json = cjson.decode(sub_body_str) ngx.thread.exit({ field = sub_field_name, data = sub_body_json }) end) table.insert(sub_threads, { thread = sub_t, field = sub_field_name }) end end -- 等待所有子查询完成 for _, st in ipairs(sub_threads) do local ok, sub_result = ngx.thread.wait(st.thread) if ok and not sub_result.error then body_json[sub_result.field] = sub_result.data else -- 错误处理 end end ngx.thread.exit({ field = field_name, data = body_json }) end) if err then kong.log.err("Failed to spawn thread: ", err) else table.insert(threads, { thread = t, field = field_name }) end end end -- 等待所有根查询的协程完成 for _, item in ipairs(threads) do local ok, res_data = ngx.thread.wait(item.thread) if ok and not res_data.error then results[item.field] = res_data.data else -- 处理错误,可以组装到GraphQL的errors字段 results[item.field] = nil -- log error... end end return { data = results }, nil end return GraphQLAggregatorHandler
代码注释:
- 这段Lua代码是一个高度简化的执行引擎。生产级的实现需要更完善的错误处理、变量替换、AST遍历、类型检查和安全性考量。
-
ngx.thread.spawn
是关键,它允许我们在Nginx的事件循环中创建轻量级的协程,实现非阻塞的并行I/O。 -
execute_query
函数的核心思想是:解析AST,为每个可以并行解析的字段(无论是根查询还是嵌套字段)创建一个协程去调用对应的OpenFaaS函数。 - 通过
resolvers_map
这个元数据,我们将GraphQL的字段解析逻辑声明式地映射到了后端的HTTP服务上。
构建自定义Kong镜像
Dockerfile
将官方Kong镜像作为基础,并将我们的插件代码复制进去。# Dockerfile ARG KONG_VERSION=2.8 FROM kong:${KONG_VERSION} USER root RUN luarocks install lua-graphql USER kong COPY kong/plugins/graphql-aggregator /usr/local/kong/plugins/graphql-aggregator ENV KONG_PLUGINS=bundled,graphql-aggregator
构建并推送到镜像仓库。
docker build . -t your-docker-repo/custom-kong:latest && docker push your-docker-repo/custom-kong:latest
部署与联调
更新Kong部署
使用helm upgrade
命令,将Kong的Deployment镜像更新为我们刚刚构建的自定义镜像。helm upgrade kong kong/kong \ --namespace kong \ --set image.repository=your-docker-repo/custom-kong \ --set image.tag=latest
应用Kong配置
我们使用Kubernetes CRD来配置Kong。创建一个Ingress
来定义路由,并通过KongPlugin
CRD来启用我们的插件。# file: kong-graphql-config.yaml apiVersion: networking.k8s.io/v1 kind: Ingress metadata: name: graphql-endpoint namespace: openfaas-fn # Ingress需要和服务在同一个namespace,或者引用不同namespace的服务 annotations: konghq.com/strip-path: 'true' # 关键:启用我们的自定义插件 konghq.com/plugins: graphql-aggregator-plugin spec: ingressClassName: kong rules: - http: paths: - path: /graphql pathType: ImplementationSpecific backend: # 这个backend只是一个占位符,因为我们的插件会拦截并处理请求,不会真正转发到这里 service: name: user-profile # 任意一个存在的服务即可 port: number: 8080 --- apiVersion: configuration.konghq.com/v1 kind: KongPlugin metadata: name: graphql-aggregator-plugin namespace: openfaas-fn plugin: graphql-aggregator # 这个插件目前没有可配置项,所以config是空的 # config: {}
应用这个YAML文件:
kubectl apply -f kong-graphql-config.yaml
客户端测试
现在,一切准备就绪。我们可以使用任何GraphQL客户端或curl
来测试我们的新端点。export KONG_PROXY_IP=$(kubectl get svc -n kong kong-proxy -o jsonpath='{.status.loadBalancer.ingress[0].ip}') curl -X POST \ -H "Content-Type: application/json" \ -d '{ "query": "query GetUserWithOrders($userId: ID!) { user(id: $userId) { id name email orders { id amount } } }", "variables": { "userId": "1" } }' \ http://${KONG_PROXY_IP}/graphql
如果一切正常,返回结果应该是:
{ "data": { "user": { "id": "1", "name": "Alice", "email": "[email protected]", "orders": [ { "id": "order-101", "amount": 99.99 }, { "id": "order-102", "amount": 19.5 } ] } } }
在后台,Kong插件接收到这个请求后,会并行地向
user-profile
和user-orders
函数发起请求,并将结果拼接在一起。我们可以用Mermaid图来清晰地展示请求流程的优化:
sequenceDiagram participant Client participant KongGateway participant GraphQL_Aggregator_Plugin participant UserProfile_Func participant UserOrders_Func Client->>+KongGateway: POST /graphql { user(id:1){... orders{...} } } KongGateway->>+GraphQL_Aggregator_Plugin: access phase GraphQL_Aggregator_Plugin-->>GraphQL_Aggregator_Plugin: Parse & Validate Query par GraphQL_Aggregator_Plugin->>+UserProfile_Func: GET /?id=1 UserProfile_Func-->>-GraphQL_Aggregator_Plugin: {id, name, email} and GraphQL_Aggregator_Plugin->>+UserOrders_Func: GET /?id=1 UserOrders_Func-->>-GraphQL_Aggregator_Plugin: [{id, amount}, ...] end GraphQL_Aggregator_Plugin-->>GraphQL_Aggregator_Plugin: Stitch JSON results GraphQL_Aggregator_Plugin->>-KongGateway: Return final GraphQL JSON KongGateway-->>-Client: 200 OK with aggregated data
这个流程避免了任何中间聚合服务,将并行扇出(fan-out)和数据聚合(fan-in)的操作放在了离客户端最近的网关层,实现了性能的最大化。
当前方案的局限性与未来展望
这个自定义Kong插件的方案虽然性能优越,但并非银弹。在真实项目中,它引入了新的复杂性。维护自定义Lua插件需要专门的技能,并且我们手写的执行引擎远没有Apollo Server或Hot Chocolate等成熟GraphQL框架功能完善。例如,当前实现没有支持GraphQL的Mutation、Subscription,也没有复杂的错误处理、数据加载器(Dataloader)来处理批处理和缓存。
此外,Schema和Resolver的映射逻辑目前是硬编码在schema.lua
中的,这在函数数量和类型定义爆炸式增长时会变得难以维护。一个更健壮的方案是让插件在启动时从一个Schema注册中心(Schema Registry)动态拉取这些配置,实现配置与代码的分离。
对于更复杂的、跨多个团队的大规模微服务场景,GraphQL Federation可能是更合适的架构选择。它提供了一套标准化的方式来组合多个独立的GraphQL子图。然而,对于中小型项目,或者对边缘性能有极致要求的特定场景,在Kong这样的高性能网关上实现一个轻量级的、定制化的GraphQL聚合层,仍然是一个极具吸引力且行之有效的架构决策。