TASKQUEUE

百度分布式任务队列服务,以异步方式执行用户任务

服务简介

Taskqueue为开发者提供分布式任务队列服务,用来以异步方式并发执行用户提交的Fetchurl、离线下载任务。

Taskqueue适用于相同类型任务组成的任务集合以异步并发方式执行的场景。常见具体应用场景示例如下:同时向1000人发送微博消息。如果以传统方式循环进行,则相当于同步串行发送每个消息,一个消息发送完毕才会发送下一个消息,则所使用的时间为1000乘以每个微博消息发送所需的时间。如果将每个发送微博消息看做一个fetchurl任务放入Taskqueue中执行,则这些任务在加入队列后即以异步并发的方式在后台执行,对用户操作而言,所使用的时间仅为1000个任务放入队列的时间,对任务执行而言,假设队列任务执行并发度为M,则任务执行总时间降低M倍。

使用示例

以下是一段示例代码:

from bae.api.taskqueue import BaeTaskQueueManager
from bae.api.taskqueue import BaeTaskQueue

def taskqueue_test():
    tqmgr = BaeTaskQueueManager.getInstance()

    ### 创建一个Queue
    q = tqmgr.create('myqueue', BaeTaskQueueManager.QUEUE_FETCHURL, default_callback_url = 'www.handlerror.com')

    ### 推入预执行的task
    qid = q.push(url = 'www.baidu.com')['response_params']['task_id']

    ### 查看task的执行信息
    q.getTaskInfo(qid)

    ### 查看当前queue的信息
    q.query()

    ### 修改queue属性
    tqmgr.modify("myqueue", concurrency = 10)

    ### 查询用户所有的queue信息
    tqmgr.getList()

    ### 使用已经创建好的队列"myqueue"
    valid_q = BaeTaskQueue("myqueue")

    ### 删除指定的queue
    tqmgr.remove('myqueue')

注意事项

因产品形态调整,TaskQueue服务在授权给其他应用使用的时候,名称可能冲突,开发者需要注意保持名称唯一。

接口列表

class BaeTaskQueue

数据操作类

__init__(self, queue_name)

构造函数, 使用已创建的queue_name队列

queue_name(str): 队列名, 若访问属于其他app的队列,则需在队列名前加上所属app的appid及一个半角冒号作为前缀进行区分,即appid:queue_name,否则默认访问属于本app的队列

getTaskInfo(self, taskId)

获取task_id对应的任务执行信息,成功返回包含执行结果的dict,失败抛出异常

taskId(int): 任务ID,从push的返回值中获得

push(self, **kwargs)

向一个队列中推送某种类型的任务, 成功返回包含执行结果的dict,失败抛出异常

kwargs: 推送任务的任务描述信息, 任务描述信息要与队列类型相符 FETCHURL任务属性包括: (必选)url(预访问的URL), (可选)params(post数据,get时不需要)OFFLINEDOWNLOAD任务属性包括: (必选)source_url(下载源URL), dest_url(存放目的URL)

query(self)

查询队列信息,成功返回包含执行结果的dict,失败抛出异常

getRequestId(self)

获取上次调用的request_id

class BaeTaskQueueManager

管理操作类

getInstance()

获取TaskQueue服务管理类对象

create(self, queueName, queueType, **kwargs)

创建一个名字为queueName的, 类型为queueType的队列,成功返回数据操作类BaeTaskQueue对象,失败抛出异常

queueName(str): 队列名字

queueType(int): 1: QUEUE_FETCHURL 2: QUEUE_OFFLINEDOWNLOAD

kwargs: default_callback_url(任务完成回调URL)concurrency(队列任务执行并发度)max_length(队列待执行任务最大长度)retry_times(队列任务失败重试次数)timeout(队列任务执行超时时间,单位秒)

getList(self)

查询所有队列信息,成功返回包含执行结果的dict,失败抛出异常

modify(self, queueName, **kwargs)

修改队列名为queueName的队列属性,成功返回包含执行结果的dict,失败抛出异常

queueName(str): 队列名字

kwargs: 可修改的属性concurrency(队列任务执行并发度)max_length(队列待执行任务最大长度)

remove(self, queueName)

删除指定队列,成功返回包含执行结果的dict,失败抛出异常

queueName(str): 队列名字

getRequestId(self)

获取上次调用的request_id

服务限制

限制项
单个开发者最大队列数 20
队列长度限制 FetchUrl任务1000,离线下载任务10
队列并发度限制 FetchUrl任务20,离线下载任务2
任务执行超时时间限制 FetchUrl任务30秒,离线下载任务10小时
任务执行失败重试次数 3次
离线下载最大文件 4GB
队列名长度 128字节

异常

  • BaeConstructError: 对象初始化错误
  • BaeParamError: 参数错误
  • BaeValueError: 后端返回的数据格式错误
  • BaeOperationFailed: 后端返回结果,但本次操作失败,异常中包含了错误原因

Table Of Contents

Previous topic

RANK

Next topic

BCS

This Page