From cf3422b1cad67b0900d24e1b1f32df549c1a072e Mon Sep 17 00:00:00 2001
From: light <978854603@qq.com>
Date: Wed, 18 Mar 2026 20:43:42 +0800
Subject: [PATCH] 1
---
.vscode/launch.json | 5 +-
mv2_simple_crx/src/actions/amazon.js | 73 +++++++++++++++++++
mv2_simple_crx/src/background/index.js | 2 +
mv2_simple_crx/src/ui/index.html | 1 +
server/app.js | 2 +-
server/config/cron_tasks.js | 3 +-
.../amazon_search_detail_reviews_flow.js | 48 ++++++++----
server/services/schedule_loader.js | 66 ++++++++++-------
8 files changed, 155 insertions(+), 45 deletions(-)
diff --git a/.vscode/launch.json b/.vscode/launch.json
index bfa3efc..b0edc21 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -24,7 +24,8 @@
],
// 工作区根目录
"cwd": "${workspaceFolder}\\server",
- "program": "${workspaceFolder}\\server\\app.js"
+ "program": "${workspaceFolder}\\server\\app.js",
+ "args": ["--run_cron_now"]
}
]
-}
\ No newline at end of file
+}
diff --git a/mv2_simple_crx/src/actions/amazon.js b/mv2_simple_crx/src/actions/amazon.js
index 39689cb..1675bc6 100644
--- a/mv2_simple_crx/src/actions/amazon.js
+++ b/mv2_simple_crx/src/actions/amazon.js
@@ -748,6 +748,56 @@ function run_pdp_action(product_url, injected_fn, inject_args, action_name, send
});
}
+function run_pdp_action_multi(product_url, steps, action_name, sendResponse) {
+ const send_action = (action, payload) => {
+ if (typeof sendResponse === 'function') {
+ sendResponse({ action, data: payload });
+ sendResponse.log && sendResponse.log(payload);
+ }
+ };
+ return new Promise(async (resolve, reject) => {
+ let url = product_url;
+ try {
+ url = normalize_product_url(product_url);
+ } catch (e) {
+ send_action(action_name, { code: 10, status: false, message: e.message, data: null });
+ return reject(e);
+ }
+
+ const tab_task = create_tab_task(url).set_bounds({ top: 20, left: 20, width: 1280, height: 900 });
+ try {
+ const tab = await tab_task.open_async();
+ await wait_tab_complete(tab.id);
+
+ const results = {};
+ for (const step of steps || []) {
+ if (!step || !step.name || typeof step.injected_fn !== 'function') continue;
+ const raw_list = await tab.execute_script(step.injected_fn, step.inject_args || [], 'document_idle');
+ const result = Array.isArray(raw_list) ? raw_list[0] : raw_list;
+ results[step.name] = result;
+ }
+
+ send_action(action_name, {
+ code: 0,
+ status: true,
+ message: 'ok',
+ data: { tab_id: tab.id, product_url: url, result: results },
+ });
+ resolve({ tab_id: tab.id, product_url: url, result: results });
+ tab.remove(0);
+ } catch (err) {
+ send_action(action_name, {
+ code: 30,
+ status: false,
+ message: (err && err.message) || String(err),
+ data: null,
+ documentURI: url,
+ });
+ reject(err);
+ }
+ });
+}
+
export function amazon_product_detail(data, sendResponse) {
return run_pdp_action(data && data.product_url, injected_amazon_product_detail, [], 'amazon_product_detail', sendResponse);
}
@@ -782,3 +832,26 @@ amazon_product_reviews.params = {
},
limit: { type: 'number', desc: '最多条数(默认 50,上限 100)', default: 50 },
};
+
+export function amazon_product_detail_reviews(data, sendResponse) {
+ const limit = data && data.limit != null ? Number(data.limit) : 50;
+ const skip_detail = data && data.skip_detail === true;
+ const steps = [];
+ if (!skip_detail) {
+ steps.push({ name: 'detail', injected_fn: injected_amazon_product_detail, inject_args: [] });
+ }
+ steps.push({ name: 'reviews', injected_fn: injected_amazon_product_reviews, inject_args: [{ limit }] });
+
+ return run_pdp_action_multi(data && data.product_url, steps, 'amazon_product_detail_reviews', sendResponse);
+}
+
+amazon_product_detail_reviews.desc = 'Amazon 商品详情 + 评论(同一详情页,支持 skip_detail=true)';
+amazon_product_detail_reviews.params = {
+ product_url: {
+ type: 'string',
+ desc: '商品详情页完整 URL(含 /dp/ASIN)',
+ default: 'https://www.amazon.com/-/zh/dp/B0B56CHMSC',
+ },
+ limit: { type: 'number', desc: '最多评论条数(默认 50,上限 100)', default: 50 },
+ skip_detail: { type: 'boolean', desc: '当日已拉过详情则跳过详情提取', default: false },
+};
diff --git a/mv2_simple_crx/src/background/index.js b/mv2_simple_crx/src/background/index.js
index b8b3d06..400eae9 100644
--- a/mv2_simple_crx/src/background/index.js
+++ b/mv2_simple_crx/src/background/index.js
@@ -4,6 +4,7 @@ import {
amazon_set_language,
amazon_product_detail,
amazon_product_reviews,
+ amazon_product_detail_reviews,
} from '../actions/amazon.js';
const actions = {
@@ -11,6 +12,7 @@ const actions = {
amazon_set_language,
amazon_product_detail,
amazon_product_reviews,
+ amazon_product_detail_reviews,
};
function list_actions_meta() {
diff --git a/mv2_simple_crx/src/ui/index.html b/mv2_simple_crx/src/ui/index.html
index 05158bb..e9a0da0 100644
--- a/mv2_simple_crx/src/ui/index.html
+++ b/mv2_simple_crx/src/ui/index.html
@@ -28,6 +28,7 @@
+
diff --git a/server/app.js b/server/app.js
index 29c0523..a369e55 100644
--- a/server/app.js
+++ b/server/app.js
@@ -21,7 +21,7 @@ const port = cfg.server.port;
await sequelize.authenticate();
// await sequelize.sync();
-start_all_cron_tasks();
+await start_all_cron_tasks();
app.listen(port);
// eslint-disable-next-line no-console
diff --git a/server/config/cron_tasks.js b/server/config/cron_tasks.js
index d9d8139..daf1435 100644
--- a/server/config/cron_tasks.js
+++ b/server/config/cron_tasks.js
@@ -7,7 +7,7 @@ export const cron_task_list = [
// 任务流:先跑列表,再依赖列表 URL 跑详情+评论
{
name: 'amazon_search_detail_reviews_every_1h',
- cron_expression: '*/15 * * * *', // 15分钟执行一次
+ cron_expression: '* */1 * * *', // 1小时执行一次
type: 'flow',
flow_name: 'amazon_search_detail_reviews',
flow_payload: {
@@ -16,7 +16,6 @@ export const cron_task_list = [
// featured / price_asc / price_desc / review / newest / bestseller
sort_by: 'bestseller',
limit: 100,
-
// flow 自己的参数
reviews_limit: 50,
gap_ms: 500
diff --git a/server/services/flows/amazon/amazon_search_detail_reviews_flow.js b/server/services/flows/amazon/amazon_search_detail_reviews_flow.js
index 71945f4..86e67b2 100644
--- a/server/services/flows/amazon/amazon_search_detail_reviews_flow.js
+++ b/server/services/flows/amazon/amazon_search_detail_reviews_flow.js
@@ -3,6 +3,7 @@ import { sleep_ms } from '../flow_utils.js';
import { amazon_product, amazon_search_item, amazon_review } from '../../../models/index.js';
import { safe_json_stringify } from '../../json_utils.js';
import { close_browser } from '../../puppeteer/puppeteer_runner.js';
+import { Op } from 'sequelize';
function build_batch_key(prefix) {
return `${prefix}_${Date.now()}_${Math.random().toString().slice(2, 8)}`;
@@ -14,6 +15,24 @@ function pick_asin_from_url(url) {
return m && m[1] ? m[1].toUpperCase() : null;
}
+function get_today_start() {
+ const d = new Date();
+ d.setHours(0, 0, 0, 0);
+ return d;
+}
+
+async function has_detail_fetched_today(asin) {
+ if (!asin) return false;
+ const row = await amazon_product.findOne({
+ attributes: ['asin', 'updated_at'],
+ where: {
+ asin,
+ updated_at: { [Op.gte]: get_today_start() }
+ }
+ });
+ return !!row;
+}
+
function unwrap_action_result(res) {
// 插件返回通常是 { ..., result: { stage, items... } }
if (res && typeof res === 'object' && res.result && typeof res.result === 'object') {
@@ -192,26 +211,25 @@ export async function run_amazon_search_detail_reviews_flow(flow_payload) {
if (gap_ms > 0) {
await sleep_ms(gap_ms);
}
- const detail_res = await execute_action_and_record({
- action_name: 'amazon_product_detail',
- action_payload: { product_url: url },
+ const asin = pick_asin_from_url(url);
+ const skip_detail = asin ? await has_detail_fetched_today(asin) : false;
+
+ const res = await execute_action_and_record({
+ action_name: 'amazon_product_detail_reviews',
+ action_payload: { product_url: url, limit: reviews_limit, skip_detail },
source: 'cron'
});
- await persist_detail(detail_res);
+ const r = unwrap_action_result(res);
+ const detail_part = r && r.detail ? r.detail : null;
+ const reviews_part = r && r.reviews ? r.reviews : null;
- if (gap_ms > 0) {
- await sleep_ms(gap_ms);
+ if (detail_part) {
+ await persist_detail(detail_part);
+ }
+ if (reviews_part) {
+ await persist_reviews(reviews_part);
}
-
- const reviews_res = await execute_action_and_record({
- action_name: 'amazon_product_reviews',
- action_payload: { product_url: url, limit: reviews_limit },
- source: 'cron',
- keep_browser_open: true
- });
-
- await persist_reviews(reviews_res);
}
diff --git a/server/services/schedule_loader.js b/server/services/schedule_loader.js
index 594b96b..56d3d1a 100644
--- a/server/services/schedule_loader.js
+++ b/server/services/schedule_loader.js
@@ -1,24 +1,25 @@
import cron from 'node-cron';
import { cron_task_list } from '../config/cron_tasks.js';
-import { execute_action_and_record } from './task_executor.js';
import { get_flow_runner } from './flows/flow_registry.js';
const cron_jobs = [];
const running_task_name_set = new Set();
+function has_argv_flag(flag_name) {
+ const name = String(flag_name || '').trim();
+ if (!name) return false;
+ return process.argv.includes(name);
+}
+
+function should_run_cron_now() {
+ return has_argv_flag('--run_cron_now');
+}
+
async function run_cron_task(task) {
if (!task || !task.type) {
throw new Error('cron_task 缺少 type');
}
- if (task.type === 'action') {
- await execute_action_and_record({
- action_name: task.action_name,
- action_payload: task.action_payload || {},
- source: 'cron'
- });
- return;
- }
if (task.type === 'flow') {
const run_flow = get_flow_runner(task.flow_name);
@@ -29,26 +30,41 @@ async function run_cron_task(task) {
throw new Error(`cron_task type 不支持: ${task.type}`);
}
+async function run_cron_task_with_guard(task_name, task) {
+ if (running_task_name_set.has(task_name)) {
+ // eslint-disable-next-line no-console
+ console.log('[cron] skip (already running)', { name: task_name });
+ return;
+ }
+
+ running_task_name_set.add(task_name);
+ try {
+ await run_cron_task(task);
+ } catch (error) {
+ console.warn('[cron] error', { task_name, error });
+ } finally {
+ running_task_name_set.delete(task_name);
+ }
+}
+
export async function start_all_cron_tasks() {
+ const run_now = should_run_cron_now();
for (const task of cron_task_list) {
- const task_name = task && task.name ? String(task.name) : 'cron_task';
- const job = cron.schedule(task.cron_expression, async () => {
- if (running_task_name_set.has(task_name)) {
- // eslint-disable-next-line no-console
- console.log('[cron] skip (already running)', { name: task_name });
- return;
- }
+ const task_name = task && task.name ? String(task.name) : 'cron_task';
- running_task_name_set.add(task_name);
- try {
- await run_cron_task(task);
- } finally {
- running_task_name_set.delete(task_name);
- }
- });
- console.log('job', { task_name, });
+ if (run_now) {
+ // eslint-disable-next-line no-console
+ console.log('[cron] run_now', { task_name });
+ await run_cron_task_with_guard(task_name, task);
+ }
+ else {
+ const job = cron.schedule(task.cron_expression, async () => {
+ await run_cron_task_with_guard(task_name, task);
+ });
+ console.log('job', { task_name, });
+ cron_jobs.push(job);
- cron_jobs.push(job);
+ }
}
}