usage - python celery task example




Recuperar lista de tarefas em uma fila no aipo (8)

Como posso recuperar uma lista de tarefas em uma fila que ainda precisa ser processada?


Até onde sei, o Celery não fornece API para examinar tarefas que estão aguardando na fila. Isso é específico do broker. Se você usa o Redis como um intermediário para um exemplo, então examinar as tarefas que estão aguardando na fila do celery (padrão) é tão simples quanto:

  1. conectar-se ao banco de dados do broker
  2. listar itens na lista de celery (comando LRANGE para um exemplo)

Tenha em mente que estas são tarefas que ESPERAM serem escolhidas pelos trabalhadores disponíveis. Seu cluster pode ter algumas tarefas em execução - elas não estarão nesta lista, pois já foram selecionadas.


Cheguei à conclusão de que a melhor maneira de obter o número de empregos em uma fila é usar rabbitmqctl como já foi sugerido várias vezes aqui. Para permitir que qualquer usuário escolhido execute o comando com o sudo , segui as instruções here (ignorei a edição da parte do perfil, pois não me importo de digitar sudo antes do comando).

Eu também peguei o grep da jamesc e cut snippet e o envolvi em chamadas de subprocesso.

from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))

O módulo de inspeção de aipo parece estar ciente das tarefas do ponto de vista dos trabalhadores. Se você quiser ver as mensagens que estão na fila (ainda a serem puxadas pelos trabalhadores) eu sugiro usar o pyrabbit , que pode interagir com a API http rabbitmq para recuperar todos os tipos de informações da fila.

Um exemplo pode ser encontrado aqui: Recuperar o tamanho da fila com Celery (RabbitMQ, Django)


Para recuperar tarefas do backend, use este

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
                       password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)

Se você estiver usando o Celery + Django, a maneira mais simples de inspecionar tarefas usando comandos diretamente do seu terminal em seu ambiente virtual ou usando um caminho completo para aipo:

Doc : http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers

$ celery inspect reserved
$ celery inspect inspect
$ celery inspect registered
$ celery inspect scheduled

Além disso, se você estiver usando o Celery + RabbitMQ, poderá inspecionar a lista de filas usando o seguinte comando:

Mais informações : https://linux.die.net/man/1/rabbitmqctl

$ sudo rabbitmqctl list_queues

Se você estiver usando rabbitMQ, use isso no terminal:

sudo rabbitmqctl list_queues

imprimirá a lista de filas com o número de tarefas pendentes. por exemplo:

Listing queues ...
0b27d8c59fba4974893ec22d478a7093    0
0e0a2da9828a48bc86fe993b210d984f    0
[email protected].celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7    0
15c036ad25884b82839495fb29bd6395    1
[email protected].celery.pidbox    0
celery  166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa   0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6   0

o número na coluna da direita é o número de tarefas na fila. acima, a fila de aipo tem 166 tarefas pendentes.


Uma solução de copiar e colar para Redis com serialização json:

def get_celery_queue_items(queue_name):
    import base64
    import json  

    # Get a configured instance of a celery app:
    from yourproject.celery import app as celery_app

    with celery_app.pool.acquire(block=True) as conn:
        tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
        decoded_tasks = []

    for task in tasks:
        j = json.loads(task)
        body = json.loads(base64.b64decode(j['body']))
        decoded_tasks.append(body)

    return decoded_tasks

Funciona com o Django. Apenas não esqueça de mudar seu yourproject.celery .


flor de lançamento - espectador de tarefas de selery

celery -A app.celery flower

e depois abra no navegador

localhost:5555




celery