天择量化公共接口文档
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

441 lines
16 KiB

5 months ago
5 months ago
5 months ago
5 months ago
5 months ago
  1. # -*- coding: utf-8 -*-
  2. # zq
  3. import os
  4. import sys
  5. import time
  6. import json
  7. import threading
  8. import multiprocessing
  9. from pprint import pprint
  10. from datetime import datetime, timedelta
  11. BASE_DIR = os.path.abspath('/tz_crypto')
  12. print(BASE_DIR)
  13. sys.path.append(BASE_DIR)
  14. from tz_ctastrategy import (
  15. ArrayManager,
  16. BarData,
  17. BarGenerator,
  18. OrderData,
  19. StopOrder,
  20. TickData,
  21. TradeData,
  22. )
  23. from tz_ctastrategy.backtesting_mul import BacktestingEngine
  24. from tz_ctastrategy.base import EngineType, BacktestingMode
  25. from tz_ctastrategy.template import CtaTemplate
  26. from tzquant.market import zmq_client
  27. from tzquant.market.log_model import write_file_by_line, log_on_init
  28. from tzquant.trader.object import *
  29. from tzquant.market.keep_alive import ping
  30. from tz_riskmanager.risk_engine import save_setting
  31. from tzquant.market.market_engine import get_account, private_run
  32. from tzquant.market.dingtalker import dingmessage
  33. class AtrRsiStrategy(CtaTemplate):
  34. """"""
  35. parameters = {}
  36. def __init__(
  37. self, cta_engine, strategy_name, vt_symbol, setting, rolling_info=None
  38. ):
  39. """"""
  40. super().__init__(cta_engine, strategy_name, vt_symbol, setting, rolling_info)
  41. if self.cta_engine.engine_type == EngineType.BACKTESTING:
  42. self.bg = BarGenerator(self.on_bar)
  43. else:
  44. self.bg = BarGenerator(self.on_bar, 60, interval=Interval.SECOND)
  45. self.bg10s = BarGenerator(self.on_bar, 10, self.on_10s_bar, interval=Interval.SECOND)
  46. self.am = ArrayManager(size=200)
  47. # 初始化变量
  48. self.atr_value = 0
  49. self.rsi_value = 0
  50. self.ma_value = 0
  51. self.martingale_level = 0 # 马丁策略层级
  52. self.max_cash = 0.0
  53. self.max_down = 0.0
  54. self.max_pos = 0.0
  55. if self.cta_engine.engine_type == EngineType.BACKTESTING:
  56. self.place_order_flag = True
  57. else:
  58. self.place_order_flag = False
  59. # ----------------------- log ----------------------------
  60. self.log = None
  61. self.log_time = datetime.now().day - 1
  62. self.zmq = zmq_client.ZMQClient('XX.XXX.XXX.XX') # 实例化zmq客户端
  63. self.log_message_id = 0
  64. self.s_file_name = f"{self.strategy_name}"
  65. self.sub_id: str = "0"
  66. self.last_tradeid = 0
  67. def on_init(self):
  68. """
  69. Callback when strategy is inited.
  70. """
  71. self.write_log("策略初始化")
  72. # 这个是日志输出的函数
  73. def logger(self):
  74. d_now = datetime.now()
  75. # 每天做一次日志名称更改
  76. if self.log is None or d_now.day != self.log_time:
  77. self.log = log_on_init(
  78. log_dir_name="/tzquant_logs",
  79. log_name=f"{self.strategy_name}-{self.sub_id}_{d_now.year}-{d_now.month:02d}-{d_now.day:02d}.log",
  80. log_out_name=self.strategy_name,
  81. write_mode="a",
  82. line=True,
  83. )
  84. self.log_time = d_now.day
  85. else:
  86. while self.cta_engine.logs:
  87. msg = self.cta_engine.logs.pop()
  88. self.log.info(msg)
  89. result = {
  90. "id": self.log_message_id,
  91. "datetime": d_now.strftime('%Y-%m-%d %H:%M:%S.%f')[:-4],
  92. "timestamp_ms": int(d_now.timestamp() * 1000),
  93. "strategy_id": self.strategy_name,
  94. "level": 'info',
  95. "log": msg
  96. }
  97. self.zmq.send_message(result)
  98. self.log_message_id += 1
  99. def on_start(self):
  100. """
  101. Callback when strategy is started.
  102. """
  103. self.write_log("策略启动")
  104. def on_stop(self):
  105. """
  106. Callback when strategy is stopped.
  107. """
  108. self.write_log("策略停止")
  109. def on_tick(self, tick: TickData):
  110. """
  111. Callback of new tick data update.
  112. """
  113. if tick.name == 'trade':
  114. self.bg.tick_to_bar(tick=tick)
  115. if self.last_tradeid:
  116. if tick.id - self.last_tradeid != 1:
  117. print(f'成交数据有缺失: last:{self.last_tick} now:{tick}')
  118. self.last_tradeid = tick.id
  119. self.last_tick = tick
  120. elif tick.name == 'depth':
  121. pass
  122. def on_bar(self, bar: BarData):
  123. """
  124. Callback of new bar data update.
  125. """
  126. if self.cta_engine.engine_type == EngineType.BACKTESTING:
  127. self.on_10s_bar(bar)
  128. else:
  129. self.bg10s.update_bar(bar)
  130. def on_10s_bar(self, bar: BarData):
  131. try:
  132. self.am.update_bar(bar)
  133. # print(bar)
  134. # if time.time() - bar.closetime / 1000 < 30:
  135. # # 柜台下买单
  136. # res = self.buy(
  137. # price=bar.close * (1 - 0.05),
  138. # volume=10 / bar.close,
  139. # closetime=bar.closetime / 1000,
  140. # order_type=OrderType.POC,
  141. # label=f't-{self.strategy_name}-{self.sub_id}-0'
  142. # )
  143. # print('place order', res)
  144. # # res1 = self.cancel_all(closetime=bar.closetime / 1000)
  145. # # print('cancel all orders', res1)
  146. # if res:
  147. # # 柜台撤销单个订单
  148. # res = self.cancel_order(vt_orderid=res[0],
  149. # closetime=bar.closetime / 1000)
  150. # print('cancel order', res)
  151. #
  152. # self.sell(
  153. # price=bar.close * (1 + 0.05),
  154. # volume=10 / bar.close,
  155. # closetime=bar.closetime / 1000,
  156. # order_type=OrderType.POC,
  157. # label=f't-{self.strategy_name}-{self.sub_id}-1'
  158. # )
  159. if self.cta_engine.engine_type == EngineType.BACKTESTING:
  160. # 持仓均价
  161. self.price = self.cta_engine.ca_balance_now["price"]
  162. if not self.am.inited:
  163. # 初始化未完成之前 撤单到成功为止
  164. if self.cta_engine.active_limit_orders:
  165. while True:
  166. res = self.cancel_all(closetime=datetime.now().timestamp(), mode=1,
  167. order_id_list=list(self.cta_engine.active_limit_orders.keys()))
  168. time.sleep(0.3)
  169. if res:
  170. break
  171. # return
  172. if self.cta_engine.active_limit_orders:
  173. print('active_limit_orders: ', self.cta_engine.active_limit_orders)
  174. # # 柜台撤所有订单 mode 要为 1 order_id_list 要传入当前所有的挂单id
  175. # while True:
  176. # res = self.cancel_all(closetime=bar.closetime / 1000,
  177. # mode=1,
  178. # order_id_list=list(self.cta_engine.active_limit_orders.keys()))
  179. # if res:
  180. # print('cancel success')
  181. # # for index, order_id in enumerate(self.cta_engine.active_limit_orders):
  182. # # print(order_id, type(order_id))
  183. # # time_diff = time.time() - self.cta_engine.active_limit_orders[order_id].datetime.timestamp()
  184. # # if time_diff > 30:
  185. # # msg = f'-----------挂单更新超时-----------:{time_diff}'
  186. # # dingmessage(webhook_key='',
  187. # # secret_name=f'{self.strategy_name}-{self.sub_id}',
  188. # # sub_id=self.sub_id,
  189. # # symbol=self.cta_engine.symbol,
  190. # # msg=msg,
  191. # # async_req=True
  192. # # )
  193. # # print(self.cta_engine.active_limit_orders[order_id].datetime.timestamp())
  194. # break
  195. # time.sleep(0.5)
  196. # print('cancel failed')
  197. except (Exception, BaseException) as e:
  198. self.cta_engine.output(
  199. "on_10s_bar error:%s-----line:%s" % (repr(e), e.__traceback__.tb_lineno)
  200. )
  201. def on_order(self, order: OrderData):
  202. """
  203. Callback of new order data update.
  204. """
  205. pass
  206. def on_trade(self, trade: TradeData):
  207. """
  208. Callback of new trade data update.
  209. """
  210. self.cta_engine.output(f'成交记录:{trade}---持有仓位:{self.pos}')
  211. # self.put_event()
  212. def on_stop_order(self, stop_order: StopOrder):
  213. """
  214. Callback of stop order update.
  215. """
  216. pass
  217. def run_single(
  218. platform: str, symbol: str, capital: int, config: dict, risk_setting: dict, queue
  219. ):
  220. engine = BacktestingEngine()
  221. now_datetime = datetime.now().replace(second=0, microsecond=0)
  222. # ---------------- 实盘 -----------------------
  223. engine.engine_type = EngineType.REAL
  224. engine.gateway_name = EngineType.REAL
  225. start = now_datetime - timedelta(days=40 / 1440)
  226. # start = now_datetime - timedelta(days=1800 / 1440)
  227. engine.set_parameters(
  228. vt_symbol=f"{symbol}.{platform}", # 下单平台
  229. source_exchanges=[platform], # 接收多源平台list
  230. start=start, # 形如:datetime(2022,1,1)
  231. end=now_datetime, # 形如:datetime(2022,1,1)
  232. maker_fee=-0.5 / 10000, # 挂单手续费
  233. taker_fee=3 / 10000, # 吃单手续费
  234. slippage=2 / 10000, # 滑点
  235. size=1, # 杠杆倍数 默认为1
  236. pricetick=0.1, # 价格精度
  237. capital=capital, # 本金
  238. annual_days=365, # 一年的连续交易天数
  239. interval=Interval.SECOND,
  240. mode=BacktestingMode.TICK # tick级别实盘
  241. )
  242. # 将策略添加进引擎
  243. engine.add_strategy(AtrRsiStrategy, {})
  244. # --------------- 加载json 文件的配置信息 ---------------
  245. action_config = config[symbol]
  246. engine.strategy.place_order_flag = action_config["place_order"]
  247. engine.strategy.strategy_name = risk_setting["secret_name"]
  248. engine.strategy.parameters = action_config
  249. engine.strategy.sub_id = risk_setting["sub_id"]
  250. # engine.strategy.on_init()
  251. # 实盘私有信息通过这个队列传输
  252. engine.queue_pri = queue
  253. print("engine.queue_pri:", engine.queue_pri)
  254. # 启动风控 实盘不管大资金还是小资金都要启动风控
  255. engine.strategy.add_risk_manager(
  256. symbol_list=[symbol_ for symbol_ in config],
  257. key=risk_setting["key"],
  258. secret=risk_setting["secret"],
  259. sub_id=risk_setting["sub_id"]
  260. )
  261. # engine.dma_depth_data(platform, int(start.timestamp()))
  262. threading.Thread(target=engine.dma_depth_data, args=(platform, int(start.timestamp()))).start()
  263. engine.receive_private_queue_data() # 接收实盘私有信息
  264. def run():
  265. # 当前跑的平台
  266. # platform = "gate_swap_u"
  267. # secret_name = "403" # 500_mm_ziying 200
  268. # symbol = "bnbusdt" # btcusdt
  269. # sub_id = "0" # 子账户id str
  270. # settle = 'usdt'
  271. platform = "binance_swap_u_uni"
  272. secret_name = "801" # 500_mm_ziying 200
  273. symbol = "dogeusdt" # btcusdt
  274. sub_id = "0" # 子账户id str
  275. settle = 'usdt'
  276. # secret_name = sys.argv[1] # 500_mm_ziying 200
  277. # symbol = sys.argv[2] # btcusdt
  278. # platform = sys.argv[3] # binance_spot_uni
  279. # sub_id = sys.argv[4] # 子账户id str
  280. # settle = 'usdt' if 'usdt' in symbol else 'fdusd'
  281. print(secret_name, symbol, settle)
  282. # 从配置文件中批量读取参数
  283. file_name = "data.json"
  284. with open(file_name, "rb") as json_file:
  285. json_data = json.loads(json_file.read())
  286. pprint(json_data[secret_name][sub_id][symbol])
  287. # 当前测试账户虚拟本金1000U place_rate_open 开仓让位 place_rate_close 平仓让位
  288. config = {
  289. symbol: json_data[secret_name][sub_id][symbol]
  290. }
  291. # ------------------ 修改风控的配置信息 密钥 ---------------------------
  292. risk_setting = {
  293. # 这个是交易所的 key
  294. "key": json_data[secret_name]["key"],
  295. "secret_name": secret_name,
  296. "sub_id": sub_id,
  297. # 这个是交易所的 secret
  298. "secret": json_data[secret_name]["secret"],
  299. # 这个是天择网站上申请的虚拟子账户的 key
  300. "sub_key": json_data[secret_name][sub_id]["key"],
  301. # 这个是天择网站上申请的虚拟子账户的 secret
  302. "sub_secret": json_data[secret_name][sub_id]["secret"],
  303. "active": True, # True 风控启动
  304. "order_flow_limit_1m": 15, # 根据下单的标签来做1m的下单次数限制
  305. "net_max_down_level_1": 20, # 实时最大回撤值百分比 超过会提醒
  306. "net_max_down_level_2": 15,
  307. "net_max_down_level_3": 10,
  308. # 单币种的持仓限制
  309. "position_limit": {symbol_: config[symbol_]["capital"] * config[symbol_]["max_pos_rate"] * 1.04
  310. for symbol_ in config},
  311. "position_time_out": 45, # 持仓信息超过45秒没来会提醒
  312. "webhook_key": ['', ''] # 钉钉发送的接收用户
  313. }
  314. # 这个是实际的本金
  315. get_account_ = get_account(risk_setting=risk_setting, platform=platform, settle=settle)
  316. balance = get_account_.balance + get_account_.unrealised_pnl
  317. print(get_account_.__dict__, balance)
  318. # 更新净值风控
  319. risk_setting["net_level_1"] = (
  320. balance * (100 - risk_setting["net_max_down_level_1"]) / 100
  321. )
  322. risk_setting["net_level_2"] = (
  323. balance * (100 - risk_setting["net_max_down_level_2"]) / 100
  324. )
  325. risk_setting["net_level_3"] = (
  326. balance * (100 - risk_setting["net_max_down_level_3"]) / 100
  327. )
  328. total_capital = sum([config[symbol_]["capital"] for symbol_ in config])
  329. print("当前实际杠杆倍数:", total_capital / balance)
  330. print(config)
  331. print(risk_setting)
  332. directory_name = "json_hub"
  333. # 获取当前工作目录
  334. current_directory = os.getcwd()
  335. # 构造 json_hub 目录的路径
  336. json_hub_path = os.path.join(current_directory, directory_name)
  337. # 检查目录是否存在
  338. if not os.path.exists(json_hub_path):
  339. # 如果不存在,则创建目录
  340. os.makedirs(json_hub_path)
  341. print(f"Directory '{directory_name}' created.")
  342. else:
  343. print(f"Directory '{directory_name}' already exists.")
  344. save_setting(setting=config, setting_filename=f'/json_hub/{secret_name}_config.json')
  345. save_setting(setting=risk_setting) # 将risk_setting 的信息写入配置文件
  346. process = []
  347. # 创建一个进程间内存共享的队列
  348. queue_dict = {}
  349. for index, key in enumerate(config):
  350. q = multiprocessing.Queue()
  351. queue_dict[key] = q
  352. # 限制该币种杠杆倍数 默认3倍
  353. leverage_limit = 3
  354. # 这个是调试用的 不调试要注释掉
  355. # private_run(queue_dict, config, risk_setting, platform, settle, leverage_limit)
  356. # 先启动私有信息的发送进程 确保初始化有数据
  357. p = multiprocessing.Process(
  358. target=private_run,
  359. args=(queue_dict, config, risk_setting, platform, settle, leverage_limit)
  360. )
  361. p.start()
  362. time.sleep(3)
  363. # 然后为每个品种启动一个进程来做接收
  364. for index, key in enumerate(config):
  365. p1 = multiprocessing.Process(
  366. target=run_single,
  367. args=(
  368. platform,
  369. key,
  370. config[key]["capital"],
  371. config,
  372. risk_setting,
  373. queue_dict[key],
  374. ),
  375. )
  376. p1.start()
  377. process.append(p1)
  378. time.sleep(60)
  379. process.append(p)
  380. for p2 in process:
  381. p2.join()
  382. if __name__ == "__main__":
  383. run()