c++ - هل يمكنني استخدام std:: async دون انتظار الحد المستقبلي؟




multithreading c++11 (2)

فبدلاً من نقل المستقبل إلى كائن عالمي (وإدارته يدوياً لحذف العقود الآجلة غير المستخدمة) ، يمكنك بالفعل نقلها إلى النطاق المحلي للدالة التي تتم تسميتها بشكل غير متزامن.

"دع وظيفة التزامن تأخذ مستقبلها الخاص" ، إذا جاز التعبير.

لقد أتيت مع هذا القالب المجمع الذي يعمل بالنسبة لي (اختبار على نظام التشغيل Windows):

#include <future>

template<class Function, class... Args>
void async_wrapper(Function&& f, Args&&... args, std::future<void>& future,
                   std::future<void>&& is_valid, std::promise<void>&& is_moved) {
    is_valid.wait(); // Wait until the return value of std::async is written to "future"
    auto our_future = std::move(future); // Move "future" to a local variable
    is_moved.set_value(); // Only now we can leave void_async in the main thread

    // This is also used by std::async so that member function pointers work transparently
    auto functor = std::bind(f, std::forward<Args>(args)...);
    functor();
}

template<class Function, class... Args> // This is what you call instead of std::async
void void_async(Function&& f, Args&&... args) {
    std::future<void> future; // This is for std::async return value
    // This is for our synchronization of moving "future" between threads
    std::promise<void> valid;
    std::promise<void> is_moved;
    auto valid_future = valid.get_future();
    auto moved_future = is_moved.get_future();

    // Here we pass "future" as a reference, so that async_wrapper
    // can later work with std::async's return value
    future = std::async(
        async_wrapper<Function, Args...>,
        std::forward<Function>(f), std::forward<Args>(args)...,
        std::ref(future), std::move(valid_future), std::move(is_moved)
    );
    valid.set_value(); // Unblock async_wrapper waiting for "future" to become valid
    moved_future.wait(); // Wait for "future" to actually be moved
}

أنا مندهش بعض الشيء لأنه يعمل لأنني اعتقدت أن المدمر الذي تحرك المستقبل سيحجب حتى نترك المتحول . يجب أن تنتظر async_wrapper للإرجاع ولكنها تنتظر داخل هذه الوظيفة ذاتها. منطقيا ، يجب أن يكون مأزقا لكنه ليس كذلك.

لقد حاولت أيضًا إضافة سطر في نهاية async_wrapper لإفراغ كائن المستقبل يدويًا:

our_future = std::future<void>();

هذا لا يمنع أي منهما.

مستوى عال
أرغب في استدعاء بعض الوظائف بدون قيمة رجوع في وضع التزامن دون انتظار أن تنتهي. إذا قمت باستخدام std :: async ، فإن الكائن المستقبلي لا يتلف حتى تنتهي المهمة ، فهذا يجعل المكالمة غير متزامنة في حالتي.

مثال

void sendMail(const std::string& address, const std::string& message)
{
    //sending the e-mail which takes some time...
}

myResonseType processRequest(args...)
{
    //Do some processing and valuate the address and the message...

    //Sending the e-mail async
    auto f = std::async(std::launch::async, sendMail, address, message);

    //returning the response ASAP to the client
    return myResponseType;

} //<-- I'm stuck here until the async call finish to allow f to be destructed.
  // gaining no benefit from the async call.

أسئلتي هي

  1. هل هناك طريقة للتغلب على هذا القيد؟
  2. إذا كان (1) لا ، هل يجب أن أطبق مرة واحدة على مؤشر ترابط يأخذ العقود الآجلة "الزومبي" وينتظرها؟
  3. هو (1) و (2) لا ، هل هناك أي خيار آخر ثم مجرد بناء تجمع موضوع بلدي؟

ملحوظة:
أنا لا أستخدم خيار thread + detach (المقترح من قبل galop1n @) لأن إنشاء سلسلة محادثات جديدة قد يكون لدي رغبة في تجنبه. أثناء استخدام std :: async (على الأقل على MSVC) يستخدم تجمع مؤشر ترابط داخلي.

شكر.


لماذا لا تبدئي فقط بخيط ، وفصل إذا كنت لا تهتم بالانضمام؟

std::thread{ sendMail, address, message}.detach();   

يرتبط std :: async بعمر std :: future ستعود و لا بديل لها.

سيتطلب وضع std :: future في طابور انتظار للقراءة بواسطة مؤشر ترابط آخر نفس آلية الأمان مثل تجمع يتلقى مهمة جديدة ، مثل mutex حول الحاوية.

أفضل خيار لديك ، إذن ، هو تجمع مؤشرات ترابط لاستهلاك المهام دفع مباشرة في قائمة انتظار مؤشر الترابط الآمن. ولن يعتمد على تنفيذ محدد.

أسفل تنفيذ تجمع مؤشرات الترابط مع أي وسائط قابلة للاستدعاء والحجج ، فإن مؤشرات الترابط تقوم بعمل poling في قائمة الانتظار ، يجب أن يستخدم التنفيذ الأفضل متغيرات الحالة ( coliru ):

#include <iostream>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <functional>
#include <string>

struct ThreadPool {
    struct Task {
        virtual void Run() const = 0;
        virtual ~Task() {};
    };   

    template < typename task_, typename... args_ >
    struct RealTask : public Task {
        RealTask( task_&& task, args_&&... args ) : fun_( std::bind( std::forward<task_>(task), std::forward<args_>(args)... ) ) {}
        void Run() const override {
            fun_();
        }
    private:
        decltype( std::bind(std::declval<task_>(), std::declval<args_>()... ) ) fun_;
    };

    template < typename task_, typename... args_ >
    void AddTask( task_&& task, args_&&... args ) {
        auto lock = std::unique_lock<std::mutex>{mtx_};
        using FinalTask = RealTask<task_, args_... >;
        q_.push( std::unique_ptr<Task>( new FinalTask( std::forward<task_>(task), std::forward<args_>(args)... ) ) );
    }

    ThreadPool() {
        for( auto & t : pool_ )
            t = std::thread( [=] {
                while ( true ) {
                    std::unique_ptr<Task> task;
                    {
                        auto lock = std::unique_lock<std::mutex>{mtx_};
                        if ( q_.empty() && stop_ ) 
                            break;
                        if ( q_.empty() )
                            continue;
                        task = std::move(q_.front());
                        q_.pop();
                    }
                    if (task)
                        task->Run();
                }
            } );
    }
    ~ThreadPool() {
        {
            auto lock = std::unique_lock<std::mutex>{mtx_};
            stop_ = true;
        }
        for( auto & t : pool_ )
            t.join();
    }
private:
    std::queue<std::unique_ptr<Task>> q_;
    std::thread pool_[8]; 
    std::mutex mtx_;
    volatile bool stop_ {};
};

void foo( int a, int b ) {
    std::cout << a << "." << b;
}
void bar( std::string const & s) {
    std::cout << s;
}

int main() {
    ThreadPool pool;
    for( int i{}; i!=42; ++i ) {
        pool.AddTask( foo, 3, 14 );    
        pool.AddTask( bar, " - " );    
    }
}