This commit is contained in:
丹尼尔
2026-03-12 19:35:06 +08:00
commit ad96272ab6
40 changed files with 2645 additions and 0 deletions

View File

@@ -0,0 +1 @@
__all__ = []

View File

@@ -0,0 +1,108 @@
import json
import os
from typing import Any, Dict
from openai import AsyncOpenAI
_client: AsyncOpenAI | None = None
def get_ai_client() -> AsyncOpenAI:
"""
Create (or reuse) a singleton AsyncOpenAI client.
The client is configured via:
- AI_API_KEY / OPENAI_API_KEY
- AI_BASE_URL (optional, defaults to official OpenAI endpoint)
- AI_MODEL (optional, defaults to gpt-4.1-mini or a similar capable model)
"""
global _client
if _client is not None:
return _client
api_key = os.getenv("AI_API_KEY") or os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("AI_API_KEY or OPENAI_API_KEY must be set in environment.")
base_url = os.getenv("AI_BASE_URL") # can point to OpenAI, DeepSeek, Qwen, etc.
_client = AsyncOpenAI(
api_key=api_key,
base_url=base_url or None,
)
return _client
def _build_requirement_prompt(raw_text: str) -> str:
"""
Build a clear system/user prompt for requirement analysis.
The model must output valid JSON only.
"""
return (
"你是一名资深的系统架构师,请阅读以下来自客户的原始需求文本,"
"提炼出清晰的交付方案,并严格按照指定 JSON 结构输出。\n\n"
"【要求】\n"
"1. 按功能模块拆分需求。\n"
"2. 每个模块给出简要说明和技术实现思路。\n"
"3. 估算建议工时(以人天或人小时为单位,使用数字)。\n"
"4. 可以根据你的经验给出每个模块的单价与小计金额,并给出总金额,"
"方便后续生成报价单。\n\n"
"【返回格式】请只返回 JSON不要包含任何额外说明文字\n"
"{\n"
' "modules": [\n'
" {\n"
' "name": "模块名称",\n'
' "description": "模块说明(可以为 Markdown 格式)",\n'
' "technical_approach": "技术实现思路Markdown 格式)",\n'
' "estimated_hours": 16,\n'
' "unit_price": 800,\n'
' "subtotal": 12800\n'
" }\n"
" ],\n"
' "total_estimated_hours": 40,\n'
' "total_amount": 32000,\n'
' "notes": "整体方案备注可选Markdown 格式)"\n'
"}\n\n"
f"【客户原始需求】\n{raw_text}"
)
async def analyze_requirement(raw_text: str) -> Dict[str, Any]:
"""
Call the AI model to analyze customer requirements.
Returns a Python dict matching the JSON structure described
in `_build_requirement_prompt`.
"""
client = get_ai_client()
model = os.getenv("AI_MODEL", "gpt-4.1-mini")
prompt = _build_requirement_prompt(raw_text)
completion = await client.chat.completions.create(
model=model,
response_format={"type": "json_object"},
messages=[
{
"role": "system",
"content": (
"你是一名严谨的系统架构师,只能输出有效的 JSON不要输出任何解释文字。"
),
},
{
"role": "user",
"content": prompt,
},
],
temperature=0.2,
)
content = completion.choices[0].message.content or "{}"
try:
data: Dict[str, Any] = json.loads(content)
except json.JSONDecodeError as exc:
raise RuntimeError(f"AI 返回的内容不是合法 JSON{content}") from exc
return data

View File

