Skip to content

metatrader ¤

LetTradeMetaTrader ¤

LetTradeMetaTrader(
    feeder: type[
        MetaTraderDataFeeder
    ] = MetaTraderDataFeeder,
    exchange: type[MetaTraderExchange] = MetaTraderExchange,
    account: type[MetaTraderAccount] = MetaTraderAccount,
    **kwargs
)

Bases: LetTradeLive

Help to maintain MetaTrader bots

Parameters:

Source code in lettrade/exchange/metatrader/metatrader.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def __init__(
    self,
    feeder: type[MetaTraderDataFeeder] = MetaTraderDataFeeder,
    exchange: type[MetaTraderExchange] = MetaTraderExchange,
    account: type[MetaTraderAccount] = MetaTraderAccount,
    **kwargs,
) -> None:
    """_summary_

    Args:
        feeder (Type[MetaTraderDataFeeder], optional): _description_. Defaults to MetaTraderDataFeeder.
        exchange (Type[MetaTraderExchange], optional): _description_. Defaults to MetaTraderExchange.
        account (Type[MetaTraderAccount], optional): _description_. Defaults to MetaTraderAccount.
    """
    super().__init__(
        feeder=feeder,
        exchange=exchange,
        account=account,
        **kwargs,
    )

plot ¤

plot(*args, jump: dict | None = None, **kwargs)

Plot strategy/optimize result

BotPlotter

Miror of BotPlotter.plot(). Plotly implement PlotlyBotPlotter.plot().

Args: jump (dict | None, optional): Miror of BotPlotter.jump()

Example: Jump to position_id

lt.plot(
    jump=dict(position_id=1, range=300),
    layout=dict(height=2000),
)

OptimizePlotter

Miror of OptimizePlotter.plot(). Plotly implement PlotlyOptimizePlotter.plot().

Example:

lt.plot(layout=dict(height=2000))

Source code in lettrade/lettrade.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def plot(self, *args, jump: dict | None = None, **kwargs):
    """Plot strategy/optimize result

    BotPlotter:
        Miror of [BotPlotter.plot()](site:/reference/plot/bot/#lettrade.plot.bot.BotPlotter.plot).
        Plotly implement [PlotlyBotPlotter.plot()](site:/reference/plot/plotly/plotly/#lettrade.plot.plotly.plotly.PlotlyBotPlotter.plot).

        Args:
            `jump` (dict | None, optional): Miror of [BotPlotter.jump()](site:/reference/plot/bot/#lettrade.plot.bot.BotPlotter.jump)

        Example:
            Jump to position_id
                ```python
                lt.plot(
                    jump=dict(position_id=1, range=300),
                    layout=dict(height=2000),
                )
                ```

    OptimizePlotter:
        Miror of [OptimizePlotter.plot()](site:/reference/plot/optimize/#lettrade.plot.optimize.OptimizePlotter.plot).
        Plotly implement [PlotlyOptimizePlotter.plot()](site:/reference/exchange/backtest/plotly/optimize/#lettrade.exchange.backtest.plotly.optimize.PlotlyOptimizePlotter.plot).

        Example:
                ```python
                lt.plot(layout=dict(height=2000))
                ```
    """
    if __debug__:
        from .utils.docs import is_docs_session

        if is_docs_session():
            return

    self.plotter.plot(*args, jump=jump, **kwargs)

run ¤

run(worker: int | None = None, **kwargs)

Run LetTrade in single or multiple processing

Parameters:

  • worker (int | None, default: None ) –

    Number of processing. Defaults to None.

Source code in lettrade/lettrade.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def run(self, worker: int | None = None, **kwargs):
    """Run LetTrade in single or multiple processing

    Args:
        worker (int | None, optional): Number of processing. Defaults to None.
    """
    if worker or isinstance(self.data, list):
        if not isinstance(self.data, list):
            # self.data = self.datas
            self.datas = [self.data]

        worker = self._max_workers(worker)

        self._multiprocess()

        datas_source = self._kwargs.pop("datas")
        with ProcessPoolExecutor(max_workers=worker) as executor:
            futures = [
                executor.submit(
                    self._bot_cls.run_bot,
                    datas=datas,
                    id=i,
                    result="str",
                    **self._kwargs,
                )
                for i, datas in enumerate(datas_source)
            ]
            for future in futures:
                result = future.result()
                print(result)
    else:
        self._bot = self._bot_cls.run_bot(
            bot=self._bot,
            result="bot",
            **self._kwargs,
        )
        print(str(self._bot.stats))

start ¤

start(force: bool = False)

Start LetTrade by init bot object and loading datafeeds

Parameters:

  • force (bool, default: False ) –

    description. Defaults to False.

Source code in lettrade/lettrade.py
108
109
110
111
112
113
114
115
116
117
def start(self, force: bool = False):
    """Start LetTrade by init bot object and loading datafeeds

    Args:
        force (bool, optional): _description_. Defaults to False.
    """
    if force and self._bot is not None:
        self._bot = None

    self._bot = self._bot_cls.start_bot(bot=self._bot, **self._kwargs)

stop ¤

stop()

Stop LetTrade

Source code in lettrade/lettrade.py
183
184
185
186
187
188
def stop(self):
    """Stop LetTrade"""
    if self._bot is not None:
        return self._bot.stop()
    if self.stats:
        self.stats.stop()

