| @ -0,0 +1,432 @@ | |||||
| # -*- coding: utf-8 -*- | |||||
| # zq | |||||
| import os | |||||
| import sys | |||||
| import time | |||||
| import json | |||||
| import threading | |||||
| import multiprocessing | |||||
| from pprint import pprint | |||||
| from datetime import datetime, timedelta | |||||
| BASE_DIR = os.path.abspath('/tz_crypto') | |||||
| print(BASE_DIR) | |||||
| sys.path.append(BASE_DIR) | |||||
| from tz_ctastrategy import ( | |||||
| ArrayManager, | |||||
| BarData, | |||||
| BarGenerator, | |||||
| OrderData, | |||||
| StopOrder, | |||||
| TickData, | |||||
| TradeData, | |||||
| ) | |||||
| from tz_ctastrategy.backtesting_mul import BacktestingEngine | |||||
| from tz_ctastrategy.base import EngineType, BacktestingMode | |||||
| from tz_ctastrategy.template import CtaTemplate | |||||
| from tzquant.market import zmq_client | |||||
| from tzquant.market.log_model import write_file_by_line, log_on_init | |||||
| from tzquant.trader.object import * | |||||
| from tzquant.market.keep_alive import ping | |||||
| from tz_riskmanager.risk_engine import save_setting | |||||
| from tzquant.market.market_engine import get_account, private_run | |||||
| from tzquant.market.dingtalker import dingmessage | |||||
| class AtrRsiStrategy(CtaTemplate): | |||||
| """""" | |||||
| parameters = {} | |||||
| def __init__( | |||||
| self, cta_engine, strategy_name, vt_symbol, setting, rolling_info=None | |||||
| ): | |||||
| """""" | |||||
| super().__init__(cta_engine, strategy_name, vt_symbol, setting, rolling_info) | |||||
| if self.cta_engine.engine_type == EngineType.BACKTESTING: | |||||
| self.bg = BarGenerator(self.on_bar) | |||||
| else: | |||||
| self.bg = BarGenerator(self.on_bar, 60, interval=Interval.SECOND) | |||||
| self.bg10s = BarGenerator(self.on_bar, 10, self.on_10s_bar, interval=Interval.SECOND) | |||||
| self.am = ArrayManager(size=2000) | |||||
| # 初始化变量 | |||||
| self.atr_value = 0 | |||||
| self.rsi_value = 0 | |||||
| self.ma_value = 0 | |||||
| self.martingale_level = 0 # 马丁策略层级 | |||||
| self.max_cash = 0.0 | |||||
| self.max_down = 0.0 | |||||
| self.max_pos = 0.0 | |||||
| if self.cta_engine.engine_type == EngineType.BACKTESTING: | |||||
| self.place_order_flag = True | |||||
| else: | |||||
| self.place_order_flag = False | |||||
| # ----------------------- log ---------------------------- | |||||
| self.log = None | |||||
| self.log_time = datetime.now().day - 1 | |||||
| self.zmq = zmq_client.ZMQClient('XX.XXX.XXX.XX') # 实例化zmq客户端 | |||||
| self.log_message_id = 0 | |||||
| self.s_file_name = f"{self.strategy_name}" | |||||
| self.sub_id: str = "0" | |||||
| self.last_tradeid = 0 | |||||
| def on_init(self): | |||||
| """ | |||||
| Callback when strategy is inited. | |||||
| """ | |||||
| self.write_log("策略初始化") | |||||
| # 这个是日志输出的函数 | |||||
| def logger(self): | |||||
| d_now = datetime.now() | |||||
| # 每天做一次日志名称更改 | |||||
| if self.log is None or d_now.day != self.log_time: | |||||
| self.log = log_on_init( | |||||
| log_dir_name="/tzquant_logs", | |||||
| log_name=f"{self.strategy_name}-{self.sub_id}_{d_now.year}-{d_now.month:02d}-{d_now.day:02d}.log", | |||||
| log_out_name=self.strategy_name, | |||||
| write_mode="a", | |||||
| line=True, | |||||
| ) | |||||
| self.log_time = d_now.day | |||||
| else: | |||||
| while self.cta_engine.logs: | |||||
| msg = self.cta_engine.logs.pop() | |||||
| self.log.info(msg) | |||||
| result = { | |||||
| "id": self.log_message_id, | |||||
| "datetime": d_now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-4], | |||||
| "timestamp_ms": int(d_now.timestamp() * 1000), | |||||
| "strategy_id": self.strategy_name, | |||||
| "level": 'info', | |||||
| "log": msg | |||||
| } | |||||
| self.zmq.send_message(result) | |||||
| self.log_message_id += 1 | |||||
| def on_start(self): | |||||
| """ | |||||
| Callback when strategy is started. | |||||
| """ | |||||
| self.write_log("策略启动") | |||||
| def on_stop(self): | |||||
| """ | |||||
| Callback when strategy is stopped. | |||||
| """ | |||||
| self.write_log("策略停止") | |||||
| def on_tick(self, tick: TickData): | |||||
| """ | |||||
| Callback of new tick data update. | |||||
| """ | |||||
| if tick.name == 'trade': | |||||
| self.bg.tick_to_bar(tick=tick) | |||||
| if self.last_tradeid: | |||||
| if tick.id - self.last_tradeid != 1: | |||||
| print(f'成交数据有缺失: last:{self.last_tick} now:{tick}') | |||||
| self.last_tradeid = tick.id | |||||
| self.last_tick = tick | |||||
| elif tick.name == 'depth': | |||||
| pass | |||||
| def on_bar(self, bar: BarData): | |||||
| """ | |||||
| Callback of new bar data update. | |||||
| """ | |||||
| if self.cta_engine.engine_type == EngineType.BACKTESTING: | |||||
| self.on_10s_bar(bar) | |||||
| else: | |||||
| self.bg10s.update_bar(bar) | |||||
| def on_10s_bar(self, bar: BarData): | |||||
| try: | |||||
| self.am.update_bar(bar) | |||||
| # print(bar) | |||||
| if time.time() - bar.closetime / 1000 < 30: | |||||
| # 柜台下买单 | |||||
| res = self.buy( | |||||
| price=bar.close * (1 - 0.05), | |||||
| volume=10 / bar.close, | |||||
| closetime=bar.closetime / 1000, | |||||
| order_type=OrderType.POC, | |||||
| label=f't-{self.strategy_name}-{self.sub_id}-0' | |||||
| ) | |||||
| print('place order', res) | |||||
| # res1 = self.cancel_all(closetime=bar.closetime / 1000) | |||||
| # print('cancel all orders', res1) | |||||
| if res: | |||||
| # 柜台撤销单个订单 | |||||
| res = self.cancel_order(vt_orderid=res[0], | |||||
| closetime=bar.closetime / 1000) | |||||
| print('cancel order', res) | |||||
| self.sell( | |||||
| price=bar.close * (1 + 0.05), | |||||
| volume=10 / bar.close, | |||||
| closetime=bar.closetime / 1000, | |||||
| order_type=OrderType.POC, | |||||
| label=f't-{self.strategy_name}-{self.sub_id}-1' | |||||
| ) | |||||
| if self.cta_engine.engine_type == EngineType.BACKTESTING: | |||||
| # 持仓均价 | |||||
| self.price = self.cta_engine.ca_balance_now["price"] | |||||
| if not self.am.inited: | |||||
| # 初始化未完成之前 撤单到成功为止 | |||||
| if self.cta_engine.active_limit_orders: | |||||
| while True: | |||||
| res = self.cancel_all(closetime=datetime.now().timestamp(), mode=1, | |||||
| order_id_list=list(self.cta_engine.active_limit_orders.keys())) | |||||
| time.sleep(0.3) | |||||
| if res: | |||||
| break | |||||
| return | |||||
| if self.cta_engine.active_limit_orders: | |||||
| # 柜台撤所有订单 mode 要为 1 order_id_list 要传入当前所有的挂单id | |||||
| while True: | |||||
| res = self.cancel_all(closetime=bar.closetime / 1000, | |||||
| mode=1, | |||||
| order_id_list=list(self.cta_engine.active_limit_orders.keys())) | |||||
| if res: | |||||
| print('cancel success') | |||||
| # for index, order_id in enumerate(self.cta_engine.active_limit_orders): | |||||
| # print(order_id, type(order_id)) | |||||
| # time_diff = time.time() - self.cta_engine.active_limit_orders[order_id].datetime.timestamp() | |||||
| # if time_diff > 30: | |||||
| # msg = f'-----------挂单更新超时-----------:{time_diff}' | |||||
| # dingmessage(webhook_key='', | |||||
| # secret_name=f'{self.strategy_name}-{self.sub_id}', | |||||
| # sub_id=self.sub_id, | |||||
| # symbol=self.cta_engine.symbol, | |||||
| # msg=msg, | |||||
| # async_req=True | |||||
| # ) | |||||
| # print(self.cta_engine.active_limit_orders[order_id].datetime.timestamp()) | |||||
| break | |||||
| time.sleep(0.5) | |||||
| print('cancel failed') | |||||
| except (Exception, BaseException) as e: | |||||
| self.cta_engine.output( | |||||
| "on_10s_bar error:%s-----line:%s" % (repr(e), e.__traceback__.tb_lineno) | |||||
| ) | |||||
| def on_order(self, order: OrderData): | |||||
| """ | |||||
| Callback of new order data update. | |||||
| """ | |||||
| pass | |||||
| def on_trade(self, trade: TradeData): | |||||
| """ | |||||
| Callback of new trade data update. | |||||
| """ | |||||
| # self.cta_engine.output(f'成交记录:{trade}---持有仓位:{self.pos}') | |||||
| # self.put_event() | |||||
| def on_stop_order(self, stop_order: StopOrder): | |||||
| """ | |||||
| Callback of stop order update. | |||||
| """ | |||||
| pass | |||||
| def run_single( | |||||
| platform: str, symbol: str, capital: int, config: dict, risk_setting: dict, queue | |||||
| ): | |||||
| engine = BacktestingEngine() | |||||
| now_datetime = datetime.now().replace(second=0, microsecond=0) | |||||
| # ---------------- 实盘 ----------------------- | |||||
| engine.engine_type = EngineType.REAL | |||||
| engine.gateway_name = EngineType.REAL | |||||
| start = now_datetime - timedelta(days=334 / 1440) | |||||
| # start = now_datetime - timedelta(days=1800 / 1440) | |||||
| engine.set_parameters( | |||||
| vt_symbol=f"{symbol}.{platform}", # 下单平台 | |||||
| source_exchanges=[platform], # 接收多源平台list | |||||
| start=start, # 形如:datetime(2022,1,1) | |||||
| end=now_datetime, # 形如:datetime(2022,1,1) | |||||
| maker_fee=-0.5 / 10000, # 挂单手续费 | |||||
| taker_fee=3 / 10000, # 吃单手续费 | |||||
| slippage=2 / 10000, # 滑点 | |||||
| size=1, # 杠杆倍数 默认为1 | |||||
| pricetick=0.1, # 价格精度 | |||||
| capital=capital, # 本金 | |||||
| annual_days=365, # 一年的连续交易天数 | |||||
| interval=Interval.SECOND, | |||||
| mode=BacktestingMode.TICK # tick级别实盘 | |||||
| ) | |||||
| # 将策略添加进引擎 | |||||
| engine.add_strategy(AtrRsiStrategy, {}) | |||||
| # --------------- 加载json 文件的配置信息 --------------- | |||||
| action_config = config[symbol] | |||||
| engine.strategy.place_order_flag = action_config["place_order"] | |||||
| engine.strategy.strategy_name = risk_setting["secret_name"] | |||||
| engine.strategy.parameters = action_config | |||||
| engine.strategy.sub_id = risk_setting["sub_id"] | |||||
| # engine.strategy.on_init() | |||||
| # 实盘私有信息通过这个队列传输 | |||||
| engine.queue_pri = queue | |||||
| print("engine.queue_pri:", engine.queue_pri) | |||||
| # 启动风控 实盘不管大资金还是小资金都要启动风控 | |||||
| engine.strategy.add_risk_manager( | |||||
| symbol_list=[symbol_ for symbol_ in config], | |||||
| key=risk_setting["key"], | |||||
| secret=risk_setting["secret"], | |||||
| sub_id=risk_setting["sub_id"] | |||||
| ) | |||||
| # engine.dma_depth_data(platform, int(start.timestamp())) | |||||
| threading.Thread(target=engine.dma_depth_data, args=(platform, int(start.timestamp()))).start() | |||||
| engine.receive_private_queue_data() # 接收实盘私有信息 | |||||
| def run(): | |||||
| # 当前跑的平台 | |||||
| platform = "gate_swap_u" | |||||
| secret_name = "403" # 500_mm_ziying 200 | |||||
| symbol = "bnbusdt" # btcusdt | |||||
| sub_id = "0" # 子账户id str | |||||
| settle = 'usdt' | |||||
| # platform = "binance_swap_u_uni" | |||||
| # secret_name = "200" # 500_mm_ziying 200 | |||||
| # symbol = "dogeusdt" # btcusdt | |||||
| # sub_id = "1" # 子账户id str | |||||
| # settle = 'usdt' | |||||
| # secret_name = sys.argv[1] # 500_mm_ziying 200 | |||||
| # symbol = sys.argv[2] # btcusdt | |||||
| # platform = sys.argv[3] # binance_spot_uni | |||||
| # sub_id = sys.argv[4] # 子账户id str | |||||
| # settle = 'usdt' if 'usdt' in symbol else 'fdusd' | |||||
| print(secret_name, symbol, settle) | |||||
| # 从配置文件中批量读取参数 | |||||
| file_name = "data.json" | |||||
| with open(file_name, "rb") as json_file: | |||||
| json_data = json.loads(json_file.read()) | |||||
| pprint(json_data[secret_name][sub_id][symbol]) | |||||
| # 当前测试账户虚拟本金1000U place_rate_open 开仓让位 place_rate_close 平仓让位 | |||||
| config = { | |||||
| symbol: json_data[secret_name][sub_id][symbol] | |||||
| } | |||||
| # ------------------ 修改风控的配置信息 密钥 --------------------------- | |||||
| risk_setting = { | |||||
| "key": json_data[secret_name]["key"], | |||||
| "secret_name": secret_name, | |||||
| "sub_id": sub_id, | |||||
| "secret": json_data[secret_name]["secret"], | |||||
| "sub_key": json_data[secret_name][sub_id]["key"], | |||||
| "sub_secret": json_data[secret_name][sub_id]["secret"], | |||||
| "active": True, # True 风控启动 | |||||
| "order_flow_limit_1m": 15, # 根据下单的标签来做1m的下单次数限制 | |||||
| "net_max_down_level_1": 20, # 实时最大回撤值百分比 超过会提醒 | |||||
| "net_max_down_level_2": 15, | |||||
| "net_max_down_level_3": 10, | |||||
| # 单币种的持仓限制 | |||||
| "position_limit": {symbol_: config[symbol_]["capital"] * config[symbol_]["max_pos_rate"] * 1.04 | |||||
| for symbol_ in config}, | |||||
| "position_time_out": 45, # 持仓信息超过45秒没来会提醒 | |||||
| "webhook_key": json_data[secret_name]["webhook_key"] # 钉钉发送的接收用户 | |||||
| } | |||||
| # 这个是实际的本金 | |||||
| get_account_ = get_account(risk_setting=risk_setting, platform=platform, settle=settle) | |||||
| balance = get_account_.balance + get_account_.unrealised_pnl | |||||
| print(get_account_.__dict__, balance) | |||||
| # 更新净值风控 | |||||
| risk_setting["net_level_1"] = ( | |||||
| balance * (100 - risk_setting["net_max_down_level_1"]) / 100 | |||||
| ) | |||||
| risk_setting["net_level_2"] = ( | |||||
| balance * (100 - risk_setting["net_max_down_level_2"]) / 100 | |||||
| ) | |||||
| risk_setting["net_level_3"] = ( | |||||
| balance * (100 - risk_setting["net_max_down_level_3"]) / 100 | |||||
| ) | |||||
| total_capital = sum([config[symbol_]["capital"] for symbol_ in config]) | |||||
| print("当前实际杠杆倍数:", total_capital / balance) | |||||
| print(config) | |||||
| print(risk_setting) | |||||
| directory_name = "json_hub" | |||||
| # 获取当前工作目录 | |||||
| current_directory = os.getcwd() | |||||
| # 构造 json_hub 目录的路径 | |||||
| json_hub_path = os.path.join(current_directory, directory_name) | |||||
| # 检查目录是否存在 | |||||
| if not os.path.exists(json_hub_path): | |||||
| # 如果不存在,则创建目录 | |||||
| os.makedirs(json_hub_path) | |||||
| print(f"Directory '{directory_name}' created.") | |||||
| else: | |||||
| print(f"Directory '{directory_name}' already exists.") | |||||
| save_setting(setting=config, setting_filename=f'/json_hub/{secret_name}_config.json') | |||||
| save_setting(setting=risk_setting) # 将risk_setting 的信息写入配置文件 | |||||
| process = [] | |||||
| # 创建一个进程间内存共享的队列 | |||||
| queue_dict = {} | |||||
| for index, key in enumerate(config): | |||||
| q = multiprocessing.Queue() | |||||
| queue_dict[key] = q | |||||
| # 先启动私有信息的发送进程 确保初始化有数据 | |||||
| p = multiprocessing.Process( | |||||
| target=private_run, args=(queue_dict, config, risk_setting, platform, settle) | |||||
| ) | |||||
| p.start() | |||||
| time.sleep(3) | |||||
| # 然后为每个品种启动一个进程来做接收 | |||||
| for index, key in enumerate(config): | |||||
| p1 = multiprocessing.Process( | |||||
| target=run_single, | |||||
| args=( | |||||
| platform, | |||||
| key, | |||||
| config[key]["capital"], | |||||
| config, | |||||
| risk_setting, | |||||
| queue_dict[key], | |||||
| ), | |||||
| ) | |||||
| p1.start() | |||||
| process.append(p1) | |||||
| time.sleep(60) | |||||
| process.append(p) | |||||
| for p2 in process: | |||||
| p2.join() | |||||
| if __name__ == "__main__": | |||||
| run() | |||||