Source code for scron.base

# Copyright 2019 Wojciech Banaś
# This code is released under the GPL3 or individual commercial license.

from machine import Timer
from utime import localtime, sleep_ms, time, ticks_ms

from scron.scount import SimpleCounter


class SimpleCRONBase(SimpleCounter):

    def __init__(self, *args, **kwargs):
        super(SimpleCRONBase, self).__init__(*args, **kwargs)
        self.timer = None

    def _sync_time(self):
        "Synchronizes SimpleCRON with time."
        self._set_time_change(self._estimate_time_change())
        self._first_step()

    def _estimate_time_change(self):
        """\
        Estimate the time change.
        :return:
        """
        last = int(time()) + 1
        current = last
        while 1:
            current = int(time())
            if current > last:
                return (self._get_time_change_pointer() + 1) % 1000
            sleep_ms(1)

    def _set_time_change(self, time_change):
        self.time_change = time_change

    def _get_time_change_pointer(self):
        return ticks_ms()

    def _get_time_change_correction(self, delta_time_ms):
        current = self._get_time_change_pointer() % 1000
        # Immediate triggering
        # There may also be no negative values of
        if current >= self.time_change:
            out = delta_time_ms - (current - self.time_change)
        else:
            out = delta_time_ms - (1000 - (self.time_change - current))
        if out < 0:
            return 0
        else:
            return out

    def _first_step(self):
        if self.timer == None:
            return
        self.timer.deinit()
        if len(self.callbacks) > 0:
            next_pointer = self.get_next_pointer(*self.get_current_pointer())
            if next_pointer == None:
                # Possible when the callback is removed during operation.
                if len(self.callbacks) > 0:
                    # This situation should not happen. If there is a callback, then the next indicator must exist.
                    raise Exception('scron bug,1')
                return
            self.next_step(*next_pointer)(self.timer)

    def _wait_for_unlock_rw(self):
        """\
        Waiting for the lock to be removed
        """
        # We wait 5 seconds, if the lock is not removed then we emit an error.
        from utime import sleep_ms
        for i in range(5000):
            sleep_ms(1)
            if not self._lock_rw:
                return
        raise Exception('Too long to wait for the lock to be removed!')

    def remove(self, callback_name, force=False, _lock=True):
        """
        Removes from the counters a callback that occurs under ID callback_name.

        Recalculates the nearest callback to call.

        :param force: force removal of the callback.
        :param callback_name: callback name
        """
        super(SimpleCRONBase, self).remove(callback_name, force, _lock)
        self._first_step()

    def run(self, timer_id=1):
        """
        Initiates a list of tasks and reserves one hardware timer.

        **Warning:**
        "OSError: 261" - error means a problem with the hardware timer.
        Try to set another timer ID.
        `See MicroPython documentation for machine.Timer. <https://docs.micropython.org/en/latest/library/machine.Timer.html>`_

        :param timer_id: hardware timer ID
        """
        # One good start up is enough
        if self.timer is not None:
            raise Exception('You can run SimpleCRON once.')
        timer = Timer(timer_id)
        # Check if this timer is possible to use
        # If not, we will get an error: OSError: 261
        timer.init(period=1, mode=Timer.ONE_SHOT, callback=lambda t: None)
        self.timer = timer
        self._sync_time()