Source code for pyramid_cron

from __future__ import absolute_import, print_function, division

import logging
import socket
from datetime import datetime, timedelta

import six

log = logging.getLogger(__name__)

__version__ = '0.1.1.dev'


class Task(object):

    def __init__(self, f, min, hour, day, month, dow, idle):
        self.f = f

        class Wildcard(set):

            def __contains__(self, other):
                return True

        wildcard = Wildcard()

        def conv(val):
            if val == '*':
                return wildcard
            if isinstance(val, six.integer_types):
                return set([val])
            if not isinstance(val, set):
                val = set(val)
            return val

        self.min = conv(min)
        self.hour = conv(hour)
        self.day = conv(day)
        self.month = conv(month)
        self.dow = conv(dow)

        self.idle = idle

    def check(self, t):
        return ((t.minute in self.min) and
                (t.hour in self.hour) and
                (t.day in self.day) and
                (t.month in self.month) and
                (t.weekday() in self.dow))

    def go(self, request, run_start):
        log.info("%s start", self.f.__name__)
        def time_left():
            return ((run_start + timedelta(seconds=60)) - datetime.now()).total_seconds()
        self.f(dict(request=request, registry=request.registry, time_left=time_left))
        log.info("%s end", self.f.__name__)


[docs]def add_cron_task(config, f, min='*', hour='*', day='*', month='*', dow='*', idle=False): """ Register a function for execution by the scheduler. Task functions must have the following signature:: def mytask(system): request = system['request'] registry = system['registry'] time_left = system['time_left'] # do stuff Additional keys may be added in the future: the single-arg signature ensures that task functions will be forward-compatible. The ``time_left`` member is a no-parameter function that returns how many seconds are remaining in the allotted 60 seconds of the current cron run. When the 60 seconds is exceeded, the returned value will be negative. In addition to the callback function, you can specify a schedule, using a cron-like syntax. For the time periods of ``min``, ``hour``, ``day``, ``month``, and ``dow`` (day of week), you can specify an integer, a set of integers, or the '*' wildcard character. The default argument is '*'. Hours are specified in 24-hour time. For example, this will run the task every day, at 2:00:: config.add_cron_task(..., hour=2) This will run the task every day at 2:00, 10:00, and 18:00:: config.add_cron_task(..., hour=[2, 10, 18]) To run the task 'every 2 hours', you can use ``range()``:: config.add_cron_task(..., hour=range(0, 24, 2)) :param f: The function to execute. Task functions must have accept a single argument, which will be a ``system`` dict containing keys for the Pyramid ``request`` and ``registry``. :param min: Specify which minutes to run the task. :param hour: Specify which hours to run the task. :param day: Specify which days to run the task. :param month: Specify which months to run the task. :param dow: Specify which days of the week to run the task. :param idle: If true, executes the task after all non-idle tasks have completed, and only when there is time remaining in the 60 second window since the cron view was triggered. """ def register(): registry = config.registry registry.setdefault('cron_tasks', []) registry['cron_tasks'].append(Task(f, min=min, hour=hour, day=day, month=month, dow=dow, idle=idle)) # This discriminator prevents a task from being registered twice. config.action(('cron_task', f), register)
class CronView(object): """ A view to allow the cron signal to be triggered by an HTTP request. This is convenient because it means that all the cron stuff happens with the webserver's permissions. """ def __init__(self, request): self.request = request def __call__(self): request = self.request server_ip = socket.gethostbyname(request.host.split(':')[0]) allowed = set(['127.0.0.1', '::1', server_ip]) if request.remote_addr in allowed: registry = request.registry # This intentionally uses localtime, not UTC. start = datetime.now() log.warn('begin cron run') for task in [ t for t in registry['cron_tasks'] if not t.idle ]: if task.check(start): task.go(request, start) for task in [ t for t in registry['cron_tasks'] if t.idle ]: if datetime.now() - start >= timedelta(seconds=60): break if task.check(start): task.go(request, start) log.warn('end cron run') return 'ok' else: return 'fail %s' % request.remote_addr def includeme(config): config.add_route('cron', '/cron') config.add_view(CronView, route_name='cron', renderer='string') config.add_directive('add_cron_task', add_cron_task)