Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple Tasks Arbitrary Generated after first tasks success #1341

Open
LEECHOONGHO opened this issue Dec 4, 2023 · 0 comments
Open

Multiple Tasks Arbitrary Generated after first tasks success #1341

LEECHOONGHO opened this issue Dec 4, 2023 · 0 comments
Labels

Comments

@LEECHOONGHO
Copy link

Describe the bug
I operate pytorch training server with 2 celery worker, redis broker, and fastapi server.
And when I give several tasks(3~4) to workers, tasks are well reserved and distributed.
But after first task is successfully done. Multiple tasks generated and reserved on my workers.

Training Process takes 1~3hour for each tasks.

To Reproduce

# FastAPI Server launch command
$ gunicorn train_master:app --workers 8 --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:$80000 --env MANAGERPORT=5555

# Celery Worker Server launch command 
$ celery -A train_worker worker -P threads -c 1 -n train-worker-$N -E --loglevel=debug

# Flower Monitoring Server launch command
$ celery -A train_worker.celery_app flower -A --broker=redis:// --port=5555 --address=0.0.0.0 --loglevel=debug

# Redis Server launch docker compose file contents
version: '3.9'
services:
    redis-server:
        environment:
            - REDISPORT=${REDISPORT}
        image: redis
        network_mode: host
        command: redis-server --port $REDISPORT
# train_worker.py

celery_app = Celery(
    __name__, 
    broker=BROKER_URL, 
    backend=CELERY_RESULT_BACKEND
)

@celery_app.task(
    bind=True,
    base=TrainManagerWrapper,
    name="train",
    retry=False,
)
def train(self, inputs:dict):
    try:
        # train process
    except Exception as e:
        return e
# train_master.py
from train_worker import train, celery_app, AsyncResult

app = FastAPI()

@app.post("/train", status_code=200)
async def train_master(inputs:TrainInput):
    task_id = train.delay(inputs.dict())
    return {"task_id" : str(task_id), "status" : "Processing"}

Expected behavior
I want workers not repeating same tasks.

Screenshots
flower errror1
flower error
화면 캡처 2023-12-03 044641

System information
No Error Messages.

Name: flower
Version: 2.0.1

Name: celery
Version: 5.3.6

Name: kombu
Version: 5.3.4
@LEECHOONGHO LEECHOONGHO added the bug label Dec 4, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant