Source code for brica1.scheduler

# -*- coding: utf-8 -*-

"""
scheduler.py
=====

This module contains the `Scheduler` class which is a base class for various
types of schedulers. The `VirtualTimeSyncScheduler` is implemneted for now.

"""

__all__ = ["Scheduler", "VirtualTimeSyncScheduler", "VirtualTimeScheduler",
           "RealTimeSyncScheduler"]

from .utils import current_time
from .supervisor import NullSupervisor

from abc import ABCMeta, abstractmethod
import time

import queue


[docs]class Scheduler(object): """ This class is an abstract class for creating `Scheduler`s. Subclasses must override the `step()` method to specify its implementation. """ __metaclass__ = ABCMeta def __init__(self, agent, supervisor=NullSupervisor): """ Create a new `Scheduler` instance. Args: agent (Agent): An `Agent` to schedule. supervisor (Supervisor): A `Supervisor` to schedule. Returns: Scheduler: A new `Scheduler` instance. """ super(Scheduler, self).__init__() self.agent = agent self.supervisor = supervisor(agent) self.num_steps = 0 self.current_time = 0 self.components = agent.get_all_components()
[docs] def reset(self): """ Reset the `Scheduler`. Args: None. Returns: None. """ self.num_steps = 0 self.current_time = 0 self.components = []
[docs] def update(self): """ Update the `Scheduler` for given cognitive architecture (agent) Args: agent (Agent): a target to update. Returns: None. """ self.components = self.agent.get_all_components()
[docs] @abstractmethod def step(self): """ Step over a single iteration Args: None. Returns: int: the current time of the `Scheduler`. """ pass
[docs]class VirtualTimeSyncScheduler(Scheduler): """ `VirtualTimeSyncScheduler` is a `Scheduler` implementation for virutal time in a synced manner. """ def __init__(self, agent, supervisor=NullSupervisor, interval=1): """ Create a new `VirtualTimeSyncScheduler` Instance. Args: interval (int): interval in milliseconds between each step Returns: VirtualTimeSyncScheduler: A new `VirtualTimeSyncScheduler` instance. """ super(VirtualTimeSyncScheduler, self).__init__( agent, supervisor=NullSupervisor, ) self.interval = interval
[docs] def step(self): """ Step by the internal interval. The methods `input()`, `fire()`, and `output()` are synchronously called and the time is incremented by the given interval for each step. Args: None. Returns: int: the current time of the `Scheduler`. """ for component in self.components: component.input(self.current_time) self.supervisor.step() for component in self.components: component.train() component.fire() self.current_time = self.current_time + self.interval for component in self.components: component.output(self.current_time) return self.current_time
[docs]class VirtualTimeScheduler(Scheduler): """ `VirtualTimeScheduler` is a `Scheduler` implementation for virutal time. """
[docs] class Event(object): """ `Event` is a queue object for `PriorityQueue` in VirtualTimeScheduler. """ def __init__(self, time, component, action='fire'): """ Create a new `Event` instance. Args: time (int): the time of the `Event`. component (Component): `Component` to be handled. Returns: Component: a new `Component` instance. """ super(VirtualTimeScheduler.Event, self).__init__() self.time = time self.component = component self.action = action def __lt__(self, other): return self.time < other.time
def __init__(self, agent, supervisor=NullSupervisor): """ Create a new `Event` instance. Args: time (int): the time of the `Event`. component (Component): `Component` to be handled. Returns: Component: a new `Component` instance. """ super(VirtualTimeScheduler, self).__init__( agent, supervisor=NullSupervisor, ) self.event_queue = queue.PriorityQueue()
[docs] def update(self): """ Update the `Scheduler` for given cognitive architecture (agent) Args: agent (Agent): a target to update. Returns: None. """ super(VirtualTimeScheduler, self).update() for component in self.components: self.event_queue.put( VirtualTimeScheduler.Event( component.offset, component, 'sleep', ) )
[docs] def step_for_time(self, time): fires = [] sleeps = [] while not self.event_queue.empty(): if self.event_queue.queue[0].time == time: event = self.event_queue.get() component = event.component if event.action == 'fire': next_event = VirtualTimeScheduler.Event( event.time + component.sleep, component, 'sleep' ) sleeps.append(component) else: next_event = VirtualTimeScheduler.Event( event.time + component.interval, component, 'fire' ) fires.append(component) self.event_queue.put(next_event) else: break for component in sleeps: component.output(time) for component in fires: component.input(time) self.supervisor.step() for component in fires: component.train() component.fire()
[docs] def step(self, interval=0): """ Step with given interval or to the next event An event is dequeued and `output()`, `input()`, and `fire()` are called for the `Component` of interest. Args: interval (int): interval in milliseconds between each step Returns: int: the current time of the `Scheduler`. """ if interval == 0: self.current_time = self.event_queue.queue[0].time self.step_for_time(self.current_time) else: self.current_time += interval while self.event_queue.queue[0].time <= self.current_time: self.step_for_time(self.event_queue.queue[0].time) return self.current_time
[docs]class RealTimeSyncScheduler(Scheduler): """ `RealTimeSyncScheduler` is a `Scheduler` implementation for real time in a synced manner. """ def __init__(self, agent, supervisor=NullSupervisor, interval=1): """ Create a new `RealTimeSyncScheduler` Instance. Args: interval (int): minimu interval in seconds between input and output of the modules. Returns: RealTimeSyncScheduler: A new `RealTimeSyncScheduler` instance. """ self.last_input_time = -1 self.last_output_time = -1 self.last_spent = -1 self.last_dt = -1 super(RealTimeSyncScheduler, self).__init__( agent, supervisor=NullSupervisor, ) self.set_interval(interval)
[docs] def reset(self): super(RealTimeSyncScheduler, self).reset() self.set_interval(1)
[docs] def set_interval(self, interval): self.interval = int(interval) assert self.interval > 0
[docs] def step(self): """ Step by the internal interval. The methods `input()`, `fire()`, and `output()` are synchronously called for all components. The time when it started calling input() and output() of the components is stored in self.last_input_time and self.last_output_time, respectively. self.interval sets the *minimum* interval between the point in time when input() is called and when output() is called. The actual interval between input() and output() will always be longer than self.interval, although the scheduler tries to make the discrepancy minimum. When calling fire() of the components takes longer than the set interval, calling output() of the components will be later than the scheduled time self.input_time + self.interval. In this case, self.lagged will be set True. During the execution of this method, it will also set self.last_spent, which will be the time spent until all components are fired after self.last_input_time is set. Args: None. Returns: int: the current time of the `Scheduler`. """ self.last_input_time = current_time() self.current_time = self.last_input_time for component in self.components: component.input(self.last_input_time) self.supervisor.step() for component in self.components: component.train() component.fire() self.last_spent = current_time() - self.last_input_time last_dt = self.interval - self.last_spent self.lagged = False if last_dt > 0: time.sleep(last_dt / 1000.0) elif last_dt < 0: self.lagged = True self.last_output_time = current_time() self.current_time = self.last_output_time for component in self.components: component.output(self.last_output_time) self.last_output_time = current_time() self.current_time = self.last_output_time return self.current_time