diff --git a/counter_demo.py b/counter_demo.py new file mode 100644 index 0000000..c160b82 --- /dev/null +++ b/counter_demo.py @@ -0,0 +1,432 @@ +# -*- coding: utf-8 -*- +# zq + +import os +import sys +import time +import json +import threading +import multiprocessing +from pprint import pprint + +from datetime import datetime, timedelta + +BASE_DIR = os.path.abspath('/tz_crypto') +print(BASE_DIR) +sys.path.append(BASE_DIR) +from tz_ctastrategy import ( + ArrayManager, + BarData, + BarGenerator, + OrderData, + StopOrder, + TickData, + TradeData, +) +from tz_ctastrategy.backtesting_mul import BacktestingEngine +from tz_ctastrategy.base import EngineType, BacktestingMode +from tz_ctastrategy.template import CtaTemplate +from tzquant.market import zmq_client + +from tzquant.market.log_model import write_file_by_line, log_on_init +from tzquant.trader.object import * +from tzquant.market.keep_alive import ping +from tz_riskmanager.risk_engine import save_setting +from tzquant.market.market_engine import get_account, private_run +from tzquant.market.dingtalker import dingmessage + + +class AtrRsiStrategy(CtaTemplate): + """""" + parameters = {} + + def __init__( + self, cta_engine, strategy_name, vt_symbol, setting, rolling_info=None + ): + """""" + super().__init__(cta_engine, strategy_name, vt_symbol, setting, rolling_info) + if self.cta_engine.engine_type == EngineType.BACKTESTING: + self.bg = BarGenerator(self.on_bar) + else: + self.bg = BarGenerator(self.on_bar, 60, interval=Interval.SECOND) + self.bg10s = BarGenerator(self.on_bar, 10, self.on_10s_bar, interval=Interval.SECOND) + + self.am = ArrayManager(size=2000) + + # 初始化变量 + self.atr_value = 0 + self.rsi_value = 0 + self.ma_value = 0 + self.martingale_level = 0 # 马丁策略层级 + self.max_cash = 0.0 + self.max_down = 0.0 + self.max_pos = 0.0 + + if self.cta_engine.engine_type == EngineType.BACKTESTING: + self.place_order_flag = True + else: + self.place_order_flag = False + + # ----------------------- log ---------------------------- + self.log = None + self.log_time = datetime.now().day - 1 + self.zmq = zmq_client.ZMQClient('XX.XXX.XXX.XX') # 实例化zmq客户端 + self.log_message_id = 0 + self.s_file_name = f"{self.strategy_name}" + self.sub_id: str = "0" + + self.last_tradeid = 0 + + def on_init(self): + """ + Callback when strategy is inited. + """ + self.write_log("策略初始化") + + # 这个是日志输出的函数 + def logger(self): + d_now = datetime.now() + # 每天做一次日志名称更改 + if self.log is None or d_now.day != self.log_time: + self.log = log_on_init( + log_dir_name="/tzquant_logs", + log_name=f"{self.strategy_name}-{self.sub_id}_{d_now.year}-{d_now.month:02d}-{d_now.day:02d}.log", + log_out_name=self.strategy_name, + write_mode="a", + line=True, + ) + self.log_time = d_now.day + else: + while self.cta_engine.logs: + msg = self.cta_engine.logs.pop() + self.log.info(msg) + + result = { + "id": self.log_message_id, + "datetime": d_now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-4], + "timestamp_ms": int(d_now.timestamp() * 1000), + "strategy_id": self.strategy_name, + "level": 'info', + "log": msg + } + self.zmq.send_message(result) + self.log_message_id += 1 + + def on_start(self): + """ + Callback when strategy is started. + """ + self.write_log("策略启动") + + def on_stop(self): + """ + Callback when strategy is stopped. + """ + self.write_log("策略停止") + + def on_tick(self, tick: TickData): + """ + Callback of new tick data update. + """ + if tick.name == 'trade': + self.bg.tick_to_bar(tick=tick) + if self.last_tradeid: + if tick.id - self.last_tradeid != 1: + print(f'成交数据有缺失: last:{self.last_tick} now:{tick}') + self.last_tradeid = tick.id + self.last_tick = tick + + elif tick.name == 'depth': + pass + + def on_bar(self, bar: BarData): + """ + Callback of new bar data update. + """ + if self.cta_engine.engine_type == EngineType.BACKTESTING: + self.on_10s_bar(bar) + else: + self.bg10s.update_bar(bar) + + def on_10s_bar(self, bar: BarData): + try: + self.am.update_bar(bar) + # print(bar) + if time.time() - bar.closetime / 1000 < 30: + # 柜台下买单 + res = self.buy( + price=bar.close * (1 - 0.05), + volume=10 / bar.close, + closetime=bar.closetime / 1000, + order_type=OrderType.POC, + label=f't-{self.strategy_name}-{self.sub_id}-0' + ) + print('place order', res) + # res1 = self.cancel_all(closetime=bar.closetime / 1000) + # print('cancel all orders', res1) + if res: + # 柜台撤销单个订单 + res = self.cancel_order(vt_orderid=res[0], + closetime=bar.closetime / 1000) + print('cancel order', res) + + self.sell( + price=bar.close * (1 + 0.05), + volume=10 / bar.close, + closetime=bar.closetime / 1000, + order_type=OrderType.POC, + label=f't-{self.strategy_name}-{self.sub_id}-1' + ) + + if self.cta_engine.engine_type == EngineType.BACKTESTING: + # 持仓均价 + self.price = self.cta_engine.ca_balance_now["price"] + + if not self.am.inited: + # 初始化未完成之前 撤单到成功为止 + if self.cta_engine.active_limit_orders: + while True: + res = self.cancel_all(closetime=datetime.now().timestamp(), mode=1, + order_id_list=list(self.cta_engine.active_limit_orders.keys())) + time.sleep(0.3) + if res: + break + return + if self.cta_engine.active_limit_orders: + # 柜台撤所有订单 mode 要为 1 order_id_list 要传入当前所有的挂单id + while True: + res = self.cancel_all(closetime=bar.closetime / 1000, + mode=1, + order_id_list=list(self.cta_engine.active_limit_orders.keys())) + if res: + print('cancel success') + # for index, order_id in enumerate(self.cta_engine.active_limit_orders): + # print(order_id, type(order_id)) + # time_diff = time.time() - self.cta_engine.active_limit_orders[order_id].datetime.timestamp() + # if time_diff > 30: + # msg = f'-----------挂单更新超时-----------:{time_diff}' + # dingmessage(webhook_key='', + # secret_name=f'{self.strategy_name}-{self.sub_id}', + # sub_id=self.sub_id, + # symbol=self.cta_engine.symbol, + # msg=msg, + # async_req=True + # ) + # print(self.cta_engine.active_limit_orders[order_id].datetime.timestamp()) + break + time.sleep(0.5) + print('cancel failed') + + except (Exception, BaseException) as e: + self.cta_engine.output( + "on_10s_bar error:%s-----line:%s" % (repr(e), e.__traceback__.tb_lineno) + ) + + def on_order(self, order: OrderData): + """ + Callback of new order data update. + """ + pass + + def on_trade(self, trade: TradeData): + """ + Callback of new trade data update. + """ + + # self.cta_engine.output(f'成交记录:{trade}---持有仓位:{self.pos}') + # self.put_event() + + def on_stop_order(self, stop_order: StopOrder): + """ + Callback of stop order update. + """ + pass + + +def run_single( + platform: str, symbol: str, capital: int, config: dict, risk_setting: dict, queue +): + engine = BacktestingEngine() + + now_datetime = datetime.now().replace(second=0, microsecond=0) + # ---------------- 实盘 ----------------------- + engine.engine_type = EngineType.REAL + engine.gateway_name = EngineType.REAL + + start = now_datetime - timedelta(days=334 / 1440) + # start = now_datetime - timedelta(days=1800 / 1440) + + engine.set_parameters( + vt_symbol=f"{symbol}.{platform}", # 下单平台 + source_exchanges=[platform], # 接收多源平台list + start=start, # 形如:datetime(2022,1,1) + end=now_datetime, # 形如:datetime(2022,1,1) + maker_fee=-0.5 / 10000, # 挂单手续费 + taker_fee=3 / 10000, # 吃单手续费 + slippage=2 / 10000, # 滑点 + size=1, # 杠杆倍数 默认为1 + pricetick=0.1, # 价格精度 + capital=capital, # 本金 + annual_days=365, # 一年的连续交易天数 + interval=Interval.SECOND, + mode=BacktestingMode.TICK # tick级别实盘 + ) + # 将策略添加进引擎 + engine.add_strategy(AtrRsiStrategy, {}) + + # --------------- 加载json 文件的配置信息 --------------- + action_config = config[symbol] + engine.strategy.place_order_flag = action_config["place_order"] + engine.strategy.strategy_name = risk_setting["secret_name"] + engine.strategy.parameters = action_config + engine.strategy.sub_id = risk_setting["sub_id"] + + # engine.strategy.on_init() + + # 实盘私有信息通过这个队列传输 + engine.queue_pri = queue + print("engine.queue_pri:", engine.queue_pri) + # 启动风控 实盘不管大资金还是小资金都要启动风控 + engine.strategy.add_risk_manager( + symbol_list=[symbol_ for symbol_ in config], + key=risk_setting["key"], + secret=risk_setting["secret"], + sub_id=risk_setting["sub_id"] + ) + # engine.dma_depth_data(platform, int(start.timestamp())) + threading.Thread(target=engine.dma_depth_data, args=(platform, int(start.timestamp()))).start() + engine.receive_private_queue_data() # 接收实盘私有信息 + + +def run(): + # 当前跑的平台 + platform = "gate_swap_u" + secret_name = "403" # 500_mm_ziying 200 + symbol = "bnbusdt" # btcusdt + sub_id = "0" # 子账户id str + settle = 'usdt' + + # platform = "binance_swap_u_uni" + # secret_name = "200" # 500_mm_ziying 200 + # symbol = "dogeusdt" # btcusdt + # sub_id = "1" # 子账户id str + # settle = 'usdt' + + # secret_name = sys.argv[1] # 500_mm_ziying 200 + # symbol = sys.argv[2] # btcusdt + # platform = sys.argv[3] # binance_spot_uni + # sub_id = sys.argv[4] # 子账户id str + # settle = 'usdt' if 'usdt' in symbol else 'fdusd' + print(secret_name, symbol, settle) + + # 从配置文件中批量读取参数 + file_name = "data.json" + + with open(file_name, "rb") as json_file: + json_data = json.loads(json_file.read()) + + pprint(json_data[secret_name][sub_id][symbol]) + + # 当前测试账户虚拟本金1000U place_rate_open 开仓让位 place_rate_close 平仓让位 + config = { + symbol: json_data[secret_name][sub_id][symbol] + } + + # ------------------ 修改风控的配置信息 密钥 --------------------------- + risk_setting = { + "key": json_data[secret_name]["key"], + "secret_name": secret_name, + "sub_id": sub_id, + "secret": json_data[secret_name]["secret"], + "sub_key": json_data[secret_name][sub_id]["key"], + "sub_secret": json_data[secret_name][sub_id]["secret"], + "active": True, # True 风控启动 + "order_flow_limit_1m": 15, # 根据下单的标签来做1m的下单次数限制 + "net_max_down_level_1": 20, # 实时最大回撤值百分比 超过会提醒 + "net_max_down_level_2": 15, + "net_max_down_level_3": 10, + # 单币种的持仓限制 + "position_limit": {symbol_: config[symbol_]["capital"] * config[symbol_]["max_pos_rate"] * 1.04 + for symbol_ in config}, + "position_time_out": 45, # 持仓信息超过45秒没来会提醒 + "webhook_key": json_data[secret_name]["webhook_key"] # 钉钉发送的接收用户 + } + + # 这个是实际的本金 + get_account_ = get_account(risk_setting=risk_setting, platform=platform, settle=settle) + balance = get_account_.balance + get_account_.unrealised_pnl + + print(get_account_.__dict__, balance) + + # 更新净值风控 + risk_setting["net_level_1"] = ( + balance * (100 - risk_setting["net_max_down_level_1"]) / 100 + ) + risk_setting["net_level_2"] = ( + balance * (100 - risk_setting["net_max_down_level_2"]) / 100 + ) + risk_setting["net_level_3"] = ( + balance * (100 - risk_setting["net_max_down_level_3"]) / 100 + ) + + total_capital = sum([config[symbol_]["capital"] for symbol_ in config]) + print("当前实际杠杆倍数:", total_capital / balance) + print(config) + print(risk_setting) + + directory_name = "json_hub" + + # 获取当前工作目录 + current_directory = os.getcwd() + + # 构造 json_hub 目录的路径 + json_hub_path = os.path.join(current_directory, directory_name) + + # 检查目录是否存在 + if not os.path.exists(json_hub_path): + # 如果不存在,则创建目录 + os.makedirs(json_hub_path) + print(f"Directory '{directory_name}' created.") + else: + print(f"Directory '{directory_name}' already exists.") + + save_setting(setting=config, setting_filename=f'/json_hub/{secret_name}_config.json') + save_setting(setting=risk_setting) # 将risk_setting 的信息写入配置文件 + + process = [] + # 创建一个进程间内存共享的队列 + queue_dict = {} + for index, key in enumerate(config): + q = multiprocessing.Queue() + queue_dict[key] = q + + # 先启动私有信息的发送进程 确保初始化有数据 + p = multiprocessing.Process( + target=private_run, args=(queue_dict, config, risk_setting, platform, settle) + ) + p.start() + time.sleep(3) + + # 然后为每个品种启动一个进程来做接收 + for index, key in enumerate(config): + p1 = multiprocessing.Process( + target=run_single, + args=( + platform, + key, + config[key]["capital"], + config, + risk_setting, + queue_dict[key], + ), + ) + p1.start() + process.append(p1) + time.sleep(60) + process.append(p) + for p2 in process: + p2.join() + + +if __name__ == "__main__": + run()