為大型系統和長時間運行的背景任務構建。
圖片來源:Ilias Chebbi 於 Unsplash幾個月前,我承擔了一個需要為媒體(音頻)流建立基礎設施的角色。但除了將音頻作為可流式傳輸的塊提供外,還有長時間運行的媒體處理任務和一個廣泛的 RAG 管道,用於轉錄、轉碼、嵌入和順序媒體更新。以生產思維構建 MVP 使我們不斷迭代,直到實現了無縫系統。我們的方法是整合功能和優先級的底層堆疊。
在構建過程中,每次迭代都是對即時且通常"全面"需求的回應。最初的關注點是排隊任務,Redis 很容易滿足這一需求;我們只需發送並忘記。NEST JS 框架中的 Bull MQ 讓我們對重試、積壓和死信隊列有了更好的控制。在本地和生產環境中使用少量負載,我們正確處理了媒體流。我們很快就被可觀察性的重擔所困擾:
日誌 → 任務記錄(請求、響應、錯誤)。
指標 → 這些任務運行、失敗、完成的頻率/數量等。
追蹤 → 任務在服務間的路徑(流程路徑中調用的函數/方法)。
你可以通過設計 API 並構建自定義儀表板來解決其中一些問題,但可擴展性問題將會出現。事實上,我們確實設計了 API。
面對管理複雜、長時間運行的後端工作流的挑戰,其中失敗必須可恢復,狀態必須持久,Inngest 成為了我們架構上的救星。它從根本上重塑了我們的方法:每個長時間運行的背景任務變成了一個背景函數,由特定事件觸發。
例如,Transcription.request 事件將觸發 TranscribeAudio 函數。這個函數可能包含以下步驟運行:fetch_audio_metadata、deepgram_transcribe、parse_save_trasncription 和 notify_user。
核心持久性原語是步驟運行。背景函數在內部被分解為這些步驟運行,每個步驟包含最小的、原子級的邏輯塊。
Inngest 函數摘要:
import { inngest } from 'inngest-client';
export const createMyFunction = (dependencies) => {
return inngest.createFunction(
{
id: 'my-function',
name: 'My Example Function',
retries: 3, // retry the entire run on failure
concurrency: { limit: 5 },
onFailure: async ({ event, error, step }) => {
// handle errors here
await step.run('handle-error', async () => {
console.error('Error processing event:', error);
});
},
},
{ event: 'my/event.triggered' },
async ({ event, step }) => {
const { payload } = event.data;
// Step 1: Define first step
const step1Result = await step.run('step-1', async () => {
// logic for step 1
return `Processed ${payload}`;
});
// Step 2: Define second step
const step2Result = await step.run('step-2', async () => {
// logic for step 2
return step1Result + ' -> step 2';
});
// Step N: Continue as needed
await step.run('final-step', async () => {
// finalization logic
console.log('Finished processing:', step2Result);
});
return { success: true };
},
);
};
Inngest 的事件驅動模型提供了對每個工作流執行的細粒度洞察:
依賴純事件處理的缺點是,雖然 Inngest 有效地排隊函數執行,但事件本身並不在傳統消息代理意義上內部排隊。在高流量場景中,由於潛在的競爭條件或如果攝取端點不堪重負而導致的事件丟失,這種明確事件隊列的缺失可能會有問題。
為了解決這個問題並強制執行嚴格的事件持久性,我們實施了一個專用的排隊系統作為緩衝。
AWS 簡單隊列系統 (SQS) 是我們的選擇(雖然任何強大的排隊系統都可行),考慮到我們在 AWS 上的現有基礎設施。我們設計了一個雙隊列系統:一個主隊列和一個死信隊列 (DLQ)。
我們建立了一個 Elastic Beanstalk (EB) 工作環境,專門配置為直接從主隊列消費消息。如果主隊列中的消息在 EB 工作者處理一定次數後失敗,主隊列會自動將失敗的消息移至專用的 DLQ。這確保了如果事件無法觸發或被 Inngest 拾取,不會永久丟失。這個工作環境與標準 EB Web 服務器環境不同,因為它的唯一責任是消息消費和處理(在這種情況下,將消費的消息轉發到 Inngest API 端點)。
構建企業級基礎設施的一個被低估且相當重要的部分是它消耗資源,而且這些資源是長時間運行的。微服務架構為每個服務提供可擴展性。存儲、RAM 和資源超時將發揮作用。例如,我們的 AWS 實例類型規格迅速從 t3.micro 移至 t3.small,現在固定在 t3.medium。對於長時間運行、CPU 密集型的背景任務,使用微小實例進行水平擴展會失敗,因為瓶頸是處理單個任務所需的時間,而不是進入隊列的新任務數量。
像轉碼、嵌入這樣的任務或函數通常是CPU 受限和內存受限的。CPU 受限是因為它們需要持續、密集的 CPU 使用,而內存受限是因為它們通常需要大量 RAM 來加載大型模型或有效處理大型文件或負載。
最終,這種增強的架構,將 SQS 的持久性和 EB 工作環境的受控執行直接放在 Inngest API 的上游,提供了基本的彈性。我們實現了嚴格的事件所有權,消除了流量高峰期間的競爭條件,並獲得了非易失性死信機制。我們利用 Inngest 進行工作流編排和調試功能,同時依靠 AWS 原語實現最大消息吞吐量和持久性。由此產生的系統不僅可擴展,而且高度可審計,成功地將複雜、長時間運行的後端任務轉化為安全、可觀察和容錯的微步驟。
為講道構建 Spotify。最初發表於 Medium 的 Coinmonks,人們通過突出顯示和回應這個故事繼續對話。