LetTradeMetaTraderBot ¤

LetTradeMetaTraderBot(
    api: LiveAPI | None = LiveAPI, **kwargs
)

Bases: LetTradeLiveBot

LetTradeBot for MetaTrader

Parameters:

  • api (LiveAPI | None, default: LiveAPI ) –

    description. Defaults to LiveAPI.

Source code in lettrade/exchange/live/live.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def __init__(self, api: LiveAPI | None = LiveAPI, **kwargs) -> None:
    """_summary_

    Args:
        api (LiveAPI | None, optional): _description_. Defaults to LiveAPI.
    """
    super().__init__(**kwargs)

    if issubclass(api, LiveAPI):
        self._api = api(**self._kwargs.get("api_kwargs", {}))
    else:
        self._api = api

    self._kwargs.setdefault("feeder_kwargs", dict()).update(api=self._api)
    self._kwargs.setdefault("exchange_kwargs", dict()).update(api=self._api)
    self._kwargs.setdefault("account_kwargs", dict()).update(api=self._api)

    for data in self.datas:
        data._api = self._api

account instance-attribute ¤

account: Account

Trading account handler

brain instance-attribute ¤

brain: Brain

Brain of bot

commander class-attribute instance-attribute ¤

commander: Commander | None = None

Control the bot

data instance-attribute ¤

data: DataFeed = datas[0]

Main DataFeed for bot

exchange instance-attribute ¤

exchange: Exchange

Trading exchange and events

feeder instance-attribute ¤

feeder: DataFeeder

DataFeeder help to handle datas

plotter class-attribute instance-attribute ¤

plotter: Plotter | None = None

Plot graphic results

strategy instance-attribute ¤

strategy: Strategy

Strategy

new_bot classmethod ¤

new_bot(
    datas: list[DataFeed],
    strategy_cls: type[Strategy],
    feeder_cls: type[DataFeeder],
    exchange_cls: type[Exchange],
    account_cls: type[Account],
    commander_cls: type[Commander],
    plotter_cls: type[Plotter],
    stats_cls: type[BotStatistic],
    name: str | None = None,
    **kwargs
) -> LetTradeBot

Create new bot object

Parameters:

Returns:

Source code in lettrade/bot.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
@classmethod
def new_bot(
    cls,
    datas: list[DataFeed],
    strategy_cls: type[Strategy],
    feeder_cls: type[DataFeeder],
    exchange_cls: type[Exchange],
    account_cls: type[Account],
    commander_cls: type[Commander],
    plotter_cls: type[Plotter],
    stats_cls: type[BotStatistic],
    name: str | None = None,
    **kwargs,
) -> "LetTradeBot":
    """Create new bot object

    Args:
        datas (list[DataFeed]): _description_
        strategy_cls (type[Strategy]): _description_
        feeder_cls (type[DataFeeder]): _description_
        exchange_cls (type[Exchange]): _description_
        account_cls (type[Account]): _description_
        commander_cls (type[Commander]): _description_
        plotter_cls (type[Plotter]): _description_
        stats_cls (type[BotStatistic]): _description_
        name (str | None, optional): _description_. Defaults to None.

    Returns:
        LetTradeBot: _description_
    """
    bot = cls(
        strategy=strategy_cls,
        datas=datas,
        feeder=feeder_cls,
        exchange=exchange_cls,
        account=account_cls,
        commander=commander_cls,
        plotter=plotter_cls,
        stats=stats_cls,
        name=name,
        **kwargs,
    )
    return bot

plot ¤

plot(*args, **kwargs)

Plot bot result

Source code in lettrade/bot.py
186
187
188
189
190
191
192
193
194
def plot(self, *args, **kwargs):
    """Plot bot result"""
    if __debug__:
        from .utils.docs import is_docs_session

        if is_docs_session():
            return

    self.plotter.plot(*args, **kwargs)

run ¤

run()

Run bot

Source code in lettrade/bot.py
173
174
175
176
177
178
179
180
181
182
183
184
def run(self):
    """Run bot"""
    if self.commander:
        self.commander.start()

    self.brain.run()

    if self.commander:
        self.commander.stop()

    if self._stats_cls:
        self.stats.compute()

run_bot classmethod ¤

run_bot(
    bot: LetTradeBot | None = None,
    datas: list[DataFeed] | None = None,
    id: int | None = None,
    name: str | None = None,
    result: Literal["str", "stats", "bot", None] = "str",
    **kwargs
) -> LetTradeBot | BotStatistic | str | None

Run bot object

Parameters:

  • bot (LetTradeBot | None, default: None ) –

    description. Defaults to None.

  • datas (list[DataFeed] | None, default: None ) –

    description. Defaults to None.

  • id (int | None, default: None ) –

    description. Defaults to None.

  • name (str | None, default: None ) –

    description. Defaults to None.

  • result (Literal['str', 'stats', 'bot', None], default: 'str' ) –

    description. Defaults to "str".

Returns:

  • LetTradeBot | BotStatistic | str | None

    LetTradeBot | BotStatistic | str | None: Return value will be pickle across multiprocessing. The cost higher from str, stats object, bot object

