Ignore:
Timestamp:
2019-06-30T13:29:06Z (5 years ago)
Author:
Jaroslav Jindrak <dzejrou@…>
Branches:
lfn, master, serial, ticket/834-toolchain-update, topic/msim-upgrade, topic/simplify-dev-export
Children:
3faf90ad
Parents:
627dc41
Message:

cpp: split future.hpp into subheaders for each type

File:
1 edited

Legend:

Unmodified
Added
Removed
  • uspace/lib/cpp/include/__bits/thread/future.hpp

    r627dc41 ra95e75e  
    3030#define LIBCPP_BITS_THREAD_FUTURE
    3131
    32 #include <__bits/functional/function.hpp>
    33 #include <__bits/functional/invoke.hpp>
    34 #include <__bits/refcount_obj.hpp>
    35 #include <__bits/thread/threading.hpp>
    3632#include <cassert>
    3733#include <memory>
     
    5652    };
    5753
    58     enum class launch
    59     {
    60         async,
    61         deferred
    62     };
    63 
    6454    enum class future_status
    6555    {
     
    9989
    10090    /**
    101      * 30.6.4, shared state:
    102      */
    103 
    104     namespace aux
    105     {
    106         template<class R>
    107         class shared_state: public aux::refcount_obj
    108         {
    109             public:
    110                 shared_state()
    111                     : mutex_{}, condvar_{}, value_{}, value_set_{false},
    112                       exception_{}, has_exception_{false}
    113                 {
    114                     threading::mutex::init(mutex_);
    115                     threading::condvar::init(condvar_);
    116                 }
    117 
    118                 void destroy() override
    119                 {
    120                     /**
    121                      * Note: No need to act in this case, async shared
    122                      *       state is the object that needs to sometimes
    123                      *       invoke its payload.
    124                      */
    125                 }
    126 
    127                 void set_value(const R& val, bool set)
    128                 {
    129                     /**
    130                      * Note: This is the 'mark ready' move described
    131                      *       in 30.6.4 (6).
    132                      */
    133 
    134                     aux::threading::mutex::lock(mutex_);
    135                     value_ = val;
    136                     value_set_ = set;
    137                     aux::threading::mutex::unlock(mutex_);
    138 
    139                     aux::threading::condvar::broadcast(condvar_);
    140                 }
    141 
    142                 void set_value(R&& val, bool set = true)
    143                 {
    144                     aux::threading::mutex::lock(mutex_);
    145                     value_ = std::move(val);
    146                     value_set_ = set;
    147                     aux::threading::mutex::unlock(mutex_);
    148 
    149                     aux::threading::condvar::broadcast(condvar_);
    150                 }
    151 
    152                 void mark_set(bool set = true) noexcept
    153                 {
    154                     value_set_ = set;
    155                 }
    156 
    157                 bool is_set() const noexcept
    158                 {
    159                     return value_set_;
    160                 }
    161 
    162                 R& get()
    163                 {
    164                     return value_;
    165                 }
    166 
    167                 void set_exception(exception_ptr ptr)
    168                 {
    169                     exception_ = ptr;
    170                     has_exception_ = true;
    171                 }
    172 
    173                 bool has_exception() const noexcept
    174                 {
    175                     return has_exception_;
    176                 }
    177 
    178                 void throw_stored_exception() const
    179                 {
    180                     // TODO: implement
    181                 }
    182 
    183                 /**
    184                  * TODO: This member function is supposed to be marked
    185                  *       as 'const'. In such a case, however, we cannot
    186                  *       use the underlying fibril API because these
    187                  *       references get converted to pointers and the API
    188                  *       does not accept e.g. 'const fibril_condvar_t*'.
    189                  *
    190                  *       The same applies to the wait_for and wait_until
    191                  *       functions.
    192                  */
    193                 virtual void wait()
    194                 {
    195                     aux::threading::mutex::lock(mutex_);
    196                     while (!value_set_)
    197                         aux::threading::condvar::wait(condvar_, mutex_);
    198                     aux::threading::mutex::unlock(mutex_);
    199                 }
    200 
    201                 template<class Rep, class Period>
    202                 bool wait_for(const chrono::duration<Rep, Period>& rel_time)
    203                 {
    204                     aux::threading::mutex::lock(mutex_);
    205                     aux::threading::condvar::wait_for(
    206                         condvar_, mutex_,
    207                         aux::threading::time::convert(rel_time)
    208                     );
    209                     aux::threading::mutex::unlock(mutex_);
    210 
    211                     return value_set_;
    212                 }
    213 
    214                 template<class Clock, class Duration>
    215                 bool wait_until(const chrono::time_point<Clock, Duration>& abs_time)
    216                 {
    217                     aux::threading::mutex::lock(mutex_);
    218                     aux::threading::condvar::wait_for(
    219                         condvar_, mutex_,
    220                         aux::threading::time::convert(abs_time - Clock::now())
    221                     );
    222                     aux::threading::mutex::unlock(mutex_);
    223 
    224                     return value_set_;
    225                 }
    226 
    227                 ~shared_state() override = default;
    228 
    229             private:
    230                 aux::mutex_t mutex_;
    231                 aux::condvar_t condvar_;
    232 
    233                 R value_;
    234                 bool value_set_;
    235 
    236                 exception_ptr exception_;
    237                 bool has_exception_;
    238         };
    239 
    240         /**
    241          * We could make one state for both async and
    242          * deferred policies, but then we would be wasting
    243          * memory and the only benefit would be the ability
    244          * for additional implementation defined policies done
    245          * directly in that state (as opposed to making new
    246          * states for them).
    247          *
    248          * But since we have no plan (nor need) to make those,
    249          * this approach seems to be the best one.
    250          * TODO: Override wait_for and wait_until in both!
    251          */
    252 
    253         template<class R, class F, class... Args>
    254         class async_shared_state: public shared_state<R>
    255         {
    256             public:
    257                 async_shared_state(F&& f, Args&&... args)
    258                     : shared_state<R>{}, thread_{}
    259                 {
    260                     thread_ = thread{
    261                         [=](){
    262                             try
    263                             {
    264                                 this->set_value(invoke(f, args...));
    265                             }
    266                             catch(...) // TODO: Any exception.
    267                             {
    268                                 // TODO: Store it.
    269                             }
    270                         }
    271                     };
    272                 }
    273 
    274                 void destroy() override
    275                 {
    276                     if (!this->is_set())
    277                         thread_.join();
    278                 }
    279 
    280                 void wait() override
    281                 {
    282                     if (!this->is_set())
    283                         thread_.join();
    284                 }
    285 
    286                 ~async_shared_state() override
    287                 {
    288                     destroy();
    289                 }
    290 
    291             private:
    292                 thread thread_;
    293         };
    294 
    295         template<class R, class F, class... Args>
    296         class deferred_shared_state: public shared_state<R>
    297         {
    298             public:
    299                 template<class G>
    300                 deferred_shared_state(G&& f, Args&&... args)
    301                     : shared_state<R>{}, func_{forward<F>(f)},
    302                       args_{forward<Args>(args)...}
    303                 { /* DUMMY BODY */ }
    304 
    305                 void destroy() override
    306                 {
    307                     if (!this->is_set())
    308                         invoke_(make_index_sequence<sizeof...(Args)>{});
    309                 }
    310 
    311                 void wait() override
    312                 {
    313                     // TODO: Should these be synchronized for async?
    314                     if (!this->is_set())
    315                         invoke_(make_index_sequence<sizeof...(Args)>{});
    316                 }
    317 
    318                 ~deferred_shared_state() override
    319                 {
    320                     destroy();
    321                 }
    322 
    323             private:
    324                 function<R(decay_t<Args>...)> func_;
    325                 tuple<decay_t<Args>...> args_;
    326 
    327                 template<size_t... Is>
    328                 void invoke_(index_sequence<Is...>)
    329                 {
    330                     try
    331                     {
    332                         this->set_value(invoke(move(func_), get<Is>(move(args_))...));
    333                     }
    334                     catch(...)
    335                     {
    336                         // TODO: Store it.
    337                     }
    338                 }
    339         };
    340     }
    341 
    342     template<class R>
    343     class future;
    344 
    345     template<class R>
    346     class promise
    347     {
    348         public:
    349             promise()
    350                 : state_{new aux::shared_state<R>{}}
    351             { /* DUMMY BODY */ }
    352 
    353             template<class Allocator>
    354             promise(allocator_arg_t, const Allocator& a)
    355                 : promise{}
    356             {
    357                 // TODO: Use the allocator.
    358             }
    359 
    360             promise(promise&& rhs) noexcept
    361                 : state_{}
    362             {
    363                 state_ = rhs.state_;
    364                 rhs.state_ = nullptr;
    365             }
    366 
    367             promise(const promise&) = delete;
    368 
    369             ~promise()
    370             {
    371                 abandon_state_();
    372             }
    373 
    374             promise& operator=(promise&& rhs) noexcept
    375             {
    376                 abandon_state_();
    377                 promise{std::move(rhs)}.swap(*this);
    378             }
    379 
    380             promise& operator=(const promise&) = delete;
    381 
    382             void swap(promise& other) noexcept
    383             {
    384                 std::swap(state_, other.state_);
    385             }
    386 
    387             future<R> get_future()
    388             {
    389                 return future<R>{state_};
    390             }
    391 
    392             void set_value(const R& val)
    393             {
    394                 if (!state_)
    395                     throw future_error{make_error_code(future_errc::no_state)};
    396                 if (state_->is_set())
    397                 {
    398                     throw future_error{
    399                         make_error_code(future_errc::promise_already_satisfied)
    400                     };
    401                 }
    402 
    403                 state_->set_value(val, true);
    404             }
    405 
    406             void set_value(R&& val)
    407             {
    408                 if (!state_)
    409                     throw future_error{make_error_code(future_errc::no_state)};
    410                 if (state_->is_set())
    411                 {
    412                     throw future_error{
    413                         make_error_code(future_errc::promise_already_satisfied)
    414                     };
    415                 }
    416 
    417                 state_->set_value(std::forward<R>(val), true);
    418             }
    419 
    420             void set_exception(exception_ptr ptr)
    421             {
    422                 assert(state_);
    423 
    424                 state_->set_exception(ptr);
    425             }
    426 
    427             void set_value_at_thread_exit(const R& val)
    428             {
    429                 if (!state_)
    430                     throw future_error{make_error_code(future_errc::no_state)};
    431                 if (state_->is_set())
    432                 {
    433                     throw future_error{
    434                         make_error_code(future_errc::promise_already_satisfied)
    435                     };
    436                 }
    437 
    438                 state_->set_value(val, false);
    439                 // TODO: schedule it to be set as ready when thread exits
    440             }
    441 
    442             void set_value_at_thread_exit(R&& val)
    443             {
    444                 if (!state_)
    445                     throw future_error{make_error_code(future_errc::no_state)};
    446                 if (state_->is_set())
    447                 {
    448                     throw future_error{
    449                         make_error_code(future_errc::promise_already_satisfied)
    450                     };
    451                 }
    452 
    453                 state_->set_value(std::forward<R>(val), false);
    454                 // TODO: schedule it to be set as ready when thread exits
    455             }
    456 
    457             void set_exception_at_thread_exit(exception_ptr)
    458             {
    459                 // TODO: No exception handling, no-op at this time.
    460             }
    461 
    462         private:
    463             void abandon_state_()
    464             {
    465                 /**
    466                  * Note: This is the 'abandon' move described in
    467                  *       30.6.4 (7).
    468                  * 1) If state is not ready:
    469                  *   a) Store exception of type future_error with
    470                  *      error condition broken_promise.
    471                  *   b) Mark state as ready.
    472                  * 2) Rekease the state.
    473                  */
    474             }
    475 
    476             aux::shared_state<R>* state_;
    477     };
    478 
    479     template<class R>
    480     class promise<R&>
    481     {
    482         // TODO: Copy & modify once promise is done.
    483     };
    484 
    485     template<>
    486     class promise<void>
    487     {
    488         // TODO: Copy & modify once promise is done.
    489     };
    490 
    491     template<class R>
    492     void swap(promise<R>& lhs, promise<R>& rhs) noexcept
    493     {
    494         lhs.swap(rhs);
    495     }
    496 
    497     template<class R, class Alloc>
    498     struct uses_allocator<promise<R>, Alloc>: true_type
    499     { /* DUMMY BODY */ };
     91     * 30.6.6, class template future:
     92     */
    50093
    50194    template<class R>
     
    579172            {
    580173                assert(state_);
    581                 if (state_->is_deffered_function)
    582                     return future_status::deferred;
    583 
    584                 auto res = state_->wait_for(rel_time);
    585 
    586                 if (res)
    587                     return future_status::ready;
    588                 else
    589                     return future_status::timeout;
     174
     175                return state_->wait_for(rel_time);
    590176            }
    591177
     
    594180            {
    595181                assert(state_);
    596                 if (state_->is_deffered_function)
    597                     return future_status::deferred;
    598 
    599                 auto res = state_->wait_until(abs_time);
    600 
    601                 if (res)
    602                     return future_status::ready;
    603                 else
    604                     return future_status::timeout;
     182
     183                return state_->wait_until(abs_time);
    605184            }
    606185
     
    647226        // TODO: Copy & modify once future is done.
    648227    };
    649 
    650     // TODO: Make sure the move-future constructor of shared_future
    651     //       invalidates the state (i.e. sets to nullptr).
    652     template<class R>
    653     class shared_future
    654     {
    655         // TODO: Copy & modify once future is done.
    656     };
    657 
    658     template<class R>
    659     class shared_future<R&>
    660     {
    661         // TODO: Copy & modify once future is done.
    662     };
    663 
    664     template<>
    665     class shared_future<void>
    666     {
    667         // TODO: Copy & modify once future is done.
    668     };
    669 
    670     template<class>
    671     class packaged_task; // undefined
    672 
    673     template<class R, class... Args>
    674     class packaged_task<R(Args...)>
    675     {
    676         packaged_task() noexcept
    677         {}
    678 
    679         template<class F>
    680         explicit packaged_task(F&& f)
    681         {}
    682 
    683         template<class F, class Allocator>
    684         explicit packaged_task(allocator_arg_t, const Allocator& a, F&& f)
    685         {}
    686 
    687         ~packaged_task()
    688         {}
    689 
    690         packaged_task(const packaged_task&) = delete;
    691         packaged_task& operator=(const packaged_task&) = delete;
    692 
    693         packaged_task(packaged_task&& rhs)
    694         {}
    695 
    696         packaged_task& operator=(packaged_task&& rhs)
    697         {}
    698 
    699         void swap(packaged_task& other) noexcept
    700         {}
    701 
    702         bool valid() const noexcept
    703         {}
    704 
    705         future<R> get_future()
    706         {}
    707 
    708         void operator()(Args...)
    709         {}
    710 
    711         void make_ready_at_thread_exit(Args...)
    712         {}
    713 
    714         void reset()
    715         {}
    716     };
    717 
    718     template<class R, class... Args>
    719     void swap(packaged_task<R(Args...)>& lhs, packaged_task<R(Args...)>& rhs) noexcept
    720     {
    721         lhs.swap(rhs);
    722     };
    723 
    724     template<class R, class Alloc>
    725     struct uses_allocator<packaged_task<R>, Alloc>: true_type
    726     { /* DUMMY BODY */ };
    727 
    728     namespace aux
    729     {
    730         /**
    731          * Note: The reason we keep the actual function
    732          *       within the aux namespace is that were the non-policy
    733          *       version of the function call the other one in the std
    734          *       namespace, we'd get resolution conflicts. This way
    735          *       aux::async is properly called even if std::async is
    736          *       called either with or without a launch policy.
    737          */
    738         template<class F, class... Args>
    739         future<result_of_t<decay_t<F>(decay_t<Args>...)>>
    740         async(launch policy, F&& f, Args&&... args)
    741         {
    742             using result_t = result_of_t<decay_t<F>(decay_t<Args>...)>;
    743 
    744             bool async = (static_cast<int>(policy) &
    745                           static_cast<int>(launch::async)) != 0;
    746             bool deferred = (static_cast<int>(policy) &
    747                              static_cast<int>(launch::deferred)) != 0;
    748 
    749             /**
    750              * Note: The case when async | deferred is set in policy
    751              *       is implementation defined, feel free to change.
    752              */
    753             if (async && deferred)
    754             {
    755                 return future<result_t>{
    756                     new aux::deferred_shared_state<
    757                         result_t, F, Args...
    758                     >{forward<F>(f), forward<Args>(args)...}
    759                 };
    760             }
    761             else if (async)
    762             {
    763                 return future<result_t>{
    764                     new aux::async_shared_state<
    765                         result_t, F, Args...
    766                     >{forward<F>(f), forward<Args>(args)...}
    767                 };
    768             }
    769             else if (deferred)
    770             {
    771                /**
    772                 * Duplicated on purpose because of the default.
    773                 * Do not remove!
    774                 */
    775                 return future<result_t>{
    776                     new aux::deferred_shared_state<
    777                         result_t, F, Args...
    778                     >{forward<F>(f), forward<Args>(args)...}
    779                 };
    780             }
    781 
    782             /**
    783              * This is undefined behaviour, let's be nice though ;)
    784              */
    785             return future<result_t>{
    786                 new aux::deferred_shared_state<
    787                     result_t, F, Args...
    788                 >{forward<F>(f), forward<Args>(args)...}
    789             };
    790         }
    791     }
    792 
    793     template<class F>
    794     decltype(auto) async(F&& f)
    795     {
    796         launch policy = static_cast<launch>(
    797             static_cast<int>(launch::async) |
    798             static_cast<int>(launch::deferred)
    799         );
    800 
    801         return aux::async(policy, forward<F>(f));
    802     }
    803 
    804     /**
    805      * The async(launch, F, Args...) and async(F, Args...)
    806      * overloards must not collide, so we check the first template
    807      * argument and handle the special case of just a functor
    808      * above.
    809      */
    810     template<class F, class Arg, class... Args>
    811     decltype(auto) async(F&& f, Arg&& arg, Args&&... args)
    812     {
    813         if constexpr (is_same_v<decay_t<F>, launch>)
    814             return aux::async(f, forward<Arg>(arg), forward<Args>(args)...);
    815         else
    816         {
    817             launch policy = static_cast<launch>(
    818                 static_cast<int>(launch::async) |
    819                 static_cast<int>(launch::deferred)
    820             );
    821 
    822             return aux::async(policy, forward<F>(f), forward<Arg>(arg), forward<Args>(args)...);
    823         }
    824     }
    825228}
    826229
Note: See TracChangeset for help on using the changeset viewer.