Cybotrade Datasource API key
The use of Provider Datasource data requires that you get an API key from Cybotrade Datasource.
🔑Generate a Cybotrade API key🔑
Once you have successfully generated an API key, you will be able to see your API key and secret like such:
In order to access the data programmatically, you will need to input the api_key
and api_secret
parameter for the RuntimeConfig
class like such:
config = RuntimeConfig(
...
api_key="YOUR_API_KEY",
api_secret="YOUR_API_SECRET"
)
Accessing the data in code.
In order to access the data you retrieve, you will have to use a field exposed by the Strategy
class called data_map
or split_data_map
.
For a further in-depth look to the difference between the data_map
and split_data_map
check here.
If you followed the overview guide you should be aware that there is an event handler that can be overidden for when a candle closes which is on_candle_closed()
.
There is also an event handler for when Provider Datasource data has been updated which is on_datasource_interval()
.
Shown below is a simple use of this on_datasource_interval
event handler.
This example is assuming that the datasource_topic being used is cryptoquant|1d|btc/exchange-flows/inflow?exchange=binance&window=day
and candle_topic is candles-1m-BTC/USDT-bybit
async def on_datasource_interval(self, strategy, topic, data_list):
# refer to provider models
inflow_total = self.data_map[topic][-1]["inflow_total"]
inflow_mean = self.data_map[topic][-1]["inflow_mean"]
You might notice that the topic
is returned through the parameters of the event_handler function, this topic is used to index into the data_map
dictionary.
You are also able to index the data_map
manually like such:
async def on_datasource_interval(self, strategy, topic, data_list):
# refer to provider models
inflow_total = self.data_map["cryptoquant|1d|btc/exchange-flows/inflow?exchange=binance&window=day"][-1]["inflow_total"]
inflow_mean = self.data_map["cryptoquant|1d|btc/exchange-flows/inflow?exchange=binance&window=day"][-1]["inflow_mean"]
This access pattern is also used for accessing candle data:
async def on_candle_closed(self, strategy, topic, symbol):
latest_close = self.data_map[topic][-1]["close"]
open_price = self.data_map[topic][-1]["open"]
Note that you are also able to access datasource topic from the candle close event handler and vice versa:
async def on_datasource_interval(self, strategy, topic, data_list):
latest_close = self.data_map["candles-1m-BTC/USDT-bybit"][-1]["close"]
open_price = self.data_map["candles-1m-BTC/USDT-bybit"][-1]["open"]
async def on_candle_closed(self, strategy, topic, symbol):
# refer to provider models
inflow_total = self.data_map["cryptoquant|1d|btc/exchange-flows/inflow?exchange=binance&window=day"][-1]["inflow_total"]
inflow_mean = self.data_map["cryptoquant|1d|btc/exchange-flows/inflow?exchange=binance&window=day"][-1]["inflow_mean"]
A simple runnable strategy example you can use:
from cybotrade.strategy import Strategy
from cybotrade.models import RuntimeConfig, RuntimeMode
from datetime import datetime, timezone
from cybotrade.permutation import Permutation
import asyncio
class MyStrategy(Strategy):
async def on_datasource_interval(self, strategy, topic, data_list):
# prints inflow total data
print(self.data_map[topic][-1]["inflow_total"])
async def on_candle_closed(self, strategy, topic, symbol):
# prints candle close price
print(self.data_map[topic][-1]["close"])
config = RuntimeConfig(
mode=RuntimeMode.Backtest,
datasource_topics=["cryptoquant|1m|btc/exchange-flows/inflow?exchange=binance&window=day"],
candle_topics=["candles-1d-BTC/USDT-bybit"],
active_order_interval=1,
start_time=datetime(2020, 4, 1, 0, 0, 0, tzinfo=timezone.utc),
end_time=datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
data_count=100,
api_key="YOUR_API_KEY",
api_secret="YOUR_API_SECRET"
)
permutation = Permutation(config)
hyper_parameters = {}
async def start_runtime():
await permutation.run(hyper_parameters, MyStrategy)
asyncio.run(start_runtime())