#!/usr/bin/env python3 """ 当前环境下自建的 HTTP 代理服务(支持 HTTP 与 HTTPS CONNECT)。 使用方式: # 安装依赖(与后端相同环境即可,无需额外包) pip install -r backend/requirements.txt # 可选,本脚本仅用标准库 # 启动(默认 0.0.0.0:8888) python scripts/proxy_server.py python scripts/proxy_server.py --host 0.0.0.0 --port 8888 # 或用环境变量 PROXY_PORT=9999 python scripts/proxy_server.py 客户端设置代理后即可走本机: export HTTP_PROXY=http://127.0.0.1:8888 export HTTPS_PROXY=http://127.0.0.1:8888 curl -x http://127.0.0.1:8888 https://example.com PySocks 已加入 requirements.txt,供后端/脚本作为客户端通过 SOCKS 代理访问时使用; 本代理服务仅用 Python 标准库 asyncio,不依赖 PySocks。 """ from __future__ import annotations import argparse import asyncio import logging import os logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", ) logger = logging.getLogger("proxy-server") async def handle_connect(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, host: str, port: int) -> None: """建立到目标 host:port 的隧道并双向转发。""" try: remote_reader, remote_writer = await asyncio.wait_for( asyncio.open_connection(host, port), timeout=15.0, ) except Exception as e: logger.warning("Connect to %s:%s failed: %s", host, port, e) writer.write(b"HTTP/1.1 502 Bad Gateway\r\nConnection: close\r\n\r\n") await writer.drain() writer.close() return try: writer.write(b"HTTP/1.1 200 Connection Established\r\n\r\n") await writer.drain() async def forward(a: asyncio.StreamReader, b: asyncio.StreamWriter, name: str) -> None: try: while True: data = await a.read(65536) if not data: break b.write(data) await b.drain() except (ConnectionResetError, BrokenPipeError, asyncio.CancelledError): pass finally: try: b.close() await b.wait_closed() except Exception: pass await asyncio.gather( forward(reader, remote_writer, "c->r"), forward(remote_reader, writer, "r->c"), ) except Exception as e: logger.debug("Tunnel error: %s", e) finally: try: writer.close() await writer.wait_closed() except Exception: pass try: remote_writer.close() await remote_writer.wait_closed() except Exception: pass async def handle_http_request(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, first_line: bytes, headers: list[bytes]) -> None: """解析请求并转发到目标主机,将响应回传给客户端。""" parts = first_line.decode("latin-1", errors="replace").split() if len(parts) < 3: writer.write(b"HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\n") await writer.drain() writer.close() return method, url, _ = parts[0], parts[1], parts[2] host = port = None for h in headers: if h.lower().startswith(b"host:"): host_line = h.split(b":", 1)[1].strip().decode("latin-1", errors="replace") if ":" in host_line: host, _, port_s = host_line.rpartition(":") port = int(port_s) else: host = host_line port = 80 break if not host: writer.write(b"HTTP/1.1 400 Bad Request\r\nConnection: close\r\n\r\nNo Host header\r\n") await writer.drain() writer.close() return if port is None: port = 80 try: remote_reader, remote_writer = await asyncio.wait_for( asyncio.open_connection(host, port), timeout=15.0, ) except Exception as e: logger.warning("Forward to %s:%s failed: %s", host, port, e) writer.write(b"HTTP/1.1 502 Bad Gateway\r\nConnection: close\r\n\r\n") await writer.drain() writer.close() return try: remote_writer.write(first_line + b"\r\n") remote_writer.write(b"\r\n".join(headers) + b"\r\n\r\n") await remote_writer.drain() body_len = 0 for h in headers: if h.lower().startswith(b"content-length:"): body_len = int(h.split(b":", 1)[1].strip()) break if body_len > 0: body = await reader.readexactly(body_len) remote_writer.write(body) await remote_writer.drain() response = await remote_reader.read(1024 * 1024) writer.write(response) await writer.drain() except Exception as e: logger.debug("HTTP forward error: %s", e) finally: try: remote_writer.close() await remote_writer.wait_closed() except Exception: pass try: writer.close() await writer.wait_closed() except Exception: pass async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: peername = writer.get_extra_info("peername", ("?", "?")) try: line = await asyncio.wait_for(reader.readline(), timeout=10.0) if not line: return first_line = line.rstrip(b"\r\n") headers = [] while True: line = await reader.readline() if line in (b"\r\n", b"\n"): break headers.append(line.rstrip(b"\r\n")) parts = first_line.split() if len(parts) >= 2 and parts[0].upper() == b"CONNECT": _, host_port, _ = parts[0], parts[1], parts[2] if len(parts) > 2 else b"" hp = host_port.decode("latin-1", errors="replace") if ":" in hp: host, _, port_s = hp.rpartition(":") port = int(port_s) else: host = hp port = 443 logger.info("CONNECT %s:%s from %s", host, port, peername) await handle_connect(reader, writer, host, port) else: logger.info("HTTP %s from %s", first_line[:60], peername) await handle_http_request(reader, writer, first_line, headers) except asyncio.TimeoutError: logger.warning("Timeout from %s", peername) except Exception as e: logger.debug("Client error %s: %s", peername, e) finally: try: writer.close() await writer.wait_closed() except Exception: pass async def main() -> None: parser = argparse.ArgumentParser(description="HTTP/HTTPS proxy server (CONNECT support)") parser.add_argument("--host", default=os.environ.get("PROXY_HOST", "0.0.0.0"), help="Bind host") parser.add_argument("--port", type=int, default=int(os.environ.get("PROXY_PORT", "8888")), help="Bind port") args = parser.parse_args() server = await asyncio.start_server(handle_client, args.host, args.port) addr = server.sockets[0].getsockname() if server.sockets else (args.host, args.port) logger.info("Proxy server listening on http://%s:%s", addr[0], addr[1]) async with server: await server.serve_forever() if __name__ == "__main__": asyncio.run(main())