构建基于Cloud Functions的Trino异步查询网关并集成Relay与Headless UI


为内部数据分析团队提供一个稳定、可扩展且成本可控的即席查询(ad-hoc query)平台,是一项普遍但棘手的挑战。核心矛盾在于:分析师需要能够对横跨多个数据源(如BigQuery, GCS, PostgreSQL)的TB级数据执行长耗时查询,而工程团队则需要避免为此部署和维护一套昂贵的、长期运行的后端服务。直接暴露Trino(或Presto)集群的端口会带来严重的安全隐患和访问控制难题。

一个典型的诉求是构建一个Web界面,用户提交SQL,系统在后台执行,完成后返回结果。这看似简单,但对后端架构提出了明确要求:必须是异步的,能够处理可能持续数分钟乃至更久的查询,同时在没有查询时将资源成本降至最低。

定义复杂技术问题

架构设计的核心目标可以分解为以下几点:

  1. 异步执行: 前端提交查询后应立即得到响应,不能阻塞等待查询完成。
  2. 长轮询与状态管理: 前端需要一种机制来轮询查询状态(QUEUED, RUNNING, FINISHED, FAILED),并在完成后获取结果。
  3. 无状态与可扩展: API层必须是无状态的,以便利用云平台的弹性伸缩能力。系统应能平滑处理从零星几个到上百个并发查询的负载。
  4. 成本效益: 在无查询任务时,计算资源消耗应趋近于零。
  5. 安全隔离: 绝对不能将Trino集群的协调器(Coordinator)直接暴露给终端用户或前端应用。所有交互必须通过一个受控的API网关。
  6. 前端解耦: UI组件本身应只关注逻辑和状态,不耦合具体样式,以便在公司内部不同的产品线中复用。这正是Headless UI理念的用武之地。

方案A:传统有状态后端服务

一个直接的方案是部署一个常驻的Node.js或Go服务,例如在Google Kubernetes Engine (GKE)或Cloud Run上。该服务通过WebSocket或长连接管理客户端,接收SQL后,在内部维护一个状态机来跟踪每个查询的生命周期,并通过Trino的JDBC/REST客户端与集群交互。

优势:

  • 状态管理直观: 在服务内存或外部缓存(如Redis)中维护查询状态相对简单。
  • 实时通信: 使用WebSocket可以实现实时状态推送,用户体验更佳。

劣势:

  • 成本高昂: 即便使用Cloud Run并缩容到零,冷启动对于交互式查询来说可能是个问题。对于GKE,则需要维护至少一个常备节点,无论有无流量都会产生费用。
  • 运维负担: 需要配置部署、监控、日志、水平扩展策略(HPA),这些都是不小的运维成本。
  • 状态管理复杂性: 如果服务需要水平扩展,查询状态必须存储在外部共享系统中(如Redis或数据库),这增加了架构的复杂性和潜在的故障点。

方案B:基于Serverless的异步事件驱动网关

此方案规避了常驻服务,转而使用一组Serverless函数来编排整个查询流程。Google Cloud Functions是实现这一模式的理想选择。

优势:

  • 极致的成本效益: 完全按调用次数和执行时间计费。没有查询时,成本为零。
  • 自动伸缩: Google Cloud负责处理所有扩展问题,无需任何手动配置。
  • 无运维负担: 无需管理服务器、容器或操作系统。
  • 天然的无状态: 迫使开发者设计无状态的、可水平扩展的逻辑。

劣势:

  • 异步流程复杂: 管理长耗时任务的状态需要精心设计。传统的请求-响应模式不再适用,必须引入轮询或事件回调机制。
  • 潜在的超时限制: Cloud Functions有最大执行时间限制(尽管最长可达9分钟,对于多数即席查询已足够),需要考虑超长查询的处理策略。

最终选择与理由

在真实项目中,内部工具的流量往往是突发性和不可预测的。可能在某个下午有数十位分析师同时进行密集的探索性查询,而在大部分夜间时间则完全空闲。方案B的成本模型与这种使用模式完美契合。虽然其异步状态管理的实现更为复杂,但这种复杂性可以通过清晰的架构设计来控制,并且一旦建成,其运维成本远低于方案A。