@@ -0,0 +1,189 @@
import asyncio
from pathlib import Path
from typing import Any, Dict, List
from docx import Document
from openpyxl import load_workbook
from reportlab.lib.pagesizes import A4
from reportlab.pdfgen import canvas
async def generate_quote_excel(
project_data: Dict[str, Any],
template_path: str,
output_path: str,
) -> str:
"""
Generate an Excel quote based on a template and structured project data.
project_data is expected to have the following structure (from AI JSON):
{
"modules": [
{
"name": "...",
"description": "...",
"technical_approach": "...",
"estimated_hours": 16,
"unit_price": 800,
"subtotal": 12800
},
...
],
"total_estimated_hours": 40,
"total_amount": 32000,
"notes": "..."
}
"""
async def _work() -> str:
template = Path(template_path)
output = Path(output_path)
output.parent.mkdir(parents=True, exist_ok=True)
wb = load_workbook(template)
# Assume the first worksheet is used for the quote.
ws = wb.active
modules: List[Dict[str, Any]] = project_data.get("modules", [])
total_amount = project_data.get("total_amount")
total_hours = project_data.get("total_estimated_hours")
notes = project_data.get("notes")
# Example layout assumptions (adjust cell coordinates to match your template):
# - Starting row for line items: 10
# - Columns:
# A: index, B: module name, C: description,
# D: estimated hours, E: unit price, F: subtotal
start_row = 10
for idx, module in enumerate(modules, start=1):
row = start_row + idx - 1
ws[f"A{row}"] = idx
ws[f"B{row}"] = module.get("name")
ws[f"C{row}"] = module.get("description")
ws[f"D{row}"] = module.get("estimated_hours")
ws[f"E{row}"] = module.get("unit_price")
ws[f"F{row}"] = module.get("subtotal")
# Place total hours and amount in typical footer cells (adjust as needed).
if total_hours is not None:
ws["D5"] = total_hours # e.g., total hours
if total_amount is not None:
ws["F5"] = total_amount # e.g., total amount
if notes:
ws["B6"] = notes
wb.save(output)
return str(output)
return await asyncio.to_thread(_work)
def _replace_in_paragraphs(paragraphs, mapping: Dict[str, str]) -> None:
for paragraph in paragraphs:
for placeholder, value in mapping.items():
if placeholder in paragraph.text:
# Rebuild runs to preserve basic formatting as much as possible.
inline = paragraph.runs
text = paragraph.text.replace(placeholder, value)
# Clear existing runs
for i in range(len(inline) - 1, -1, -1):
paragraph.runs[i].clear()
paragraph.runs[i].text = ""
# Add a single run with replaced text
paragraph.add_run(text)
def _replace_in_tables(tables, mapping: Dict[str, str]) -> None:
for table in tables:
for row in table.rows:
for cell in row.cells:
_replace_in_paragraphs(cell.paragraphs, mapping)
async def generate_contract_word(
contract_data: Dict[str, str],
template_path: str,
output_path: str,
) -> str:
"""
Generate a contract Word document by replacing placeholders.
contract_data is a flat dict like:
{
"{{CUSTOMER_NAME}}": "张三",
"{{TOTAL_PRICE}}": "¥32,000",
"{{DELIVERY_DATE}}": "2026-03-31",
...
}
"""
async def _work() -> str:
template = Path(template_path)
output = Path(output_path)
output.parent.mkdir(parents=True, exist_ok=True)
doc = Document(str(template))
_replace_in_paragraphs(doc.paragraphs, contract_data)
_replace_in_tables(doc.tables, contract_data)
doc.save(str(output))
return str(output)
return await asyncio.to_thread(_work)
async def generate_quote_pdf_from_data(
project_data: Dict[str, Any],
output_pdf_path: str,
) -> str:
"""
Generate a simple PDF quote summary directly from structured data.
This does not render the Excel visually, but provides a clean PDF
that can be sent to customers.
"""
async def _work() -> str:
output = Path(output_pdf_path)
output.parent.mkdir(parents=True, exist_ok=True)
c = canvas.Canvas(str(output), pagesize=A4)
width, height = A4
y = height - 40
c.setFont("Helvetica-Bold", 14)
c.drawString(40, y, "报价单 Quote")
y -= 30
c.setFont("Helvetica", 10)
modules: List[Dict[str, Any]] = project_data.get("modules", [])
for idx, module in enumerate(modules, start=1):
name = module.get("name", "")
hours = module.get("estimated_hours", "")
subtotal = module.get("subtotal", "")
line = f"{idx}. {name} - 工时: {hours}, 小计: {subtotal}"
c.drawString(40, y, line)
y -= 16
if y < 80:
c.showPage()
y = height - 40
c.setFont("Helvetica", 10)
total_amount = project_data.get("total_amount")
total_hours = project_data.get("total_estimated_hours")
y -= 10
c.setFont("Helvetica-Bold", 11)
if total_hours is not None:
c.drawString(40, y, f"总工时 Total Hours: {total_hours}")
y -= 18
if total_amount is not None:
c.drawString(40, y, f"总金额 Total Amount: {total_amount}")
c.showPage()
c.save()
return str(output)
return await asyncio.to_thread(_work)

View File