Source code in lettrade/bot.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
@classmethod
def run_bot(
    cls,
    bot: "LetTradeBot | None" = None,
    datas: list[DataFeed] | None = None,
    id: int | None = None,
    name: str | None = None,
    result: Literal["str", "stats", "bot", None] = "str",
    **kwargs,
) -> "LetTradeBot | BotStatistic | str | None":
    """Run bot object

    Args:
        bot (LetTradeBot | None, optional): _description_. Defaults to None.
        datas (list[DataFeed] | None, optional): _description_. Defaults to None.
        id (int | None, optional): _description_. Defaults to None.
        name (str | None, optional): _description_. Defaults to None.
        result (Literal["str", "stats", "bot", None], optional): _description_. Defaults to "str".

    Returns:
        LetTradeBot | BotStatistic | str | None: Return value will be pickle across multiprocessing.
            The cost higher from `str`, `stats` object, `bot` object
    """
    # Set name for current processing
    if name is None:
        d = datas[0] if datas else bot.data
        name = f"{id}-{os.getpid()}-{d.name}"

    if bot is None:
        bot = cls.start_bot(
            datas=datas,
            name=name,
            **kwargs,
        )

    # bot
    bot.run(**kwargs.get("run_kwargs", {}))

    # Return type
    if result == "stats":
        return bot.stats
    if result == "str":
        return str(bot.stats)

    return bot

start ¤

start()

Start bot

Source code in lettrade/bot.py
163
164
165
166
167
168
169
170
171
def start(self):
    """Start bot"""
    if not hasattr(self, "brain"):
        self.init()

    self.brain.start()

    if __debug__:
        logger.debug("Bot %s started with %d datas", self._name, len(self.datas))

start_bot classmethod ¤

start_bot(
    bot: LetTradeBot | None = None, **kwargs
) -> LetTradeBot

Init and start bot object

Parameters:

  • bot (LetTradeBot | None, default: None ) –

    description. Defaults to None.

Returns:

Source code in lettrade/bot.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
@classmethod
def start_bot(
    cls,
    bot: "LetTradeBot | None" = None,
    **kwargs,
) -> "LetTradeBot":
    """Init and start bot object

    Args:
        bot (LetTradeBot | None, optional): _description_. Defaults to None.

    Returns:
        LetTradeBot: _description_
    """
    if bot is None:
        bot = cls.new_bot(**kwargs)
    bot.init(**kwargs.get("init_kwargs", {}))
    bot.start()
    return bot

stop ¤

stop()

Stop bot

Source code in lettrade/bot.py
196
197
198
199
200
def stop(self):
    """Stop bot"""
    self.brain.stop()
    if self.plotter is not None:
        self.plotter.stop()

MetaTraderAccount ¤

MetaTraderAccount(api: LiveAPI, **kwargs)

Bases: LiveAccount

Account for MetaTrader

Parameters:

Source code in lettrade/exchange/live/account.py
18
19
20
21
22
23
24
25
26
27
28
29
def __init__(self, api: LiveAPI, **kwargs) -> None:
    """Account for live trading

    Args:
        api (LiveAPI): _description_
        **kwargs (dict, optional): Mirror of [lettrade.account.Account()](site:/reference/account/account/#lettrade.account.account.Account).
    """
    super().__init__(**kwargs)
    self._api = api

    self._balance = 0.0
    self._equity = 0.0

balance property ¤

balance: float

Balance of account

Returns:

  • float ( float ) –

    description

account_refresh ¤

account_refresh()

Refresh account balance

Source code in lettrade/exchange/live/account.py
53
54
55
56
57
58
59
60
61
def account_refresh(self):
    """Refresh account balance"""
    self._account = self._api.account()

    if __debug__:
        logger.debug("Account: %s", str(self._account))

    self._balance = self._account.balance
    self._equity = self._account.equity

next ¤

next()

Next account

Source code in lettrade/account/account.py
73
74
def next(self):
    """Next account"""

next_next ¤

next_next()

Call after strategy.next()

Source code in lettrade/account/account.py
76
77
78
def next_next(self):
    """Call after strategy.next()"""
    self._equity_snapshot()

risk ¤

risk(side: TradeSide, size: float, **kwargs) -> float

Risk calculation

Source code in lettrade/account/account.py
87
88
89
90
91
def risk(self, side: "TradeSide", size: float, **kwargs) -> float:
    """Risk calculation"""
    if size is None:
        return side * abs(self._risk)
    return side * abs(size)

start ¤

start()

Start live account by load account info from API

Source code in lettrade/exchange/live/account.py
34
35
36
37
38
def start(self):
    """Start live account by load account info from API"""
    self.account_refresh()
    self._margin = self._account.margin
    self._leverage = self._account.leverage

stop ¤

stop()

Stop account

Source code in lettrade/account/account.py
80
81
82
83
84
85
def stop(self):
    """Stop account"""
    try:
        self._equity_snapshot()
    except LetAccountInsufficientException:
        pass

MetaTraderDataFeed ¤

MetaTraderDataFeed(
    symbol: str,
    timeframe: str | int | Timedelta,
    name: str | None = None,
    columns: list[str] | None = None,
    api: LiveAPI | None = None,
    api_kwargs: dict | None = None,
    **kwargs
)

Bases: LiveDataFeed

DataFeed for MetaTrader

