Source code for pyb._timer

"""
Dummy pyb.Timer class
"""

import threading
from typing import Callable, Optional, Dict, Any


class ThreadingTicker(threading.Timer):
    """
    A repeated version of threading.Timer

    threading.Timer fires only once, basically just delaying a function call.
    This extension will keep calling a function like a ticker.

    A problem with this timer is still that the script is not necessarily kept
    alive by it. This is because daemon _mode is set to `True`. To keep a
    ticker running you need to run the script in interactive _mode. But when
    daemon it set to `False`, the process becomes very hard to kill as it does
    not respond to a halt signal.
    An alternative was tried with a single thread with a While loop that keeps
    waiting, but this shows the same problem.

    This object is *not* a mirror of `biorobotics.Ticker`!
    """

    def run(self):
        while not self.finished.wait(self.interval):
            self.function(*self.args, **self.kwargs)

    def __del__(self):
        """Destructor"""

        # Stop ticker, prevent it running when the handle has been destroyed
        self.cancel()


class TimerChannel:
    """Channel of a Timer object"""

    # Values of timer[channel], to be independent from instances
    _values: Dict[int, Dict[int, Any]] = {}

    def __init__(self, timer: 'Timer', channel: int, mode: int,
                 callback:  Optional[Callable] = None,
                 pin: Optional['Pin'] = None, **kwargs):

        self.timer = timer
        self.channel = channel
        self.mode = mode
        self.callback = callback
        self.pin = pin

        timer_id = self.timer.timer_number

        if timer_id not in self._values:
            self._values[timer_id] = {}

        if self.channel not in self._values[timer_id]:
            self._values[timer_id][self.channel] = 0

    def pulse_width_percent(self, percentage: float):
        """Set PWM percentage between 0.0 and 100.0"""
        self._values[self.timer.timer_number][self.channel] = percentage

    def test_get_value(self):
        """Dummy method to get the value of this channel"""
        return self._values[self.timer.timer_number][self.channel]


[docs]class Timer: """Hardware timer class""" PWM = 0 PWM_INVERTED = 1 OC_TIMING = 2 OC_ACTIVE = 3 OC_INACTIVE = 4 OC_TOGGLE = 5 OC_FORCED_ACTIVE = 6 OC_FORCED_INACTIVE = 7 IC = 8 ENC_A = 9 ENC_B = 10 ENC_AB = 11 # Values of the timers, independent from instances _values: Dict[int, int] = {} def __init__(self, timer_id: int, freq: float = None, callback: Callable = None, *args, **kwargs): """Constructor Calls self.init() :param timer_id: Timer id (defined by hardware) """ self.timer_number = timer_id if self.timer_number not in self._values: self._values[self.timer_number] = 0 self.freq = 1.0 self.channel_number = 0 self._user_callback: Optional[Callable] = None # The threading-based object used for actual timing self._threading_ticker: Optional[ThreadingTicker] = None self.init(freq, callback, *args, **kwargs)
[docs] def init(self, freq: float = None, callback: Callable = None, *args, **kwargs): """Initialize timer :param freq: Timer frequency in Hz :param callback: Function to be attached to timer """ self.freq = freq self.callback(callback)
[docs] def deinit(self): """De-initialize timer""" self.callback(None)
[docs] def channel(self, channel_number, mode=None, pin=None, *args, **kwargs): """Initialize and/or return a TimerChannel""" if mode is None: return # pragma: no cover return TimerChannel(self, channel_number, mode=mode, callback=None, pin=pin, *args, **kwargs)
[docs] def callback(self, func: Optional[Callable]): """Set or clear the timer callback Set `func` to `None` to clear the callback. """ # Clear old ticker object first if self._threading_ticker is not None: self._threading_ticker.cancel() # Make sure the old timer stops del self._threading_ticker self._threading_ticker = None if func is not None: interval = 1.0 / self.freq self._threading_ticker = ThreadingTicker(interval, func) # With daemon enabled, the script is not kept alive by the ticker # and ticker stops on script exit. See ThreadingTicker.__doc__ self._threading_ticker.daemon = True self._threading_ticker.args = [self] # The real timer passes a # single variable (the timer itself), we mimic that here # Note that this argument is not used when an object method is # set as callback. In this case there would be sort of two # arguments. self._threading_ticker.start() # Timer objects are started by
# default
[docs] def counter(self, new_counter: Optional[int] = None) -> int: """Get or set timer counter When in encoder _mode, this corresponds to the number of encoder pulses. """ if new_counter is None: return self._values[self.timer_number] self._values[self.timer_number] = new_counter
[docs] def test_set_value(self, value): """Set Timer value Will be returned with `counter()` """ self._values[self.timer_number] = value