Skip to content

feeder ¤

LiveDataFeeder ¤

LiveDataFeeder(
    api: LiveAPI | None = None,
    tick: bool = 5,
    start_size: int = 500,
    api_kwargs: dict | None = None,
    **kwargs
)

Bases: DataFeeder

tick == 0: wait until new bar change value
tick > 0: sleep tick time (in seconds) then update
Source code in lettrade/exchange/live/feeder.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def __init__(
    self,
    api: LiveAPI | None = None,
    tick: bool = 5,
    start_size: int = 500,
    api_kwargs: dict | None = None,
    **kwargs,
) -> None:
    """
    tick:
        tick < 0: no tick, just get completed bar
        tick == 0: wait until new bar change value
        tick > 0: sleep tick time (in seconds) then update
    """
    super().__init__()

    # API init
    if api is None:
        if api_kwargs is None:
            raise RuntimeError("api or api_kwargs cannot missing")
        api = self._api_cls(**api_kwargs)

    self._api = api
    self._tick = tick
    self._start_size = start_size
    self._config = kwargs

    if isinstance(self._tick, int):
        self._wait_timeframe = TimeFrame(f"{self._tick}s")
    else:
        self._wait_timeframe = None

is_continous property ¤

is_continous

Flag check is realtime continous datafeeder