"""
Dummy pyb.Timer class
"""
import threading
from typing import Callable, Optional, Dict, Any
class ThreadingTicker(threading.Timer):
"""
A repeated version of threading.Timer
threading.Timer fires only once, basically just delaying a function call.
This extension will keep calling a function like a ticker.
A problem with this timer is still that the script is not necessarily kept
alive by it. This is because daemon _mode is set to `True`. To keep a
ticker running you need to run the script in interactive _mode. But when
daemon it set to `False`, the process becomes very hard to kill as it does
not respond to a halt signal.
An alternative was tried with a single thread with a While loop that keeps
waiting, but this shows the same problem.
This object is *not* a mirror of `biorobotics.Ticker`!
"""
def run(self):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)
def __del__(self):
"""Destructor"""
# Stop ticker, prevent it running when the handle has been destroyed
self.cancel()
class TimerChannel:
"""Channel of a Timer object"""
# Values of timer[channel], to be independent from instances
_values: Dict[int, Dict[int, Any]] = {}
def __init__(self, timer: 'Timer', channel: int, mode: int,
callback: Optional[Callable] = None,
pin: Optional['Pin'] = None, **kwargs):
self.timer = timer
self.channel = channel
self.mode = mode
self.callback = callback
self.pin = pin
timer_id = self.timer.timer_number
if timer_id not in self._values:
self._values[timer_id] = {}
if self.channel not in self._values[timer_id]:
self._values[timer_id][self.channel] = 0
def pulse_width_percent(self, percentage: float):
"""Set PWM percentage between 0.0 and 100.0"""
self._values[self.timer.timer_number][self.channel] = percentage
def test_get_value(self):
"""Dummy method to get the value of this channel"""
return self._values[self.timer.timer_number][self.channel]
[docs]class Timer:
"""Hardware timer class"""
PWM = 0
PWM_INVERTED = 1
OC_TIMING = 2
OC_ACTIVE = 3
OC_INACTIVE = 4
OC_TOGGLE = 5
OC_FORCED_ACTIVE = 6
OC_FORCED_INACTIVE = 7
IC = 8
ENC_A = 9
ENC_B = 10
ENC_AB = 11
# Values of the timers, independent from instances
_values: Dict[int, int] = {}
def __init__(self, timer_id: int, freq: float = None,
callback: Callable = None, *args, **kwargs):
"""Constructor
Calls self.init()
:param timer_id: Timer id (defined by hardware)
"""
self.timer_number = timer_id
if self.timer_number not in self._values:
self._values[self.timer_number] = 0
self.freq = 1.0
self.channel_number = 0
self._user_callback: Optional[Callable] = None
# The threading-based object used for actual timing
self._threading_ticker: Optional[ThreadingTicker] = None
self.init(freq, callback, *args, **kwargs)
[docs] def init(self, freq: float = None, callback: Callable = None,
*args, **kwargs):
"""Initialize timer
:param freq: Timer frequency in Hz
:param callback: Function to be attached to timer
"""
self.freq = freq
self.callback(callback)
[docs] def deinit(self):
"""De-initialize timer"""
self.callback(None)
[docs] def channel(self, channel_number, mode=None, pin=None, *args, **kwargs):
"""Initialize and/or return a TimerChannel"""
if mode is None:
return # pragma: no cover
return TimerChannel(self, channel_number, mode=mode, callback=None,
pin=pin, *args, **kwargs)
[docs] def callback(self, func: Optional[Callable]):
"""Set or clear the timer callback
Set `func` to `None` to clear the callback.
"""
# Clear old ticker object first
if self._threading_ticker is not None:
self._threading_ticker.cancel() # Make sure the old timer stops
del self._threading_ticker
self._threading_ticker = None
if func is not None:
interval = 1.0 / self.freq
self._threading_ticker = ThreadingTicker(interval, func)
# With daemon enabled, the script is not kept alive by the ticker
# and ticker stops on script exit. See ThreadingTicker.__doc__
self._threading_ticker.daemon = True
self._threading_ticker.args = [self] # The real timer passes a
# single variable (the timer itself), we mimic that here
# Note that this argument is not used when an object method is
# set as callback. In this case there would be sort of two
# arguments.
self._threading_ticker.start() # Timer objects are started by
# default
[docs] def counter(self, new_counter: Optional[int] = None) -> int:
"""Get or set timer counter
When in encoder _mode, this corresponds to the number of encoder pulses.
"""
if new_counter is None:
return self._values[self.timer_number]
self._values[self.timer_number] = new_counter
[docs] def test_set_value(self, value):
"""Set Timer value
Will be returned with `counter()`
"""
self._values[self.timer_number] = value