为编程爱好者分享易语言教程源码的资源网

网站首页 > 网络编程 > 其它综合 正文

Python Tornado集成Apscheduler定时任务(oracle创建定时任务)

三叶资源网 2022-10-20 19:18:50 其它综合 478 ℃ 0 评论

熟悉Python的人可能都知道,Apscheduler是python里面一款非常优秀的任务调度框架,这个框架是从鼎鼎大名的Quartz移植而来。

之前有用过Flask版本的Apscheduler做定时任务。刚好前不久接触了Tornado,顺便玩玩Tornado版本的Apscheduler。

本篇做了一个简单的Wdb页面,用于添加和删除定时任务,小伙伴们可以基于这个做一些扩展,比如把定时定时任务写入数据库,改变cron规则等等。

主要功能点如下:

#新增任务(需要动态改变job_id的值)

http://localhost:8888/scheduler?job_id=1&action=add

#删除任务(需要动态改变job_id的值)

http://localhost:8888/scheduler?job_id=1&action=remov

执行结果可以在console看到

from datetime import datetime
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.web import RequestHandler, Application
from apscheduler.schedulers.tornado import TornadoScheduler


scheduler = None
job_ids   = []

# 初始化
def init_scheduler():
    global scheduler
    scheduler = TornadoScheduler()
    scheduler.start()
    print('[Scheduler Init]APScheduler has been started')

# 要执行的定时任务在这里
def task1(options):
    print('{} [APScheduler][Task]-{}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'), options))


class MainHandler(RequestHandler):
    def get(self):
        self.write('<a href="/scheduler?job_id=1&action=add">add job</a><br><a href="/scheduler?job_id=1&action=remove">remove job</a>')


class SchedulerHandler(RequestHandler):
    def get(self):
        global job_ids
        job_id = self.get_query_argument('job_id', None)
        action = self.get_query_argument('action', None)
        if job_id:
            # add
            if 'add' == action:
                if job_id not in job_ids:
                    job_ids.append(job_id)
                    scheduler.add_job(task1, 'interval', seconds=3, id=job_id, args=(job_id,))
                    self.write('[TASK ADDED] - {}'.format(job_id))
                else:
                    self.write('[TASK EXISTS] - {}'.format(job_id))
            # remove
            elif 'remove' == action:
                if job_id in job_ids:
                    scheduler.remove_job(job_id)
                    job_ids.remove(job_id)
                    self.write('[TASK REMOVED] - {}'.format(job_id))
                else:
                    self.write('[TASK NOT FOUND] - {}'.format(job_id))
        else:
            self.write('[INVALID PARAMS] INVALID job_id or action')


if __name__ == "__main__":
    routes    = [
        (r"/", MainHandler),
        (r"/scheduler/?", SchedulerHandler),
    ]
    init_scheduler()
    app       = Application(routes, debug=True)
    app.listen(8888)
    IOLoop.current().start()

来源:三叶资源网,欢迎分享,公众号:iisanye,(三叶资源网⑤群:21414575

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

百度站内搜索
关注微信公众号
三叶资源网⑤群:三叶资源网⑤群

网站分类
随机tag
go agent配置本机QQ登录常量支持库软件加密黑客数字雨163邮箱登陆朗读DZ论坛postB站弹幕助手源码百度网盘算法笔记HP端口转发CHM帮助文档驱动开发教程校验车辆识别号IOCP验证码图片屏幕录象QQ好友计数器软件群空间
最新评论