114 lines
3.4 KiB
Python
114 lines
3.4 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from collections import OrderedDict
|
|
|
|
import matplotlib.pyplot as plt
|
|
import mplfinance as mpf
|
|
import pandas as pd
|
|
import redis
|
|
|
|
from data_protocol import KlineMessage
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Realtime K-line viewer from Redis")
|
|
parser.add_argument("--redis-host", default="127.0.0.1", help="Redis host")
|
|
parser.add_argument("--redis-port", type=int, default=6379, help="Redis port")
|
|
parser.add_argument("--redis-db", type=int, default=0, help="Redis DB")
|
|
parser.add_argument("--redis-password", default="", help="Redis password")
|
|
parser.add_argument("--channel", default="kline.stream", help="Pub/Sub channel")
|
|
parser.add_argument("--window", type=int, default=80, help="Window size to render")
|
|
parser.add_argument("--only-final", action="store_true", help="Render only closed candles")
|
|
parser.add_argument("--timeout", type=float, default=1.0, help="Subscriber timeout (seconds)")
|
|
return parser.parse_args()
|
|
|
|
|
|
def upsert_message(buffer: OrderedDict[int, KlineMessage], message: KlineMessage, max_size: int) -> None:
|
|
buffer[message.open_time] = message
|
|
buffer.move_to_end(message.open_time)
|
|
while len(buffer) > max_size:
|
|
buffer.popitem(last=False)
|
|
|
|
|
|
def to_dataframe(messages: list[KlineMessage]) -> pd.DataFrame:
|
|
rows = []
|
|
for msg in messages:
|
|
rows.append(
|
|
{
|
|
"Date": pd.to_datetime(msg.open_time, unit="ms"),
|
|
"Open": float(msg.open),
|
|
"High": float(msg.high),
|
|
"Low": float(msg.low),
|
|
"Close": float(msg.close),
|
|
"Volume": float(msg.volume),
|
|
}
|
|
)
|
|
frame = pd.DataFrame(rows).set_index("Date")
|
|
return frame
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
plt.ion()
|
|
client = redis.Redis(
|
|
host=args.redis_host,
|
|
port=args.redis_port,
|
|
db=args.redis_db,
|
|
password=args.redis_password or None,
|
|
decode_responses=True,
|
|
)
|
|
pubsub = client.pubsub(ignore_subscribe_messages=True)
|
|
pubsub.subscribe(args.channel)
|
|
|
|
print(
|
|
f"subscribed redis channel={args.channel} "
|
|
f"at {args.redis_host}:{args.redis_port}/{args.redis_db}"
|
|
)
|
|
|
|
candles: OrderedDict[int, KlineMessage] = OrderedDict()
|
|
|
|
try:
|
|
while True:
|
|
packet = pubsub.get_message(timeout=args.timeout)
|
|
if not packet:
|
|
plt.pause(0.01)
|
|
continue
|
|
|
|
raw = packet.get("data")
|
|
if not isinstance(raw, str):
|
|
continue
|
|
|
|
payload = json.loads(raw)
|
|
message = KlineMessage.from_dict(payload)
|
|
if args.only_final and not message.final:
|
|
continue
|
|
|
|
upsert_message(candles, message, args.window)
|
|
frame = to_dataframe(list(candles.values()))
|
|
if frame.empty:
|
|
continue
|
|
|
|
plt.clf()
|
|
mpf.plot(
|
|
frame,
|
|
type="candle",
|
|
style="yahoo",
|
|
volume=True,
|
|
datetime_format="%H:%M",
|
|
tight_layout=True,
|
|
)
|
|
plt.pause(0.001)
|
|
except KeyboardInterrupt:
|
|
print("viewer stopped by user")
|
|
finally:
|
|
pubsub.close()
|
|
client.close()
|
|
plt.ioff()
|
|
plt.show()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|