This commit is contained in:
2026-03-18 20:43:42 +08:00
parent 92021eaa41
commit cf3422b1ca
8 changed files with 155 additions and 45 deletions

3
.vscode/launch.json vendored
View File

@@ -24,7 +24,8 @@
], ],
// 工作区根目录 // 工作区根目录
"cwd": "${workspaceFolder}\\server", "cwd": "${workspaceFolder}\\server",
"program": "${workspaceFolder}\\server\\app.js" "program": "${workspaceFolder}\\server\\app.js",
"args": ["--run_cron_now"]
} }
] ]
} }

View File

@@ -748,6 +748,56 @@ function run_pdp_action(product_url, injected_fn, inject_args, action_name, send
}); });
} }
function run_pdp_action_multi(product_url, steps, action_name, sendResponse) {
const send_action = (action, payload) => {
if (typeof sendResponse === 'function') {
sendResponse({ action, data: payload });
sendResponse.log && sendResponse.log(payload);
}
};
return new Promise(async (resolve, reject) => {
let url = product_url;
try {
url = normalize_product_url(product_url);
} catch (e) {
send_action(action_name, { code: 10, status: false, message: e.message, data: null });
return reject(e);
}
const tab_task = create_tab_task(url).set_bounds({ top: 20, left: 20, width: 1280, height: 900 });
try {
const tab = await tab_task.open_async();
await wait_tab_complete(tab.id);
const results = {};
for (const step of steps || []) {
if (!step || !step.name || typeof step.injected_fn !== 'function') continue;
const raw_list = await tab.execute_script(step.injected_fn, step.inject_args || [], 'document_idle');
const result = Array.isArray(raw_list) ? raw_list[0] : raw_list;
results[step.name] = result;
}
send_action(action_name, {
code: 0,
status: true,
message: 'ok',
data: { tab_id: tab.id, product_url: url, result: results },
});
resolve({ tab_id: tab.id, product_url: url, result: results });
tab.remove(0);
} catch (err) {
send_action(action_name, {
code: 30,
status: false,
message: (err && err.message) || String(err),
data: null,
documentURI: url,
});
reject(err);
}
});
}
export function amazon_product_detail(data, sendResponse) { export function amazon_product_detail(data, sendResponse) {
return run_pdp_action(data && data.product_url, injected_amazon_product_detail, [], 'amazon_product_detail', sendResponse); return run_pdp_action(data && data.product_url, injected_amazon_product_detail, [], 'amazon_product_detail', sendResponse);
} }
@@ -782,3 +832,26 @@ amazon_product_reviews.params = {
}, },
limit: { type: 'number', desc: '最多条数(默认 50上限 100', default: 50 }, limit: { type: 'number', desc: '最多条数(默认 50上限 100', default: 50 },
}; };
export function amazon_product_detail_reviews(data, sendResponse) {
const limit = data && data.limit != null ? Number(data.limit) : 50;
const skip_detail = data && data.skip_detail === true;
const steps = [];
if (!skip_detail) {
steps.push({ name: 'detail', injected_fn: injected_amazon_product_detail, inject_args: [] });
}
steps.push({ name: 'reviews', injected_fn: injected_amazon_product_reviews, inject_args: [{ limit }] });
return run_pdp_action_multi(data && data.product_url, steps, 'amazon_product_detail_reviews', sendResponse);
}
amazon_product_detail_reviews.desc = 'Amazon 商品详情 + 评论(同一详情页,支持 skip_detail=true';
amazon_product_detail_reviews.params = {
product_url: {
type: 'string',
desc: '商品详情页完整 URL含 /dp/ASIN',
default: 'https://www.amazon.com/-/zh/dp/B0B56CHMSC',
},
limit: { type: 'number', desc: '最多评论条数(默认 50上限 100', default: 50 },
skip_detail: { type: 'boolean', desc: '当日已拉过详情则跳过详情提取', default: false },
};

View File

