Skip to content

telegram ¤

Module help LetTrade send notify and receive command from Telegram Bot

Example¤

Example
import os
from datetime import datetime

import talib.abstract as ta
from dotenv import load_dotenv

from lettrade import indicator as i

# import example.logger
from lettrade.all import DataFeed, Strategy, TelegramCommander, let_metatrader

load_dotenv()


class SmaCross(Strategy):
    ema1_window = 9
    ema2_window = 21

    _now: datetime

    def indicators(self, df: DataFeed):
        df["ema1"] = ta.EMA(df, timeperiod=self.ema1_window)
        df["ema2"] = ta.EMA(df, timeperiod=self.ema2_window)

        df["signal_ema_crossover"] = i.crossover(df.ema1, df.ema2)
        df["signal_ema_crossunder"] = i.crossunder(df.ema1, df.ema2)
        return df

    def start(self, df: DataFeed):
        self._now = df.now

    def next(self, df: DataFeed):
        if self.is_live:
            # Filter start of new bar
            if self._now == df.now:
                return
            self._now = df.now

        if len(self.orders) > 0 or len(self.positions) > 0:
            return

        if df.l["signal_ema_crossover"][-1]:
            price = self.data.l["close"][-1]
            self.buy(size=0.1, sl=price - 0.01, tp=price + 0.01)
        elif df.l["signal_ema_crossunder"][-1]:
            price = self.data.l["close"][-1]
            self.sell(size=0.1, sl=price + 0.01, tp=price - 0.01)

    def stop(self, df: DataFeed):
        print(df.tail())
        print(self.orders)

    def plot(self, df: DataFeed):
        return dict(
            items=[
                dict(
                    type="scatter",
                    x=df.index,
                    y=df["ema1"],
                    line=dict(color="blue", width=1),
                    name="ema1",
                ),
                dict(
                    type="scatter",
                    x=df.index,
                    y=df["ema2"],
                    line=dict(color="green", width=1),
                    name="ema2",
                ),
            ]
        )


if __name__ == "__main__":
    lt = let_metatrader(
        strategy=SmaCross,
        datas=[("EURGBP", "5m")],
        mt5_login=os.getenv("MT5_LOGIN"),
        mt5_password=os.getenv("MT5_PASSWORD"),
        mt5_server=os.getenv("MT5_SERVER"),
        mt5_wine=os.getenv("MT5_WINE", None),
        commander=TelegramCommander,
        commander_kwargs=dict(
            token=os.getenv("TELEGRAM_TOKEN"),
            chat_id=os.getenv("TELEGRAM_CHAT_ID"),
        ),
    )

    lt.run()

TelegramAPI ¤

TelegramAPI(token: str, chat_id: int, *args, **kwargs)

Singleton object communicate across multipprocessing

Source code in lettrade/commander/telegram.py
 98
 99
100
101
102
def __init__(self, token: str, chat_id: int, *args, **kwargs) -> None:
    self._token: str = token
    self._chat_id: int = int(chat_id)
    self._bots_queue = dict()
    self._bot_selected = None

cleanup ¤

cleanup() -> None

Stops all running telegram threads.

Source code in lettrade/commander/telegram.py
147
148
149
150
151
def cleanup(self) -> None:
    """Stops all running telegram threads."""
    # This can take up to `timeout` from the call to `start_polling`.
    asyncio.run_coroutine_threadsafe(self._cleanup_telegram(), self._loop)
    self._thread.join()

send_message ¤

send_message(msg: str, pname: str, **kwargs) -> None

Send message to Telegram Bot

Parameters:

  • msg (str) –

    Message

Returns:

  • _type_ ( None ) –

    None

Source code in lettrade/commander/telegram.py
129
130
131
132
133
134
135
136
137
138
139
def send_message(self, msg: str, pname: str, **kwargs) -> None:
    """Send message to Telegram Bot

    Args:
        msg (str): Message

    Returns:
        _type_: `None`
    """
    msg = f"*[Process: {pname}]*\n\n{escape_markdown(msg)}"
    asyncio.run_coroutine_threadsafe(self._send_msg(msg, **kwargs), self._loop)