我们将采用方案B,并利用Relay的现代数据获取能力来优雅地处理前端的轮询和状态同步,从而将后端的复杂性对前端开发者屏蔽。

核心实现概览

整体架构分为三层:前端UI、Serverless API层和后端的Trino集群。

graph TD
    subgraph "前端 (Browser)"
        A[Headless UI - React Component] --> B{Relay Environment};
    end

    subgraph "Serverless API (Google Cloud)"
        B -- GraphQL Mutation: submitQuery --> C[API Gateway];
        C --> D[Cloud Function: submit-trino-query];
        B -- GraphQL Query: queryStatus (Polling) --> C;
        C --> E[Cloud Function: check-trino-status];
    end

    subgraph "数据基础设施 (VPC)"
        F[Trino Coordinator];
    end

    D -- "1. POST /v1/statement (async)" --> F;
    F -- "2. Returns { id, stats.state, nextUri }" --> D;
    D -- "3. Returns { queryId }" --> C;
    E -- "4. GET {nextUri}" --> F;
    F -- "5. Returns { id, stats.state, columns, data, nextUri }" --> E;
    E -- "6. Returns { status, results? }" --> C;

    style A fill:#D2E9FF,stroke:#333,stroke-width:2px
    style F fill:#FFDAB9,stroke:#333,stroke-width:2px

这个流程的核心思想是将一个长查询分解为三个独立的、短暂的HTTP交互:

  1. 提交: 前端调用submit-trino-query函数,该函数将SQL代理提交给Trino并立即返回一个唯一的queryId
  2. 轮询: 前端使用获取到的queryId,周期性地调用check-trino-status函数。
  3. 获取结果: check-trino-status函数通过Trino返回的nextUri分页获取状态和最终数据,直到查询完成。

后端实现: Google Cloud Functions (Node.js)

我们将创建两个HTTP触发的Cloud Function。生产环境中,这些函数应通过API Gateway暴露,并配置IAM进行身份验证和授权。

1. submit-trino-query 函数

这个函数负责接收用户的SQL语句,将其发送给Trino,然后立即返回Trino生成的查询ID。

package.json 依赖:

{
  "dependencies": {
    "axios": "^1.6.0",
    "@google-cloud/functions-framework": "^3.0.0",
    "cors": "^2.8.5"
  }
}

index.js - submit-trino-query

const functions = require('@google-cloud/functions-framework');
const axios = require('axios');
const cors = require('cors')({origin: true});

// 生产环境中,这些配置应从Secret Manager中获取
const TRINO_COORDINATOR_URL = process.env.TRINO_COORDINATOR_URL; // e.g., http://trino.internal:8080
const TRINO_USER = process.env.TRINO_USER || 'internal-query-service';

// 创建一个配置好的axios实例,用于与Trino通信
const trinoClient = axios.create({
  baseURL: TRINO_COORDINATOR_URL,
  headers: {
    'X-Trino-User': TRINO_USER,
    'X-Trino-Source': 'cloud-function-submitter',
    'Content-Type': 'text/plain',
  },
  // 关键:为内部VPC中的Trino设置较长的超时
  timeout: 10000, 
});