Parameters:

  • symbol (str) –

    Symbol of DataFeed

  • timeframe (str | int | Timedelta) –

    TimeFrame of DataFeed

  • name (str | None, default: None ) –

    Name of DataFeed, auto generate {symbol}_{timeframe} if none. Defaults to None.

  • columns (list[str] | None, default: None ) –

    List of DataFeed columns. Defaults to None.

  • api (LiveAPI | None, default: None ) –

    Live trading API. Defaults to None.

Source code in lettrade/exchange/live/data.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(
    self,
    symbol: str,
    timeframe: str | int | pd.Timedelta,
    name: str | None = None,
    columns: list[str] | None = None,
    api: LiveAPI | None = None,
    api_kwargs: dict | None = None,
    **kwargs,
) -> None:
    """_summary_

    Args:
        symbol (str): Symbol of DataFeed
        timeframe (str | int | pd.Timedelta): TimeFrame of DataFeed
        name (str | None, optional): Name of DataFeed, auto generate `{symbol}_{timeframe}` if none. Defaults to None.
        columns (list[str] | None, optional): List of DataFeed columns. Defaults to None.
        api (LiveAPI | None, optional): Live trading API. Defaults to None.
    """
    super().__init__(
        name=name or f"{symbol}_{timeframe}",
        timeframe=timeframe,
        columns=columns or ["open", "high", "low", "close", "volume"],
        **kwargs,
    )

    self.meta.update(symbol=symbol, base_columns=self.columns.copy())

    if api is not None:
        self._api = api
    elif api_kwargs is not None:
        self._api = self._api_cls(**api_kwargs)
    else:
        raise RuntimeError("Parameter: api or api_kwargs cannot missing")

i property ¤

Alias to lettrade.indicator and using in DataFeed by call: DataFeed.i.indicator_name()

is_main property ¤

is_main: bool

Property to check DataFeed is main DataFeed or not

l instance-attribute ¤

LetTrade DataFeed wrapper using to manage index pointer of DataFeed

meta property ¤

meta: dict

Property to get metadata of DataFeed

name property ¤

name: str

Property to get name of DataFeed

now property ¤

now: Timestamp

Property to get current index value of DataFeed

symbol property ¤

symbol: str

Property to get symbol of DataFeed

timeframe property ¤

timeframe: TimeFrame

Property to get timeframe of DataFeed

bar ¤

bar(i: int = 0) -> Timestamp

Get current pd.Timestamp value of DataFeed

Parameters:

  • i (int, default: 0 ) –

    Index. Defaults to 0.

Returns:

  • Timestamp

    pd.Timestamp: description

Source code in lettrade/data/data.py
108
109
110
111
112
113
114
115
116
117
def bar(self, i: int = 0) -> pd.Timestamp:
    """Get current pd.Timestamp value of DataFeed

    Args:
        i (int, optional): Index. Defaults to 0.

    Returns:
        pd.Timestamp: _description_
    """
    return self.l.index[i]

bars ¤

bars(
    since: int | str | Timestamp, to: int | str | Timestamp
) -> list

Get bars from LiveAPI

Parameters:

  • since (int | str | Timestamp) –

    description

  • to (int | str | Timestamp) –

    description

Returns:

  • list ( list ) –

    list of bar

Source code in lettrade/exchange/live/data.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def bars(
    self,
    since: int | str | pd.Timestamp,
    to: int | str | pd.Timestamp,
) -> list:
    """Get bars from LiveAPI

    Args:
        since (int | str | pd.Timestamp): _description_
        to (int | str | pd.Timestamp): _description_

    Returns:
        list: list of bar
    """
    return self._api.bars(
        symbol=self.symbol,
        timeframe=self.timeframe.string,
        since=since,
        to=to,
    )

bars_load ¤

bars_load(
    since: int | str | Timestamp, to: int | str | Timestamp
) -> bool

Get bar from API and push to DataFeed

Parameters:

  • since (int | str | Timestamp) –

    description

  • to (int | str | Timestamp) –

    description

Returns:

  • bool ( bool ) –

    True if has data, False if no data

Source code in lettrade/exchange/live/data.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def bars_load(
    self,
    since: int | str | pd.Timestamp,
    to: int | str | pd.Timestamp,
) -> bool:
    """Get bar from API and push to DataFeed

    Args:
        since (int | str | pd.Timestamp): _description_
        to (int | str | pd.Timestamp): _description_

    Returns:
        bool: True if has data, False if no data
    """
    bars = self.bars(since=since, to=to)

    if bars is None or len(bars) == 0:
        logger.warning("No bars data for %s", self.name)
        return False

    self.push(bars, unit=self._bar_datetime_unit)
    return True

drop ¤

drop(
    *args,
    since: int | str | Timestamp | None = None,
    to: int | str | Timestamp | None = None,
    **kwargs
) -> None

summary

Parameters:

  • since (int | str | Timestamp | None, default: None ) –

    description. Defaults to None.

  • to (int | str | Timestamp | None, default: None ) –

    description. Defaults to None.

Raises:

