处理 Runway Alpeh 视频生成完成的 webhook 通知
提交生成请求
callBackUrl
参数:{
"prompt": "转换为梦幻水彩画风格,配以柔和流动的运动效果",
"videoUrl": "https://example.com/input-video.mp4",
"callBackUrl": "https://your-app.com/webhook/aleph-callback"
}
接收任务 ID
{
"code": 200,
"msg": "success",
"data": {
"taskId": "ee603959-debb-48d1-98c4-a6d1c717eba6"
}
}
处理回调
{
"code": 200,
"msg": "success",
"data": {
"result_video_url": "https://file.com/k/xxxxxxx.mp4",
"result_image_url": "https://file.com/m/xxxxxxxx.png"
},
"taskId": "ee603959-debb-48d1-98c4-a6d1c717eba6"
}
200
:视频生成成功400
:由于内容政策或技术问题生成失败{
"code": 400,
"msg": "您的提示词被我们的 AI 审查员捕获。请调整后重试!",
"data": null,
"taskId": "ee603959-debb-48d1-98c4-a6d1c717eba6"
}
const express = require('express');
const app = express();
// 解析 JSON 的中间件
app.use(express.json());
// Aleph 视频生成的回调端点
app.post('/webhook/aleph-callback', (req, res) => {
try {
const { code, msg, data } = req.body;
console.log(`收到任务回调:${data.task_id}`);
if (code === 200) {
// 成功 - 视频已生成
console.log('视频生成成功!');
console.log('视频 URL:', data.video_url);
console.log('缩略图 URL:', data.image_url);
// 处理成功的生成
handleSuccessfulGeneration(data);
} else {
// 生成过程中发生错误
console.error('生成失败:', msg);
// 处理错误
handleGenerationError(data.task_id, msg);
}
// 始终响应 200 以确认收到
res.status(200).json({
code: 200,
msg: '回调接收成功'
});
} catch (error) {
console.error('处理回调时出错:', error);
res.status(500).json({
code: 500,
msg: '处理回调时出错'
});
}
});
async function handleSuccessfulGeneration(data) {
try {
// 使用视频信息更新数据库
await updateTaskStatus(data.task_id, 'completed', {
videoUrl: data.video_url,
thumbnailUrl: data.image_url,
videoId: data.video_id
});
// 可选择下载并存储视频
await downloadAndStoreVideo(data.video_url, data.task_id);
// 通知用户或触发工作流程中的下一步
await notifyUser(data.task_id, '视频生成完成!');
} catch (error) {
console.error('处理成功生成时出错:', error);
}
}
async function handleGenerationError(taskId, errorMessage) {
try {
// 使用错误状态更新数据库
await updateTaskStatus(taskId, 'failed', { error: errorMessage });
// 通知用户失败
await notifyUser(taskId, `视频生成失败:${errorMessage}`);
} catch (error) {
console.error('处理生成错误时出错:', error);
}
}
app.listen(3000, () => {
console.log('Webhook 服务器监听端口 3000');
});
from flask import Flask, request, jsonify
import logging
import requests
from datetime import datetime
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
@app.route('/webhook/aleph-callback', methods=['POST'])
def aleph_callback():
try:
data = request.get_json()
if not data:
return jsonify({'code': 400, 'msg': '无效的 JSON 负载'}), 400
code = data.get('code')
msg = data.get('msg', '')
callback_data = data.get('data', {})
task_id = data.get('taskId')
logging.info(f"收到任务回调:{task_id}")
if code == 200:
# 成功 - 视频已生成
video_url = callback_data.get('result_video_url')
image_url = callback_data.get('result_image_url')
logging.info(f"视频生成成功:{video_url}")
# 处理成功生成
handle_successful_generation(task_id, callback_data)
else:
# 发生错误
logging.error(f"任务 {task_id} 生成失败:{msg}")
handle_generation_error(task_id, msg)
# 始终返回 200 以确认收到
return jsonify({'code': 200, 'msg': '回调接收成功'})
except Exception as e:
logging.error(f"处理回调时出错:{str(e)}")
return jsonify({'code': 500, 'msg': '处理回调时出错'}), 500
def handle_successful_generation(task_id, data):
"""处理成功的视频生成"""
try:
# 更新数据库
update_task_status(task_id, 'completed', {
'video_url': data['result_video_url'],
'image_url': data['result_image_url'],
'completed_at': datetime.utcnow()
})
# 如需要下载视频
# download_video(video_url, task_id)
# 发送通知
notify_user(task_id, '您的 Aleph 视频已准备就绪!')
except Exception as e:
logging.error(f"处理成功生成时出错:{str(e)}")
def handle_generation_error(task_id, error_message):
"""处理生成错误"""
try:
# 更新数据库
update_task_status(task_id, 'failed', {
'error_message': error_message,
'failed_at': datetime.utcnow()
})
# 发送错误通知
notify_user(task_id, f'视频生成失败:{error_message}')
except Exception as e:
logging.error(f"处理生成错误时出错:{str(e)}")
def update_task_status(task_id, status, additional_data=None):
"""在数据库中更新任务状态"""
# 在此处实现您的数据库更新逻辑
logging.info(f"更新任务 {task_id} 状态为 {status}")
def notify_user(task_id, message):
"""发送用户通知"""
# 在此处实现您的通知逻辑
logging.info(f"为任务 {task_id} 通知用户:{message}")
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=False)
<?php
header('Content-Type: application/json');
// 启用错误日志记录
error_reporting(E_ALL);
ini_set('log_errors', 1);
ini_set('error_log', 'callback_errors.log');
try {
// 获取 JSON 输入
$input = file_get_contents('php://input');
$data = json_decode($input, true);
if (!$data) {
http_response_code(400);
echo json_encode(['code' => 400, 'msg' => '无效的 JSON 负载']);
exit;
}
$code = $data['code'] ?? null;
$msg = $data['msg'] ?? '';
$callbackData = $data['data'] ?? [];
$taskId = $callbackData['task_id'] ?? null;
error_log("收到任务回调:" . $taskId);
if ($code === 200) {
// 成功 - 视频已生成
$videoUrl = $callbackData['video_url'] ?? '';
$imageUrl = $callbackData['image_url'] ?? '';
$videoId = $callbackData['video_id'] ?? '';
error_log("视频生成成功:" . $videoUrl);
handleSuccessfulGeneration($taskId, $videoUrl, $imageUrl, $videoId);
} else {
// 发生错误
error_log("任务 $taskId 生成失败:" . $msg);
handleGenerationError($taskId, $msg);
}
// 始终返回 200 以确认收到
http_response_code(200);
echo json_encode(['code' => 200, 'msg' => '回调接收成功']);
} catch (Exception $e) {
error_log("处理回调时出错:" . $e->getMessage());
http_response_code(500);
echo json_encode(['code' => 500, 'msg' => '处理回调时出错']);
}
function handleSuccessfulGeneration($taskId, $videoUrl, $imageUrl, $videoId) {
try {
// 更新数据库
updateTaskStatus($taskId, 'completed', [
'video_url' => $videoUrl,
'image_url' => $imageUrl,
'video_id' => $videoId,
'completed_at' => date('Y-m-d H:i:s')
]);
// 发送通知
notifyUser($taskId, '您的 Aleph 视频已准备就绪!');
} catch (Exception $e) {
error_log("处理成功生成时出错:" . $e->getMessage());
}
}
function handleGenerationError($taskId, $errorMessage) {
try {
// 更新数据库
updateTaskStatus($taskId, 'failed', [
'error_message' => $errorMessage,
'failed_at' => date('Y-m-d H:i:s')
]);
// 发送通知
notifyUser($taskId, "视频生成失败:$errorMessage");
} catch (Exception $e) {
error_log("处理生成错误时出错:" . $e->getMessage());
}
}
function updateTaskStatus($taskId, $status, $additionalData = []) {
// 在此处实现您的数据库更新逻辑
error_log("更新任务 $taskId 状态为 $status");
// 使用 PDO 的示例:
/*
$pdo = new PDO($dsn, $username, $password);
$stmt = $pdo->prepare("UPDATE tasks SET status = ?, updated_at = NOW() WHERE task_id = ?");
$stmt->execute([$status, $taskId]);
*/
}
function notifyUser($taskId, $message) {
// 在此处实现您的通知逻辑
error_log("为任务 $taskId 通知用户:$message");
// 示例:发送邮件、推送通知等
}
?>
验证回调来源
User-Agent
头// 示例:基本验证
app.post('/webhook/aleph-callback', (req, res) => {
// 验证必需字段
const { code, data } = req.body;
if (typeof code !== 'number' || !data || !data.task_id) {
return res.status(400).json({
code: 400,
msg: '无效的回调负载'
});
}
// 处理有效回调
// ...
});
处理重复回调
const processedTasks = new Set();
app.post('/webhook/aleph-callback', (req, res) => {
const taskId = req.body.data?.task_id;
if (processedTasks.has(taskId)) {
console.log(`任务 ${taskId} 已处理,跳过`);
return res.status(200).json({ code: 200, msg: '已处理' });
}
// 处理回调
processCallback(req.body);
processedTasks.add(taskId);
res.status(200).json({ code: 200, msg: '处理成功' });
});
错误处理和重试逻辑
app.post('/webhook/aleph-callback', async (req, res) => {
try {
await processCallbackWithRetry(req.body);
res.status(200).json({ code: 200, msg: '成功' });
} catch (error) {
// 记录错误但仍返回 200 以防止重试
console.error('回调处理错误:', error);
res.status(200).json({ code: 200, msg: '已接收' });
}
});
async function processCallbackWithRetry(data, maxRetries = 3) {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
await processCallback(data);
return; // 成功
} catch (error) {
if (attempt === maxRetries) throw error;
const delay = Math.pow(2, attempt) * 1000; // 指数退避
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
# 安装 ngrok
npm install -g ngrok
# 暴露本地端口 3000
ngrok http 3000
# 使用提供的 HTTPS URL 作为您的回调 URL
# 示例:https://abc123.ngrok.io/webhook/aleph-callback
未收到回调
重复或丢失的回调
回调负载问题
function validateCallback(payload) {
const required = ['code', 'msg', 'data'];
const missing = required.filter(field => !(field in payload));
if (missing.length > 0) {
throw new Error(`缺少必需字段:${missing.join(', ')}`);
}
if (payload.code === 200 && payload.data) {
const dataRequired = ['task_id', 'video_url', 'image_url'];
const dataMissing = dataRequired.filter(field => !(field in payload.data));
if (dataMissing.length > 0) {
throw new Error(`缺少数据字段:${dataMissing.join(', ')}`);
}
}
}