celery介绍、架构、快速使用、包结构,celery执行异步、延迟、定时任务,django中使用celery,定时更新首页轮播图效果实现,数据加入redis缓存的坑及解决

今日内容概要

  • celery介绍,架构
  • celery 快速使用
  • celery包结构
  • celery执行异步任务
  • celery执行延迟任务
  • celery执行定时任务
  • django中使用celery
  • 定时更新轮播图接口

内容详细

1、celery介绍,架构

# celery: 分布式(放在多台机器)的 异步任务 框架 	Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统 	Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.           # celery:能做什么事,解决什么问题? 	异步任务---》项目中同步的操作,可以通过celery把它做成异步 	延迟任务---》等一会再执行任务 	定时任务---》每隔多长时间干什么事      	如果你的项目仅仅想做定时任务,没有必要使用celery,使用apscheduler     -https://www.cnblogs.com/xiao-xue-di/p/14081790.html                      # 大白话理解celery 	1)可以不依赖任何服务器,通过自身命令,启动服务      	2)celery服务为为其他项目服务提供异步解决任务需求的      	注:会有两个服务同时运行,一个是项目服务(django),一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求  	人是一个独立运行的服务 | 医院也是一个独立运行的服务 	正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题 	人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求    # 中间件:不是django中间件 	中间件概念非常大 	数据库中间件:应用程序和数据之间有一个东西 	服务器中间件:web服务和浏览器之间有个东西:nginx 	消息队列中间件:程序和程序之间:redis,rabbitmq    # celery架构图 	broker:任务中间件,消息队列中间件,存储任务,celery本身不提供,需要借助第三方:redis,rabbitmq..          	worker:任务执行单元,真正指向任务的进程,celery提供的      	backend:结果存储,任务执行结果存在某个地方,借助于第三方:redis 

image

2、快速使用

# 安装: 	pip install celery       ##### 第一步:写一个py文件,celery_task.py---》app实例化,写任务 from celery import Celery  # 消息中间件 broker = 'redis://127.0.0.1:6379/2' # 结果存储 backend = 'redis://127.0.0.1:6379/1' # 实例化得到对象 app = Celery('test', broker=broker, backend=backend)  # 写任务---》使用装饰器装饰一下变成celery的任务 @app.task def add(a, b):     # import time     # time.sleep(1)     return a + b        ##### 第二步:提交任务,应该是另一个服务,咱么写了一个py脚本提交 # 提交任务 from celery_task import add  # res=add(7,8)  # 同步调用,一直等结果给我 # print(res)  # 异步调用 res = add.apply_async(args=[7, 8])  # 把任务提交到redis中的消息队列中了,任务中间件,消息队列中间件 print(res)  # 任务id号:e7ef51e3-d71e-4028-93f9-b602d5351a20    ##### 第三步:任务就被提交到redis中了,等待worker执行该任务,启动worker # 启动worker执行任务---》使用命令启动 # 非windows 命令:celery -A celery_task worker -l info # windows: pip3 install eventlet # 注意启动路径 cd到文件同级目录下 celery -A celery_task worker -l info -P eventlet    ##### 第四步:任务被celery执行完了,结果放到redis中了,查询结果,使用代码 AsyncResult # 通过代码把结果取出来 from celery_task import app  # 借助于app from celery.result import AsyncResult  # 导入一个类,来查询结果  id = 'e8fc88cf-4246-43de-ab1d-25ea7e7145cc'  if __name__ == '__main__':     res = AsyncResult(id=id, app=app)  # 根据id,去哪个app中找哪个任务,      if res.successful():  # 执行成功         result = res.get()         print('任务执行成功')         print(result)  # 15      elif res.failed():         print('任务失败')     elif res.status == 'PENDING':         print('任务等待中被执行')     elif res.status == 'RETRY':         print('任务异常后正在重试')     elif res.status == 'STARTED':         print('任务已经开始被执行') 
# 借助于celery的异步秒杀场景分析 # 原始同步场景 100个人,秒杀3个商品--->100个人在浏览器等着开始---》一旦开始--->瞬间100个人同时发送秒杀请求到后端----》---->假设秒杀函数执行2s钟---》100个请求在2s内,一直跟后端连着,假设我的并发量是100,这两秒钟,其他任何人都访问不了了 假设 150人来秒杀---》最多能承受100个人,50个人就请求不了---》不友好  # 异步场景 100个人,秒杀3个商品--->100个人在浏览器等着开始---》一旦开始--->瞬间100个人同时发送秒杀请求到后端----》---->假设秒杀函数执行2s钟---》当前100个请求,过来,使用celery提交100个任务,请求立马返回--->这样的话,2s内能提交特别多的任务,可以接收特别多人发的请求---》后台使用worker慢慢的执行秒杀任务---》多起几个worker---》过了一会,所有提交的任务都执行完了  提交完任务,返回前端---》前端使用个动态图片盖住页面,显示您正在排队,每个2s钟,向后端发送一次ajax请求,带着id号,查询结果是否完成,如果没完成---》再等2s钟--->如果秒杀成功了,显示恭喜您,成了---》如果没有成功,显示很遗憾,没有秒到  # 尝试写写 

