其他
Python Web:Flask异步执行任务
以下文章来源于hackpython ,作者ayuliao
出自 | hackpython(ID:hackpython)
Flask 是 Python 中有名的轻量级同步 web 框架,在一些开发中,可能会遇到需要长时间处理的任务,此时就需要使用异步的方式来实现,让长时间任务在后台运行,先将本次请求的响应状态返回给前端,不让前端界面「卡顿」,当异步任务处理好后,如果需要返回状态,再将状态返回。
怎么实现呢?
使用线程的方式
当要执行耗时任务时,直接开启一个新的线程来执行任务,这种方式最为简单快速。
通过 ThreadPoolExecutor 来实现
from time import sleep
from concurrent.futures import ThreadPoolExecutor
# DOCS https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
# 创建线程池执行器
executor = ThreadPoolExecutor(2)
app = Flask(__name__)
@app.route('/jobs')
def run_jobs():
# 交由线程去执行耗时任务
executor.submit(long_task, 'hello', 123)
return 'long task running.'
# 耗时任务
def long_task(arg1, arg2):
print("args: %s %s!" % (arg1, arg2))
sleep(5)
print("Task is done!")
if __name__ == '__main__':
app.run()
pip install redis
from celery import Celery
app = Flask(__name__)
# 配置
# 配置消息代理的路径,如果是在远程服务器上,则配置远程服务器中redis的URL
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
# 要存储 Celery 任务的状态或运行结果时就必须要配置
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
# 初始化Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
# 将Flask中的配置直接传递给Celery
celery.conf.update(app.config)
def long_task(arg1, arg2):
# 耗时任务的逻辑
return result
def index():
task = long_task.delay(1, 2)
delay () 方法是 applyasync () 方法的快捷方式,applyasync () 参数更多,可以更加细致的控制耗时任务,比如想要 long_task () 在一分钟后再执行
@app.route('/', methods=['GET', 'POST'])
def index():
task = long_task.apply_async(args=[1, 2], countdown=60)
delay () 与 apply_async () 会返回一个任务对象,该对象可以获取任务的状态与各种相关信息。
@celery.task(bind=True)
def long_task(self):
verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
message = ''
total = random.randint(10, 50)
for i in range(total):
if not message or random.random() < 0.25:
# 随机的获取一些信息
message = '{0} {1} {2}...'.format(random.choice(verb),
random.choice(adjective),
random.choice(noun))
# 更新Celery任务状态
self.update_state(state='PROGRESS',
meta={'current': i, 'total': total,
'status': message})
time.sleep(1)
# 返回字典
return {'current': 100, 'total': 100, 'status': 'Task completed!',
'result': 42}
def longtask():
# 异步调用
task = long_task.apply_async()
# 返回 202,与Location头
return jsonify({}), 202, {'Location': url_for('taskstatus',
task_id=task.id)}
def taskstatus(task_id):
task = long_task.AsyncResult(task_id)
if task.state == 'PENDING': # 在等待
response = {
'state': task.state,
'current': 0,
'total': 1,
'status': 'Pending...'
}
elif task.state != 'FAILURE': # 没有失败
response = {
'state': task.state, # 状态
# meta中的数据,通过task.info.get()可以获得
'current': task.info.get('current', 0), # 当前循环进度
'total': task.info.get('total', 1), # 总循环进度
'status': task.info.get('status', '')
}
if 'result' in task.info:
response['result'] = task.info['result']
else:
# 后端执行任务出现了一些问题
response = {
'state': task.state,
'current': 1,
'total': 1,
'status': str(task.info), # 报错的具体异常
}
return jsonify(response)
classname: 'my-class',
id: 'my-id',
// 进度条要出现的位置
target: document.getElementById('myDivId')
};
// 初始化进度条对象
var nanobar = new Nanobar( options );
nanobar.go( 30 ); // 30% 进度条
nanobar.go( 76 ); // 76% 进度条
// 100% 进度条,进度条结束
nanobar.go(100);
<button id="start-bg-job">Start Long Calculation</button><br><br>
<div id="progress"></div>
$(function() {
$('#start-bg-job').click(start_long_task);
});
// 请求 longtask 接口
function start_long_task() {
// 添加元素在html中
div = $('<div class="progress"><div></div><div>0%</div><div>...</div><div> </div></div><hr>');
$('#progress').append(div);
// 创建进度条对象
var nanobar = new Nanobar({
bg: '#44f',
target: div[0].childNodes[0]
});
// ajax请求longtask
$.ajax({
type: 'POST',
url: '/longtask',
// 获得数据,从响应头中获取Location
success: function(data, status, request) {
status_url = request.getResponseHeader('Location');
// 调用 update_progress() 方法更新进度条
update_progress(status_url, nanobar, div[0]);
},
error: function() {
alert('Unexpected error');
}
});
}
// 更新进度条
function update_progress(status_url, nanobar, status_div) {
// getJSON()方法是JQuery内置方法,这里向Location中对应的url发起请求,即请求「/status/<task_id>」
$.getJSON(status_url, function(data) {
// 计算进度
percent = parseInt(data['current'] * 100 / data['total']);
// 更新进度条
nanobar.go(percent);
// 更新文字
$(status_div.childNodes[1]).text(percent + '%');
$(status_div.childNodes[2]).text(data['status']);
if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') {
if ('result' in data) {
// 展示结果
$(status_div.childNodes[3]).text('Result: ' + data['result']);
}
else {
// 意料之外的事情发生
$(status_div.childNodes[3]).text('Result: ' + data['state']);
}
}
else {
// 2秒后再次运行
setTimeout(function() {
update_progress(status_url, nanobar, status_div);
}, 2000);
}
});
}
◆
精彩推荐
◆
5大必知的图算法,附Python代码实现
如何用爬虫技术帮助孩子秒到心仪的幼儿园(基础篇)
Python传奇:30年崛起之路 2019年最新华为、BAT、美团、头条、滴滴面试题目及答案汇总
阿里巴巴杨群:高并发场景下Python的性能挑战