diff --git a/counter_demo.py b/counter_demo.py index c160b82..0f0f6d5 100644 --- a/counter_demo.py +++ b/counter_demo.py @@ -1,432 +1,436 @@ -# -*- coding: utf-8 -*- -# zq - -import os -import sys -import time -import json -import threading -import multiprocessing -from pprint import pprint - -from datetime import datetime, timedelta - -BASE_DIR = os.path.abspath('/tz_crypto') -print(BASE_DIR) -sys.path.append(BASE_DIR) -from tz_ctastrategy import ( - ArrayManager, - BarData, - BarGenerator, - OrderData, - StopOrder, - TickData, - TradeData, -) -from tz_ctastrategy.backtesting_mul import BacktestingEngine -from tz_ctastrategy.base import EngineType, BacktestingMode -from tz_ctastrategy.template import CtaTemplate -from tzquant.market import zmq_client - -from tzquant.market.log_model import write_file_by_line, log_on_init -from tzquant.trader.object import * -from tzquant.market.keep_alive import ping -from tz_riskmanager.risk_engine import save_setting -from tzquant.market.market_engine import get_account, private_run -from tzquant.market.dingtalker import dingmessage - - -class AtrRsiStrategy(CtaTemplate): - """""" - parameters = {} - - def __init__( - self, cta_engine, strategy_name, vt_symbol, setting, rolling_info=None - ): - """""" - super().__init__(cta_engine, strategy_name, vt_symbol, setting, rolling_info) - if self.cta_engine.engine_type == EngineType.BACKTESTING: - self.bg = BarGenerator(self.on_bar) - else: - self.bg = BarGenerator(self.on_bar, 60, interval=Interval.SECOND) - self.bg10s = BarGenerator(self.on_bar, 10, self.on_10s_bar, interval=Interval.SECOND) - - self.am = ArrayManager(size=2000) - - # 初始化变量 - self.atr_value = 0 - self.rsi_value = 0 - self.ma_value = 0 - self.martingale_level = 0 # 马丁策略层级 - self.max_cash = 0.0 - self.max_down = 0.0 - self.max_pos = 0.0 - - if self.cta_engine.engine_type == EngineType.BACKTESTING: - self.place_order_flag = True - else: - self.place_order_flag = False - - # ----------------------- log ---------------------------- - self.log = None - self.log_time = datetime.now().day - 1 - self.zmq = zmq_client.ZMQClient('XX.XXX.XXX.XX') # 实例化zmq客户端 - self.log_message_id = 0 - self.s_file_name = f"{self.strategy_name}" - self.sub_id: str = "0" - - self.last_tradeid = 0 - - def on_init(self): - """ - Callback when strategy is inited. - """ - self.write_log("策略初始化") - - # 这个是日志输出的函数 - def logger(self): - d_now = datetime.now() - # 每天做一次日志名称更改 - if self.log is None or d_now.day != self.log_time: - self.log = log_on_init( - log_dir_name="/tzquant_logs", - log_name=f"{self.strategy_name}-{self.sub_id}_{d_now.year}-{d_now.month:02d}-{d_now.day:02d}.log", - log_out_name=self.strategy_name, - write_mode="a", - line=True, - ) - self.log_time = d_now.day - else: - while self.cta_engine.logs: - msg = self.cta_engine.logs.pop() - self.log.info(msg) - - result = { - "id": self.log_message_id, - "datetime": d_now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-4], - "timestamp_ms": int(d_now.timestamp() * 1000), - "strategy_id": self.strategy_name, - "level": 'info', - "log": msg - } - self.zmq.send_message(result) - self.log_message_id += 1 - - def on_start(self): - """ - Callback when strategy is started. - """ - self.write_log("策略启动") - - def on_stop(self): - """ - Callback when strategy is stopped. - """ - self.write_log("策略停止") - - def on_tick(self, tick: TickData): - """ - Callback of new tick data update. - """ - if tick.name == 'trade': - self.bg.tick_to_bar(tick=tick) - if self.last_tradeid: - if tick.id - self.last_tradeid != 1: - print(f'成交数据有缺失: last:{self.last_tick} now:{tick}') - self.last_tradeid = tick.id - self.last_tick = tick - - elif tick.name == 'depth': - pass - - def on_bar(self, bar: BarData): - """ - Callback of new bar data update. - """ - if self.cta_engine.engine_type == EngineType.BACKTESTING: - self.on_10s_bar(bar) - else: - self.bg10s.update_bar(bar) - - def on_10s_bar(self, bar: BarData): - try: - self.am.update_bar(bar) - # print(bar) - if time.time() - bar.closetime / 1000 < 30: - # 柜台下买单 - res = self.buy( - price=bar.close * (1 - 0.05), - volume=10 / bar.close, - closetime=bar.closetime / 1000, - order_type=OrderType.POC, - label=f't-{self.strategy_name}-{self.sub_id}-0' - ) - print('place order', res) - # res1 = self.cancel_all(closetime=bar.closetime / 1000) - # print('cancel all orders', res1) - if res: - # 柜台撤销单个订单 - res = self.cancel_order(vt_orderid=res[0], - closetime=bar.closetime / 1000) - print('cancel order', res) - - self.sell( - price=bar.close * (1 + 0.05), - volume=10 / bar.close, - closetime=bar.closetime / 1000, - order_type=OrderType.POC, - label=f't-{self.strategy_name}-{self.sub_id}-1' - ) - - if self.cta_engine.engine_type == EngineType.BACKTESTING: - # 持仓均价 - self.price = self.cta_engine.ca_balance_now["price"] - - if not self.am.inited: - # 初始化未完成之前 撤单到成功为止 - if self.cta_engine.active_limit_orders: - while True: - res = self.cancel_all(closetime=datetime.now().timestamp(), mode=1, - order_id_list=list(self.cta_engine.active_limit_orders.keys())) - time.sleep(0.3) - if res: - break - return - if self.cta_engine.active_limit_orders: - # 柜台撤所有订单 mode 要为 1 order_id_list 要传入当前所有的挂单id - while True: - res = self.cancel_all(closetime=bar.closetime / 1000, - mode=1, - order_id_list=list(self.cta_engine.active_limit_orders.keys())) - if res: - print('cancel success') - # for index, order_id in enumerate(self.cta_engine.active_limit_orders): - # print(order_id, type(order_id)) - # time_diff = time.time() - self.cta_engine.active_limit_orders[order_id].datetime.timestamp() - # if time_diff > 30: - # msg = f'-----------挂单更新超时-----------:{time_diff}' - # dingmessage(webhook_key='', - # secret_name=f'{self.strategy_name}-{self.sub_id}', - # sub_id=self.sub_id, - # symbol=self.cta_engine.symbol, - # msg=msg, - # async_req=True - # ) - # print(self.cta_engine.active_limit_orders[order_id].datetime.timestamp()) - break - time.sleep(0.5) - print('cancel failed') - - except (Exception, BaseException) as e: - self.cta_engine.output( - "on_10s_bar error:%s-----line:%s" % (repr(e), e.__traceback__.tb_lineno) - ) - - def on_order(self, order: OrderData): - """ - Callback of new order data update. - """ - pass - - def on_trade(self, trade: TradeData): - """ - Callback of new trade data update. - """ - - # self.cta_engine.output(f'成交记录:{trade}---持有仓位:{self.pos}') - # self.put_event() - - def on_stop_order(self, stop_order: StopOrder): - """ - Callback of stop order update. - """ - pass - - -def run_single( - platform: str, symbol: str, capital: int, config: dict, risk_setting: dict, queue -): - engine = BacktestingEngine() - - now_datetime = datetime.now().replace(second=0, microsecond=0) - # ---------------- 实盘 ----------------------- - engine.engine_type = EngineType.REAL - engine.gateway_name = EngineType.REAL - - start = now_datetime - timedelta(days=334 / 1440) - # start = now_datetime - timedelta(days=1800 / 1440) - - engine.set_parameters( - vt_symbol=f"{symbol}.{platform}", # 下单平台 - source_exchanges=[platform], # 接收多源平台list - start=start, # 形如:datetime(2022,1,1) - end=now_datetime, # 形如:datetime(2022,1,1) - maker_fee=-0.5 / 10000, # 挂单手续费 - taker_fee=3 / 10000, # 吃单手续费 - slippage=2 / 10000, # 滑点 - size=1, # 杠杆倍数 默认为1 - pricetick=0.1, # 价格精度 - capital=capital, # 本金 - annual_days=365, # 一年的连续交易天数 - interval=Interval.SECOND, - mode=BacktestingMode.TICK # tick级别实盘 - ) - # 将策略添加进引擎 - engine.add_strategy(AtrRsiStrategy, {}) - - # --------------- 加载json 文件的配置信息 --------------- - action_config = config[symbol] - engine.strategy.place_order_flag = action_config["place_order"] - engine.strategy.strategy_name = risk_setting["secret_name"] - engine.strategy.parameters = action_config - engine.strategy.sub_id = risk_setting["sub_id"] - - # engine.strategy.on_init() - - # 实盘私有信息通过这个队列传输 - engine.queue_pri = queue - print("engine.queue_pri:", engine.queue_pri) - # 启动风控 实盘不管大资金还是小资金都要启动风控 - engine.strategy.add_risk_manager( - symbol_list=[symbol_ for symbol_ in config], - key=risk_setting["key"], - secret=risk_setting["secret"], - sub_id=risk_setting["sub_id"] - ) - # engine.dma_depth_data(platform, int(start.timestamp())) - threading.Thread(target=engine.dma_depth_data, args=(platform, int(start.timestamp()))).start() - engine.receive_private_queue_data() # 接收实盘私有信息 - - -def run(): - # 当前跑的平台 - platform = "gate_swap_u" - secret_name = "403" # 500_mm_ziying 200 - symbol = "bnbusdt" # btcusdt - sub_id = "0" # 子账户id str - settle = 'usdt' - - # platform = "binance_swap_u_uni" - # secret_name = "200" # 500_mm_ziying 200 - # symbol = "dogeusdt" # btcusdt - # sub_id = "1" # 子账户id str - # settle = 'usdt' - - # secret_name = sys.argv[1] # 500_mm_ziying 200 - # symbol = sys.argv[2] # btcusdt - # platform = sys.argv[3] # binance_spot_uni - # sub_id = sys.argv[4] # 子账户id str - # settle = 'usdt' if 'usdt' in symbol else 'fdusd' - print(secret_name, symbol, settle) - - # 从配置文件中批量读取参数 - file_name = "data.json" - - with open(file_name, "rb") as json_file: - json_data = json.loads(json_file.read()) - - pprint(json_data[secret_name][sub_id][symbol]) - - # 当前测试账户虚拟本金1000U place_rate_open 开仓让位 place_rate_close 平仓让位 - config = { - symbol: json_data[secret_name][sub_id][symbol] - } - - # ------------------ 修改风控的配置信息 密钥 --------------------------- - risk_setting = { - "key": json_data[secret_name]["key"], - "secret_name": secret_name, - "sub_id": sub_id, - "secret": json_data[secret_name]["secret"], - "sub_key": json_data[secret_name][sub_id]["key"], - "sub_secret": json_data[secret_name][sub_id]["secret"], - "active": True, # True 风控启动 - "order_flow_limit_1m": 15, # 根据下单的标签来做1m的下单次数限制 - "net_max_down_level_1": 20, # 实时最大回撤值百分比 超过会提醒 - "net_max_down_level_2": 15, - "net_max_down_level_3": 10, - # 单币种的持仓限制 - "position_limit": {symbol_: config[symbol_]["capital"] * config[symbol_]["max_pos_rate"] * 1.04 - for symbol_ in config}, - "position_time_out": 45, # 持仓信息超过45秒没来会提醒 - "webhook_key": json_data[secret_name]["webhook_key"] # 钉钉发送的接收用户 - } - - # 这个是实际的本金 - get_account_ = get_account(risk_setting=risk_setting, platform=platform, settle=settle) - balance = get_account_.balance + get_account_.unrealised_pnl - - print(get_account_.__dict__, balance) - - # 更新净值风控 - risk_setting["net_level_1"] = ( - balance * (100 - risk_setting["net_max_down_level_1"]) / 100 - ) - risk_setting["net_level_2"] = ( - balance * (100 - risk_setting["net_max_down_level_2"]) / 100 - ) - risk_setting["net_level_3"] = ( - balance * (100 - risk_setting["net_max_down_level_3"]) / 100 - ) - - total_capital = sum([config[symbol_]["capital"] for symbol_ in config]) - print("当前实际杠杆倍数:", total_capital / balance) - print(config) - print(risk_setting) - - directory_name = "json_hub" - - # 获取当前工作目录 - current_directory = os.getcwd() - - # 构造 json_hub 目录的路径 - json_hub_path = os.path.join(current_directory, directory_name) - - # 检查目录是否存在 - if not os.path.exists(json_hub_path): - # 如果不存在,则创建目录 - os.makedirs(json_hub_path) - print(f"Directory '{directory_name}' created.") - else: - print(f"Directory '{directory_name}' already exists.") - - save_setting(setting=config, setting_filename=f'/json_hub/{secret_name}_config.json') - save_setting(setting=risk_setting) # 将risk_setting 的信息写入配置文件 - - process = [] - # 创建一个进程间内存共享的队列 - queue_dict = {} - for index, key in enumerate(config): - q = multiprocessing.Queue() - queue_dict[key] = q - - # 先启动私有信息的发送进程 确保初始化有数据 - p = multiprocessing.Process( - target=private_run, args=(queue_dict, config, risk_setting, platform, settle) - ) - p.start() - time.sleep(3) - - # 然后为每个品种启动一个进程来做接收 - for index, key in enumerate(config): - p1 = multiprocessing.Process( - target=run_single, - args=( - platform, - key, - config[key]["capital"], - config, - risk_setting, - queue_dict[key], - ), - ) - p1.start() - process.append(p1) - time.sleep(60) - process.append(p) - for p2 in process: - p2.join() - - -if __name__ == "__main__": - run() +# -*- coding: utf-8 -*- +# zq + +import os +import sys +import time +import json +import threading +import multiprocessing +from pprint import pprint + +from datetime import datetime, timedelta + +BASE_DIR = os.path.abspath('/tz_crypto') +print(BASE_DIR) +sys.path.append(BASE_DIR) +from tz_ctastrategy import ( + ArrayManager, + BarData, + BarGenerator, + OrderData, + StopOrder, + TickData, + TradeData, +) +from tz_ctastrategy.backtesting_mul import BacktestingEngine +from tz_ctastrategy.base import EngineType, BacktestingMode +from tz_ctastrategy.template import CtaTemplate +from tzquant.market import zmq_client + +from tzquant.market.log_model import write_file_by_line, log_on_init +from tzquant.trader.object import * +from tzquant.market.keep_alive import ping +from tz_riskmanager.risk_engine import save_setting +from tzquant.market.market_engine import get_account, private_run +from tzquant.market.dingtalker import dingmessage + + +class AtrRsiStrategy(CtaTemplate): + """""" + parameters = {} + + def __init__( + self, cta_engine, strategy_name, vt_symbol, setting, rolling_info=None + ): + """""" + super().__init__(cta_engine, strategy_name, vt_symbol, setting, rolling_info) + if self.cta_engine.engine_type == EngineType.BACKTESTING: + self.bg = BarGenerator(self.on_bar) + else: + self.bg = BarGenerator(self.on_bar, 60, interval=Interval.SECOND) + self.bg10s = BarGenerator(self.on_bar, 10, self.on_10s_bar, interval=Interval.SECOND) + + self.am = ArrayManager(size=200) + + # 初始化变量 + self.atr_value = 0 + self.rsi_value = 0 + self.ma_value = 0 + self.martingale_level = 0 # 马丁策略层级 + self.max_cash = 0.0 + self.max_down = 0.0 + self.max_pos = 0.0 + + if self.cta_engine.engine_type == EngineType.BACKTESTING: + self.place_order_flag = True + else: + self.place_order_flag = False + + # ----------------------- log ---------------------------- + self.log = None + self.log_time = datetime.now().day - 1 + self.zmq = zmq_client.ZMQClient('XX.XXX.XXX.XX') # 实例化zmq客户端 + self.log_message_id = 0 + self.s_file_name = f"{self.strategy_name}" + self.sub_id: str = "0" + + self.last_tradeid = 0 + + def on_init(self): + """ + Callback when strategy is inited. + """ + self.write_log("策略初始化") + + # 这个是日志输出的函数 + def logger(self): + d_now = datetime.now() + # 每天做一次日志名称更改 + if self.log is None or d_now.day != self.log_time: + self.log = log_on_init( + log_dir_name="/tzquant_logs", + log_name=f"{self.strategy_name}-{self.sub_id}_{d_now.year}-{d_now.month:02d}-{d_now.day:02d}.log", + log_out_name=self.strategy_name, + write_mode="a", + line=True, + ) + self.log_time = d_now.day + else: + while self.cta_engine.logs: + msg = self.cta_engine.logs.pop() + self.log.info(msg) + + result = { + "id": self.log_message_id, + "datetime": d_now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-4], + "timestamp_ms": int(d_now.timestamp() * 1000), + "strategy_id": self.strategy_name, + "level": 'info', + "log": msg + } + self.zmq.send_message(result) + self.log_message_id += 1 + + def on_start(self): + """ + Callback when strategy is started. + """ + self.write_log("策略启动") + + def on_stop(self): + """ + Callback when strategy is stopped. + """ + self.write_log("策略停止") + + def on_tick(self, tick: TickData): + """ + Callback of new tick data update. + """ + if tick.name == 'trade': + self.bg.tick_to_bar(tick=tick) + if self.last_tradeid: + if tick.id - self.last_tradeid != 1: + print(f'成交数据有缺失: last:{self.last_tick} now:{tick}') + self.last_tradeid = tick.id + self.last_tick = tick + + elif tick.name == 'depth': + pass + + def on_bar(self, bar: BarData): + """ + Callback of new bar data update. + """ + if self.cta_engine.engine_type == EngineType.BACKTESTING: + self.on_10s_bar(bar) + else: + self.bg10s.update_bar(bar) + + def on_10s_bar(self, bar: BarData): + try: + self.am.update_bar(bar) + # print(bar) + # if time.time() - bar.closetime / 1000 < 30: + # # 柜台下买单 + # res = self.buy( + # price=bar.close * (1 - 0.05), + # volume=10 / bar.close, + # closetime=bar.closetime / 1000, + # order_type=OrderType.POC, + # label=f't-{self.strategy_name}-{self.sub_id}-0' + # ) + # print('place order', res) + # # res1 = self.cancel_all(closetime=bar.closetime / 1000) + # # print('cancel all orders', res1) + # if res: + # # 柜台撤销单个订单 + # res = self.cancel_order(vt_orderid=res[0], + # closetime=bar.closetime / 1000) + # print('cancel order', res) + # + # self.sell( + # price=bar.close * (1 + 0.05), + # volume=10 / bar.close, + # closetime=bar.closetime / 1000, + # order_type=OrderType.POC, + # label=f't-{self.strategy_name}-{self.sub_id}-1' + # ) + + if self.cta_engine.engine_type == EngineType.BACKTESTING: + # 持仓均价 + self.price = self.cta_engine.ca_balance_now["price"] + + if not self.am.inited: + # 初始化未完成之前 撤单到成功为止 + if self.cta_engine.active_limit_orders: + while True: + res = self.cancel_all(closetime=datetime.now().timestamp(), mode=1, + order_id_list=list(self.cta_engine.active_limit_orders.keys())) + time.sleep(0.3) + if res: + break + # return + if self.cta_engine.active_limit_orders: + print('active_limit_orders: ', self.cta_engine.active_limit_orders) + # # 柜台撤所有订单 mode 要为 1 order_id_list 要传入当前所有的挂单id + # while True: + # res = self.cancel_all(closetime=bar.closetime / 1000, + # mode=1, + # order_id_list=list(self.cta_engine.active_limit_orders.keys())) + # if res: + # print('cancel success') + # # for index, order_id in enumerate(self.cta_engine.active_limit_orders): + # # print(order_id, type(order_id)) + # # time_diff = time.time() - self.cta_engine.active_limit_orders[order_id].datetime.timestamp() + # # if time_diff > 30: + # # msg = f'-----------挂单更新超时-----------:{time_diff}' + # # dingmessage(webhook_key='', + # # secret_name=f'{self.strategy_name}-{self.sub_id}', + # # sub_id=self.sub_id, + # # symbol=self.cta_engine.symbol, + # # msg=msg, + # # async_req=True + # # ) + # # print(self.cta_engine.active_limit_orders[order_id].datetime.timestamp()) + # break + # time.sleep(0.5) + # print('cancel failed') + + except (Exception, BaseException) as e: + self.cta_engine.output( + "on_10s_bar error:%s-----line:%s" % (repr(e), e.__traceback__.tb_lineno) + ) + + def on_order(self, order: OrderData): + """ + Callback of new order data update. + """ + pass + + def on_trade(self, trade: TradeData): + """ + Callback of new trade data update. + """ + + self.cta_engine.output(f'成交记录:{trade}---持有仓位:{self.pos}') + # self.put_event() + + def on_stop_order(self, stop_order: StopOrder): + """ + Callback of stop order update. + """ + pass + + +def run_single( + platform: str, symbol: str, capital: int, config: dict, risk_setting: dict, queue +): + engine = BacktestingEngine() + + now_datetime = datetime.now().replace(second=0, microsecond=0) + # ---------------- 实盘 ----------------------- + engine.engine_type = EngineType.REAL + engine.gateway_name = EngineType.REAL + + start = now_datetime - timedelta(days=40 / 1440) + # start = now_datetime - timedelta(days=1800 / 1440) + + engine.set_parameters( + vt_symbol=f"{symbol}.{platform}", # 下单平台 + source_exchanges=[platform], # 接收多源平台list + start=start, # 形如:datetime(2022,1,1) + end=now_datetime, # 形如:datetime(2022,1,1) + maker_fee=-0.5 / 10000, # 挂单手续费 + taker_fee=3 / 10000, # 吃单手续费 + slippage=2 / 10000, # 滑点 + size=1, # 杠杆倍数 默认为1 + pricetick=0.1, # 价格精度 + capital=capital, # 本金 + annual_days=365, # 一年的连续交易天数 + interval=Interval.SECOND, + mode=BacktestingMode.TICK # tick级别实盘 + ) + # 将策略添加进引擎 + engine.add_strategy(AtrRsiStrategy, {}) + + # --------------- 加载json 文件的配置信息 --------------- + action_config = config[symbol] + engine.strategy.place_order_flag = action_config["place_order"] + engine.strategy.strategy_name = risk_setting["secret_name"] + engine.strategy.parameters = action_config + engine.strategy.sub_id = risk_setting["sub_id"] + + # engine.strategy.on_init() + + # 实盘私有信息通过这个队列传输 + engine.queue_pri = queue + print("engine.queue_pri:", engine.queue_pri) + # 启动风控 实盘不管大资金还是小资金都要启动风控 + engine.strategy.add_risk_manager( + symbol_list=[symbol_ for symbol_ in config], + key=risk_setting["key"], + secret=risk_setting["secret"], + sub_id=risk_setting["sub_id"] + ) + # engine.dma_depth_data(platform, int(start.timestamp())) + threading.Thread(target=engine.dma_depth_data, args=(platform, int(start.timestamp()))).start() + engine.receive_private_queue_data() # 接收实盘私有信息 + + +def run(): + # 当前跑的平台 + # platform = "gate_swap_u" + # secret_name = "403" # 500_mm_ziying 200 + # symbol = "bnbusdt" # btcusdt + # sub_id = "0" # 子账户id str + # settle = 'usdt' + + platform = "binance_swap_u_uni" + secret_name = "801" # 500_mm_ziying 200 + symbol = "dogeusdt" # btcusdt + sub_id = "0" # 子账户id str + settle = 'usdt' + + # secret_name = sys.argv[1] # 500_mm_ziying 200 + # symbol = sys.argv[2] # btcusdt + # platform = sys.argv[3] # binance_spot_uni + # sub_id = sys.argv[4] # 子账户id str + # settle = 'usdt' if 'usdt' in symbol else 'fdusd' + print(secret_name, symbol, settle) + + # 从配置文件中批量读取参数 + file_name = "data.json" + + with open(file_name, "rb") as json_file: + json_data = json.loads(json_file.read()) + + pprint(json_data[secret_name][sub_id][symbol]) + + # 当前测试账户虚拟本金1000U place_rate_open 开仓让位 place_rate_close 平仓让位 + config = { + symbol: json_data[secret_name][sub_id][symbol] + } + + # ------------------ 修改风控的配置信息 密钥 --------------------------- + risk_setting = { + "key": json_data[secret_name]["key"], + "secret_name": secret_name, + "sub_id": sub_id, + "secret": json_data[secret_name]["secret"], + "sub_key": json_data[secret_name][sub_id]["key"], + "sub_secret": json_data[secret_name][sub_id]["secret"], + "active": True, # True 风控启动 + "order_flow_limit_1m": 15, # 根据下单的标签来做1m的下单次数限制 + "net_max_down_level_1": 20, # 实时最大回撤值百分比 超过会提醒 + "net_max_down_level_2": 15, + "net_max_down_level_3": 10, + # 单币种的持仓限制 + "position_limit": {symbol_: config[symbol_]["capital"] * config[symbol_]["max_pos_rate"] * 1.04 + for symbol_ in config}, + "position_time_out": 45, # 持仓信息超过45秒没来会提醒 + "webhook_key": ['', ''] # 钉钉发送的接收用户 + } + + # 这个是实际的本金 + get_account_ = get_account(risk_setting=risk_setting, platform=platform, settle=settle) + balance = get_account_.balance + get_account_.unrealised_pnl + + print(get_account_.__dict__, balance) + + # 更新净值风控 + risk_setting["net_level_1"] = ( + balance * (100 - risk_setting["net_max_down_level_1"]) / 100 + ) + risk_setting["net_level_2"] = ( + balance * (100 - risk_setting["net_max_down_level_2"]) / 100 + ) + risk_setting["net_level_3"] = ( + balance * (100 - risk_setting["net_max_down_level_3"]) / 100 + ) + + total_capital = sum([config[symbol_]["capital"] for symbol_ in config]) + print("当前实际杠杆倍数:", total_capital / balance) + print(config) + print(risk_setting) + + directory_name = "json_hub" + + # 获取当前工作目录 + current_directory = os.getcwd() + + # 构造 json_hub 目录的路径 + json_hub_path = os.path.join(current_directory, directory_name) + + # 检查目录是否存在 + if not os.path.exists(json_hub_path): + # 如果不存在,则创建目录 + os.makedirs(json_hub_path) + print(f"Directory '{directory_name}' created.") + else: + print(f"Directory '{directory_name}' already exists.") + + save_setting(setting=config, setting_filename=f'/json_hub/{secret_name}_config.json') + save_setting(setting=risk_setting) # 将risk_setting 的信息写入配置文件 + + process = [] + # 创建一个进程间内存共享的队列 + queue_dict = {} + for index, key in enumerate(config): + q = multiprocessing.Queue() + queue_dict[key] = q + # 限制该币种杠杆倍数 默认3倍 + leverage_limit = 3 + # private_run(queue_dict, config, risk_setting, platform, settle, leverage_limit) + # 先启动私有信息的发送进程 确保初始化有数据 + p = multiprocessing.Process( + target=private_run, + args=(queue_dict, config, risk_setting, platform, settle, leverage_limit) + ) + p.start() + time.sleep(3) + + # 然后为每个品种启动一个进程来做接收 + for index, key in enumerate(config): + p1 = multiprocessing.Process( + target=run_single, + args=( + platform, + key, + config[key]["capital"], + config, + risk_setting, + queue_dict[key], + ), + ) + p1.start() + process.append(p1) + time.sleep(60) + process.append(p) + for p2 in process: + p2.join() + + +if __name__ == "__main__": + run()