functions.http('submitTrinoQuery', (req, res) => {
  // 使用cors中间件处理跨域请求
  cors(req, res, async () => {
    if (req.method !== 'POST') {
      return res.status(405).send('Method Not Allowed');
    }
    
    const sqlQuery = req.body.query;
    if (!sqlQuery || typeof sqlQuery !== 'string') {
      console.warn('Validation Error: Missing or invalid query parameter.');
      return res.status(400).send('Bad Request: "query" field is required and must be a string.');
    }

    try {
      console.log(`Submitting query to Trino for user: ${TRINO_USER}`);
      const response = await trinoClient.post('/v1/statement', sqlQuery);

      // Trino成功接收查询后,会返回一个包含queryId和nextUri的初始响应
      // 这是异步执行的关键
      if (response.status === 200 && response.data.id) {
        console.log(`Successfully submitted query. Trino Query ID: ${response.data.id}`);
        res.status(202).json({
          queryId: response.data.id,
          status: response.data.stats.state,
          nextUri: response.data.nextUri, // 初始的轮询地址
        });
      } else {
        // 捕获Trino直接返回的非200状态码,例如SQL语法错误
        console.error('Trino rejected the query submission.', { status: response.status, data: response.data });
        res.status(502).send('Trino coordinator rejected the query.');
      }
    } catch (error) {
      console.error('Error submitting query to Trino coordinator:', error.message);
      if (error.response) {
        // Trino API返回的详细错误
        console.error('Trino Error Response:', { 
          status: error.response.status, 
          headers: error.response.headers,
          data: error.response.data 
        });
        res.status(502).json({
          message: 'Bad Gateway: Failed to communicate with Trino.',
          trinoError: error.response.data
        });
      } else if (error.request) {
        // 请求已发出但没有收到响应,通常是网络问题或超时
        console.error('No response received from Trino. Check network connectivity and Trino coordinator health.');
        res.status(504).send('Gateway Timeout: No response from Trino coordinator.');
      } else {
        // 其他未知错误
        res.status(500).send('Internal Server Error.');
      }
    }
  });
});

这个函数的单元测试思路:

  • Mock axios.post 来模拟Trino的成功响应 (200 OK)。
  • Mock axios.post 来模拟Trino拒绝请求(例如返回400 Bad Request)。
  • Mock axios.post 来模拟网络超时。
  • 测试输入验证,例如当请求体中没有query字段时。

2. check-trino-status 函数

这个函数是轮询的核心。它接收一个Trino的nextUri,并向其发送GET请求以获取最新状态。

index.js - check-trino-status

const functions = require('@google-cloud/functions-framework');
const axios = require('axios');
const cors = require('cors')({origin: true});

// 这里不需要baseURL,因为我们将使用完整的nextUri
const trinoFetcher = axios.create({
  headers: {
    'X-Trino-User': process.env.TRINO_USER || 'internal-query-service',
    'X-Trino-Source': 'cloud-function-checker',
  },
  timeout: 10000,
});


functions.http('checkTrinoStatus', (req, res) => {
  cors(req, res, async () => {
    if (req.method !== 'POST') {
      return res.status(405).send('Method Not Allowed');
    }

    const { nextUri } = req.body;
    if (!nextUri || !nextUri.startsWith(process.env.TRINO_COORDINATOR_URL)) {
      // 安全性检查:确保请求的URI是我们Trino集群的地址
      console.warn('Validation Error: Invalid or missing nextUri.', { nextUri });
      return res.status(400).send('Bad Request: A valid "nextUri" is required.');
    }
    
    try {
      const response = await trinoFetcher.get(nextUri);

      if (response.status === 200) {
        const { id, stats, columns, data, nextUri: newNextUri, error } = response.data;
        const payload = {
            queryId: id,
            status: stats.state, // QUEUED, RUNNING, FINISHED, FAILED
            stats: { // 剥离出关键统计数据
                elapsedTime: stats.elapsedTime,
                processedRows: stats.processedRows,
                processedBytes: stats.processedBytes,
            },
            columns: columns || null,
            data: data || null,
            nextUri: newNextUri || null, // 如果查询未完成,Trino会提供下一个轮询地址
            error: error || null,
        };

        console.log(`Checked status for query ${id}: ${stats.state}`);
        res.status(200).json(payload);
      } else {
        console.error('Received non-200 response from Trino on status check.', { status: response.status });
        res.status(502).send(`Bad Gateway: Trino returned status ${response.status}`);
      }
    } catch (error) {
      console.error(`Error fetching status from Trino URI ${nextUri}:`, error.message);
      if (error.response) {
        res.status(502).json({
          message: 'Bad Gateway: Failed to fetch query status from Trino.',
          trinoError: error.response.data
        });
      } else {
        res.status(504).send('Gateway Timeout during status check.');
      }
    }
  });
});

前端实现: Headless UI with Relay

前端的核心是管理异步查询的复杂状态。用户提交查询后,UI应显示“正在运行”状态,并禁用提交按钮。查询成功后,显示结果表格;失败则显示错误信息。Relay的数据驱动方法非常适合处理这种声明式UI状态。

