大多数交易者在交易所手动交易时面临同样的问题:
这太耗时间了。追踪图表、追踪价格走势、寻找交易机会真是让人精疲力尽。加密货币市场永不停歇,但我们却如此,这意味着交易机会的错失。
人类反应太慢了。实时对价格波动做出反应几乎不可能,尤其是在波动剧烈的加密货币市场。等你分析好走势并下单时,机器人和高频交易员早已获利了结。
情绪会阻碍交易。 每个交易员都告诉自己要理性交易,但“害怕错过”(FOMO)的情绪会袭来,恐慌性抛售占据上风。几笔糟糕的交易之后,我们就开始对每一次交易都产生怀疑。
如果您是开发人员或具备一定的技术技能,您可能已经考虑过构建自己的加密货币交易机器人。在本教程中,我们将学习如何使用 Python 在中心化加密货币交易所 (CEX) 上自动执行交易,同时利用CoinGecko API监控任何指定交易所的最新交易。本指南基于我之前的教程,我们开发了一个加密货币套利机器人,用于监控特定加密货币对在多个交易所的最新价格。
让我们开始吧!
什么是加密交易机器人?它们能盈利吗?
理论上,加密货币交易机器人听起来像是作弊代码。它们自动执行交易,对价格变动做出即时反应,而且不受情绪影响。但任何真正使用过它们的人都知道,事情远没有那么简单。
构建机器人很容易,但让它持续盈利却很难。资深交易员会花费大量时间完善策略和调整算法,以优化性能。话虽如此,如果您已经有策略并希望实现自动化,那么从头开始构建机器人是值得的。
市面上有很多预置的交易机器人,但如果您想要额外的定制化功能,并完全掌控策略、执行和风险管理,那么您就需要构建自己的交易机器人。与依赖通用算法的预置机器人不同,定制机器人可以根据您的交易风格进行定制,针对特定的市场条件进行优化,并与您的盈利目标保持一致。
先决条件
与套利机器人教程类似,我们将再次使用 Jupyter Notebook 作为主要开发环境。请确保已安装以下附加软件包:
pip install sqlalchemy
pip install coinbase-advanced-py
要在新笔记本中开始编码,请执行以下命令:
jupyter lab
这会在浏览器中打开一个新标签页。如果您想使用通过 GitHub 仓库共享的笔记本,请先克隆该仓库,然后在 Jupyter 环境中打开它。请确保将 API 密钥的路径替换为您自己的路径。
您还需要访问 CoinGecko 和 Coinbase API:
CoinGecko API:我们将使用 CoinGecko API 根据指定的交易所 ID获取代码。免费的演示计划(每分钟 30 次调用,每月 10,000 次调用)足以完成本教程。注册一个 CoinGecko 帐户,并按照以下步骤生成免费的演示 API 密钥。
Coinbase API:您需要一个正常运行的 Coinbase 用户帐户来生成交易 API 密钥,稍后会详细介绍。您也可以选择使用Coinbase 开发者门户 (CDP)来生成和管理密钥。
使用的 API 端点
我们将首先扩展我们的套利机器人,使其能够收集交易数据并将其存储在一个简单的数据库中,以便我们能够直观地比较不同交易所的交易活动。一旦我们观察到有利的趋势,我们将使用 Coinbase Advanced API 执行市价买入订单。我们将进一步探索卖单和限价订单。
以监控 Coinbase 上的最新交易数据为例,我们将使用CoinGecko API并调用/exchanges/{id}/tickers端点。此端点根据给定的交易所 ID 查询特定的代码。响应包含有用的数据,例如上次交易的时间、最新价格和交易量。
为了在 Coinbase 上执行交易,我们将使用Advanced Python SDK,它为以下端点(本文中使用)提供了包装器:
api/v3/brokerage/time - 查询高级 API 服务器时间的端点
api/v3/brokerage/key_permissions - 获取与提供的 API 密钥相关的权限的端点
api/v3/brokerage/market/products/{product_id}/ticker - 获取给定的最新交易的端点
api/v3/brokerage/portfolios/{portfolio_uuid} - 获取给定投资组合明细的端点
api/v3/brokerage/orders - 下达买入市价单的端点
api/v3/brokerage/orders/historical/{order_id} - 获取订单详细信息的端点
构建交易机器人的分步指南
步骤 1:获取价格和交易活动
为了持续监控特定交易所的最新交易,我们需要定期收集数据(如之前套利机器人文章中所述,可以每分钟收集一次)。然而,长时间存储所有这些数据可能会导致系统内存不足。因此,一个解决方案是将数据写入以文件形式存储在磁盘上的SQLite数据库中。我们将对机器人进行编程,使其仅在需要时读取相关交易所的数据,从而实现全天候运行。
使用sqlalchemy包,我们可以将 pandas DataFrame 写入SQLite数据库。每个交易所的数据都写入单独的表。
from sqlalchemy import create_engine | |
def create_db(df_ex_all, engine): | |
for ex in df_ex_all["exchange"]: | |
df_ex = df_ex_all[df_ex_all["exchange"] == ex] | |
# Write to a new separate table in sqlite database | |
df_ex.to_sql(name = ex, con = engine, if_exists = 'replace') | |
return None |
def append_to_db(df_ex_all, engine): | |
for ex in df_ex_all["exchange"]: | |
try: | |
df_ex = df_ex_all[df_ex_all["exchange"] == ex] | |
# Remove timezone information for easier comparison later | |
new_ts = df_ex["trade_time"].iloc[0] | |
new_ts = new_ts.replace(tzinfo = None) | |
# Read existing table | |
df_table = pd.read_sql(f'SELECT * FROM {ex}', engine) | |
# Read last row (should be latest) | |
existing_ts = datetime.datetime.strptime( | |
df_table["trade_time"].iloc[-1], | |
"%Y-%m-%d %H:%M:%S.%f") | |
if existing_ts == new_ts: | |
print("Last trade already exists!") | |
else: | |
df_ex.to_sql(name = ex, con = engine, if_exists = 'append') | |
except: | |
warnings.warn(f'Unable to append data for {ex}') | |
continue | |
return None |
import plotly.express as px | |
def plot_ex_from_db(ex, engine, base_curr, target_curr): | |
# Read existing table | |
df_table = pd.read_sql(f'SELECT * FROM {ex}', engine) | |
fig = px.line(df_table, | |
x = df_table.trade_time, | |
y = df_table.last_price, | |
title = f"{base_curr}-{target_curr} price data for {ex}", | |
markers = True | |
) | |
return fig |
country, base_curr, target_curr
) 用于配置数据库文件名。这将确保不同货币对的数据存储在单独的文件中,并确保在数据库损坏的情况下不会丢失所有数据。def run_bot_db(country, | |
ex, | |
base_curr, | |
target_curr): | |
engine = create_engine(f'sqlite:///{country}_{base_curr}_{target_curr}', | |
echo = False) | |
df_ex_all = get_trade_exchange_per_country(country, | |
Base_curr, | |
target_curr) | |
try: | |
create_db(df_ex_all, engine) | |
except: | |
warnings.warn(f"Unable to create a new sqlite database!") | |
# Collect data every minute | |
while True: | |
time.sleep(60) | |
df_new = get_trade_exchange_per_country(country, | |
Base_curr, | |
target_curr) | |
# Write new trade data to existing sqlite database | |
append_to_db(df_new, engine) | |
# Clear previous display once new one is available | |
clear_output(wait = True) | |
# Plot from database | |
fig = plot_ex_from_db(ex, engine, base_curr, target_curr) | |
fig.show() | |
return None |
价格最初似乎呈下降趋势,现在出现了反转迹象。这可能是在 Coinbase 上下单“买入” ETH的好机会。
engine = create_engine(f'sqlite:///United States_ETH_USD',
echo = False)
ex = "kraken"
fig = plot_ex_from_db(ex, engine, "BTC", "USD")
fig.show()
步骤2:设置Coinbase API密钥
API 密钥可在开发者平台上生成,如下所示。建议创建单独的投资组合,用于执行不同的交易策略。API 密钥可以与特定投资组合关联,从而限制其使用并防止意外下单。
要创建单独的投资组合,请导航至 Coinbase 主页,打开左侧的“投资组合”选项卡。然后点击“新建投资组合”,并根据预期用途为其命名。
资金可以从其他或“默认”投资组合转移,如下所示:
打开开发者平台→访问→“创建API密钥”,生成新的API密钥。
您可以通过 API 密钥生成器弹出窗口底部的下拉框选择相关投资组合。请注意,“默认”投资组合已存在。
关键权限可以设置为“查看”(测试代码时安全,无法执行订单)或“交易”(允许下订单)。
下载 json 文件并将其保存到安全的位置。现在我们将导入该类RESTClient
并初始化它。
from coinbase.rest import RESTClient | |
# Get demo API key | |
def get_demo_key(key_type): | |
f = open(f"/home/vikas/Documents/Demo_{key_type}_CG_key.json") | |
key_dict = json.load(f) | |
return key_dict["name"], key_dict["privateKey"] | |
api_key, api_secret = get_demo_key("view") | |
view_client = RESTClient(api_key = api_key, | |
api_secret = api_secret) |
让我们看看是否可以使用此密钥访问一些可用的端点。要检查当前服务器时间,请执行以下操作:
要检查 API 密钥权限,可以进行以下调用:
步骤3:获取投资组合详情
投资组合创建完成后,其详细信息可通过编程方式访问。这在处理大量采用不同交易策略的投资组合时尤其有用。
给定投资组合 uuid,还可以获取更多详细信息(例如余额分配)。
步骤 4:下达市场买入订单
现在一切就绪,是时候执行一些市价单了。相关的 API 还接受一个唯一的客户 ID 作为输入。这确保不会错误地下达重复订单。我们可以使用以下函数生成给定长度的随机字符串,并将其用作客户 ID 的输入。
def generate_client_id(id_length): | |
return "".join(f"{randint(0, 9)}" for _ in range(id_length)) |
buy_order = trade_client.market_order_buy(
client_order_id = generate_client_id(10),
product_id = "ETH-EUR",
quote_size = "5")
查看响应详情,我们可以验证订单是否已成功下单。由于这是一个仅用于演示的小订单,预计很快就会完成。
如果需要,可以进一步使用 order_id(上面突出显示,为了保护隐私而部分隐藏)来获取其他信息。
您还可以在 Coinbase 网页界面上查看订单摘要。请记住选择正确的投资组合,如下所示。
buy_order_preview = trade_client.preview_market_order_buy(
product_id = "ETH-EUR",
quote_size = "5")
步骤 5:下达市价卖单
在下达卖单之前,我们先来查看一下订单摘要。
sell_order_preview = trade_client.preview_market_order_sell(
product_id = "ETH-EUR",
base_size = "0.0025")
这次我们使用了 ' base_size
' 参数,它指的是交易对中的第一个资产。在本例中,它是 ETH。
确认预览看起来没问题后,我们可以继续下达卖单。
sell_order = trade_client.market_order_sell(
client_order_id = generate_client_id(10),
product_id = "ETH-EUR",
base_size = "0.0025")
使用
order_id
,可以获取更多详细信息,类似于之前显示的购买订单。
步骤6:下限价单
限价单旨在保护交易者在订单执行过程中免受价格快速波动的影响。例如,限价买单可用于确保资产的买入价格低于交易者设定的最高基准价格。同样,限价卖单则用于确保资产的卖出价格等于或高于预设的限价。这也意味着此类订单不一定会立即执行。订单在等待执行期间的有效性受执行政策的约束。
在以下示例中,我们将下达一个采用 GTC(有效至取消)策略的限价卖单。这意味着该订单将一直有效,直到交易者自行取消。请注意,这也可能导致订单持续有效数月的情况。我们首先创建一个预览,以测试所有输入是否正确。
limit_sell_order_preview = trade_client.preview_limit_order_gtc_sell(
product_id = "ETH-EUR",
base_size = "0.0020",
limit_price = "2200")
请注意,limit_price 设置为 2200 欧元,这意味着只有当 ETH-EUR 交易对达到或超过此价格阈值时才会执行订单。
现在让我们执行真正的限价卖单。
limit_sell_order = trade_client.limit_order_gtc_sell(
client_order_id = generate_client_id(10),
product_id = "ETH-EUR",
base_size = "0.0020",
limit_price = "2200")
此订单预计不会立即成交。为了验证是否如此,请检查订单详情。
蓝色框显示订单确实处于打开状态。政策也清晰可见。
步骤 7:审核订单
可以通过传递正确的状态参数来查看未结订单,如下所示:
通过order_id
参数,我们还可以取消未结订单:
如前所示,订单状态也可以从 Coinbase 网络界面进行验证。
测试和优化你的机器人
大多数机器人失败并非因为代码糟糕,而是因为其背后的策略不可靠。因此,模拟交易至关重要。在模拟市场中运行机器人可以帮助你在真正投入资金之前发现交易的弱点。
但即便如此,这还不够。通过历史数据进行回测,可以了解该策略在不同市场条件下的表现——牛市、熊市以及介于两者之间的所有市场环境。目标不仅仅是检验其是否有效,而是在上线前进行改进、优化和调整。
结论
在本文中,我们学习了如何设置一个机器人来收集并可视化来自 CoinGecko 的各交易所的最新交易数据。然后,我们以 Coinbase 为例,介绍了如何使用通过 Advanced Python SDK 实现的 Coinbase API 来执行交易。
我们的程序化交易方法展现出诸多潜在优势。即使对于新手交易者,也能利用该方法每月自动购买固定数量的加密货币,俗称“美元成本平均法”(DCA)。Python 代码可以轻松部署在低成本低功耗的设备(例如 Raspberry Pi)上,并可全天候运行。
我们还仔细分析并实施了充分的安全措施,以防止不良订单。最后,通过与 Coinbase 网页界面进行可视化比较,对 API 返回的数据进行了持续验证。