3、celery包结构

	-celery_task  # 包  		-__init__.py 		-celery.py  # 写app的py文件 		-home_task.py  # 任务1  		-order_task.py  # 任务2 		-user_task.py  # 任务3      --------------下面这些,跟上面可能在不同项目中----------------     add_task.py  # 提交任务,django中提交 get_result.py  # 查询结果,django中查询 

celery.py

from celery import Celery  # 消息中间件 broker = 'redis://127.0.0.1:6379/2' # 结果存储 backend = 'redis://127.0.0.1:6379/1' # 实例化得到对象 app = Celery('test', broker=broker, backend=backend, include=[     'celery_task.home_task',     'celery_task.order_task',     'celery_task.user_task', ]) # 写好include,会去相应的py下检索任务,这些任务都被app管理 # 以后任务不写在这里了,放到单独的py文件中 

add_task.py

from celery_task.user_task import send_sms   # 2 异步任务另一种方式 res = send_sms.apply_async(args=['1872637484872']) print(res) 

get_result.py

# 通过代码把结果取出来 from celery_task.celery import app  # 借助于app from celery.result import AsyncResult  # 导入一个类,来查询结果  id = 'd9692e2a-1e1f-436c-b58f-b25484cc5c56' if __name__ == '__main__':     res = AsyncResult(id=id, app=app)  # 根据id,去哪个app中找哪个任务,      if res.successful():  # 执行成功         result = res.get()         print('任务执行成功')         print(result)  # 15     elif res.failed():         print('任务失败')     elif res.status == 'PENDING':         print('任务等待中被执行')     elif res.status == 'RETRY':         print('任务异常后正在重试')     elif res.status == 'STARTED':         print('任务已经开始被执行')  

user_task.py

from .celery import app   @app.task def send_sms(phone):     print('手机号:%s,发送成功' % phone)     return True 

4、celery执行异步任务

# 任务名 .delay(参数,参数) # 异步执行  ### add_task.py中演示:  # 1  异步任务 res=send_sms.delay('187888888') print(res) 

5、celery执行延迟任务

### add_task.py中演示:  # # 3  延迟任务--->默认用utc时间---》时区--->东八区 from datetime import datetime, timedelta  eta = datetime.utcnow() + timedelta(seconds=10)  # 5s后时间 # eta = datetime.utcnow() + timedelta(days=3)  # 3天后时间 res = send_sms.apply_async(args=['17777777'], eta=eta) print(res) 

6、celery执行定时任务

### 写在celery.py:  # # 定时任务需要写在这里 ##### 第一步: ### celery的配置信息###   djagno也有配置信息--->setting.py # 时区 app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC app.conf.enable_utc = False ### celery的配置信息---结束###  ##### 第二步: #### 定时任务 from datetime import timedelta # from celery.schedules import crontab  app.conf.beat_schedule = {     'send_sms_5': {         'task': 'celery_task.user_task.send_sms',  # 哪个任务         'schedule': timedelta(seconds=5),  # 每5s干一次任务         # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点         'args': ('1999999999',),     }, }   ##### 第三步: ## cd 到 包同级目录下再进行: ## 启动worker celery -A celery_task worker -l info ## 启动beat   【【【【注意路径】】】】】 celery -A celery_task beat -l info  ### 本质是beat 5s钟自动提交一次任务,worker执行 

7、django中使用celery

