diff --git a/.vscode/launch.json b/.vscode/launch.json index bfa3efc..b0edc21 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -24,7 +24,8 @@ ], // 工作区根目录 "cwd": "${workspaceFolder}\\server", - "program": "${workspaceFolder}\\server\\app.js" + "program": "${workspaceFolder}\\server\\app.js", + "args": ["--run_cron_now"] } ] -} \ No newline at end of file +} diff --git a/mv2_simple_crx/src/actions/amazon.js b/mv2_simple_crx/src/actions/amazon.js index 39689cb..1675bc6 100644 --- a/mv2_simple_crx/src/actions/amazon.js +++ b/mv2_simple_crx/src/actions/amazon.js @@ -748,6 +748,56 @@ function run_pdp_action(product_url, injected_fn, inject_args, action_name, send }); } +function run_pdp_action_multi(product_url, steps, action_name, sendResponse) { + const send_action = (action, payload) => { + if (typeof sendResponse === 'function') { + sendResponse({ action, data: payload }); + sendResponse.log && sendResponse.log(payload); + } + }; + return new Promise(async (resolve, reject) => { + let url = product_url; + try { + url = normalize_product_url(product_url); + } catch (e) { + send_action(action_name, { code: 10, status: false, message: e.message, data: null }); + return reject(e); + } + + const tab_task = create_tab_task(url).set_bounds({ top: 20, left: 20, width: 1280, height: 900 }); + try { + const tab = await tab_task.open_async(); + await wait_tab_complete(tab.id); + + const results = {}; + for (const step of steps || []) { + if (!step || !step.name || typeof step.injected_fn !== 'function') continue; + const raw_list = await tab.execute_script(step.injected_fn, step.inject_args || [], 'document_idle'); + const result = Array.isArray(raw_list) ? raw_list[0] : raw_list; + results[step.name] = result; + } + + send_action(action_name, { + code: 0, + status: true, + message: 'ok', + data: { tab_id: tab.id, product_url: url, result: results }, + }); + resolve({ tab_id: tab.id, product_url: url, result: results }); + tab.remove(0); + } catch (err) { + send_action(action_name, { + code: 30, + status: false, + message: (err && err.message) || String(err), + data: null, + documentURI: url, + }); + reject(err); + } + }); +} + export function amazon_product_detail(data, sendResponse) { return run_pdp_action(data && data.product_url, injected_amazon_product_detail, [], 'amazon_product_detail', sendResponse); } @@ -782,3 +832,26 @@ amazon_product_reviews.params = { }, limit: { type: 'number', desc: '最多条数(默认 50,上限 100)', default: 50 }, }; + +export function amazon_product_detail_reviews(data, sendResponse) { + const limit = data && data.limit != null ? Number(data.limit) : 50; + const skip_detail = data && data.skip_detail === true; + const steps = []; + if (!skip_detail) { + steps.push({ name: 'detail', injected_fn: injected_amazon_product_detail, inject_args: [] }); + } + steps.push({ name: 'reviews', injected_fn: injected_amazon_product_reviews, inject_args: [{ limit }] }); + + return run_pdp_action_multi(data && data.product_url, steps, 'amazon_product_detail_reviews', sendResponse); +} + +amazon_product_detail_reviews.desc = 'Amazon 商品详情 + 评论(同一详情页,支持 skip_detail=true)'; +amazon_product_detail_reviews.params = { + product_url: { + type: 'string', + desc: '商品详情页完整 URL(含 /dp/ASIN)', + default: 'https://www.amazon.com/-/zh/dp/B0B56CHMSC', + }, + limit: { type: 'number', desc: '最多评论条数(默认 50,上限 100)', default: 50 }, + skip_detail: { type: 'boolean', desc: '当日已拉过详情则跳过详情提取', default: false }, +}; diff --git a/mv2_simple_crx/src/background/index.js b/mv2_simple_crx/src/background/index.js index b8b3d06..400eae9 100644 --- a/mv2_simple_crx/src/background/index.js +++ b/mv2_simple_crx/src/background/index.js @@ -4,6 +4,7 @@ import { amazon_set_language, amazon_product_detail, amazon_product_reviews, + amazon_product_detail_reviews, } from '../actions/amazon.js'; const actions = { @@ -11,6 +12,7 @@ const actions = { amazon_set_language, amazon_product_detail, amazon_product_reviews, + amazon_product_detail_reviews, }; function list_actions_meta() { diff --git a/mv2_simple_crx/src/ui/index.html b/mv2_simple_crx/src/ui/index.html index 05158bb..e9a0da0 100644 --- a/mv2_simple_crx/src/ui/index.html +++ b/mv2_simple_crx/src/ui/index.html @@ -28,6 +28,7 @@ + diff --git a/server/app.js b/server/app.js index 29c0523..a369e55 100644 --- a/server/app.js +++ b/server/app.js @@ -21,7 +21,7 @@ const port = cfg.server.port; await sequelize.authenticate(); // await sequelize.sync(); -start_all_cron_tasks(); +await start_all_cron_tasks(); app.listen(port); // eslint-disable-next-line no-console diff --git a/server/config/cron_tasks.js b/server/config/cron_tasks.js index d9d8139..daf1435 100644 --- a/server/config/cron_tasks.js +++ b/server/config/cron_tasks.js @@ -7,7 +7,7 @@ export const cron_task_list = [ // 任务流:先跑列表,再依赖列表 URL 跑详情+评论 { name: 'amazon_search_detail_reviews_every_1h', - cron_expression: '*/15 * * * *', // 15分钟执行一次 + cron_expression: '* */1 * * *', // 1小时执行一次 type: 'flow', flow_name: 'amazon_search_detail_reviews', flow_payload: { @@ -16,7 +16,6 @@ export const cron_task_list = [ // featured / price_asc / price_desc / review / newest / bestseller sort_by: 'bestseller', limit: 100, - // flow 自己的参数 reviews_limit: 50, gap_ms: 500 diff --git a/server/services/flows/amazon/amazon_search_detail_reviews_flow.js b/server/services/flows/amazon/amazon_search_detail_reviews_flow.js index 71945f4..86e67b2 100644 --- a/server/services/flows/amazon/amazon_search_detail_reviews_flow.js +++ b/server/services/flows/amazon/amazon_search_detail_reviews_flow.js @@ -3,6 +3,7 @@ import { sleep_ms } from '../flow_utils.js'; import { amazon_product, amazon_search_item, amazon_review } from '../../../models/index.js'; import { safe_json_stringify } from '../../json_utils.js'; import { close_browser } from '../../puppeteer/puppeteer_runner.js'; +import { Op } from 'sequelize'; function build_batch_key(prefix) { return `${prefix}_${Date.now()}_${Math.random().toString().slice(2, 8)}`; @@ -14,6 +15,24 @@ function pick_asin_from_url(url) { return m && m[1] ? m[1].toUpperCase() : null; } +function get_today_start() { + const d = new Date(); + d.setHours(0, 0, 0, 0); + return d; +} + +async function has_detail_fetched_today(asin) { + if (!asin) return false; + const row = await amazon_product.findOne({ + attributes: ['asin', 'updated_at'], + where: { + asin, + updated_at: { [Op.gte]: get_today_start() } + } + }); + return !!row; +} + function unwrap_action_result(res) { // 插件返回通常是 { ..., result: { stage, items... } } if (res && typeof res === 'object' && res.result && typeof res.result === 'object') { @@ -192,26 +211,25 @@ export async function run_amazon_search_detail_reviews_flow(flow_payload) { if (gap_ms > 0) { await sleep_ms(gap_ms); } - const detail_res = await execute_action_and_record({ - action_name: 'amazon_product_detail', - action_payload: { product_url: url }, + const asin = pick_asin_from_url(url); + const skip_detail = asin ? await has_detail_fetched_today(asin) : false; + + const res = await execute_action_and_record({ + action_name: 'amazon_product_detail_reviews', + action_payload: { product_url: url, limit: reviews_limit, skip_detail }, source: 'cron' }); - await persist_detail(detail_res); + const r = unwrap_action_result(res); + const detail_part = r && r.detail ? r.detail : null; + const reviews_part = r && r.reviews ? r.reviews : null; - if (gap_ms > 0) { - await sleep_ms(gap_ms); + if (detail_part) { + await persist_detail(detail_part); + } + if (reviews_part) { + await persist_reviews(reviews_part); } - - const reviews_res = await execute_action_and_record({ - action_name: 'amazon_product_reviews', - action_payload: { product_url: url, limit: reviews_limit }, - source: 'cron', - keep_browser_open: true - }); - - await persist_reviews(reviews_res); } diff --git a/server/services/schedule_loader.js b/server/services/schedule_loader.js index 594b96b..56d3d1a 100644 --- a/server/services/schedule_loader.js +++ b/server/services/schedule_loader.js @@ -1,24 +1,25 @@ import cron from 'node-cron'; import { cron_task_list } from '../config/cron_tasks.js'; -import { execute_action_and_record } from './task_executor.js'; import { get_flow_runner } from './flows/flow_registry.js'; const cron_jobs = []; const running_task_name_set = new Set(); +function has_argv_flag(flag_name) { + const name = String(flag_name || '').trim(); + if (!name) return false; + return process.argv.includes(name); +} + +function should_run_cron_now() { + return has_argv_flag('--run_cron_now'); +} + async function run_cron_task(task) { if (!task || !task.type) { throw new Error('cron_task 缺少 type'); } - if (task.type === 'action') { - await execute_action_and_record({ - action_name: task.action_name, - action_payload: task.action_payload || {}, - source: 'cron' - }); - return; - } if (task.type === 'flow') { const run_flow = get_flow_runner(task.flow_name); @@ -29,26 +30,41 @@ async function run_cron_task(task) { throw new Error(`cron_task type 不支持: ${task.type}`); } +async function run_cron_task_with_guard(task_name, task) { + if (running_task_name_set.has(task_name)) { + // eslint-disable-next-line no-console + console.log('[cron] skip (already running)', { name: task_name }); + return; + } + + running_task_name_set.add(task_name); + try { + await run_cron_task(task); + } catch (error) { + console.warn('[cron] error', { task_name, error }); + } finally { + running_task_name_set.delete(task_name); + } +} + export async function start_all_cron_tasks() { + const run_now = should_run_cron_now(); for (const task of cron_task_list) { - const task_name = task && task.name ? String(task.name) : 'cron_task'; - const job = cron.schedule(task.cron_expression, async () => { - if (running_task_name_set.has(task_name)) { - // eslint-disable-next-line no-console - console.log('[cron] skip (already running)', { name: task_name }); - return; - } + const task_name = task && task.name ? String(task.name) : 'cron_task'; - running_task_name_set.add(task_name); - try { - await run_cron_task(task); - } finally { - running_task_name_set.delete(task_name); - } - }); - console.log('job', { task_name, }); + if (run_now) { + // eslint-disable-next-line no-console + console.log('[cron] run_now', { task_name }); + await run_cron_task_with_guard(task_name, task); + } + else { + const job = cron.schedule(task.cron_expression, async () => { + await run_cron_task_with_guard(task_name, task); + }); + console.log('job', { task_name, }); + cron_jobs.push(job); - cron_jobs.push(job); + } } }