1. GraphQL Schema 定义

我们不直接用GraphQL查询Trino,而是用它来建模与我们Serverless API层的交互。

# file: schema.graphql

type Mutation {
  """
  提交一个新的Trino查询。立即返回一个初始状态。
  """
  submitTrinoQuery(input: SubmitTrinoQueryInput!): TrinoQuery
}

input SubmitTrinoQueryInput {
  sql: String!
}

type Query {
  """
  根据轮询URI检查Trino查询的最新状态。
  """
  trinoQueryStatus(nextUri: String!): TrinoQuery
}

"""
代表一个Trino查询的执行状态和结果。
"""
type TrinoQuery {
  queryId: ID!
  status: TrinoQueryStatus!
  stats: TrinoQueryStats
  columns: [TrinoColumn]
  data: [[String]] # 数据以字符串数组的数组形式返回
  nextUri: String
  error: TrinoQueryError
}

enum TrinoQueryStatus {
  QUEUED
  RUNNING
  FINISHED
  FAILED
  CANCELED
}

type TrinoQueryStats {
  elapsedTime: String
  processedRows: Int
  processedBytes: Float
}

type TrinoColumn {
  name: String!
  type: String!
}

type TrinoQueryError {
    message: String
    errorCode: Int
    errorName: String
    errorType: String
}

2. Relay Hooks 和组件

我们将创建一个名为useTrinoQueryRunner的自定义Hook,它封装了提交和轮询的逻辑。

// components/TrinoQueryRunner.jsx

import React, { useState, useEffect, useCallback } from 'react';
import { useMutation, useLazyLoadQuery, graphql } from 'react-relay';
import { useQueryClient } from '@tanstack/react-query'; // Or any other way to manage polling interval

// 定义GraphQL操作
const SubmitTrinoQueryMutation = graphql`
  mutation TrinoQueryRunnerSubmitMutation($input: SubmitTrinoQueryInput!) {
    submitTrinoQuery(input: $input) {
      queryId
      status
      nextUri
    }
  }
`;

const CheckTrinoQueryStatusQuery = graphql`
  query TrinoQueryRunnerStatusQuery($nextUri: String!) {
    trinoQueryStatus(nextUri: $nextUri) {
      queryId
      status
      stats {
        elapsedTime
        processedRows
      }
      columns {
        name
        type
      }
      data
      nextUri
      error {
        message
        errorName
      }
    }
  }
`;

// Headless UI Hook: 只提供状态和行为,不渲染任何东西
const useTrinoQueryRunner = () => {
  const [commitMutation, isSubmitting] = useMutation(SubmitTrinoQueryMutation);
  const [queryState, setQueryState] = useState({
    status: 'IDLE', // IDLE, SUBMITTED, POLLING, FINISHED, FAILED
    data: null,
    error: null,
    nextUri: null,
  });

  const [refetch, isFetching] = useLazyLoadQuery(
      CheckTrinoQueryStatusQuery,
      { nextUri: queryState.nextUri },
      {
          fetchPolicy: 'network-only', // 确保每次都是新的请求
          onComplete: (data) => {
              const result = data.trinoQueryStatus;
              if (result.status === 'FINISHED' || result.status === 'FAILED' || result.status === 'CANCELED') {
                  setQueryState({
                      status: result.status,
                      data: result.data ? { columns: result.columns, data: result.data } : null,
                      error: result.error,
                      nextUri: null,
                  });
              } else {
                  // 继续轮询
                  setQueryState(prev => ({ ...prev, status: 'POLLING', nextUri: result.nextUri }));
              }
          },
          onError: (error) => {
              setQueryState({ status: 'FAILED', data: null, error, nextUri: null });
          }
      }
  );

  // 轮询逻辑
  useEffect(() => {
    if (queryState.status === 'POLLING' && queryState.nextUri) {
      const timer = setTimeout(() => {
        // Relay的refetch并不像React Query那样直接可用,
        // 这里我们通过重新触发lazy load query来实现轮询。
        // 一个更健壮的实现会使用`useRefetchableFragment`。
        // 为了简化,我们直接调用 refetch。
        refetch({ nextUri: queryState.nextUri });
      }, 2000); // 轮询间隔2秒

      return () => clearTimeout(timer);
    }
  }, [queryState.status, queryState.nextUri, refetch]);

  const runQuery = useCallback((sql) => {
    setQueryState({ status: 'SUBMITTED', data: null, error: null, nextUri: null });
    commitMutation({
      variables: { input: { sql } },
      onCompleted: (response) => {
        const result = response.submitTrinoQuery;
        if (result && result.nextUri) {
          setQueryState({ status: 'POLLING', data: null, error: null, nextUri: result.nextUri });
        } else {
          setQueryState({ status: 'FAILED', data: null, error: 'Submission failed to return a valid state.', nextUri: null });
        }
      },
      onError: (error) => {
        setQueryState({ status: 'FAILED', data: null, error, nextUri: null });
      },
    });
  }, [commitMutation]);

  return {
    state: queryState,
    isLoading: isSubmitting || queryState.status === 'POLLING' || queryState.status === 'SUBMITTED',
    runQuery,
  };
};


