Cybotrade Datasource API key
The use of Provider Datasource data requires that you get an API key from Cybotrade Datasource.
🔑Generate a Cybotrade API key🔑
Once you have successfully generated an API key, you will be able to see your API key and secret like such:

In order to access the data programmatically, you will need to input the api_key and api_secret parameter for the RuntimeConfig class like such:
config = RuntimeConfig(
...
api_key="YOUR_API_KEY",
api_secret="YOUR_API_SECRET"
)Accessing the data in code.
In order to access the data you retrieve, you will have to use a field exposed by the Strategy class called data_map or split_data_map.
For a further in-depth look to the difference between the data_map and split_data_map check here.
If you followed the overview guide you should be aware that there is an event handler that can be overidden for when a candle closes which is on_candle_closed().
There is also an event handler for when Provider Datasource data has been updated which is on_datasource_interval().
Shown below is a simple use of this on_datasource_interval event handler.
This example is assuming that the datasource_topic being used is cryptoquant|btc/exchange-flows/inflow?exchange=binance&window=day and candle_topic is bybit-linear|candles?interval=1m&symbol=BTCUSDT
async def on_datasource_interval(self, strategy, topic, data_list):
# refer to provider models
inflow_total = self.data_map[topic][-1]["inflow_total"]
inflow_mean = self.data_map[topic][-1]["inflow_mean"]You might notice that the topic is returned through the parameters of the event_handler function, this topic is used to index into the data_map dictionary.
You are also able to index the data_map manually like such:
async def on_datasource_interval(self, strategy, topic, data_list):
# refer to provider models
inflow_total = self.data_map["cryptoquant|btc/exchange-flows/inflow?exchange=binance&window=day"][-1]["inflow_total"]
inflow_mean = self.data_map["cryptoquant|btc/exchange-flows/inflow?exchange=binance&window=day"][-1]["inflow_mean"]This access pattern is also used for accessing candle data:
async def on_candle_closed(self, strategy, topic, symbol):
latest_close = self.data_map[topic][-1]["close"]
open_price = self.data_map[topic][-1]["open"]Note that you are also able to access datasource topic from the candle close event handler and vice versa:
async def on_datasource_interval(self, strategy, topic, data_list):
latest_close = self.data_map["bybit-linear|candles?interval=1m&symbol=BTCUSDT"][-1]["close"]
open_price = self.data_map["bybit-linear|candles?interval=1m&symbol=BTCUSDT"][-1]["open"]
async def on_candle_closed(self, strategy, topic, symbol):
# refer to provider models
inflow_total = self.data_map["cryptoquant|btc/exchange-flows/inflow?exchange=binance&window=day"][-1]["inflow_total"]
inflow_mean = self.data_map["cryptoquant|btc/exchange-flows/inflow?exchange=binance&window=day"][-1]["inflow_mean"]A simple runnable strategy example you can use:
from cybotrade.strategy import Strategy
from cybotrade.models import RuntimeConfig, RuntimeMode
from datetime import datetime, timezone
from cybotrade.permutation import Permutation
import asyncio
class MyStrategy(Strategy):
async def on_datasource_interval(self, strategy, topic, data_list):
# prints inflow total data
print(self.data_map[topic][-1]["inflow_total"])
async def on_candle_closed(self, strategy, topic, symbol):
# prints candle close price
print(self.data_map[topic][-1]["close"])
config = RuntimeConfig(
mode=RuntimeMode.Backtest,
datasource_topics=["cryptoquant|btc/exchange-flows/inflow?exchange=binance&window=day"],
candle_topics=["bybit-linear|candles?interval=1d&symbol=BTC/USDT"],
active_order_interval=1,
start_time=datetime(2020, 4, 1, 0, 0, 0, tzinfo=timezone.utc),
end_time=datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
data_count=100,
api_key="YOUR_API_KEY",
api_secret="YOUR_API_SECRET"
)
permutation = Permutation(config)
hyper_parameters = {}
async def start_runtime():
await permutation.run(hyper_parameters, MyStrategy)
asyncio.run(start_runtime())