92 lines
2.6 KiB
Python
92 lines
2.6 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import pandas as pd
|
|
|
|
from market import create_provider, load_market_config
|
|
from market.service import DISPLAY_FIELDS, resolve_candidates
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Stock quick lookup (A-share)")
|
|
parser.add_argument("query", nargs="?", default="", help="stock code/name/abbr")
|
|
parser.add_argument("--limit", type=int, default=8, help="max candidate rows")
|
|
return parser.parse_args()
|
|
|
|
|
|
def safe_str(value: object) -> str:
|
|
if value is None:
|
|
return ""
|
|
if isinstance(value, float) and pd.isna(value):
|
|
return ""
|
|
return str(value).strip()
|
|
|
|
|
|
def print_row(row: pd.Series) -> None:
|
|
print("\n===== 股票信息 =====")
|
|
for field in DISPLAY_FIELDS:
|
|
if field in row.index:
|
|
print(f"{field}: {safe_str(row[field])}")
|
|
print("===================\n")
|
|
|
|
|
|
def pick_candidate(candidates: pd.DataFrame) -> pd.Series | None:
|
|
if len(candidates) == 1:
|
|
return candidates.iloc[0]
|
|
|
|
print("\n匹配到多个标的,请选择:")
|
|
for idx, (_, row) in enumerate(candidates.iterrows(), start=1):
|
|
print(f"{idx}. {safe_str(row['代码'])} {safe_str(row['名称'])} 最新价={safe_str(row.get('最新价'))}")
|
|
|
|
choice = input("输入序号(直接回车默认 1): ").strip()
|
|
if not choice:
|
|
return candidates.iloc[0]
|
|
if not choice.isdigit():
|
|
print("输入无效")
|
|
return None
|
|
pos = int(choice)
|
|
if pos < 1 or pos > len(candidates):
|
|
print("输入超出范围")
|
|
return None
|
|
return candidates.iloc[pos - 1]
|
|
|
|
|
|
def run_once(query: str, limit: int) -> None:
|
|
provider = create_provider(load_market_config())
|
|
candidates = resolve_candidates(provider, query, limit)
|
|
if candidates.empty:
|
|
print(f"未找到匹配股票: {query}")
|
|
return
|
|
selected = pick_candidate(candidates)
|
|
if selected is None:
|
|
return
|
|
print_row(selected)
|
|
|
|
|
|
def interactive_loop(limit: int) -> None:
|
|
print("股票速查已启动,输入代码/名称(输入 q 退出)")
|
|
while True:
|
|
query = input("查询> ").strip()
|
|
if query.lower() in {"q", "quit", "exit"}:
|
|
print("已退出股票速查")
|
|
return
|
|
if not query:
|
|
continue
|
|
try:
|
|
run_once(query, limit)
|
|
except Exception as exc: # noqa: BLE001
|
|
print(f"查询失败: {exc}")
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
if not args.query:
|
|
interactive_loop(args.limit)
|
|
return
|
|
|
|
run_once(args.query, args.limit)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|