// 使用Hook的UI组件
// 注意这个组件几乎没有样式,只负责根据状态渲染不同的子组件
export function TrinoQueryInterface({ children }) {
  const { state, isLoading, runQuery } = useTrinoQueryRunner();
  const [sql, setSql] = useState('SELECT * FROM system.runtime.nodes LIMIT 10;');

  const handleSubmit = (e) => {
    e.preventDefault();
    runQuery(sql);
  };

  // children是一个渲染函数,将状态传递给调用者,由调用者决定如何渲染
  return children({
    sql,
    setSql,
    handleSubmit,
    isLoading,
    state,
  });
}

使用示例:

// my-page.jsx
import { TrinoQueryInterface } from './TrinoQueryRunner';
import { MyTextArea, MyButton, MyTable, MySpinner, MyErrorPanel } from './MyStyledComponents';

function MyDataExplorerPage() {
  return (
    <div>
      <h1>Trino 即席查询</h1>
      <TrinoQueryInterface>
        {({ sql, setSql, handleSubmit, isLoading, state }) => (
          <form onSubmit={handleSubmit}>
            <MyTextArea value={sql} onChange={(e) => setSql(e.target.value)} disabled={isLoading} />
            <MyButton type="submit" disabled={isLoading}>
              {isLoading ? '正在查询...' : '执行查询'}
            </MyButton>

            {isLoading && <MySpinner />}
            
            {state.status === 'FAILED' && <MyErrorPanel error={state.error} />}

            {state.status === 'FINISHED' && state.data && (
              <MyTable columns={state.data.columns} data={state.data.data} />
            )}
          </form>
        )}
      </TrinoQueryInterface>
    </div>
  );
}

这个实现体现了Headless UI的核心思想:TrinoQueryInterfaceuseTrinoQueryRunner提供了所有功能逻辑,但完全不关心MyButton是蓝色还是绿色,MyTable是否有边框。这种关注点分离使得核心查询逻辑可以在多个内部产品中以不同的外观被重用。

架构的扩展性与局限性

此架构的当前实现是一个扎实的起点,但并非没有局限。首先,轮询机制带来了固有的延迟,并且在查询执行期间会产生持续的、小规模的Cloud Function调用成本。对于需要更实时反馈的场景,这可能不是最佳选择。

其次,我们没有实现查询取消的逻辑。在Trino中,可以通过向查询ID发送DELETE请求来取消查询。这可以作为另一个Cloud Function cancel-trino-query来轻松添加。

最后,一个更高级的、事件驱动的架构可以完全消除轮询。例如,可以配置一个Trino事件监听器插件,当查询完成时,将事件(包含查询ID和状态)发布到Google Pub/Sub。然后,一个由Pub/Sub触发的Cloud Function可以将结果推送到一个Firestore文档或通过WebSocket直接通知前端。这种设计更为复杂,但提供了更好的实时性和效率,适合对延迟有更高要求的场景。它将系统的耦合点从客户端轮询转移到了后端的消息总线,是一种典型的架构演进路径。


  目录