@@ -4,6 +4,7 @@ import {
amazon_set_language, amazon_set_language,
amazon_product_detail, amazon_product_detail,
amazon_product_reviews, amazon_product_reviews,
amazon_product_detail_reviews,
} from '../actions/amazon.js'; } from '../actions/amazon.js';
const actions = { const actions = {
@@ -11,6 +12,7 @@ const actions = {
amazon_set_language, amazon_set_language,
amazon_product_detail, amazon_product_detail,
amazon_product_reviews, amazon_product_reviews,
amazon_product_detail_reviews,
}; };
function list_actions_meta() { function list_actions_meta() {

View File

@@ -28,6 +28,7 @@
<option value="amazon_set_language">amazon_set_language</option> <option value="amazon_set_language">amazon_set_language</option>
<option value="amazon_product_detail">amazon_product_detail</option> <option value="amazon_product_detail">amazon_product_detail</option>
<option value="amazon_product_reviews">amazon_product_reviews</option> <option value="amazon_product_reviews">amazon_product_reviews</option>
<option value="amazon_product_detail_reviews">amazon_product_detail_reviews</option>
</select> </select>
<label class="label">参数JSON</label> <label class="label">参数JSON</label>

View File

@@ -21,7 +21,7 @@ const port = cfg.server.port;
await sequelize.authenticate(); await sequelize.authenticate();
// await sequelize.sync(); // await sequelize.sync();
start_all_cron_tasks(); await start_all_cron_tasks();
app.listen(port); app.listen(port);
// eslint-disable-next-line no-console // eslint-disable-next-line no-console

View File

@@ -7,7 +7,7 @@ export const cron_task_list = [
// 任务流:先跑列表,再依赖列表 URL 跑详情+评论 // 任务流:先跑列表,再依赖列表 URL 跑详情+评论
{ {
name: 'amazon_search_detail_reviews_every_1h', name: 'amazon_search_detail_reviews_every_1h',
cron_expression: '*/15 * * * *', // 15分钟执行一次 cron_expression: '* */1 * * *', // 1小时执行一次
type: 'flow', type: 'flow',
flow_name: 'amazon_search_detail_reviews', flow_name: 'amazon_search_detail_reviews',
flow_payload: { flow_payload: {
@@ -16,7 +16,6 @@ export const cron_task_list = [
// featured / price_asc / price_desc / review / newest / bestseller // featured / price_asc / price_desc / review / newest / bestseller
sort_by: 'bestseller', sort_by: 'bestseller',
limit: 100, limit: 100,
// flow 自己的参数 // flow 自己的参数
reviews_limit: 50, reviews_limit: 50,
gap_ms: 500 gap_ms: 500

View File

@@ -3,6 +3,7 @@ import { sleep_ms } from '../flow_utils.js';
import { amazon_product, amazon_search_item, amazon_review } from '../../../models/index.js'; import { amazon_product, amazon_search_item, amazon_review } from '../../../models/index.js';
import { safe_json_stringify } from '../../json_utils.js'; import { safe_json_stringify } from '../../json_utils.js';
import { close_browser } from '../../puppeteer/puppeteer_runner.js'; import { close_browser } from '../../puppeteer/puppeteer_runner.js';
import { Op } from 'sequelize';
function build_batch_key(prefix) { function build_batch_key(prefix) {
return `${prefix}_${Date.now()}_${Math.random().toString().slice(2, 8)}`; return `${prefix}_${Date.now()}_${Math.random().toString().slice(2, 8)}`;
@@ -14,6 +15,24 @@ function pick_asin_from_url(url) {
return m && m[1] ? m[1].toUpperCase() : null; return m && m[1] ? m[1].toUpperCase() : null;
} }
function get_today_start() {
const d = new Date();
d.setHours(0, 0, 0, 0);
return d;
}
async function has_detail_fetched_today(asin) {
if (!asin) return false;
const row = await amazon_product.findOne({
attributes: ['asin', 'updated_at'],
where: {
asin,
updated_at: { [Op.gte]: get_today_start() }
}
});
return !!row;
}
function unwrap_action_result(res) { function unwrap_action_result(res) {
// 插件返回通常是 { ..., result: { stage, items... } } // 插件返回通常是 { ..., result: { stage, items... } }
if (res && typeof res === 'object' && res.result && typeof res.result === 'object') { if (res && typeof res === 'object' && res.result && typeof res.result === 'object') {
@@ -192,26 +211,25 @@ export async function run_amazon_search_detail_reviews_flow(flow_payload) {
if (gap_ms > 0) { if (gap_ms > 0) {
await sleep_ms(gap_ms); await sleep_ms(gap_ms);
} }
const detail_res = await execute_action_and_record({ const asin = pick_asin_from_url(url);
action_name: 'amazon_product_detail', const skip_detail = asin ? await has_detail_fetched_today(asin) : false;
action_payload: { product_url: url },
const res = await execute_action_and_record({
action_name: 'amazon_product_detail_reviews',
action_payload: { product_url: url, limit: reviews_limit, skip_detail },
source: 'cron' source: 'cron'
}); });
await persist_detail(detail_res); const r = unwrap_action_result(res);
const detail_part = r && r.detail ? r.detail : null;
const reviews_part = r && r.reviews ? r.reviews : null;
if (gap_ms > 0) { if (detail_part) {
await sleep_ms(gap_ms); await persist_detail(detail_part);
}
if (reviews_part) {
await persist_reviews(reviews_part);
} }
const reviews_res = await execute_action_and_record({
action_name: 'amazon_product_reviews',
action_payload: { product_url: url, limit: reviews_limit },
source: 'cron',
keep_browser_open: true
});
await persist_reviews(reviews_res);
} }

View File

@@ -1,24 +1,25 @@
import cron from 'node-cron'; import cron from 'node-cron';
import { cron_task_list } from '../config/cron_tasks.js'; import { cron_task_list } from '../config/cron_tasks.js';
import { execute_action_and_record } from './task_executor.js';
import { get_flow_runner } from './flows/flow_registry.js'; import { get_flow_runner } from './flows/flow_registry.js';
const cron_jobs = []; const cron_jobs = [];
const running_task_name_set = new Set(); const running_task_name_set = new Set();
function has_argv_flag(flag_name) {
const name = String(flag_name || '').trim();
if (!name) return false;
return process.argv.includes(name);
}
function should_run_cron_now() {
return has_argv_flag('--run_cron_now');
}
async function run_cron_task(task) { async function run_cron_task(task) {
if (!task || !task.type) { if (!task || !task.type) {
throw new Error('cron_task 缺少 type'); throw new Error('cron_task 缺少 type');
} }
if (task.type === 'action') {
await execute_action_and_record({
action_name: task.action_name,
action_payload: task.action_payload || {},
source: 'cron'
});
return;
}
if (task.type === 'flow') { if (task.type === 'flow') {
const run_flow = get_flow_runner(task.flow_name); const run_flow = get_flow_runner(task.flow_name);
@@ -29,10 +30,7 @@ async function run_cron_task(task) {
throw new Error(`cron_task type 不支持: ${task.type}`); throw new Error(`cron_task type 不支持: ${task.type}`);
} }
export async function start_all_cron_tasks() { async function run_cron_task_with_guard(task_name, task) {
for (const task of cron_task_list) {
const task_name = task && task.name ? String(task.name) : 'cron_task';
const job = cron.schedule(task.cron_expression, async () => {
if (running_task_name_set.has(task_name)) { if (running_task_name_set.has(task_name)) {
// eslint-disable-next-line no-console // eslint-disable-next-line no-console
console.log('[cron] skip (already running)', { name: task_name }); console.log('[cron] skip (already running)', { name: task_name });
@@ -42,13 +40,31 @@ export async function start_all_cron_tasks() {
running_task_name_set.add(task_name); running_task_name_set.add(task_name);
try { try {
await run_cron_task(task); await run_cron_task(task);
} catch (error) {
console.warn('[cron] error', { task_name, error });
} finally { } finally {
running_task_name_set.delete(task_name); running_task_name_set.delete(task_name);
} }
}
export async function start_all_cron_tasks() {
const run_now = should_run_cron_now();
for (const task of cron_task_list) {
const task_name = task && task.name ? String(task.name) : 'cron_task';
if (run_now) {
// eslint-disable-next-line no-console
console.log('[cron] run_now', { task_name });
await run_cron_task_with_guard(task_name, task);
}
else {
const job = cron.schedule(task.cron_expression, async () => {
await run_cron_task_with_guard(task_name, task);
}); });
console.log('job', { task_name, }); console.log('job', { task_name, });
cron_jobs.push(job); cron_jobs.push(job);
}
} }
} }