@@ -0,0 +1,215 @@
import asyncio
import email
import imaplib
import os
from datetime import datetime
from email.header import decode_header
from pathlib import Path
from typing import Any, Dict, List, Tuple
from backend.app.db import SessionLocal
from backend.app.models import FinanceRecord
FINANCE_BASE_DIR = Path("data/finance")
def _decode_header_value(value: str | None) -> str:
if not value:
return ""
parts = decode_header(value)
decoded = ""
for text, enc in parts:
if isinstance(text, bytes):
decoded += text.decode(enc or "utf-8", errors="ignore")
else:
decoded += text
return decoded
def _classify_type(subject: str) -> str:
"""
Classify finance document type based on subject keywords.
"""
subject_lower = subject.lower()
if any(k in subject for k in ["发票", "invoice"]):
return "invoices"
if any(k in subject for k in ["流水", "bank", "对账单", "statement"]):
return "bank_records"
if any(k in subject for k in ["回执", "receipt"]):
return "receipts"
return "others"
def _ensure_month_dir(month_str: str, doc_type: str) -> Path:
month_dir = FINANCE_BASE_DIR / month_str / doc_type
month_dir.mkdir(parents=True, exist_ok=True)
return month_dir
def _parse_email_date(msg: email.message.Message) -> datetime:
date_tuple = email.utils.parsedate_tz(msg.get("Date"))
if date_tuple:
dt = datetime.fromtimestamp(email.utils.mktime_tz(date_tuple))
else:
dt = datetime.utcnow()
return dt
def _save_attachment(
msg: email.message.Message,
month_str: str,
doc_type: str,
) -> List[Tuple[str, str]]:
"""
Save PDF/image attachments and return list of (file_name, file_path).
"""
saved: List[Tuple[str, str]] = []
base_dir = _ensure_month_dir(month_str, doc_type)
for part in msg.walk():
content_disposition = part.get("Content-Disposition", "")
if "attachment" not in content_disposition:
continue
filename = part.get_filename()
filename = _decode_header_value(filename)
if not filename:
continue
content_type = part.get_content_type()
maintype = part.get_content_maintype()
# Accept pdf and common images
if maintype not in ("application", "image"):
continue
data = part.get_payload(decode=True)
if not data:
continue
file_path = base_dir / filename
# Ensure unique filename
counter = 1
while file_path.exists():
stem = file_path.stem
suffix = file_path.suffix
file_path = base_dir / f"{stem}_{counter}{suffix}"
counter += 1
with open(file_path, "wb") as f:
f.write(data)
saved.append((filename, str(file_path)))
return saved
async def sync_finance_emails() -> List[Dict[str, Any]]:
"""
Connect to IMAP, fetch unread finance-related emails, download attachments,
save to filesystem and record FinanceRecord entries.
"""
def _sync() -> List[Dict[str, Any]]:
host = os.getenv("IMAP_HOST")
user = os.getenv("IMAP_USER")
password = os.getenv("IMAP_PASSWORD")
port = int(os.getenv("IMAP_PORT", "993"))
mailbox = os.getenv("IMAP_MAILBOX", "INBOX")
if not all([host, user, password]):
raise RuntimeError("IMAP_HOST, IMAP_USER, IMAP_PASSWORD must be set.")
results: List[Dict[str, Any]] = []
with imaplib.IMAP4_SSL(host, port) as imap:
imap.login(user, password)
imap.select(mailbox)
# Search for UNSEEN emails with finance related keywords in subject.
# Note: IMAP SEARCH is limited; here we search UNSEEN first then filter in Python.
status, data = imap.search(None, "UNSEEN")
if status != "OK":
return results
id_list = data[0].split()
db = SessionLocal()
try:
for msg_id in id_list:
status, msg_data = imap.fetch(msg_id, "(RFC822)")
if status != "OK":
continue
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email)
subject = _decode_header_value(msg.get("Subject"))
doc_type = _classify_type(subject)
# Filter by keywords first
if doc_type == "others":
continue
dt = _parse_email_date(msg)
month_str = dt.strftime("%Y-%m")
saved_files = _save_attachment(msg, month_str, doc_type)
for file_name, file_path in saved_files:
record = FinanceRecord(
month=month_str,
type=doc_type,
file_name=file_name,
file_path=file_path,
)
# NOTE: created_at defaults at DB layer
db.add(record)
db.flush()
results.append(
{
"id": record.id,
"month": record.month,
"type": record.type,
"file_name": record.file_name,
"file_path": record.file_path,
}
)
# Mark email as seen and flagged to avoid re-processing
imap.store(msg_id, "+FLAGS", "\\Seen \\Flagged")
db.commit()
finally:
db.close()
return results
return await asyncio.to_thread(_sync)
async def create_monthly_zip(month_str: str) -> str:
"""
Zip the finance folder for a given month (YYYY-MM) and return the zip path.
"""
import zipfile
def _zip() -> str:
month_dir = FINANCE_BASE_DIR / month_str
if not month_dir.exists():
raise FileNotFoundError(f"Finance directory for {month_str} not found.")
FINANCE_BASE_DIR.mkdir(parents=True, exist_ok=True)
zip_path = FINANCE_BASE_DIR / f"{month_str}.zip"
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for root, _, files in os.walk(month_dir):
for file in files:
full_path = Path(root) / file
rel_path = full_path.relative_to(FINANCE_BASE_DIR)
zf.write(full_path, arcname=rel_path)
return str(zip_path)
return await asyncio.to_thread(_zip)