diff --git a/server/config/cron_tasks.js b/server/config/cron_tasks.js index 5782d9f..ab02cf9 100644 --- a/server/config/cron_tasks.js +++ b/server/config/cron_tasks.js @@ -11,11 +11,14 @@ export const cron_task_list = [ type: 'flow', flow_name: 'amazon_search_detail_reviews', flow_payload: { - keyword: '野餐包', + // 插件参数:category_keyword / sort_by / limit + category_keyword: '野餐包', + // featured / price_asc / price_desc / review / newest / bestseller + sort_by: 'bestseller', limit: 100, - max_products: 30, + + // flow 自己的参数 reviews_limit: 50, - concurrency: 1, gap_ms: 500 } } diff --git a/server/services/flows/amazon/amazon_search_detail_reviews_flow.js b/server/services/flows/amazon/amazon_search_detail_reviews_flow.js index eabc839..8bf04b8 100644 --- a/server/services/flows/amazon/amazon_search_detail_reviews_flow.js +++ b/server/services/flows/amazon/amazon_search_detail_reviews_flow.js @@ -2,6 +2,7 @@ import { execute_action_and_record } from '../../task_executor.js'; import { map_limit, sleep_ms } from '../flow_utils.js'; import { amazon_product, amazon_search_item, amazon_review } from '../../../models/index.js'; import { safe_json_stringify } from '../../json_utils.js'; +import { close_browser } from '../../puppeteer/puppeteer_runner.js'; function build_batch_key(prefix) { return `${prefix}_${Date.now()}_${Math.random().toString().slice(2, 8)}`; @@ -13,7 +14,34 @@ function pick_asin_from_url(url) { return m && m[1] ? m[1].toUpperCase() : null; } -async function persist_detail(detail_res) { +function unwrap_action_result(res) { + // 插件返回通常是 { ..., result: { stage, items... } } + if (res && typeof res === 'object' && res.result && typeof res.result === 'object') { + return res.result; + } + return res; +} + +function normalize_sort_by(sort_by) { + if (sort_by === undefined || sort_by === null || sort_by === '') { + return null; + } + + const s = String(sort_by).trim(); + if (!s) { + return null; + } + + const allow = new Set(['featured', 'price_asc', 'price_desc', 'review', 'newest', 'bestseller']); + if (!allow.has(s)) { + throw new Error(`sort_by 不支持: ${s}`); + } + + return s; +} + +async function persist_detail(detail_res_raw) { + const detail_res = unwrap_action_result(detail_res_raw); if (!detail_res || detail_res.stage !== 'detail') { return; } @@ -48,7 +76,8 @@ async function persist_detail(detail_res) { }); } -async function persist_list(list_res) { +async function persist_list(list_res_raw) { + const list_res = unwrap_action_result(list_res_raw); if (!list_res || list_res.stage !== 'list') { return; } @@ -77,7 +106,8 @@ async function persist_list(list_res) { } } -async function persist_reviews(reviews_res) { +async function persist_reviews(reviews_res_raw) { + const reviews_res = unwrap_action_result(reviews_res_raw); if (!reviews_res || reviews_res.stage !== 'reviews') { return; } @@ -110,10 +140,10 @@ async function persist_reviews(reviews_res) { } function must_string(v, name) { - if (!v || typeof v !== 'string') { + if (typeof v !== 'string' || !v.trim()) { throw new Error(`flow 参数 ${name} 必须是字符串`); } - return v; + return v.trim(); } function get_int(v, default_value) { @@ -123,40 +153,47 @@ function get_int(v, default_value) { } export async function run_amazon_search_detail_reviews_flow(flow_payload) { - const keyword = must_string(flow_payload.keyword, 'keyword'); + const category_keyword = must_string(flow_payload.category_keyword, 'category_keyword'); + const sort_by = normalize_sort_by(flow_payload.sort_by); const limit = get_int(flow_payload.limit, 100); - const max_products = get_int(flow_payload.max_products, 30); const reviews_limit = get_int(flow_payload.reviews_limit, 50); - const concurrency = get_int(flow_payload.concurrency, 1); const gap_ms = get_int(flow_payload.gap_ms, 0); + + + const list_payload = { category_keyword, limit }; + if (sort_by) { + list_payload.sort_by = sort_by; + } + const list_res = await execute_action_and_record({ action_name: 'amazon_search_list', - action_payload: { keyword, limit }, + action_payload: list_payload, source: 'cron' }); await persist_list(list_res); - if (!list_res || list_res.stage !== 'list') { + const list_result = unwrap_action_result(list_res); + if (!list_result || list_result.stage !== 'list') { throw new Error('amazon_search_list 返回非 list stage'); } - const items = Array.isArray(list_res.items) ? list_res.items : []; + const items = Array.isArray(list_result.items) ? list_result.items : []; const urls = items .map((it) => (it && it.url ? String(it.url) : '')) .filter((u) => u); - const picked_urls = urls.slice(0, Math.max(0, max_products)); + const picked_urls = urls; - await map_limit(picked_urls, concurrency, async (product_url) => { + + for (const url of picked_urls) { if (gap_ms > 0) { await sleep_ms(gap_ms); } - const detail_res = await execute_action_and_record({ action_name: 'amazon_product_detail', - action_payload: { product_url }, + action_payload: { product_url: url }, source: 'cron' }); @@ -168,23 +205,27 @@ export async function run_amazon_search_detail_reviews_flow(flow_payload) { const reviews_res = await execute_action_and_record({ action_name: 'amazon_product_reviews', - action_payload: { product_url, limit: reviews_limit }, - source: 'cron' + action_payload: { product_url: url, limit: reviews_limit }, + source: 'cron', + keep_browser_open: true }); await persist_reviews(reviews_res); + } - return null; - }); - return { + const summary = { stage: 'flow', flow_name: 'amazon_search_detail_reviews', - keyword, + category_keyword, + sort_by: sort_by || 'featured', limit, - max_products, reviews_limit, total_urls: urls.length, picked: picked_urls.length }; + + await close_browser(); + + return summary; } diff --git a/server/services/flows/flow_utils.js b/server/services/flows/flow_utils.js index 9979e67..8b1790b 100644 --- a/server/services/flows/flow_utils.js +++ b/server/services/flows/flow_utils.js @@ -2,9 +2,8 @@ export async function sleep_ms(ms) { await new Promise((resolve) => setTimeout(resolve, ms)); } -export async function map_limit(items, limit, worker) { +export async function map_limit(items, worker) { const list = Array.isArray(items) ? items : []; - const n = Math.max(1, Number(limit || 1)); const res = new Array(list.length); let idx = 0; @@ -18,7 +17,7 @@ export async function map_limit(items, limit, worker) { } const runners = []; - for (let i = 0; i < Math.min(n, list.length); i += 1) { + for (let i = 0; i < list.length; i += 1) { runners.push(run_one()); } diff --git a/server/services/puppeteer/puppeteer_runner.js b/server/services/puppeteer/puppeteer_runner.js index b55e0e6..49fa1b1 100644 --- a/server/services/puppeteer/puppeteer_runner.js +++ b/server/services/puppeteer/puppeteer_runner.js @@ -6,6 +6,18 @@ import { apply_page_stealth_defaults, get_stealth_puppeteer } from './puppeteer_ let browser_singleton = null; +export async function close_browser() { + if (!browser_singleton) { + return; + } + try { + await browser_singleton.close(); + } catch (err) { + // ignore + } + browser_singleton = null; +} + function get_action_timeout_ms() { const cfg = get_app_config(); return cfg.crawler.action_timeout_ms; @@ -94,7 +106,7 @@ export async function get_or_create_browser() { return browser_singleton; } -export async function invoke_extension_action(action_name, action_payload) { +export async function invoke_extension_action(action_name, action_payload, options) { const cfg = get_app_config(); const browser = await get_or_create_browser(); @@ -189,13 +201,9 @@ export async function invoke_extension_action(action_name, action_payload) { } } - if (cfg.crawler.auto_close_browser) { - try { - await browser.close(); - } catch (err) { - // ignore - } - browser_singleton = null; + const keep_browser_open = options && options.keep_browser_open === true; + if (cfg.crawler.auto_close_browser && !keep_browser_open) { + await close_browser(); } } } diff --git a/server/services/schedule_loader.js b/server/services/schedule_loader.js index b49cc83..b06b1c7 100644 --- a/server/services/schedule_loader.js +++ b/server/services/schedule_loader.js @@ -30,15 +30,15 @@ async function run_cron_task(task) { export async function start_all_cron_tasks() { for (const task of cron_task_list) { - const job = cron.schedule(task.cron_expression, async () => { + // const job = cron.schedule(task.cron_expression, async () => { try { await run_cron_task(task); } catch (err) { // action 内部已记录 crawl_run_record;flow 内部 action 也会记录 } - }); + // }); - cron_jobs.push(job); + // cron_jobs.push(job); } } diff --git a/server/services/task_executor.js b/server/services/task_executor.js index cf0cbe4..4251356 100644 --- a/server/services/task_executor.js +++ b/server/services/task_executor.js @@ -3,7 +3,7 @@ import { safe_json_stringify } from './json_utils.js'; import { invoke_extension_action } from './puppeteer/puppeteer_runner.js'; export async function execute_action_and_record(params) { - const { action_name, action_payload, source } = params; + const { action_name, action_payload, source, keep_browser_open } = params; const request_payload = safe_json_stringify(action_payload || {}); @@ -12,12 +12,14 @@ export async function execute_action_and_record(params) { let error_message = null; try { - const result = await invoke_extension_action(action_name, action_payload || {}); + const result_obj = await invoke_extension_action(action_name, action_payload || {}, { + keep_browser_open: keep_browser_open === true + }); ok = true; - result_payload = safe_json_stringify(result); + result_payload = safe_json_stringify(result_obj); - return result; + return result_obj.result; } catch (err) { ok = false; error_message = (err && err.message) || String(err);