# 第一步:把包copy到项目一级路径下 luffy_api 	celery_task 		user_task.py 		order_task.py 		home_task.py 		celery.py 		__init__.py 	luffy_api 	...      ### 在celery.py写入两行代码: # 注意注意: # 一、加载django配置环境 import os  os.environ.setdefault(DJANGO_SETTINGS_MODULE, luffy_api.setting.dev)                # 第二步:在要提交任务的地方,导入执行  ### user_task.py中写入用户事件: @app.task def create_user(mobile, username, password):     # 一旦使用djagno中的东西  User   就一定要加那两句     from user.models import User     User.objects.create_user(mobile=mobile, username=username, password=password)     return True  ### 写在视图类中views.py: from celery_task.user_task import create_user  class TestView(APIView):     def get(self, requeste):         create_user.delay('12222222', 'lqznb', 'lqz12345')         return Response('用户创建任务已经提交')                 # 第三步:启动worker PS E:\Django\luffy_api> celery -A celery_task worker -l info -P eventlet  # 第四步: 	从浏览器只要访问该事件接口 就会触发celery任务提交到broker端,再由worker抓取储存到 redis指定仓库位置           # 公司里的情况,把task放到了不同app中 

8、定时更新轮播图接口效果实现

# 首页轮播图 	去mysql中查的---》假设瞬间10w访问咱们首页----》数据库会查询10w次,返回数据---》但是实际上,咱们轮播图基本不变      # 我们优化一下 	对轮播图接口做个缓存---》第一次访问查询mysql,放到reids中,以后都从redis中取,如果redis中没有,再去数据库中查----》好处在于,对mysql压力小,redis性能高  # 以后如果接口响应慢 	第一想法先加缓存:把查出来的数据缓存到redis中,再来请求,先从redis中查,如果没有,再去mysql查,然后在redis中重新缓存一份                ### 写在 home/views.py中:   class BannerView(GenericViewSet, ListModelMixin):     # class BannerView(GenericViewSet,ListModelMixin):     # 获取所有接口-list,自动生成路由     # qs对象可以像列表一样,切片     queryset = Banner.objects.filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]     serializer_class = BannerSerializer      def list(self, request, *args, **kwargs):  # 重写list         res = super().list(request, *args, **kwargs)          # 逻辑是,先去redis中查询,如果有,直接返回,如果没有,再执行下面super().list这句,这句是去数据库中查         banner_list = cache.get('banner_list')         if banner_list:             print('走了缓存,很快很快')             return APIResponse(result=banner_list)         else:             print('没走缓存比较慢')             res = super().list(request, *args, **kwargs)             # 再缓存一份             cache.set('banner_list', res.data)          return APIResponse(result=res.data)      # 从浏览器刷新接口访问 	http://127.0.0.1:8000/api/v1/home/banner/ 

image

8.1 加入缓存的坑

# redis中有一份数据,mysql中有一份数据 # 存在问题:mysql更新了,reids更新了么? # 专业名词叫:双写一致性问题  写入mysql,redis是否同步  # 解决双写一致性问题 	第一:定时更新  比如10分钟更新一次缓存   	第二:写入mysql,删除缓存 	第三:写入mysql,更新缓存       # 三种解决方案,没有好于不好之说,只是看业务场景 	轮播图定时更新---》借助celery            ##### 通过定时更新,解决双写一致性问题 # 在luffy_api/celery_task/home_task.py中写入:  from .celery import app from home import models, serializer from django.conf import settings from django.core.cache import cache   @app.task def banner_update():     print('轮播图更新了')     return '更新好了'   @app.task def update_banner_list():     queryset = models.Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:settings.BANNER_COUNT]     banner_list = serializer.BannerSerializer(queryset, many=True).data     # 拿不到request对象,所以头像的连接base_url要自己组装     for banner in banner_list:         banner['image'] = 'http://127.0.0.1:8000%s' % banner['image']      cache.set('banner_list', banner_list, 86400)     return True    #### 定时任务 写在luffy_api/celery_task/celery.py:  from datetime import timedelta from celery.schedules import crontab  app.conf.beat_schedule = {     'update_banner_5': {         'task': 'celery_task.home_task.update_banner_list',  # 哪个任务         'schedule': timedelta(seconds=5),  # 每5s干一次         # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点         'args': (),     }, }   # 可以通过更改我们自定义的 BANNER_COUNT参数或者mysql库中的banner图片参数来测试