python Recuperar lista de tarefas em uma fila no aipo




celery (10)

Como posso recuperar uma lista de tarefas em uma fila que ainda precisa ser processada?


Até onde sei, o Celery não fornece API para examinar tarefas que estão aguardando na fila. Isso é específico do broker. Se você usa o Redis como um intermediário para um exemplo, então examinar as tarefas que estão aguardando na fila do celery (padrão) é tão simples quanto:

  1. conectar-se ao banco de dados do broker
  2. listar itens na lista de celery (comando LRANGE para um exemplo)

Tenha em mente que estas são tarefas que ESPERAM serem escolhidas pelos trabalhadores disponíveis. Seu cluster pode ter algumas tarefas em execução - elas não estarão nesta lista, pois já foram selecionadas.


EDIT: Veja outras respostas para obter uma lista de tarefas na fila.

Você deve procurar aqui: Guia do Aipo - Inspecionando os Trabalhadores

Basicamente isso:

>>> from celery.task.control import inspect

# Inspect all nodes.
>>> i = inspect()

# Show the items that have an ETA or are scheduled for later processing
>>> i.scheduled()

# Show tasks that are currently active.
>>> i.active()

# Show tasks that have been claimed by workers
>>> i.reserved()

Dependendo do que você quer


Eu acho que a única maneira de obter as tarefas que estão esperando é manter uma lista de tarefas que você iniciou e deixar a tarefa se remover da lista quando ela for iniciada.

Com rabbitmqctl e list_queues você pode obter uma visão geral de quantas tarefas estão esperando, mas não as tarefas em si: http://www.rabbitmq.com/man/rabbitmqctl.1.man.html

Se o que você deseja incluir a tarefa sendo processada, mas ainda não estiver concluída, você poderá manter uma lista de tarefas e verificar seus estados:

from tasks import add
result = add.delay(4, 4)

result.ready() # True if finished

Ou você permite que o aipo armazene os resultados com CELERY_RESULT_BACKEND e verifique quais das suas tarefas não estão lá.


Cheguei à conclusão de que a melhor maneira de obter o número de empregos em uma fila é usar rabbitmqctl como já foi sugerido várias vezes aqui. Para permitir que qualquer usuário escolhido execute o comando com o sudo , segui as instruções here (ignorei a edição da parte do perfil, pois não me importo de digitar sudo antes do comando).

Eu também peguei o grep da jamesc e cut snippet e o envolvi em chamadas de subprocesso.

from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))

Se você não usar tarefas priorizadas, isso é realmente muito simples se você estiver usando o Redis. Para obter a tarefa conta:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

Mas, tarefas priorizadas usam uma chave diferente em redis , então a imagem completa é um pouco mais complicada. A imagem completa é que você precisa consultar o redis para cada prioridade da tarefa. Em python (e do projeto Flower), isso se parece com:

PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]


def make_queue_name_for_pri(queue, pri):
    """Make a queue name for redis

    Celery uses PRIORITY_SEP to separate different priorities of tasks into
    different queues in Redis. Each queue-priority combination becomes a key in
    redis with names like:

     - batch1\x06\x163 <-- P3 queue named batch1

    There's more information about this in Github, but it doesn't look like it 
    will change any time soon:

      - https://github.com/celery/kombu/issues/422

    In that ticket the code below, from the Flower project, is referenced:

      - https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135

    :param queue: The name of the queue to make a name for.
    :param pri: The priority to make a name with.
    :return: A name for the queue-priority pair.
    """
    if pri not in DEFAULT_PRIORITY_STEPS:
        raise ValueError('Priority not in priority steps')
    return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
                                (queue, '', '')))


def get_queue_length(queue_name='celery'):
    """Get the number of tasks in a celery queue.

    :param queue_name: The name of the queue you want to inspect.
    :return: the number of items in the queue.
    """
    priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
                      DEFAULT_PRIORITY_STEPS]
    r = redis.StrictRedis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=settings.REDIS_DATABASES['CELERY'],
    )
    return sum([r.llen(x) for x in priority_names])

Se você deseja obter uma tarefa real, pode usar algo como:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1

De lá, você terá que desserializar a lista retornada. No meu caso, consegui fazer isso com algo como:

r = redis.StrictRedis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))

Apenas seja avisado que a desserialização pode demorar um pouco, e você precisará ajustar os comandos acima para trabalhar com várias prioridades.


Uma solução de copiar e colar para Redis com serialização json:

def get_celery_queue_items(queue_name):
    import base64
    import json  

    # Get a configured instance of a celery app:
    from yourproject.celery import app as celery_app

    with celery_app.pool.acquire(block=True) as conn:
        tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
        decoded_tasks = []

    for task in tasks:
        j = json.loads(task)
        body = json.loads(base64.b64decode(j['body']))
        decoded_tasks.append(body)

    return decoded_tasks

Funciona com o Django. Apenas não esqueça de mudar seu yourproject.celery .


flor de lançamento - espectador de tarefas de selery

celery -A app.celery flower

e depois abra no navegador

localhost:5555

Para recuperar tarefas do backend, use este

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
                       password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)

Se você controlar o código das tarefas, poderá resolver o problema permitindo que uma tarefa acione uma nova tentativa trivial na primeira vez que for executada e, em seguida, verificando inspect().reserved() . A nova tentativa registra a tarefa com o backend de resultado, e o aipo pode ver isso. A tarefa deve aceitar self ou context como primeiro parâmetro para que possamos acessar a contagem de novas tentativas.

@task(bind=True)
def mytask(self):
    if self.request.retries == 0:
        raise self.retry(exc=MyTrivialError(), countdown=1)
    ...

Esta solução é agnóstica, ie. você não precisa se preocupar se está usando RabbitMQ ou Redis para armazenar as tarefas.

EDIT: após o teste, descobri que isso é apenas uma solução parcial. O tamanho do reservado é limitado à configuração de pré-busca para o trabalhador.


Se você estiver usando o Celery + Django, a maneira mais simples de inspecionar tarefas usando comandos diretamente do seu terminal em seu ambiente virtual ou usando um caminho completo para aipo:

Doc : http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers

$ celery inspect reserved
$ celery inspect inspect
$ celery inspect registered
$ celery inspect scheduled

Além disso, se você estiver usando o Celery + RabbitMQ, poderá inspecionar a lista de filas usando o seguinte comando:

Mais informações : https://linux.die.net/man/1/rabbitmqctl

$ sudo rabbitmqctl list_queues




celery