Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

超时机制导致element 切割两份commit,出现循环依赖 #363

Open
yeshenyong opened this issue Apr 2, 2024 · 2 comments
Open

Comments

@yeshenyong
Copy link
Collaborator

简单模仿样例

#include <condition_variable>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>

class ThreadPool {
public:
    // 构造函数,启动工作线程
    ThreadPool() : done(false), worker_thread(&ThreadPool::worker_thread_func, this) {}

    // 禁止拷贝构造函数和拷贝赋值操作符
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;

    // 析构函数,结束工作线程
    ~ThreadPool() {
        // 通知工作线程退出
        {
            std::lock_guard<std::mutex> lock(queue_mutex);
            done = true;
        }
        // 唤醒所有等待线程
        condition.notify_all();
        if (worker_thread.joinable()) {
            worker_thread.join();
        }
    }

    // 添加任务到线程池
    template <typename FunctionType>
    std::future<typename std::result_of<FunctionType()>::type> submit(FunctionType f) {
        typedef typename std::result_of<FunctionType()>::type result_type;

        // 封装任务成std::packaged_task
        std::packaged_task<result_type()> task(std::move(f));
        std::future<result_type> res(task.get_future());
        {
            // 将任务添加到队列中
            std::lock_guard<std::mutex> lock(queue_mutex);
            if (done) {
                throw std::runtime_error("submit on stopped ThreadPool");
            }
            tasks.emplace(std::move(task));
        }
        // 通知一个等待中的线程
        condition.notify_one();
        return res;
    }

private:
    // 工作线程函数
    void worker_thread_func() {
        while (true) {
            std::packaged_task<void()> task;
            {
                std::unique_lock<std::mutex> lock(queue_mutex);
                condition.wait(lock, [this]() { return done || !tasks.empty(); });
                if (done && tasks.empty()) {
                    // 如果完成标志已设置且任务队列为空,则退出线程
                    return;
                }
                task = std::move(tasks.front());
                tasks.pop();
            }
            // 执行任务
            task();
        }
    }

private:
    std::atomic<bool> done; // 线程池完成标志
    std::mutex queue_mutex;  // 任务队列互斥锁
    std::condition_variable condition;  // 任务队列条件变量
    std::queue<std::packaged_task<void()>> tasks; // 任务队列
    std::thread worker_thread; // 工作线程
};

// 以下是使用ThreadPool的示例代码

#include <iostream>
ThreadPool pool;

void timeoutTask() {
    // 睡眠2s
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "Executing timeoutTask on thread " << std::this_thread::get_id() << std::endl;
}

void exampleTask() {
    std::cout << "Executing exampleTask on thread " << std::this_thread::get_id() << std::endl;
    // 此处模仿CGraph 运行机制 = GElement::Run() + GElement 超时机制
    auto future = pool.submit(timeoutTask);
    auto futStatus = future.wait_for(std::chrono::seconds(5)); // 等待1s,如果超时,则打印超时信息
    std::cout << "exampleTask finished on thread " << std::this_thread::get_id() << std::endl;
    if (futStatus == std::future_status::timeout) {
        std::cout << "exampleTask timeout on thread " << std::this_thread::get_id() << std::endl;
    }
}

int main() {
    auto future = pool.submit(exampleTask);
    future.wait(); // 如果需要,这里可以等待任务完成
    return 0;
}

涉及CGraph 代码

// GElement.cpp
CStatus GElement::asyncRun() {
    CGRAPH_FUNCTION_BEGIN
    CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(!isAsync(), "[" + name_ + "] cannot async run.")

    async_result_ = thread_pool_->commit([this] {  // 二次commit
        return run();
    }, CGRAPH_POOL_TASK_STRATEGY);

    auto futStatus = async_result_.wait_for(std::chrono::milliseconds(timeout_));
    if (std::future_status::ready == futStatus) {
        status = getAsyncResult();
    } else {
        CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION( GElementTimeoutStrategy::AS_ERROR == timeout_strategy_,    \
        "[" + name_ + "] running time more than [" + std::to_string(timeout_) + "]ms")
        cur_state_.store(GElementState::TIMEOUT, std::memory_order_release);
    }

    CGRAPH_FUNCTION_END
}
// GDynamicEngine.cpp
CVoid GDynamicEngine::process(GElementPtr element, CBool affinity) {
    if (unlikely(cur_status_.isErr() || element->done_)) {
        /**
         * 如果已经有异常逻辑,
         * 或者传入的element,是已经执行过的了(理论上不会出现这种情况,由于提升性能的原因,取消了atomic计数的逻辑,故添加这一处判定,防止意外情况)
         * 则直接停止当前流程
         */
        return;
    }

    const auto& execute = [this, element] {
        const CStatus& curStatus = element->fatProcessor(CFunctionType::RUN);
        if (unlikely(curStatus.isErr())) {
            // 当且仅当整体状正常,且当前状态异常的时候,进入赋值逻辑。确保不重复赋值
            cur_status_ += curStatus;
        }
        afterElementRun(element);
    };

    if (affinity
        && CGRAPH_DEFAULT_BINDING_INDEX == element->getBindingIndex()) {
        // 如果 affinity=true,表示用当前的线程,执行这个逻辑。以便增加亲和性
        execute();
    } else {
        thread_pool_->commit(execute, calcIndex(element)); // 一次commit
    }
}
@ChunelFeng
Copy link
Owner

批主想知道,你打算如何修改这一块的内容呢?

@yeshenyong
Copy link
Collaborator Author

yeshenyong commented Apr 16, 2024 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants