|
|
- # -*- coding: utf-8 -*-
- # zq
-
- import os
- import sys
- import time
- import json
- import threading
- import multiprocessing
- from pprint import pprint
-
- from datetime import datetime, timedelta
-
- BASE_DIR = os.path.abspath('/tz_crypto')
- print(BASE_DIR)
- sys.path.append(BASE_DIR)
- from tz_ctastrategy import (
- ArrayManager,
- BarData,
- BarGenerator,
- OrderData,
- StopOrder,
- TickData,
- TradeData,
- )
- from tz_ctastrategy.backtesting_mul import BacktestingEngine
- from tz_ctastrategy.base import EngineType, BacktestingMode
- from tz_ctastrategy.template import CtaTemplate
- from tzquant.market import zmq_client
-
- from tzquant.market.log_model import write_file_by_line, log_on_init
- from tzquant.trader.object import *
- from tzquant.market.keep_alive import ping
- from tz_riskmanager.risk_engine import save_setting
- from tzquant.market.market_engine import get_account, private_run
- from tzquant.market.dingtalker import dingmessage
-
-
- class AtrRsiStrategy(CtaTemplate):
- """"""
- parameters = {}
-
- def __init__(
- self, cta_engine, strategy_name, vt_symbol, setting, rolling_info=None
- ):
- """"""
- super().__init__(cta_engine, strategy_name, vt_symbol, setting, rolling_info)
- if self.cta_engine.engine_type == EngineType.BACKTESTING:
- self.bg = BarGenerator(self.on_bar)
- else:
- self.bg = BarGenerator(self.on_bar, 60, interval=Interval.SECOND)
- self.bg10s = BarGenerator(self.on_bar, 10, self.on_10s_bar, interval=Interval.SECOND)
-
- self.am = ArrayManager(size=200)
-
- # 初始化变量
- self.atr_value = 0
- self.rsi_value = 0
- self.ma_value = 0
- self.martingale_level = 0 # 马丁策略层级
- self.max_cash = 0.0
- self.max_down = 0.0
- self.max_pos = 0.0
-
- if self.cta_engine.engine_type == EngineType.BACKTESTING:
- self.place_order_flag = True
- else:
- self.place_order_flag = False
-
- # ----------------------- log ----------------------------
- self.log = None
- self.log_time = datetime.now().day - 1
- self.zmq = zmq_client.ZMQClient('XX.XXX.XXX.XX') # 实例化zmq客户端
- self.log_message_id = 0
- self.s_file_name = f"{self.strategy_name}"
- self.sub_id: str = "0"
-
- self.last_tradeid = 0
-
- def on_init(self):
- """
- Callback when strategy is inited.
- """
- self.write_log("策略初始化")
-
- # 这个是日志输出的函数
- def logger(self):
- d_now = datetime.now()
- # 每天做一次日志名称更改
- if self.log is None or d_now.day != self.log_time:
- self.log = log_on_init(
- log_dir_name="/tzquant_logs",
- log_name=f"{self.strategy_name}-{self.sub_id}_{d_now.year}-{d_now.month:02d}-{d_now.day:02d}.log",
- log_out_name=self.strategy_name,
- write_mode="a",
- line=True,
- )
- self.log_time = d_now.day
- else:
- while self.cta_engine.logs:
- msg = self.cta_engine.logs.pop()
- self.log.info(msg)
-
- result = {
- "id": self.log_message_id,
- "datetime": d_now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-4],
- "timestamp_ms": int(d_now.timestamp() * 1000),
- "strategy_id": self.strategy_name,
- "level": 'info',
- "log": msg
- }
- self.zmq.send_message(result)
- self.log_message_id += 1
-
- def on_start(self):
- """
- Callback when strategy is started.
- """
- self.write_log("策略启动")
-
- def on_stop(self):
- """
- Callback when strategy is stopped.
- """
- self.write_log("策略停止")
-
- def on_tick(self, tick: TickData):
- """
- Callback of new tick data update.
- """
- if tick.name == 'trade':
- self.bg.tick_to_bar(tick=tick)
- if self.last_tradeid:
- if tick.id - self.last_tradeid != 1:
- print(f'成交数据有缺失: last:{self.last_tick} now:{tick}')
- self.last_tradeid = tick.id
- self.last_tick = tick
-
- elif tick.name == 'depth':
- pass
-
- def on_bar(self, bar: BarData):
- """
- Callback of new bar data update.
- """
- if self.cta_engine.engine_type == EngineType.BACKTESTING:
- self.on_10s_bar(bar)
- else:
- self.bg10s.update_bar(bar)
-
- def on_10s_bar(self, bar: BarData):
- try:
- self.am.update_bar(bar)
- # print(bar)
- # if time.time() - bar.closetime / 1000 < 30:
- # # 柜台下买单
- # res = self.buy(
- # price=bar.close * (1 - 0.05),
- # volume=10 / bar.close,
- # closetime=bar.closetime / 1000,
- # order_type=OrderType.POC,
- # label=f't-{self.strategy_name}-{self.sub_id}-0'
- # )
- # print('place order', res)
- # # res1 = self.cancel_all(closetime=bar.closetime / 1000)
- # # print('cancel all orders', res1)
- # if res:
- # # 柜台撤销单个订单
- # res = self.cancel_order(vt_orderid=res[0],
- # closetime=bar.closetime / 1000)
- # print('cancel order', res)
- #
- # self.sell(
- # price=bar.close * (1 + 0.05),
- # volume=10 / bar.close,
- # closetime=bar.closetime / 1000,
- # order_type=OrderType.POC,
- # label=f't-{self.strategy_name}-{self.sub_id}-1'
- # )
-
- if self.cta_engine.engine_type == EngineType.BACKTESTING:
- # 持仓均价
- self.price = self.cta_engine.ca_balance_now["price"]
-
- if not self.am.inited:
- # 初始化未完成之前 撤单到成功为止
- if self.cta_engine.active_limit_orders:
- while True:
- res = self.cancel_all(closetime=datetime.now().timestamp(), mode=1,
- order_id_list=list(self.cta_engine.active_limit_orders.keys()))
- time.sleep(0.3)
- if res:
- break
- # return
- if self.cta_engine.active_limit_orders:
- print('active_limit_orders: ', self.cta_engine.active_limit_orders)
- # # 柜台撤所有订单 mode 要为 1 order_id_list 要传入当前所有的挂单id
- # while True:
- # res = self.cancel_all(closetime=bar.closetime / 1000,
- # mode=1,
- # order_id_list=list(self.cta_engine.active_limit_orders.keys()))
- # if res:
- # print('cancel success')
- # # for index, order_id in enumerate(self.cta_engine.active_limit_orders):
- # # print(order_id, type(order_id))
- # # time_diff = time.time() - self.cta_engine.active_limit_orders[order_id].datetime.timestamp()
- # # if time_diff > 30:
- # # msg = f'-----------挂单更新超时-----------:{time_diff}'
- # # dingmessage(webhook_key='',
- # # secret_name=f'{self.strategy_name}-{self.sub_id}',
- # # sub_id=self.sub_id,
- # # symbol=self.cta_engine.symbol,
- # # msg=msg,
- # # async_req=True
- # # )
- # # print(self.cta_engine.active_limit_orders[order_id].datetime.timestamp())
- # break
- # time.sleep(0.5)
- # print('cancel failed')
-
- except (Exception, BaseException) as e:
- self.cta_engine.output(
- "on_10s_bar error:%s-----line:%s" % (repr(e), e.__traceback__.tb_lineno)
- )
-
- def on_order(self, order: OrderData):
- """
- Callback of new order data update.
- """
- pass
-
- def on_trade(self, trade: TradeData):
- """
- Callback of new trade data update.
- """
-
- self.cta_engine.output(f'成交记录:{trade}---持有仓位:{self.pos}')
- # self.put_event()
-
- def on_stop_order(self, stop_order: StopOrder):
- """
- Callback of stop order update.
- """
- pass
-
-
- def run_single(
- platform: str, symbol: str, capital: int, config: dict, risk_setting: dict, queue
- ):
- engine = BacktestingEngine()
-
- now_datetime = datetime.now().replace(second=0, microsecond=0)
- # ---------------- 实盘 -----------------------
- engine.engine_type = EngineType.REAL
- engine.gateway_name = EngineType.REAL
-
- start = now_datetime - timedelta(days=40 / 1440)
- # start = now_datetime - timedelta(days=1800 / 1440)
-
- engine.set_parameters(
- vt_symbol=f"{symbol}.{platform}", # 下单平台
- source_exchanges=[platform], # 接收多源平台list
- start=start, # 形如:datetime(2022,1,1)
- end=now_datetime, # 形如:datetime(2022,1,1)
- maker_fee=-0.5 / 10000, # 挂单手续费
- taker_fee=3 / 10000, # 吃单手续费
- slippage=2 / 10000, # 滑点
- size=1, # 杠杆倍数 默认为1
- pricetick=0.1, # 价格精度
- capital=capital, # 本金
- annual_days=365, # 一年的连续交易天数
- interval=Interval.SECOND,
- mode=BacktestingMode.TICK # tick级别实盘
- )
- # 将策略添加进引擎
- engine.add_strategy(AtrRsiStrategy, {})
-
- # --------------- 加载json 文件的配置信息 ---------------
- action_config = config[symbol]
- engine.strategy.place_order_flag = action_config["place_order"]
- engine.strategy.strategy_name = risk_setting["secret_name"]
- engine.strategy.parameters = action_config
- engine.strategy.sub_id = risk_setting["sub_id"]
-
- # engine.strategy.on_init()
-
- # 实盘私有信息通过这个队列传输
- engine.queue_pri = queue
- print("engine.queue_pri:", engine.queue_pri)
- # 启动风控 实盘不管大资金还是小资金都要启动风控
- engine.strategy.add_risk_manager(
- symbol_list=[symbol_ for symbol_ in config],
- key=risk_setting["key"],
- secret=risk_setting["secret"],
- sub_id=risk_setting["sub_id"]
- )
- # engine.dma_depth_data(platform, int(start.timestamp()))
- threading.Thread(target=engine.dma_depth_data, args=(platform, int(start.timestamp()))).start()
- engine.receive_private_queue_data() # 接收实盘私有信息
-
-
- def run():
- # 当前跑的平台
- # platform = "gate_swap_u"
- # secret_name = "403" # 500_mm_ziying 200
- # symbol = "bnbusdt" # btcusdt
- # sub_id = "0" # 子账户id str
- # settle = 'usdt'
-
- platform = "binance_swap_u_uni"
- secret_name = "801" # 500_mm_ziying 200
- symbol = "dogeusdt" # btcusdt
- sub_id = "0" # 子账户id str
- settle = 'usdt'
-
- # secret_name = sys.argv[1] # 500_mm_ziying 200
- # symbol = sys.argv[2] # btcusdt
- # platform = sys.argv[3] # binance_spot_uni
- # sub_id = sys.argv[4] # 子账户id str
- # settle = 'usdt' if 'usdt' in symbol else 'fdusd'
- print(secret_name, symbol, settle)
-
- # 从配置文件中批量读取参数
- file_name = "data.json"
-
- with open(file_name, "rb") as json_file:
- json_data = json.loads(json_file.read())
-
- pprint(json_data[secret_name][sub_id][symbol])
-
- # 当前测试账户虚拟本金1000U place_rate_open 开仓让位 place_rate_close 平仓让位
- config = {
- symbol: json_data[secret_name][sub_id][symbol]
- }
-
- # ------------------ 修改风控的配置信息 密钥 ---------------------------
- risk_setting = {
- "key": json_data[secret_name]["key"],
- "secret_name": secret_name,
- "sub_id": sub_id,
- "secret": json_data[secret_name]["secret"],
- "sub_key": json_data[secret_name][sub_id]["key"],
- "sub_secret": json_data[secret_name][sub_id]["secret"],
- "active": True, # True 风控启动
- "order_flow_limit_1m": 15, # 根据下单的标签来做1m的下单次数限制
- "net_max_down_level_1": 20, # 实时最大回撤值百分比 超过会提醒
- "net_max_down_level_2": 15,
- "net_max_down_level_3": 10,
- # 单币种的持仓限制
- "position_limit": {symbol_: config[symbol_]["capital"] * config[symbol_]["max_pos_rate"] * 1.04
- for symbol_ in config},
- "position_time_out": 45, # 持仓信息超过45秒没来会提醒
- "webhook_key": ['', ''] # 钉钉发送的接收用户
- }
-
- # 这个是实际的本金
- get_account_ = get_account(risk_setting=risk_setting, platform=platform, settle=settle)
- balance = get_account_.balance + get_account_.unrealised_pnl
-
- print(get_account_.__dict__, balance)
-
- # 更新净值风控
- risk_setting["net_level_1"] = (
- balance * (100 - risk_setting["net_max_down_level_1"]) / 100
- )
- risk_setting["net_level_2"] = (
- balance * (100 - risk_setting["net_max_down_level_2"]) / 100
- )
- risk_setting["net_level_3"] = (
- balance * (100 - risk_setting["net_max_down_level_3"]) / 100
- )
-
- total_capital = sum([config[symbol_]["capital"] for symbol_ in config])
- print("当前实际杠杆倍数:", total_capital / balance)
- print(config)
- print(risk_setting)
-
- directory_name = "json_hub"
-
- # 获取当前工作目录
- current_directory = os.getcwd()
-
- # 构造 json_hub 目录的路径
- json_hub_path = os.path.join(current_directory, directory_name)
-
- # 检查目录是否存在
- if not os.path.exists(json_hub_path):
- # 如果不存在,则创建目录
- os.makedirs(json_hub_path)
- print(f"Directory '{directory_name}' created.")
- else:
- print(f"Directory '{directory_name}' already exists.")
-
- save_setting(setting=config, setting_filename=f'/json_hub/{secret_name}_config.json')
- save_setting(setting=risk_setting) # 将risk_setting 的信息写入配置文件
-
- process = []
- # 创建一个进程间内存共享的队列
- queue_dict = {}
- for index, key in enumerate(config):
- q = multiprocessing.Queue()
- queue_dict[key] = q
- # 限制该币种杠杆倍数 默认3倍
- leverage_limit = 3
- # private_run(queue_dict, config, risk_setting, platform, settle, leverage_limit)
- # 先启动私有信息的发送进程 确保初始化有数据
- p = multiprocessing.Process(
- target=private_run,
- args=(queue_dict, config, risk_setting, platform, settle, leverage_limit)
- )
- p.start()
- time.sleep(3)
-
- # 然后为每个品种启动一个进程来做接收
- for index, key in enumerate(config):
- p1 = multiprocessing.Process(
- target=run_single,
- args=(
- platform,
- key,
- config[key]["capital"],
- config,
- risk_setting,
- queue_dict[key],
- ),
- )
- p1.start()
- process.append(p1)
- time.sleep(60)
- process.append(p)
- for p2 in process:
- p2.join()
-
-
- if __name__ == "__main__":
- run()
|