start ¤

start(pname: str, action_queue: Queue)

Start

Source code in lettrade/commander/telegram.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def start(self, pname: str, action_queue: Queue):
    """Start"""
    if pname in self._bots_queue:
        logger.warning("Process name %s override existed action queue", pname)
    self._bots_queue[pname] = action_queue

    logger.info("New join process: %s", pname)

    # TODO: Lock for safe multipleprocessing
    if hasattr(self, "_keyboard"):
        return

    self._init_keyboard()
    self._start_thread()

TelegramCommander ¤

TelegramCommander(
    token: str,
    chat_id: int,
    api: TelegramAPI | None = None,
    *args,
    **kwargs
)

Bases: Commander

Send notify and receive command from Telegram Bot

Parameters:

  • token (str) –

    Telegram Bot token

  • chat_id (int) –

    Telegram chat_id

Source code in lettrade/commander/telegram.py
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
def __init__(
    self,
    token: str,
    chat_id: int,
    api: TelegramAPI | None = None,
    *args,
    **kwargs,
) -> None:
    """_summary_

    Args:
        token (str): Telegram Bot token
        chat_id (int): Telegram chat_id
    """
    super().__init__(*args, **kwargs)
    self._api = api or TelegramAPI(token=token, chat_id=chat_id)
    self._is_running = True

init ¤

init(
    bot: LetTradeBot,
    brain: Brain,
    exchange: Exchange,
    strategy: Strategy,
)

Init commander dependencies

Parameters:

Source code in lettrade/commander/commander.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def init(
    self,
    bot: "LetTradeBot",
    brain: "Brain",
    exchange: "Exchange",
    strategy: "Strategy",
):
    """Init commander dependencies

    Args:
        bot (LetTradeBot): LetTradeBot object
        brain (Brain): Brain of bot
        exchange (Exchange): Manage bot trading
        strategy (Strategy): Strategy of bot
    """
    self.bot = bot
    self.brain = brain
    self.exchange = exchange
    self.strategy = strategy

    self._name = self.bot._name

start ¤

start()

Start

Source code in lettrade/commander/telegram.py
556
557
558
559
560
def start(self):
    """Start"""
    logger.info("TelegramCommander start %s", self._name)
    q = self._t_action()
    self._api.start(pname=self._name, action_queue=q)

stop ¤

stop()

Stop

Source code in lettrade/commander/telegram.py
562
563
564
565
566
def stop(self):
    """Stop"""
    logger.info("TelegramCommander stop %s", self._name)
    self._api.cleanup()
    self._is_running = False

authorized_only ¤

authorized_only(
    command_handler: Callable[
        ..., Coroutine[Any, Any, None]
    ]
)

Decorator to check if the message comes from the correct chat_id

Parameters:

Source code in lettrade/commander/telegram.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def authorized_only(command_handler: Callable[..., Coroutine[Any, Any, None]]):
    """Decorator to check if the message comes from the correct chat_id

    Args:
        command_handler (Callable[..., Coroutine[Any, Any, None]]): Telegram CommandHandler
    """

    @wraps(command_handler)
    async def wrapper(self: "TelegramCommander", *args, **kwargs):
        """Decorator logic"""
        update = kwargs.get("update") or args[0]

        # Reject unauthorized messages
        if update.callback_query:
            cchat_id = int(update.callback_query.message.chat.id)
        else:
            cchat_id = int(update.message.chat_id)

        if cchat_id != self._chat_id:
            logger.info(f"Rejected unauthorized message from: {update.message.chat_id}")
            return wrapper

        logger.debug(
            "Executing handler: %s for chat_id: %s",
            command_handler.__name__,
            self._chat_id,
        )
        try:
            return await command_handler(self, *args, **kwargs)
        except Exception as e:
            await self._send_msg(str(e))
            # except BaseException:
            logger.exception("Exception occurred within Telegram module", exc_info=e)

    return wrapper