与 Coinbase 或币安等中心化交易所 (CEX) 相比,在 Uniswap 等去中心化交易所 (DEX) 上交易有着根本的不同。DEX 上的用户连接他们的加密钱包,并直接与智能合约交互,以便在特定网络上兑换不同的代币。最广泛使用的 DEX 基于自动做市商 (AMM) 模型,用户可以通过资金池获取流动性。代币兑换在资金池中进行,并根据智能合约的规定收取费用。因此,同一交易对可能存在多个高流动性资金池,有时只是费用不同。
尽管 DEX 近年来颇受欢迎,但它们通常缺乏 CEX 那样用户友好的监控界面。用户还需要使用自己的钱包手动连接到 DEX,这增加了额外的复杂性,尤其是在执行多笔交易时。因此,使用 DEX API 来自动化交易的监控和执行非常有用。
在本教程中,我们将学习如何利用CoinGecko API中的链上 DEX 端点来监控不同矿池的交易活动。此外,我们还将使用 Python 监控加密钱包的余额,并在 Uniswap 等 DEX 上执行兑换交易。
让我们开始吧!
先决条件
我们将使用 Jupyter Notebook 作为主要开发环境。此外,还需要安装以下软件包:
pip install uniswap
要在新笔记本中开始编码,请在终端中执行以下命令:
jupyter lab
这会在浏览器中打开一个新标签页。如果您想使用通过GitHub 仓库共享的笔记本,请先克隆该仓库,然后在 Jupyter 环境中打开它。请务必将 API 密钥文件的路径替换为您自己的路径。
您还需要一个支持 ERC-20(以太坊)代币的加密钱包。MetaMask 易于设置,可作为浏览器扩展程序使用,因此推荐使用。稍后我们将使用 MetaMask 内部生成的账户地址和主密钥。要查看主密钥,请点击查看账户摘要时显示的三个点(右上角)。选择“账户详情”,然后点击“显示主密钥”。系统将提示您输入钱包密码,然后按住“显示”按钮。
一旦主键可见,就将其复制到文件并以以下 JSON 格式在本地保存:
{ “primary_key” : “XXYYZZ”}
💡 专业提示:钱包账户的主密钥赋予了与该地址关联的所有资金的完全访问权限,因此,您在处理它时应格外小心。请勿与任何人共享主密钥,也不要将 JSON 文件上传到代码库或云备份。如果您想测试本教程中的代码,最好创建一个新的钱包账户/地址,并向其中转入少量加密货币。
最后,我们需要一个 RPC 端点来与以太坊区块链进行交互。我们可以使用 Ankr 提供的免费 RPC 端点:https://rpc.ankr.com/eth
虽然此端点有速率限制,但对于我们的演示用例来说已经足够了。如果您需要更高的速率限制,请考虑切换到他们的付费计划。另一个选择是使用Infura的免费计划,该计划提供单个 API 密钥和丰厚的每日积分。
CoinGecko API 链上端点
我们将使用以下链上 DEX 端点,所有CoinGecko API 付费计划均可访问这些端点:
-
/onchain/networks/{network}/dexes - 获取给定网络支持的 DEX 列表
-
/onchain/networks/{network}/trending_pools - 获取给定网络的趋势池列表
-
/onchain/networks/{network}/pools - 获取给定网络的顶级池列表
-
/onchain/networks/{network}/dexes/{dex}/pools - 获取给定网络和 DEX 的顶级池列表
-
/onchain/networks/{network}/pools/{pool_address} - 获取给定网络和池地址的池数据
-
/onchain/networks/{network}/tokens/{token_address}/pools - 获取给定网络和代币地址的池数据
我在上一篇文章中概述了如何设置项目环境和安全 API 访问。
如何监控趋势DEX池
由于 DEX 上的交易是基于加密货币池提供的流动性执行的,因此能够跟踪不同池中的代币活动对于制定明智的交易策略至关重要。现在,我们将深入探讨一些使用 CoinGecko API 的链上 DEX 数据 并结合 Python 的示例。
让我们首先编写一个便利函数,它将帮助我们发出 API 请求。
def get_response(endpoint, headers, params, URL): | |
url = "".join((URL, endpoint)) | |
response = rq.get(url, headers = headers, params = params) | |
if response.status_code == 200: | |
data = response.json() | |
return data | |
else: | |
print(f"Failed to fetch data, check status code {response.status_code}") |
def get_url(url_type, | |
network, | |
dex = "", | |
pool_address = "", | |
token_address = ""): | |
url_dict = { | |
"trending_pools": f"/onchain/networks/{network}/trending_pools", | |
"top_pools": f"/onchain/networks/{network}/pools", | |
"top_pools_dex": f"/onchain/networks/{network}/dexes/{dex}/pools", | |
"specific_pool_dex": f"/onchain/networks/{network}/pools/{pool_address}", | |
"top_pools_add": f"/onchain/networks/{network}/tokens/{token_address}/pools" | |
} | |
return url_dict[url_type] |
从 CoinGecko API 收到的响应将通过以下函数解析为 pandas DataFrame 的列:
def collect_response(list_response): | |
response_all = [] | |
for response in list_response["data"]: | |
all_attributes = response["attributes"] | |
daily_tx = all_attributes["transactions"]["h24"] | |
rel = response["relationships"] | |
temp_dict = dict( | |
pair = all_attributes["name"], | |
dex = rel["dex"]["data"]["id"], | |
add = all_attributes["address"], | |
fdv_usd = all_attributes["fdv_usd"], | |
market_cap_usd = all_attributes["market_cap_usd"], | |
daily_volume = all_attributes["volume_usd"]["h24"], | |
daily_price_change = all_attributes["price_change_percentage"]["h24"], | |
daily_buys = daily_tx["buys"], | |
daily_sells = daily_tx["sells"], | |
daily_buyers = daily_tx["buyers"], | |
daily_sellers = daily_tx["sellers"] | |
) | |
response_all.append(temp_dict) | |
return response_all |
def get_trending_pools(network, sort_by_col): | |
target_url = get_url("trending_pools", network) | |
trendpool_list_response = get_response(target_url, | |
use_pro, | |
"", | |
PRO_URL) | |
trendpool_all = collect_response(trendpool_list_response) | |
return pd.DataFrame(trendpool_all).sort_values(by = | |
[f"{sort_by_col}"], | |
ascending = False) |
如何追踪顶级 DEX 池
与前面的示例类似,我们也可以跟踪特定网络的顶级池。
def get_top_pools_network(network, sort_by_col): | |
target_url = get_url("top_pools", network) | |
toppool_list_response = get_response(target_url, | |
use_pro, | |
"", | |
PRO_URL) | |
toppool_all = collect_response(toppool_list_response) | |
return pd.DataFrame(toppool_all).sort_values(by = [f"{sort_by_col}"], | |
ascending = False) |
如上例所示,Uniswap 似乎是一个热门的去中心化交易所 (DEX)。对于同一个代币对,不同的池子可能根据其收取的费用而有所不同,费用通常在对名称旁边以百分比表示。查看 Uniswap 的顶级池子会很方便。值得庆幸的是,CoinGecko API 提供了一个端点!我们将在下一个示例中使用此端点,其中可以将特定的 DEX 作为输入参数提供。
def get_top_pools_dex(network, dex, sort_by_col): | |
target_url = get_url("top_pools_dex", network, dex) | |
toppool_list_response = get_response(target_url, | |
use_pro, | |
"", | |
PRO_URL) | |
toppool_all = collect_response(toppool_list_response) | |
return pd.DataFrame(toppool_all).sort_values(by = [f"{sort_by_col}"], | |
ascending = False) |
如何获取特定池地址的数据
有时,调查特定矿池以确定与其日常活动相关的更有用的指标会很有用。使用矿池地址作为附加输入,可以使用以下函数来获得进一步的洞察:
def collect_pool_response(list_response): | |
response = list_response["data"] | |
all_attributes = response["attributes"] | |
daily_tx = all_attributes["transactions"]["h24"] | |
rel = response["relationships"] | |
response_dict = dict( | |
pair = all_attributes["name"], | |
dex = rel["dex"]["data"]["id"], | |
add = all_attributes["address"], | |
fdv_usd = all_attributes["fdv_usd"], | |
market_cap_usd = all_attributes["market_cap_usd"], | |
daily_volume = all_attributes["volume_usd"]["h24"], | |
daily_price_change = | |
all_attributes["price_change_percentage"]["h24"], | |
daily_buys = daily_tx["buys"], | |
daily_sells = daily_tx["sells"], | |
daily_buyers = daily_tx["buyers"], | |
daily_sellers = daily_tx["sellers"] | |
) | |
return response_dict |
def get_pool_data(network, dex, pool_address): | |
target_url = get_url("specific_pool_dex", network, dex, pool_address) | |
pool_list_response = get_response(target_url, | |
use_pro, | |
"", | |
PRO_URL) | |
pool_all = collect_pool_response(pool_list_response) | |
return pool_all |
如何监控特定代币的价格
到目前为止,我们已经研究并比较了不同池子的各种汇总指标。然而,我们还需要比较代币价格数据。这可以帮助用户开发和测试跨不同池子的套利策略。
代币通常分为基础代币和报价代币。以 ETH/USDT 池为例,ETH为基础代币,USDT为报价代币。我们想知道 ETH 到 USDT 的兑换价格,该价格反映在下面的“base_token_price_quote_token”字段中:
def collect_response_token(list_response): | |
response_all = [] | |
for response in list_response["data"]: | |
all_attributes = response["attributes"] | |
daily_tx = all_attributes["transactions"]["h24"] | |
rel = response["relationships"] | |
temp_dict = dict( | |
pair = all_attributes["name"], | |
dex = rel["dex"]["data"]["id"], | |
add = all_attributes["address"], | |
base_token_price_quote_token = | |
all_attributes["base_token_price_quote_token"], | |
fdv_usd = all_attributes["fdv_usd"], | |
market_cap_usd = all_attributes["market_cap_usd"], | |
daily_volume = all_attributes["volume_usd"]["h24"], | |
daily_price_change = | |
all_attributes["price_change_percentage"]["h24"], | |
daily_buys = daily_tx["buys"], | |
daily_sells = daily_tx["sells"], | |
daily_buyers = daily_tx["buyers"], | |
daily_sellers = daily_tx["sellers"] | |
) | |
response_all.append(temp_dict) | |
return response_all |
使用代币合约地址作为输入,我们可以使用以下函数来获取给定网络和代币地址的顶级池列表。
def get_top_pools_token(network, | |
token_address, | |
sort_by_col): | |
target_url = get_url("top_pools_add", | |
network, | |
"", | |
"", | |
token_address) | |
toppool_list_response = get_response(target_url, | |
use_pro, | |
"", | |
PRO_URL) | |
toppool_all = collect_response_token(toppool_list_response) | |
return pd.DataFrame(toppool_all).sort_values(by = [f"{sort_by_col}"], | |
ascending = False) |
|
算法交易:如何在 Uniswap 上兑换代币
我们将使用Uniswap Python 库与 Uniswap 协议进行交互,该库提供了各种便捷的函数来查询价格和执行掉期交易。这非常方便,因为许多 web3 实现都已从最终用户抽象出来。此外,它还可以用来监控用户钱包地址的余额。
如本文前面所述,要使用钱包地址进行交易,需要提供关联的主键。您可以将其保存在本地文件中,然后按如下所示读取:
# Get wallet private key
def get_private_key():
f = open("/home/vikas/Documents/MetaMask_private_key.json")
key_dict = json.load(f)
return key_dict["private_key"]
接下来,需要设置 Uniswap 类:
from uniswap import Uniswap | |
# Use None for address and private_key when not doing a transaction | |
address = "0xAf418C54351BA8a0Aa15Ba4A5C99C46C122B3DBC" | |
private_key = get_private_key() | |
version = 3 | |
provider = "https://rpc.ankr.com/eth" | |
uniswap = Uniswap(address = address, | |
private_key = private_key, | |
version = version, | |
provider = provider) |
# Token contract address
# https://support.uniswap.org/hc/en-us/articles/26757826138637-What-is-a-token-contract-address
eth = "0x0000000000000000000000000000000000000000"
usdt = "0xdAC17F958D2ee523a2206206994597C13D831ec7"
函数 get_price_output 返回兑换 X 笔 USDT 所需的 ETH 数量(以 wei 为单位)。注意,由于 1 ETH = 10¹⁸ wei,我们将输出结果除以 ETH,将结果转换回 ETH。在以下示例中,我们想知道兑换 3 笔 USDT 需要兑换多少 ETH。参数采用代币的最小单位,因此 USDT 输入为 3 x 10⁶。Fee = 100 将确保使用手续费为 0.01% 的池子。对于 Uniswap v3,建议先研究各种池子,然后选择一个在低手续费和高流动性之间取得最佳平衡的池子。
uniswap.get_price_output(eth, usdt, 3 * 10**6, fee = 100) / (10**18)
常见代币的小数:
-
ETH、DAI、UNI、BAT、LINK 使用 18 位小数
-
WBTC 使用 8 位小数
-
USDC、USDT 使用 6 位小数
总是可以在 Etherscan 上查找小数位数。
为了执行上述交换,我们调用 make_trade_output 函数,并使用与上述相同的参数集。
uniswap.make_trade_output(eth, usdt, 3 * 10**6, fee = 100)
要检查钱包余额,请使用以下功能:
以上数据与下图MetaMask钱包上看到的一致。
此外,我们可以确认已收到预期数量的 USDT 代币(= 3)。该交易也将出现在目标池中的 Uniswap 网页界面上。
在 Etherscan 上查找钱包地址,我们可以看到更多关于这笔交易的详细信息。需要注意的是,Uniswap 使用 WETH。然而,我们最初的钱包里是 ETH。因此,兑换 USDT 的流程首先需要先转账到 WETH。
接下来,我们将尝试使用 USDT 回购相同数量的 ETH(约 0.001135)。这次,我们的目标池为蓝色高亮显示的池子。
请注意,此池子基于 Uniswap 协议 v2 版本,因此我们需要更新 Uniswap 类定义中的版本号 (= 2)。现在我们可以检查此交换需要多少 USDT。输入的 ETH 金额(以 wei 为单位)需要为整数。由于这是 v2 版本上唯一的池子,因此此处未使用费用参数。
与之前的互换相比,回购相同数量的 ETH 所需的 USDT 有所减少。这是意料之中的,因为该池中的 ETH 价格(以 USDT 计价)较低。然而,需要注意的是,由于这些互换是链上交易,因此也会消耗 Gas。Gas 价格会随着网络拥堵情况而变化。因此,任何潜在的收益都需要通过考虑交易成本的变化来谨慎抵消。
为了执行交换,我们应用与之前相同的函数。
输出是交易哈希,可以在Etherscan上验证。“交易操作”部分提供了简要概述。
向下滚动可以找到有关交换路线的更多详细信息:
该交易也出现在Uniswap 的池摘要页面上。
结论
在本文中,我们学习了如何结合使用 CoinGecko API 的链上 DEX 数据和 Python 来监控各种 DEX 池的交易活动。以 Uniswap 为例,我们演示了如何以编程方式在不同池中兑换代币,从而寻找潜在的套利机会。此外,我们还在 Uniswap 和 Etherscan 上验证了交易,以确保兑换路径符合我们的预期。
这里提供的脚本可以帮助交易者简化和自动化他们的交易策略,从钱包余额跟踪开始,扩展到在 DEX 上执行代币交换。