Source code in lettrade/data/data.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def drop(
    self,
    *args,
    since: int | str | pd.Timestamp | None = None,
    to: int | str | pd.Timestamp | None = None,
    **kwargs,
) -> None:
    """_summary_

    Args:
        since (int | str | pd.Timestamp | None, optional): _description_. Defaults to None.
        to (int | str | pd.Timestamp | None, optional): _description_. Defaults to None.

    Raises:
        RuntimeError: _description_
    """
    if since is None and to is None:
        super().drop(*args, **kwargs)
        return

    condiction = None

    # Since
    if since is not None:
        if isinstance(since, int):
            loc = self.l.index[since]
        elif isinstance(since, str):
            loc = pd.to_datetime(since, utc=True)
        elif isinstance(since, pd.Timestamp):
            loc = since
        else:
            raise RuntimeError(f"DataFeed.drop since {since} is invalid")
        condiction = self.index < loc

    # To
    if to is not None:
        if isinstance(to, int):
            loc = self.l.index[to]
        elif isinstance(to, str):
            loc = pd.to_datetime(to, utc=True)
        elif isinstance(to, pd.Timestamp):
            loc = to
        else:
            raise RuntimeError(f"DataFeed.drop to {to} is invalid")

        if condiction is None:
            condiction = self.index > loc
        else:
            condiction = condiction | (self.index > loc)

    index = self[condiction].index
    super().drop(index=index, inplace=True)
    self.l.reset()

    if __debug__:
        logger.debug("BackTestDataFeed %s dropped %s rows", self.name, len(index))

dump_csv ¤

dump_csv(
    path: str | None = None,
    since: int | str | datetime | None = 0,
    to: int | str | datetime | None = 1000,
    **kwargs
)

summary

Parameters:

  • path (str | None, default: None ) –

    description. Defaults to None.

  • since (int | str | datetime | None, default: 0 ) –

    description. Defaults to 0.

  • to (int | str | datetime | None, default: 1000 ) –

    description. Defaults to 1_000.

Source code in lettrade/exchange/live/data.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def dump_csv(
    self,
    path: str | None = None,
    since: int | str | datetime | None = 0,
    to: int | str | datetime | None = 1_000,
    **kwargs,
):
    """_summary_

    Args:
        path (str | None, optional): _description_. Defaults to None.
        since (int  |  str  |  datetime | None, optional): _description_. Defaults to 0.
        to (int  |  str  |  datetime | None, optional): _description_. Defaults to 1_000.
    """
    if self.empty:
        if isinstance(since, str):
            since = pd.to_datetime(since).to_pydatetime()
        if isinstance(to, str):
            to = pd.to_datetime(to).to_pydatetime()

        self.bars_load(since=since, to=to)

    if path is None:
        path = f"data/{self.name}-{since}_{to}.csv"

    from lettrade.data.extra.csv import csv_export

    csv_export(dataframe=self, path=path, **kwargs)

next ¤

next(size=1, tick=0) -> bool

Drop extra columns and load next DataFeed

Parameters:

  • size (int, default: 1 ) –

    description. Defaults to 1.

  • tick (int, default: 0 ) –

    description. Defaults to 0.

Returns:

  • bool ( bool ) –

    description

Source code in lettrade/exchange/live/data.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def next(self, size=1, tick=0) -> bool:
    """Drop extra columns and load next DataFeed

    Args:
        size (int, optional): _description_. Defaults to 1.
        tick (int, optional): _description_. Defaults to 0.

    Returns:
        bool: _description_
    """
    # Drop existed extra columns to skip reusing calculated data
    self.drop(columns=self.columns.difference(self._base_columns), inplace=True)

    self.bars_load(since=0, to=size + 1)
    self.l.go_stop()
    return True

push ¤

push(
    rows: list[list[int | float]],
    unit: str | None = None,
    utc: bool = True,
    **kwargs
)

Push new rows to DataFeed

Parameters:

  • rows (list[list[int | float]]) –

    list of rows [["timestamp", "open price", "high price"...]]

  • unit (str | None, default: None ) –

    pandas.Timestamp parsing unit. Defaults to None.

  • utc (bool, default: True ) –

    description. Defaults to True.

Source code in lettrade/data/data.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def push(
    self,
    rows: list[list[int | float]],
    unit: str | None = None,
    utc: bool = True,
    **kwargs,
):
    """Push new rows to DataFeed

    Args:
        rows (list[list[int | float]]): list of rows `[["timestamp", "open price", "high price"...]]`
        unit (str | None, optional): pandas.Timestamp parsing unit. Defaults to None.
        utc (bool, optional): _description_. Defaults to True.
    """
    for row in rows:
        dt = pd.to_datetime(row[0], unit=unit, utc=utc, **kwargs)
        self.at[
            dt,
            (
                "open",
                "high",
                "low",
                "close",
                "volume",
            ),
        ] = (
            row[1],  # open
            row[2],  # high
            row[3],  # low
            row[4],  # close
            row[5],  # volume
        )

    if __debug__:
        logger.debug("[%s] Update bar: \n%s", self.name, self.tail(len(rows)))

symbol_info ¤

symbol_info()

Get symbol information from API

Source code in lettrade/exchange/live/data.py
73
74
75
def symbol_info(self):
    """Get symbol information from API"""
    return self._api.market(symbol=self.symbol)

MetaTraderDataFeeder ¤

MetaTraderDataFeeder(
    api: LiveAPI | None = None,
    tick: bool = 5,
    start_size: int = 500,
    api_kwargs: dict | None = None,
    **kwargs
)

Bases: LiveDataFeeder

DataFeeder for MetaTrader

