在金融领域,套利是一种策略,即在一个市场买入证券、商品或货币,然后在另一个市场以更高的价格卖出。两个市场之间的差价减去交易费用即为利润。
对于加密货币而言,大多数交易活动都是全天候在众多交易所进行的。资产的价格由供求经济因素决定,而不同交易所的供求经济因素可能并不一致。跨交易所套利策略可以利用这种差异,从而为交易者提供潜在的获利机会。
在本教程中,我们将讲解如何构建一个基于 Python 的套利机器人,该机器人通过追踪加密货币价格和其他加密货币交易所的有用指标来识别套利机会。设置好机器人后,您就可以使用Python 交易机器人在交易所进行自动化交易。
我们将使用CoinGecko API中的 Demo 端点来检索加密货币价格数据。这些端点可以免费访问,但需要密钥身份验证。
-
/exchanges- 获取所有受支持的交易所列表及其相关数据(ID、名称、国家/地区等)的端点。请注意,只有在 CoinGecko 上有活跃交易量的交易所才会被列出。
-
/exchanges/{id}/tickers- 根据给定交易所 ID 查询特定股票代码的端点。响应包含有用数据,例如上次交易时间、最新价格和交易量。
-
/exchange_rates- 获取 BTC 与其他货币的汇率列表。
-
/exchanges/{id}/volume_chart;- 获取带有 unix 时间戳的选定交易所的历史总量数据(以 BTC 为单位)。
然而,由于我们的加密货币套利机器人需要全程运行,因此可能会遇到速率限制。在这种情况下,Pro API 密钥可能会有所帮助。
免责声明:任何使用或依赖我们内容的行为均由您自行承担风险和判断。在依赖我们的内容之前,请自行研究、审阅、分析和验证。交易是一项高风险活动,可能导致重大损失,因此,请在做出任何决定之前咨询您的财务顾问。
先决条件
我们将使用 Jupyter Notebook 来创建并运行该机器人。请确保已安装 Python 3 及以下附加软件包:
pip install jupyterlab
pip install pandas
pip install numpy
pip install pytz
要在新笔记本中开始编码,请执行以下命令:
jupyter lab
这会在浏览器窗口中打开一个新标签页。如果你想使用通过 GitHub 仓库共享的笔记本,请先克隆该仓库,然后在 Jupyter 环境中打开它。
1. 设置项目环境和 API 访问
可以按如下所示加载 Python 包:
| import requests as rq | |
| import json | |
| import pandas as pd | |
| pd.set_option('display.precision', 4, | |
| 'display.colheader_justify', 'center') | |
| import numpy as np | |
| import warnings | |
| import pytz | |
| import datetime | |
| import time | |
| from IPython.display import clear_output |
演示 API 密钥可以从本地文件读取。get_response函数会使用use_demo标头,该函数会为我们发出请求。状态码 200 表示请求成功。
| def get_demo_key(): | |
| f = open("/home/vikas/Documents/CG_demo_key.json") | |
| key_dict = json.load(f) | |
| return key_dict["key"] | |
| use_demo = { | |
| "accept": "application/json", | |
| "x-cg-demo-api-key" : get_demo_key() | |
| } | |
| def get_response(endpoint, headers, params, URL): | |
| url = "".join((URL, endpoint)) | |
| response = rq.get(url, headers = headers, params = params) | |
| if response.status_code == 200: | |
| data = response.json() | |
| return data | |
| else: | |
| print(f"Failed to fetch data, check status code {response.status_code}") |
2. 获取所有加密货币交易所列表
要获取所有加密货币交易所的完整列表,请对CoinGecko API 中的/exchanges端点进行 API 调用。以下查询参数将确保我们在单个页面中获取所有结果。
| # Valid values for results per page is between 1-250 | |
| exchange_params = { | |
| "per_page": 250, | |
| "page": 1 | |
| } |
| exchange_list_response = get_response("/exchanges", use_demo, exchange_params, PUB_URL) | |
| df_ex = pd.DataFrame(exchange_list_response) |
要按交易量列出交易所,可以轻松地根据“trade_volume_24h_btc”列对上述 DataFrame 进行排序。
| df_ex_subset = df_ex[["id", "name", "country", "trade_volume_24h_btc"]] | |
| df_ex_subset = df_ex_subset.sort_values(by = ["trade_volume_24h_btc"], ascending = False) |
3. 获取加密货币交易所代码
对于每个交易所,都有多个加密货币代码(或交易对)的数据可用。我们希望筛选列表,找到我们感兴趣的代码的最新交易。向相关端点发出请求后,我们可以循环遍历响应,找到相关的基础货币和目标货币。如果未找到匹配项,则返回空字符串。这种情况可能发生,因为并非所有交易所都提供所有交易对。
| def get_trade_exchange(id, base_curr, target_curr): | |
| exchange_ticker_response = get_response(f"/exchanges/{id}/tickers", | |
| use_demo, | |
| {}, | |
| PUB_URL) | |
| found_match = "" | |
| for ticker in exchange_ticker_response["tickers"]: | |
| if ticker["base"] == base_curr and ticker["target"] == target_curr: | |
| found_match = ticker | |
| break | |
| if found_match == "": | |
| warnings.warn(f"No data found for {base_curr}-{target_curr} pair in {id}") | |
| return found_match |
4. 将时间戳转换为本地时区(Python)
如上所示,返回的数据带有不同时区的时间戳。在监控机器人活动时,将其转换为我们自己的时区会很有帮助。使用 pytz 库可以轻松实现,例如“Europe/Amsterdam”的时区转换。
| def convert_to_local_tz(old_ts): | |
| new_tz = pytz.timezone("Europe/Amsterdam") | |
| old_tz = pytz.timezone("UTC") | |
| format = "%Y-%m-%dT%H:%M:%S+00:00" | |
| datetime_obj = datetime.datetime.strptime(old_ts, format) | |
| localized_ts = old_tz.localize(datetime_obj) | |
| new_ts = localized_ts.astimezone(new_tz) | |
| return new_ts |
5. 获取多个交易所的加密货币行情数据
现在我们已经知道如何获取单个交易所的股票行情数据,我们可以扩展相同的逻辑来收集多个交易所的数据。为了专注于特定市场,我们还可以添加针对国家/地区的过滤器。从股票行情响应中,我们收集最新交易价格、最新交易量、价差和交易时间(转换为当地时区)。如果交易对未在指定交易所上线,则会显示警告。
| def get_trade_exchange_per_country(country, | |
| base_curr, | |
| target_curr): | |
| df_all = df_ex_subset[(df_ex_subset["country"] == country)] | |
| exchanges_list = df_all["id"] | |
| ex_all = [] | |
| for exchange_id in exchanges_list: | |
| found_match = get_trade_exchange(exchange_id, base_curr, target_curr) | |
| if found_match == "": | |
| continue | |
| else: | |
| temp_dict = dict( | |
| exchange = exchange_id, | |
| last_price = found_match["last"], | |
| last_vol = found_match["volume"], | |
| spread = found_match["bid_ask_spread_percentage"], | |
| trade_time = convert_to_local_tz(found_match["last_traded_at"]) | |
| ) | |
| ex_all.append(temp_dict) | |
| return pd.DataFrame(ex_all) |
6. 获取多种货币的比特币汇率
各个端点的数据(例如,交易所交易量)以BTC为单位报告。对于我们的机器人来说,额外确定特定代码的交易量占总交易量的百分比将会很有趣,这可以让我们进一步了解特定交易所的流动性。
为了将 BTC 转换为不同的目标货币,我们可以通过 CoinGecko API 获取汇率,如下所示:
| def get_exchange_rate(base_curr): | |
| # This returns current BTC to base_curr exchange rate | |
| exchange_rate_response = get_response(f"/exchange_rates", | |
| use_demo, | |
| {}, | |
| PUB_URL) | |
| rate = "" | |
| try: | |
| rate = exchange_rate_response["rates"][base_curr.lower()]["value"] | |
| except KeyError as ke: | |
| print("Currency not found in the exchange rate API response:", ke) | |
| return rate |
7. 获取历史交易量数据
利用给定时间段的历史交易量数据,我们可以使用 7 天窗口期的简单移动平均线来确定最新交易量。然后,可以使用上一节确定的汇率将该交易量(默认为 BTC)转换为我们感兴趣的货币。一旦我们知道了总交易量(所有股票代码的总和),就可以轻松确定我们股票代码交易量的百分比,稍后我们将展示这一点。
| def get_vol_exchange(id, days, base_curr): | |
| vol_params = {"days": days} | |
| exchange_vol_response = get_response(f"/exchanges/{id}/volume_chart", | |
| use_demo, | |
| vol_params, | |
| PUB_URL) | |
| time, volume = [], [] | |
| # Get exchange rate when base_curr is not BTC | |
| ex_rate = 1.0 | |
| if base_curr != "BTC": | |
| ex_rate = get_exchange_rate(base_curr) | |
| # Give a warning when exchange rate is not found | |
| if ex_rate == "": | |
| print(f"Unable to find exchange rate for {base_curr}, vol will be reported in BTC") | |
| ex_rate = 1.0 | |
| for i in range(len(exchange_vol_response)): | |
| # Convert to seconds | |
| s = exchange_vol_response[i][0] / 1000 | |
| time.append(datetime.datetime.fromtimestamp(s).strftime('%Y-%m-%d')) | |
| # Default unit for volume is BTC | |
| volume.append(float(exchange_vol_response[i][1]) * ex_rate) | |
| df_vol = pd.DataFrame(list(zip(time, volume)), columns = ["date", "volume"]) | |
| # Calculate SMA for a specific window | |
| df_vol["volume_SMA"] = df_vol["volume"].rolling(7).mean() | |
| return df_vol.sort_values(by = ["date"], ascending = False).reset_index(drop = True) |
8. 汇总并显示加密货币交易所的交易
在运行机器人之前,重要的是要思考如何随时间汇总数据。本质上,我们的机器人会定期获取最新的交易数据。对于某些交易所,在此期间可能没有新的交易发生;而对于其他交易所,可能会执行许多交易。因此,删除重复数据至关重要。然后,可以使用唯一值的数量来确定交易数量。此外,为了构建可靠的套利策略,收集一段时间内的统计数据也很有帮助。因此,我们将对交易所 ID 执行分组操作,并计算所有相关列的平均值。
此外,还将添加一个包含总交易量百分比的新列(如前几节所示)。
| def display_agg_per_exchange(df_ex_all, base_curr): | |
| # Group data and calculate statistics per exchange | |
| df_agg = ( | |
| df_ex_all.groupby("exchange").agg | |
| ( | |
| trade_time_min = ("trade_time", 'min'), | |
| trade_time_latest = ("trade_time", 'max'), | |
| last_price_mean = ("last_price", 'mean'), | |
| last_vol_mean = ("last_vol", 'mean'), | |
| spread_mean = ("spread", 'mean'), | |
| num_trades = ("last_price", 'count') | |
| ) | |
| ) | |
| # Get time interval over which statistics have been calculated | |
| df_agg["trade_time_duration"] = df_agg["trade_time_latest"] - df_agg["trade_time_min"] | |
| # Reset columns so that we can access exchanges below | |
| df_agg = df_agg.reset_index() | |
| # Calculate % of total volume for all exchanges | |
| last_vol_pert = [] | |
| for i, row in df_agg.iterrows(): | |
| try: | |
| df_vol = get_vol_exchange(row["exchange"], 30, base_curr) | |
| current_vol = df_vol["volume_SMA"][0] | |
| vol_pert = (row["last_vol_mean"] / current_vol) * 100 | |
| last_vol_pert.append(vol_pert) | |
| except: | |
| last_vol_pert.append("") | |
| continue | |
| # Add % of total volume column | |
| df_agg["last_vol_pert"] = last_vol_pert | |
| # Remove redundant column | |
| df_agg = df_agg.drop(columns = ["trade_time_min"]) | |
| # Round all float values | |
| # (seems to be overwritten by style below) | |
| df_agg = df_agg.round({"last_price_mean": 2, | |
| "last_vol_mean": 2, | |
| "spread_mean": 2 | |
| }) | |
| display(df_agg.style.apply(highlight_max_min, | |
| color = 'green', | |
| subset = "last_price_mean") | |
| ) | |
| return None |
| def highlight_max_min(x, color): | |
| return np.where((x == np.nanmax(x.to_numpy())) | | |
| (x == np.nanmin(x.to_numpy())), | |
| f"color: {color};", | |
| None) |
9. 运行加密货币交易套利机器人
我们的机器人需要持续监控多个交易所的最新交易。因此,我们将执行一个包含 while 语句的单元。这将使代码持续运行,直到用户停止。为了在更新之间引入一分钟的延迟,我们将使用 sleep 语句。由于 API 本身在演示计划中每分钟都会刷新,因此无需设置更短的延迟。
| def run_bot(country, | |
| base_curr, | |
| target_curr): | |
| df_ex_all = get_trade_exchange_per_country(country, base_curr, target_curr) | |
| # Collect data every minute | |
| while True: | |
| time.sleep(60) | |
| df_new = get_trade_exchange_per_country(country, base_curr, target_curr) | |
| # Merge to existing DataFrame | |
| df_ex_all = pd.concat([df_ex_all, df_new]) | |
| # Remove duplicate rows based on all columns | |
| df_ex_all = df_ex_all.drop_duplicates() | |
| # Clear previous display once new one is available | |
| clear_output(wait = True) | |
| display_agg_per_exchange(df_ex_all, base_curr) | |
| return None |
绿色突出显示的是最低价格(Coinlist)和最高价格(Binance US)。因此,一个套利策略示例是在 Coinlist 上买入 ETH,然后立即在 Binance US 上卖出。
另一个值得关注的点是买卖价差与交易数量之间的相关性。在Gemini上,价差相当高——这表明该交易对的流动性较低。与同期其他交易所相比,Gemini 的交易数量较低(仅 2 笔!),这进一步证实了这一点。
要停止机器人,请导航至顶部的“内核”选项卡并选择“中断内核”
结论
通过在单个 Jupyter 笔记本中利用 CoinGecko 的 API 和 Python 编程的强大功能,我们能够持续监控来自各个加密货币交易所的最新交易并突出跨交易所套利机会。
