Python
Accessing Data

Cybotrade Datasource API key

The use of Provider Datasource data requires that you get an API key from Cybotrade Datasource.


🔑Generate a Cybotrade API key🔑

Once you have successfully generated an API key, you will be able to see your API key and secret like such:

Example of an API Key and Secret

In order to access the data programmatically, you will need to input the api_key and api_secret parameter for the RuntimeConfig class like such:

config = RuntimeConfig(
    ...
    api_key="YOUR_API_KEY",
    api_secret="YOUR_API_SECRET"
)

Accessing the data in code.

In order to access the data you retrieve, you will have to use a field exposed by the Strategy class called data_map or split_data_map.

For a further in-depth look to the difference between the data_map and split_data_map check here.

If you followed the overview guide you should be aware that there is an event handler that can be overidden for when a candle closes which is on_candle_closed().

There is also an event handler for when Provider Datasource data has been updated which is on_datasource_interval().

Shown below is a simple use of this on_datasource_interval event handler.

This example is assuming that the datasource_topic being used is cryptoquant|1d|btc/exchange-flows/inflow?exchange=binance&window=day and candle_topic is candles-1m-BTC/USDT-bybit

async def on_datasource_interval(self, strategy, topic, data_list):
    # refer to provider models
    inflow_total = self.data_map[topic][-1]["inflow_total"]
    inflow_mean = self.data_map[topic][-1]["inflow_mean"]

You might notice that the topic is returned through the parameters of the event_handler function, this topic is used to index into the data_map dictionary. You are also able to index the data_map manually like such:

async def on_datasource_interval(self, strategy, topic, data_list):
    # refer to provider models
    inflow_total = self.data_map["cryptoquant|1d|btc/exchange-flows/inflow?exchange=binance&window=day"][-1]["inflow_total"]
    inflow_mean = self.data_map["cryptoquant|1d|btc/exchange-flows/inflow?exchange=binance&window=day"][-1]["inflow_mean"]

This access pattern is also used for accessing candle data:

async def on_candle_closed(self, strategy, topic, symbol):
    latest_close = self.data_map[topic][-1]["close"]
    open_price = self.data_map[topic][-1]["open"]

Note that you are also able to access datasource topic from the candle close event handler and vice versa:

async def on_datasource_interval(self, strategy, topic, data_list):
    latest_close = self.data_map["candles-1m-BTC/USDT-bybit"][-1]["close"]
    open_price = self.data_map["candles-1m-BTC/USDT-bybit"][-1]["open"]
 
async def on_candle_closed(self, strategy, topic, symbol):
    # refer to provider models
    inflow_total = self.data_map["cryptoquant|1d|btc/exchange-flows/inflow?exchange=binance&window=day"][-1]["inflow_total"]
    inflow_mean = self.data_map["cryptoquant|1d|btc/exchange-flows/inflow?exchange=binance&window=day"][-1]["inflow_mean"]

A simple runnable strategy example you can use:

from cybotrade.strategy import Strategy
from cybotrade.models import RuntimeConfig, RuntimeMode
from datetime import datetime, timezone
from cybotrade.permutation import Permutation
import asyncio
 
class MyStrategy(Strategy):
    async def on_datasource_interval(self, strategy, topic, data_list):
        # prints inflow total data
        print(self.data_map[topic][-1]["inflow_total"])
 
    async def on_candle_closed(self, strategy, topic, symbol):
        # prints candle close price
        print(self.data_map[topic][-1]["close"])
 
config = RuntimeConfig(
    mode=RuntimeMode.Backtest,
    datasource_topics=["cryptoquant|1m|btc/exchange-flows/inflow?exchange=binance&window=day"],
    candle_topics=["candles-1d-BTC/USDT-bybit"],
    active_order_interval=1,
    start_time=datetime(2020, 4, 1, 0, 0, 0, tzinfo=timezone.utc),
    end_time=datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
    data_count=100,
    api_key="YOUR_API_KEY",
    api_secret="YOUR_API_SECRET"
)
 
permutation = Permutation(config)
hyper_parameters = {}
 
async def start_runtime():
    await permutation.run(hyper_parameters, MyStrategy)
 
asyncio.run(start_runtime())