tick == 0: wait until new bar change value
tick > 0: sleep tick time (in seconds) then update
Source code in lettrade/exchange/live/feeder.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def __init__(
    self,
    api: LiveAPI | None = None,
    tick: bool = 5,
    start_size: int = 500,
    api_kwargs: dict | None = None,
    **kwargs,
) -> None:
    """
    tick:
        tick < 0: no tick, just get completed bar
        tick == 0: wait until new bar change value
        tick > 0: sleep tick time (in seconds) then update
    """
    super().__init__()

    # API init
    if api is None:
        if api_kwargs is None:
            raise RuntimeError("api or api_kwargs cannot missing")
        api = self._api_cls(**api_kwargs)

    self._api = api
    self._tick = tick
    self._start_size = start_size
    self._config = kwargs

    if isinstance(self._tick, int):
        self._wait_timeframe = TimeFrame(f"{self._tick}s")
    else:
        self._wait_timeframe = None

is_continous property ¤

is_continous

Flag check is realtime continous datafeeder

MetaTraderExchange ¤

MetaTraderExchange(api: LiveAPI, *args, **kwargs)

Bases: LiveExchange

MetaTrade 5 exchange module for lettrade

Parameters:

  • api (LiveAPI) –

    API connect to rpyc MeTrader 5 Terminal server through module mt5linux

  • *args (list, default: () ) –

    Exchange list parameters

  • **kwargs (dict, default: {} ) –

    Exchange dict parameters

Source code in lettrade/exchange/live/exchange.py
21
22
23
24
25
26
27
28
29
30
def __init__(self, api: LiveAPI, *args, **kwargs):
    """_summary_

    Args:
        api (LiveAPI): API connect to rpyc MeTrader 5 Terminal server through module `mt5linux`
        *args (list): `Exchange` list parameters
        **kwargs (dict): `Exchange` dict parameters
    """
    super().__init__(*args, **kwargs)
    self._api = api

data instance-attribute ¤

data: DataFeed

main DataFeed

datas instance-attribute ¤

datas: list[DataFeed]

List of all available DataFeed

executions instance-attribute ¤

executions: dict[str, Execution] = dict()

Execution dict by Execution.id key

history_orders instance-attribute ¤

history_orders: dict[str, Order] = dict()

History Order dict by Order.id key

history_positions instance-attribute ¤

history_positions: dict[str, Position] = dict()

History Position dict by Position.id key

orders instance-attribute ¤

orders: dict[str, Order] = dict()

Available Order dict by Order.id key

positions instance-attribute ¤

positions: dict[str, Position] = dict()

Available Position dict by Position.id key

init ¤

init(
    brain: Brain,
    feeder: DataFeeder,
    account: Account,
    commander: Commander,
) -> None

Init Exchange dependencies. Set data/datas from feeder. Set state to ExchangeState.Start.

Parameters:

Source code in lettrade/exchange/exchange.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def init(
    self,
    brain: "Brain",
    feeder: DataFeeder,
    account: Account,
    commander: Commander,
) -> None:
    """Init Exchange dependencies.
    Set data/datas from feeder.
    Set state to `ExchangeState.Start`.

    Args:
        brain (Brain): _description_
        feeder (DataFeeder): _description_
        account (Account): _description_
        commander (Commander): _description_
    """
    self._brain = brain
    self._feeder = feeder
    self._account = account
    self._commander = commander

    self.data = self._feeder.data
    self.datas = self._feeder.datas

    self._account.init(exchange=self)

    self._state = ExchangeState.Start

new_order ¤

new_order(
    size: float,
    type: OrderType | None = OrderType.Market,
    limit: float | None = None,
    stop: float | None = None,
    sl: float | None = None,
    tp: float | None = None,
    tag: str | None = None,
    data: DataFeed | None = None,
    **kwargs
) -> OrderResult

Place new order to server

Parameters:

  • size (float) –

    description

  • type (OrderType | None, default: Market ) –

    description. Defaults to OrderType.Market.

  • limit (float | None, default: None ) –

    description. Defaults to None.

  • stop (float | None, default: None ) –

    description. Defaults to None.

  • sl (float | None, default: None ) –

    description. Defaults to None.

  • tp (float | None, default: None ) –

    description. Defaults to None.

  • tag (str | None, default: None ) –

    description. Defaults to None.

  • data (DataFeed | None, default: None ) –

    description. Defaults to None.

Returns:

Source code in lettrade/exchange/live/exchange.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def new_order(
    self,
    size: float,
    type: OrderType | None = OrderType.Market,
    limit: float | None = None,
    stop: float | None = None,
    sl: float | None = None,
    tp: float | None = None,
    tag: str | None = None,
    data: DataFeed | None = None,
    **kwargs,
) -> OrderResult:
    """Place new order to server

    Args:
        size (float): _description_
        type (OrderType | None, optional): _description_. Defaults to OrderType.Market.
        limit (float | None, optional): _description_. Defaults to None.
        stop (float | None, optional): _description_. Defaults to None.
        sl (float | None, optional): _description_. Defaults to None.
        tp (float | None, optional): _description_. Defaults to None.
        tag (str | None, optional): _description_. Defaults to None.
        data (DataFeed | None, optional): _description_. Defaults to None.

    Returns:
        OrderResult: _description_
    """
    if not data:
        data = self.data

    if type == OrderType.Market:
        limit = 0
        stop = 0

    order = self._order_cls(
        id=-1,
        exchange=self,
        data=data,
        size=size,
        type=type,
        limit_price=limit,
        stop_price=stop,
        sl_price=sl,
        tp_price=tp,
        tag=tag,
        **kwargs,
    )
    ok = order.place()

    # if __debug__:
    #     logger.info("New order %s at %s", order, self.data.now)

    return ok

