|
|
- # coding: utf-8
- """
- Gate API v4
-
- Welcome to Gate.io API APIv4 provides spot, margin and futures trading operations. There are public APIs to retrieve the real-time market statistics, and private APIs which needs authentication to trade on user's behalf. # noqa: E501
-
- Contact: support@mail.gate.io
- Generated by: https://openapi-generator.tech
- """
-
- from __future__ import absolute_import
-
- import atexit
- import datetime
- import hashlib
- import hmac
- import time
- from six.moves.urllib.parse import unquote_plus, urlencode, urlparse
-
- from dateutil.parser import parse
- import json
- import mimetypes
- from multiprocessing.pool import ThreadPool
- import os
- import re
- import tempfile
-
- # python 2 and python 3 compatibility library
- import six
- from six.moves.urllib.parse import quote
-
- from gate_api.configuration import Configuration
- import gate_api.models
- from gate_api import rest
- from gate_api.exceptions import ApiValueError, ApiException, GateApiException
-
-
- class ApiClient(object):
- """Generic API client for OpenAPI client library builds.
-
- OpenAPI generic API client. This client handles the client-
- server communication, and is invariant across implementations. Specifics of
- the methods and models for each application are generated from the OpenAPI
- templates.
-
- NOTE: This class is auto generated by OpenAPI Generator.
- Ref: https://openapi-generator.tech
- Do not edit the class manually.
-
- :param configuration: .Configuration object for this client
- :param header_name: a header to pass when making calls to the API.
- :param header_value: a header value to pass when making calls to
- the API.
- :param cookie: a cookie to include in the header when making calls
- to the API
- :param pool_threads: The number of threads to use for async requests
- to the API. More threads means more concurrent API requests.
- """
-
- PRIMITIVE_TYPES = (float, bool, bytes, six.text_type) + six.integer_types
- NATIVE_TYPES_MAPPING = {
- 'int': int,
- 'long': int if six.PY3 else long, # noqa: F821
- 'float': float,
- 'str': str,
- 'bool': bool,
- 'date': datetime.date,
- 'datetime': datetime.datetime,
- 'object': object,
- }
- _pool = None
-
- def __init__(self, configuration=None, header_name=None, header_value=None, cookie=None, pool_threads=1):
- if configuration is None:
- configuration = Configuration.get_default_copy()
- self.configuration = configuration
- self.pool_threads = pool_threads
-
- self.rest_client = rest.RESTClientObject(configuration)
- self.default_headers = {}
- if header_name is not None:
- self.default_headers[header_name] = header_value
- self.cookie = cookie
- # Set default User-Agent.
- self.user_agent = 'OpenAPI-Generator/4.42.0/python'
- self.client_side_validation = configuration.client_side_validation
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- self.close()
-
- def close(self):
- if self._pool:
- self._pool.close()
- self._pool.join()
- self._pool = None
- if hasattr(atexit, 'unregister'):
- atexit.unregister(self.close)
-
- @property
- def pool(self):
- """Create thread pool on first request
- avoids instantiating unused threadpool for blocking clients.
- """
- if self._pool is None:
- atexit.register(self.close)
- self._pool = ThreadPool(self.pool_threads)
- return self._pool
-
- @property
- def user_agent(self):
- """User agent for this API client"""
- return self.default_headers['User-Agent']
-
- @user_agent.setter
- def user_agent(self, value):
- self.default_headers['User-Agent'] = value
-
- def set_default_header(self, header_name, header_value):
- self.default_headers[header_name] = header_value
-
- def __call_api(
- self,
- resource_path,
- method,
- path_params=None,
- query_params=None,
- header_params=None,
- body=None,
- post_params=None,
- files=None,
- response_type=None,
- auth_settings=None,
- _return_http_data_only=None,
- collection_formats=None,
- _preload_content=True,
- _request_timeout=None,
- _host=None,
- ):
-
- config = self.configuration
-
- # header parameters
- header_params = header_params or {}
- header_params.update(self.default_headers)
- if self.cookie:
- header_params['Cookie'] = self.cookie
- if header_params:
- header_params = self.sanitize_for_serialization(header_params)
- header_params = dict(self.parameters_to_tuples(header_params, collection_formats))
-
- # path parameters
- if path_params:
- path_params = self.sanitize_for_serialization(path_params)
- path_params = self.parameters_to_tuples(path_params, collection_formats)
- for k, v in path_params:
- # specified safe chars, encode everything
- resource_path = resource_path.replace('{%s}' % k, quote(str(v), safe=config.safe_chars_for_path_param))
-
- # query parameters
- if query_params:
- query_params = self.sanitize_for_serialization(query_params)
- query_params = self.parameters_to_tuples(query_params, collection_formats)
-
- # post parameters
- if post_params or files:
- post_params = post_params if post_params else []
- post_params = self.sanitize_for_serialization(post_params)
- post_params = self.parameters_to_tuples(post_params, collection_formats)
- post_params.extend(self.files_parameters(files))
-
- # body
- if body:
- body = self.sanitize_for_serialization(body)
-
- # request url
- if _host is None:
- url = self.configuration.host + resource_path
- else:
- # use server/host defined in path or operation instead
- url = _host + resource_path
-
- # auth setting
- self.update_params_for_auth(method, url, header_params, query_params, body, auth_settings)
-
- try:
- # perform request and return response
- response_data = self.request(
- method,
- url,
- query_params=query_params,
- headers=header_params,
- post_params=post_params,
- body=body,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- )
- except ApiException as e:
- e.body = e.body.decode('utf-8') if six.PY3 else e.body
- try:
- err = json.loads(e.body)
- except ValueError:
- raise e
- else:
- if not err.get('label'):
- raise e
- raise GateApiException(err.get('label'), err.get('message'), err.get('detail'), e)
-
- content_type = response_data.getheader('content-type')
-
- self.last_response = response_data
-
- return_data = response_data
-
- if not _preload_content:
- return return_data
-
- if six.PY3 and response_type not in ["file", "bytes"]:
- match = None
- if content_type is not None:
- match = re.search(r"charset=([a-zA-Z\-\d]+)[\s\;]?", content_type)
- encoding = match.group(1) if match else "utf-8"
- response_data.data = response_data.data.decode(encoding)
-
- # deserialize response data
- if response_type:
- return_data = self.deserialize(response_data, response_type)
- else:
- return_data = None
-
- if _return_http_data_only:
- return return_data
- else:
- return (return_data, response_data.status, response_data.getheaders())
-
- def sanitize_for_serialization(self, obj):
- """Builds a JSON POST object.
-
- If obj is None, return None.
- If obj is str, int, long, float, bool, return directly.
- If obj is datetime.datetime, datetime.date
- convert to string in iso8601 format.
- If obj is list, sanitize each element in the list.
- If obj is dict, return the dict.
- If obj is OpenAPI model, return the properties dict.
-
- :param obj: The data to serialize.
- :return: The serialized form of data.
- """
- if obj is None:
- return None
- elif isinstance(obj, self.PRIMITIVE_TYPES):
- return obj
- elif isinstance(obj, list):
- return [self.sanitize_for_serialization(sub_obj) for sub_obj in obj]
- elif isinstance(obj, tuple):
- return tuple(self.sanitize_for_serialization(sub_obj) for sub_obj in obj)
- elif isinstance(obj, (datetime.datetime, datetime.date)):
- return obj.isoformat()
-
- if isinstance(obj, dict):
- obj_dict = obj
- else:
- # Convert model obj to dict except
- # attributes `openapi_types`, `attribute_map`
- # and attributes which value is not None.
- # Convert attribute name to json key in
- # model definition for request.
- obj_dict = {
- obj.attribute_map[attr]: getattr(obj, attr)
- for attr, _ in six.iteritems(obj.openapi_types)
- if getattr(obj, attr) is not None
- }
-
- return {key: self.sanitize_for_serialization(val) for key, val in six.iteritems(obj_dict)}
-
- def deserialize(self, response, response_type):
- """Deserializes response into an object.
-
- :param response: RESTResponse object to be deserialized.
- :param response_type: class literal for
- deserialized object, or string of class name.
-
- :return: deserialized object.
- """
- # handle file downloading
- # save response body into a tmp file and return the instance
- if response_type == "file":
- return self.__deserialize_file(response)
-
- # fetch data from response object
- try:
- data = json.loads(response.data)
- except ValueError:
- data = response.data
-
- return self.__deserialize(data, response_type)
-
- def __deserialize(self, data, klass):
- """Deserializes dict, list, str into an object.
-
- :param data: dict, list or str.
- :param klass: class literal, or string of class name.
-
- :return: object.
- """
- if data is None:
- return None
-
- if type(klass) == str:
- if klass.startswith('list['):
- sub_kls = re.match(r'list\[(.*)\]', klass).group(1)
- return [self.__deserialize(sub_data, sub_kls) for sub_data in data]
-
- if klass.startswith('dict('):
- sub_kls = re.match(r'dict\(([^,]*), (.*)\)', klass).group(2)
- return {k: self.__deserialize(v, sub_kls) for k, v in six.iteritems(data)}
-
- # convert str to class
- if klass in self.NATIVE_TYPES_MAPPING:
- klass = self.NATIVE_TYPES_MAPPING[klass]
- else:
- klass = getattr(gate_api.models, klass)
-
- if klass in self.PRIMITIVE_TYPES:
- return self.__deserialize_primitive(data, klass)
- elif klass == object:
- return self.__deserialize_object(data)
- elif klass == datetime.date:
- return self.__deserialize_date(data)
- elif klass == datetime.datetime:
- return self.__deserialize_datetime(data)
- else:
- return self.__deserialize_model(data, klass)
-
- def call_api(
- self,
- resource_path,
- method,
- path_params=None,
- query_params=None,
- header_params=None,
- body=None,
- post_params=None,
- files=None,
- response_type=None,
- auth_settings=None,
- async_req=None,
- _return_http_data_only=None,
- collection_formats=None,
- _preload_content=True,
- _request_timeout=None,
- _host=None,
- ):
- """Makes the HTTP request (synchronous) and returns deserialized data.
-
- To make an async_req request, set the async_req parameter.
-
- :param resource_path: Path to method endpoint.
- :param method: Method to call.
- :param path_params: Path parameters in the url.
- :param query_params: Query parameters in the url.
- :param header_params: Header parameters to be
- placed in the request header.
- :param body: Request body.
- :param list post_params: Request post form parameters,
- for `application/x-www-form-urlencoded`, `multipart/form-data`.
- :param list auth_settings: Auth Settings names for the request.
- :param response_type: Response data type.
- :param dict files: key -> filename, value -> filepath,
- for `multipart/form-data`.
- :param bool async_req: execute request asynchronously
- :param _return_http_data_only: response data without head status code
- and headers
- :param collection_formats: dict of collection formats for path, query,
- header, and post parameters.
- :param _preload_content: if False, the urllib3.HTTPResponse object will
- be returned without reading/decoding response
- data. Default is True.
- :param _request_timeout: timeout setting for this request. If one
- number provided, it will be total request
- timeout. It can also be a pair (tuple) of
- (connection, read) timeouts.
- :param _host: server/host defined in path or operation instead
- :return:
- If async_req parameter is True,
- the request will be called asynchronously.
- The method will return the request thread.
- If parameter async_req is False or missing,
- then the method will return the response directly.
- """
- if not async_req:
- return self.__call_api(
- resource_path,
- method,
- path_params,
- query_params,
- header_params,
- body,
- post_params,
- files,
- response_type,
- auth_settings,
- _return_http_data_only,
- collection_formats,
- _preload_content,
- _request_timeout,
- _host,
- )
-
- return self.pool.apply_async(
- self.__call_api,
- (
- resource_path,
- method,
- path_params,
- query_params,
- header_params,
- body,
- post_params,
- files,
- response_type,
- auth_settings,
- _return_http_data_only,
- collection_formats,
- _preload_content,
- _request_timeout,
- _host,
- ),
- )
-
- def request(
- self,
- method,
- url,
- query_params=None,
- headers=None,
- post_params=None,
- body=None,
- _preload_content=True,
- _request_timeout=None,
- ):
- """Makes the HTTP request using RESTClient."""
- if method == "GET":
- return self.rest_client.GET(
- url,
- query_params=query_params,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- headers=headers,
- )
- elif method == "HEAD":
- return self.rest_client.HEAD(
- url,
- query_params=query_params,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- headers=headers,
- )
- elif method == "OPTIONS":
- return self.rest_client.OPTIONS(
- url,
- query_params=query_params,
- headers=headers,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- )
- elif method == "POST":
- return self.rest_client.POST(
- url,
- query_params=query_params,
- headers=headers,
- post_params=post_params,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- body=body,
- )
- elif method == "PUT":
- return self.rest_client.PUT(
- url,
- query_params=query_params,
- headers=headers,
- post_params=post_params,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- body=body,
- )
- elif method == "PATCH":
- return self.rest_client.PATCH(
- url,
- query_params=query_params,
- headers=headers,
- post_params=post_params,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- body=body,
- )
- elif method == "DELETE":
- return self.rest_client.DELETE(
- url,
- query_params=query_params,
- headers=headers,
- _preload_content=_preload_content,
- _request_timeout=_request_timeout,
- body=body,
- )
- else:
- raise ApiValueError("http method must be `GET`, `HEAD`, `OPTIONS`," " `POST`, `PATCH`, `PUT` or `DELETE`.")
-
- def parameters_to_tuples(self, params, collection_formats):
- """Get parameters as list of tuples, formatting collections.
-
- :param params: Parameters as dict or list of two-tuples
- :param dict collection_formats: Parameter collection formats
- :return: Parameters as list of tuples, collections formatted
- """
- new_params = []
- if collection_formats is None:
- collection_formats = {}
- for k, v in six.iteritems(params) if isinstance(params, dict) else params: # noqa: E501
- if k in collection_formats:
- collection_format = collection_formats[k]
- if collection_format == 'multi':
- new_params.extend((k, value) for value in v)
- else:
- if collection_format == 'ssv':
- delimiter = ' '
- elif collection_format == 'tsv':
- delimiter = '\t'
- elif collection_format == 'pipes':
- delimiter = '|'
- else: # csv is the default
- delimiter = ','
- new_params.append((k, delimiter.join(str(value) for value in v)))
- else:
- new_params.append((k, v))
- return new_params
-
- def files_parameters(self, files=None):
- """Builds form parameters.
-
- :param files: File parameters.
- :return: Form parameters with files.
- """
- params = []
-
- if files:
- for k, v in six.iteritems(files):
- if not v:
- continue
- file_names = v if type(v) is list else [v]
- for n in file_names:
- with open(n, 'rb') as f:
- filename = os.path.basename(f.name)
- filedata = f.read()
- mimetype = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
- params.append(tuple([k, tuple([filename, filedata, mimetype])]))
-
- return params
-
- def select_header_accept(self, accepts):
- """Returns `Accept` based on an array of accepts provided.
-
- :param accepts: List of headers.
- :return: Accept (e.g. application/json).
- """
- if not accepts:
- return
-
- accepts = [x.lower() for x in accepts]
-
- if 'application/json' in accepts:
- return 'application/json'
- else:
- return ', '.join(accepts)
-
- def select_header_content_type(self, content_types):
- """Returns `Content-Type` based on an array of content_types provided.
-
- :param content_types: List of content-types.
- :return: Content-Type (e.g. application/json).
- """
- if not content_types:
- return 'application/json'
-
- content_types = [x.lower() for x in content_types]
-
- if 'application/json' in content_types or '*/*' in content_types:
- return 'application/json'
- else:
- return content_types[0]
-
- def update_params_for_auth(self, method, url, headers, querys, body, auth_settings):
- """Updates header and query params based on authentication setting.
-
- :param method: Request method
- :param url: Request path, host included
- :param headers: Header parameters dict to be updated.
- :param querys: Query parameters tuple list to be updated.
- :param body: Request body
- :param auth_settings: Authentication setting identifiers list.
- """
- if not auth_settings:
- return
-
- for auth in auth_settings:
- auth_setting = self.configuration.auth_settings().get(auth)
- if auth_setting:
- if auth_setting['type'] == 'apiv4':
- auth_headers = self.gen_sign(method, urlparse(url).path, unquote_plus(urlencode(querys)), body)
- headers.update(auth_headers)
- continue
- if auth_setting['in'] == 'cookie':
- headers['Cookie'] = auth_setting['value']
- elif auth_setting['in'] == 'header':
- headers[auth_setting['key']] = auth_setting['value']
- elif auth_setting['in'] == 'query':
- querys.append((auth_setting['key'], auth_setting['value']))
- else:
- raise ApiValueError('Authentication token must be in `query` or `header`')
-
- def gen_sign(self, method, url, query_string=None, body=None):
- """generate authentication headers
-
- :param method: http request method
- :param url: http resource path
- :param query_string: query string
- :param body: request body
- :return: signature headers
- """
- t = time.time()
- m = hashlib.sha512()
- if body is not None:
- if not isinstance(body, six.string_types):
- body = json.dumps(body)
- m.update(body.encode('utf-8'))
- hashed_payload = m.hexdigest()
- s = '%s\n%s\n%s\n%s\n%s' % (method, url, query_string or "", hashed_payload, t)
- sign = hmac.new(self.configuration.secret.encode('utf-8'), s.encode('utf-8'), hashlib.sha512).hexdigest()
- return {'KEY': self.configuration.key, 'Timestamp': str(t), 'SIGN': sign}
-
- def __deserialize_file(self, response):
- """Deserializes body to file
-
- Saves response body into a file in a temporary folder,
- using the filename from the `Content-Disposition` header if provided.
-
- :param response: RESTResponse.
- :return: file path.
- """
- fd, path = tempfile.mkstemp(dir=self.configuration.temp_folder_path)
- os.close(fd)
- os.remove(path)
-
- content_disposition = response.getheader("Content-Disposition")
- if content_disposition:
- filename = re.search(r'filename=[\'"]?([^\'"\s]+)[\'"]?', content_disposition).group(1)
- path = os.path.join(os.path.dirname(path), filename)
-
- with open(path, "wb") as f:
- f.write(response.data)
-
- return path
-
- def __deserialize_primitive(self, data, klass):
- """Deserializes string to primitive type.
-
- :param data: str.
- :param klass: class literal.
-
- :return: int, long, float, str, bool.
- """
- try:
- return klass(data)
- except UnicodeEncodeError:
- return six.text_type(data)
- except TypeError:
- return data
-
- def __deserialize_object(self, value):
- """Return an original value.
-
- :return: object.
- """
- return value
-
- def __deserialize_date(self, string):
- """Deserializes string to date.
-
- :param string: str.
- :return: date.
- """
- try:
- return parse(string).date()
- except ImportError:
- return string
- except ValueError:
- raise rest.ApiException(status=0, reason="Failed to parse `{0}` as date object".format(string))
-
- def __deserialize_datetime(self, string):
- """Deserializes string to datetime.
-
- The string should be in iso8601 datetime format.
-
- :param string: str.
- :return: datetime.
- """
- try:
- return parse(string)
- except ImportError:
- return string
- except ValueError:
- raise rest.ApiException(status=0, reason=("Failed to parse `{0}` as datetime object".format(string)))
-
- def __deserialize_model(self, data, klass):
- """Deserializes list or dict to model.
-
- :param data: dict, list.
- :param klass: class literal.
- :return: model object.
- """
- has_discriminator = False
- if hasattr(klass, 'get_real_child_model') and klass.discriminator_value_class_map:
- has_discriminator = True
-
- if not klass.openapi_types and has_discriminator is False:
- return data
-
- kwargs = {}
- if data is not None and klass.openapi_types is not None and isinstance(data, (list, dict)):
- for attr, attr_type in six.iteritems(klass.openapi_types):
- if klass.attribute_map[attr] in data:
- value = data[klass.attribute_map[attr]]
- kwargs[attr] = self.__deserialize(value, attr_type)
-
- instance = klass(**kwargs)
-
- if has_discriminator:
- klass_name = instance.get_real_child_model(data)
- if klass_name:
- instance = self.__deserialize(data, klass_name)
- return instance
|