next_next ¤

next_next()

Call after strategy.next()

Source code in lettrade/exchange/exchange.py
109
110
111
def next_next(self):
    """Call after strategy.next()"""
    self._account.next_next()

on_execution ¤

on_execution(
    execution: Execution,
    broadcast: bool | None = True,
    **kwargs
) -> None

Receive Execution event from exchange then store and notify Brain

Parameters:

  • execution (Execution) –

    description

  • broadcast (bool | None, default: True ) –

    description. Defaults to True.

Source code in lettrade/exchange/exchange.py
118
119
120
121
122
123
124
125
126
127
128
129
130
def on_execution(
    self,
    execution: Execution,
    broadcast: bool | None = True,
    **kwargs,
) -> None:
    """Receive Execution event from exchange then store and notify Brain

    Args:
        execution (Execution): _description_
        broadcast (bool | None, optional): _description_. Defaults to True.
    """
    self.on_executions(executions=[execution], broadcast=broadcast, **kwargs)

on_executions ¤

on_executions(
    executions: list[Execution],
    broadcast: bool | None = True,
    **kwargs
) -> None

Receive Execution event from exchange then store and notify Brain

Source code in lettrade/exchange/exchange.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def on_executions(
    self,
    executions: list[Execution],
    broadcast: bool | None = True,
    **kwargs,
) -> None:
    """
    Receive Execution event from exchange then store and notify Brain
    """
    for execution in executions:
        if not isinstance(execution, Execution):
            raise RuntimeError(f"{execution} is not instance of type Execution")

        if execution.id in self.executions:
            # Merge to keep Execution handler for strategy using
            # when strategy want to store Execution object
            # and object will be automatic update directly
            self.executions[execution.id].merge(execution)
            execution = self.executions[execution.id]
        else:
            self.executions[execution.id] = execution

    if self._state != ExchangeState.Run:
        return

    if broadcast:
        self._brain.on_executions(executions)

on_orders ¤

on_orders(
    orders: list[Order],
    broadcast: bool | None = True,
    **kwargs
) -> None

Receive orders event from exchange then store and notify Brain

Parameters:

  • orders (list[Order]) –

    description

  • broadcast (bool | None, default: True ) –

    description. Defaults to True.

Raises:

Source code in lettrade/exchange/exchange.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def on_orders(
    self,
    orders: list[Order],
    broadcast: bool | None = True,
    **kwargs,
) -> None:
    """Receive orders event from exchange then store and notify Brain

    Args:
        orders (list[Order]): _description_
        broadcast (bool | None, optional): _description_. Defaults to True.

    Raises:
        RuntimeError: _description_
    """
    for order in orders:
        if not isinstance(order, Order):
            raise RuntimeError(f"{order} is not instance of type Order")

        if order.is_closed:
            if order.id in self.history_orders:
                logger.warning("Order closed recall: %s", order)

            self.history_orders[order.id] = order
            if order.id in self.orders:
                del self.orders[order.id]
        else:
            if order.id in self.history_orders:
                raise RuntimeError(f"Order {order.id} closed")

            if order.id in self.orders:
                # Merge to keep Order handler for strategy using
                # when strategy want to store Order object
                # and object will be automatic update directly
                self.orders[order.id].merge(order)
                order = self.orders[order.id]
            else:
                self.orders[order.id] = order

    if self._state != ExchangeState.Run:
        return

    if broadcast:
        self._brain.on_orders(orders)

on_position ¤

on_position(
    position: Position,
    broadcast: bool | None = True,
    **kwargs
) -> None

Receive Position event from exchange then store and notify Brain

Parameters:

  • position (Position) –

    new comming Position

  • broadcast (bool | None, default: True ) –

    Flag notify to Brain. Defaults to True.

Raises:

Source code in lettrade/exchange/exchange.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def on_position(
    self,
    position: Position,
    broadcast: bool | None = True,
    **kwargs,
) -> None:
    """Receive Position event from exchange then store and notify Brain

    Args:
        position (Position): new comming `Position`
        broadcast (bool | None, optional): Flag notify to Brain. Defaults to True.

    Raises:
        RuntimeError: validate `Position` instance
    """
    self.on_positions(positions=[position], broadcast=broadcast, **kwargs)

on_positions ¤

on_positions(
    positions: list[Position],
    broadcast: bool | None = True,
    **kwargs
) -> None

Receive Positions event from exchange then store and notify Brain

Parameters:

  • positions (list[Position]) –

    list of new comming Position

  • broadcast (bool | None, default: True ) –

    Flag notify to Brain. Defaults to True.

Raises:

Source code in lettrade/exchange/exchange.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def on_positions(
    self,
    positions: list[Position],
    broadcast: bool | None = True,
    **kwargs,
) -> None:
    """Receive Positions event from exchange then store and notify Brain

    Args:
        positions (list[Position]): list of new comming `Position`
        broadcast (bool | None, optional): Flag notify to Brain. Defaults to True.

    Raises:
        RuntimeError: validate `Position` instance
    """
    for position in positions:
        if not isinstance(position, Position):
            raise RuntimeError(f"{position} is not instance of type Position")

        if position.is_exited:
            if position.id in self.history_positions:
                logger.warning("Position exited recall: %s", position)

            self.history_positions[position.id] = position
            if position.id in self.positions:
                del self.positions[position.id]

            # self._account._on_position_exit(position)
        else:
            if position.id in self.history_positions:
                raise RuntimeError(f"Position {position.id} closed: {position}")
            if position.id in self.positions:
                # Merge to keep Position handler for strategy using
                # when strategy want to store Position object
                # and object will be automatic update directly
                self.positions[position.id].merge(position)
                position = self.positions[position.id]
            else:
                self.positions[position.id] = position
                # self._account._on_position_entry(position)

    self._account.on_positions(positions)

    if self._state != ExchangeState.Run:
        return

    if broadcast:
        self._brain.on_positions(positions)

start ¤

start() -> None

Start Live exchange by: Sync positions from server

Source code in lettrade/exchange/live/exchange.py
32
33
34
35
def start(self) -> None:
    """Start Live exchange by: Sync positions from server"""
    self._api.start(exchange=self)
    return super().start()

stop ¤

stop() -> None

Stop Exchange

Source code in lettrade/exchange/exchange.py
113
114
115
116
def stop(self) -> None:
    """Stop Exchange"""
    self._state = ExchangeState.Stop
    self._account.stop()

let_metatrader ¤

let_metatrader(
    datas: set[set[str]],
    strategy: type[Strategy],
    *,
    mt5_login: int,
    mt5_password: str,
    mt5_server: str,
    mt5_wine: str | None = None,
    mt5_path: str | None = None,
    feeder: type[
        MetaTraderDataFeeder
    ] = MetaTraderDataFeeder,
    exchange: type[MetaTraderExchange] = MetaTraderExchange,
    account: type[MetaTraderAccount] = MetaTraderAccount,
    commander: type[Commander] | None = None,
    stats: type[BotStatistic] = BotStatistic,
    plotter: type[Plotter] | None = None,
    bot: type[
        LetTradeMetaTraderBot
    ] = LetTradeMetaTraderBot,
    lettrade: type[LetTradeMetaTrader] = LetTradeMetaTrader,
    api: type[MetaTraderAPI] = MetaTraderAPI,
    **kwargs
) -> LetTradeMetaTrader

Help to build LetTradeMetaTrader

Parameters:

Returns:

Source code in lettrade/exchange/metatrader/metatrader.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def let_metatrader(
    datas: set[set[str]],
    strategy: type[Strategy],
    *,
    mt5_login: int,
    mt5_password: str,
    mt5_server: str,
    mt5_wine: str | None = None,
    mt5_path: str | None = None,
    feeder: type[MetaTraderDataFeeder] = MetaTraderDataFeeder,
    exchange: type[MetaTraderExchange] = MetaTraderExchange,
    account: type[MetaTraderAccount] = MetaTraderAccount,
    commander: type[Commander] | None = None,
    stats: type[BotStatistic] = BotStatistic,
    plotter: type[Plotter] | None = None,
    bot: type[LetTradeMetaTraderBot] = LetTradeMetaTraderBot,
    lettrade: type[LetTradeMetaTrader] = LetTradeMetaTrader,
    api: type[MetaTraderAPI] = MetaTraderAPI,
    **kwargs,
) -> LetTradeMetaTrader:
    """Help to build `LetTradeMetaTrader`

    Args:
        datas (set[set[str]]): _description_
        strategy (type[Strategy]): _description_
        mt5_login (int): _description_
        mt5_password (str): _description_
        mt5_server (str): _description_
        mt5_wine (str | None, optional): WineHQ execute path. Defaults to None.
        feeder (type[MetaTraderDataFeeder], optional): _description_. Defaults to MetaTraderDataFeeder.
        exchange (type[MetaTraderExchange], optional): _description_. Defaults to MetaTraderExchange.
        account (type[MetaTraderAccount], optional): _description_. Defaults to MetaTraderAccount.
        commander (type[Commander] | None, optional): _description_. Defaults to None.
        stats (type[BotStatistic], optional): _description_. Defaults to BotStatistic.
        plotter (type[Plotter] | None, optional): _description_. Defaults to None.
        bot (type[LetTradeMetaTraderBot], optional): _description_. Defaults to LetTradeMetaTraderBot.
        lettrade (type[LetTradeMetaTrader], optional): _description_. Defaults to LetTradeMetaTrader.
        api (type[MetaTraderAPI], optional): _description_. Defaults to MetaTraderAPI.

    Returns:
        LetTradeMetaTrader: _description_
    """
    api_kwargs: dict = kwargs.setdefault("api_kwargs", {})
    api_kwargs.update(
        login=int(mt5_login),
        password=mt5_password,
        server=mt5_server,
        wine=mt5_wine,
        path=mt5_path,
    )

    return let_live(
        strategy=strategy,
        datas=datas,
        feeder=feeder,
        exchange=exchange,
        account=account,
        commander=commander,
        stats=stats,
        plotter=plotter,
        bot=bot,
        lettrade=lettrade,
        api=api,
        **kwargs,
    )