Changeset a95e75e in mainline for uspace/lib/cpp/include/__bits/thread/future.hpp
- Timestamp:
- 2019-06-30T13:29:06Z (5 years ago)
- Branches:
- lfn, master, serial, ticket/834-toolchain-update, topic/msim-upgrade, topic/simplify-dev-export
- Children:
- 3faf90ad
- Parents:
- 627dc41
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
uspace/lib/cpp/include/__bits/thread/future.hpp
r627dc41 ra95e75e 30 30 #define LIBCPP_BITS_THREAD_FUTURE 31 31 32 #include <__bits/functional/function.hpp>33 #include <__bits/functional/invoke.hpp>34 #include <__bits/refcount_obj.hpp>35 #include <__bits/thread/threading.hpp>36 32 #include <cassert> 37 33 #include <memory> … … 56 52 }; 57 53 58 enum class launch59 {60 async,61 deferred62 };63 64 54 enum class future_status 65 55 { … … 99 89 100 90 /** 101 * 30.6.4, shared state: 102 */ 103 104 namespace aux 105 { 106 template<class R> 107 class shared_state: public aux::refcount_obj 108 { 109 public: 110 shared_state() 111 : mutex_{}, condvar_{}, value_{}, value_set_{false}, 112 exception_{}, has_exception_{false} 113 { 114 threading::mutex::init(mutex_); 115 threading::condvar::init(condvar_); 116 } 117 118 void destroy() override 119 { 120 /** 121 * Note: No need to act in this case, async shared 122 * state is the object that needs to sometimes 123 * invoke its payload. 124 */ 125 } 126 127 void set_value(const R& val, bool set) 128 { 129 /** 130 * Note: This is the 'mark ready' move described 131 * in 30.6.4 (6). 132 */ 133 134 aux::threading::mutex::lock(mutex_); 135 value_ = val; 136 value_set_ = set; 137 aux::threading::mutex::unlock(mutex_); 138 139 aux::threading::condvar::broadcast(condvar_); 140 } 141 142 void set_value(R&& val, bool set = true) 143 { 144 aux::threading::mutex::lock(mutex_); 145 value_ = std::move(val); 146 value_set_ = set; 147 aux::threading::mutex::unlock(mutex_); 148 149 aux::threading::condvar::broadcast(condvar_); 150 } 151 152 void mark_set(bool set = true) noexcept 153 { 154 value_set_ = set; 155 } 156 157 bool is_set() const noexcept 158 { 159 return value_set_; 160 } 161 162 R& get() 163 { 164 return value_; 165 } 166 167 void set_exception(exception_ptr ptr) 168 { 169 exception_ = ptr; 170 has_exception_ = true; 171 } 172 173 bool has_exception() const noexcept 174 { 175 return has_exception_; 176 } 177 178 void throw_stored_exception() const 179 { 180 // TODO: implement 181 } 182 183 /** 184 * TODO: This member function is supposed to be marked 185 * as 'const'. In such a case, however, we cannot 186 * use the underlying fibril API because these 187 * references get converted to pointers and the API 188 * does not accept e.g. 'const fibril_condvar_t*'. 189 * 190 * The same applies to the wait_for and wait_until 191 * functions. 192 */ 193 virtual void wait() 194 { 195 aux::threading::mutex::lock(mutex_); 196 while (!value_set_) 197 aux::threading::condvar::wait(condvar_, mutex_); 198 aux::threading::mutex::unlock(mutex_); 199 } 200 201 template<class Rep, class Period> 202 bool wait_for(const chrono::duration<Rep, Period>& rel_time) 203 { 204 aux::threading::mutex::lock(mutex_); 205 aux::threading::condvar::wait_for( 206 condvar_, mutex_, 207 aux::threading::time::convert(rel_time) 208 ); 209 aux::threading::mutex::unlock(mutex_); 210 211 return value_set_; 212 } 213 214 template<class Clock, class Duration> 215 bool wait_until(const chrono::time_point<Clock, Duration>& abs_time) 216 { 217 aux::threading::mutex::lock(mutex_); 218 aux::threading::condvar::wait_for( 219 condvar_, mutex_, 220 aux::threading::time::convert(abs_time - Clock::now()) 221 ); 222 aux::threading::mutex::unlock(mutex_); 223 224 return value_set_; 225 } 226 227 ~shared_state() override = default; 228 229 private: 230 aux::mutex_t mutex_; 231 aux::condvar_t condvar_; 232 233 R value_; 234 bool value_set_; 235 236 exception_ptr exception_; 237 bool has_exception_; 238 }; 239 240 /** 241 * We could make one state for both async and 242 * deferred policies, but then we would be wasting 243 * memory and the only benefit would be the ability 244 * for additional implementation defined policies done 245 * directly in that state (as opposed to making new 246 * states for them). 247 * 248 * But since we have no plan (nor need) to make those, 249 * this approach seems to be the best one. 250 * TODO: Override wait_for and wait_until in both! 251 */ 252 253 template<class R, class F, class... Args> 254 class async_shared_state: public shared_state<R> 255 { 256 public: 257 async_shared_state(F&& f, Args&&... args) 258 : shared_state<R>{}, thread_{} 259 { 260 thread_ = thread{ 261 [=](){ 262 try 263 { 264 this->set_value(invoke(f, args...)); 265 } 266 catch(...) // TODO: Any exception. 267 { 268 // TODO: Store it. 269 } 270 } 271 }; 272 } 273 274 void destroy() override 275 { 276 if (!this->is_set()) 277 thread_.join(); 278 } 279 280 void wait() override 281 { 282 if (!this->is_set()) 283 thread_.join(); 284 } 285 286 ~async_shared_state() override 287 { 288 destroy(); 289 } 290 291 private: 292 thread thread_; 293 }; 294 295 template<class R, class F, class... Args> 296 class deferred_shared_state: public shared_state<R> 297 { 298 public: 299 template<class G> 300 deferred_shared_state(G&& f, Args&&... args) 301 : shared_state<R>{}, func_{forward<F>(f)}, 302 args_{forward<Args>(args)...} 303 { /* DUMMY BODY */ } 304 305 void destroy() override 306 { 307 if (!this->is_set()) 308 invoke_(make_index_sequence<sizeof...(Args)>{}); 309 } 310 311 void wait() override 312 { 313 // TODO: Should these be synchronized for async? 314 if (!this->is_set()) 315 invoke_(make_index_sequence<sizeof...(Args)>{}); 316 } 317 318 ~deferred_shared_state() override 319 { 320 destroy(); 321 } 322 323 private: 324 function<R(decay_t<Args>...)> func_; 325 tuple<decay_t<Args>...> args_; 326 327 template<size_t... Is> 328 void invoke_(index_sequence<Is...>) 329 { 330 try 331 { 332 this->set_value(invoke(move(func_), get<Is>(move(args_))...)); 333 } 334 catch(...) 335 { 336 // TODO: Store it. 337 } 338 } 339 }; 340 } 341 342 template<class R> 343 class future; 344 345 template<class R> 346 class promise 347 { 348 public: 349 promise() 350 : state_{new aux::shared_state<R>{}} 351 { /* DUMMY BODY */ } 352 353 template<class Allocator> 354 promise(allocator_arg_t, const Allocator& a) 355 : promise{} 356 { 357 // TODO: Use the allocator. 358 } 359 360 promise(promise&& rhs) noexcept 361 : state_{} 362 { 363 state_ = rhs.state_; 364 rhs.state_ = nullptr; 365 } 366 367 promise(const promise&) = delete; 368 369 ~promise() 370 { 371 abandon_state_(); 372 } 373 374 promise& operator=(promise&& rhs) noexcept 375 { 376 abandon_state_(); 377 promise{std::move(rhs)}.swap(*this); 378 } 379 380 promise& operator=(const promise&) = delete; 381 382 void swap(promise& other) noexcept 383 { 384 std::swap(state_, other.state_); 385 } 386 387 future<R> get_future() 388 { 389 return future<R>{state_}; 390 } 391 392 void set_value(const R& val) 393 { 394 if (!state_) 395 throw future_error{make_error_code(future_errc::no_state)}; 396 if (state_->is_set()) 397 { 398 throw future_error{ 399 make_error_code(future_errc::promise_already_satisfied) 400 }; 401 } 402 403 state_->set_value(val, true); 404 } 405 406 void set_value(R&& val) 407 { 408 if (!state_) 409 throw future_error{make_error_code(future_errc::no_state)}; 410 if (state_->is_set()) 411 { 412 throw future_error{ 413 make_error_code(future_errc::promise_already_satisfied) 414 }; 415 } 416 417 state_->set_value(std::forward<R>(val), true); 418 } 419 420 void set_exception(exception_ptr ptr) 421 { 422 assert(state_); 423 424 state_->set_exception(ptr); 425 } 426 427 void set_value_at_thread_exit(const R& val) 428 { 429 if (!state_) 430 throw future_error{make_error_code(future_errc::no_state)}; 431 if (state_->is_set()) 432 { 433 throw future_error{ 434 make_error_code(future_errc::promise_already_satisfied) 435 }; 436 } 437 438 state_->set_value(val, false); 439 // TODO: schedule it to be set as ready when thread exits 440 } 441 442 void set_value_at_thread_exit(R&& val) 443 { 444 if (!state_) 445 throw future_error{make_error_code(future_errc::no_state)}; 446 if (state_->is_set()) 447 { 448 throw future_error{ 449 make_error_code(future_errc::promise_already_satisfied) 450 }; 451 } 452 453 state_->set_value(std::forward<R>(val), false); 454 // TODO: schedule it to be set as ready when thread exits 455 } 456 457 void set_exception_at_thread_exit(exception_ptr) 458 { 459 // TODO: No exception handling, no-op at this time. 460 } 461 462 private: 463 void abandon_state_() 464 { 465 /** 466 * Note: This is the 'abandon' move described in 467 * 30.6.4 (7). 468 * 1) If state is not ready: 469 * a) Store exception of type future_error with 470 * error condition broken_promise. 471 * b) Mark state as ready. 472 * 2) Rekease the state. 473 */ 474 } 475 476 aux::shared_state<R>* state_; 477 }; 478 479 template<class R> 480 class promise<R&> 481 { 482 // TODO: Copy & modify once promise is done. 483 }; 484 485 template<> 486 class promise<void> 487 { 488 // TODO: Copy & modify once promise is done. 489 }; 490 491 template<class R> 492 void swap(promise<R>& lhs, promise<R>& rhs) noexcept 493 { 494 lhs.swap(rhs); 495 } 496 497 template<class R, class Alloc> 498 struct uses_allocator<promise<R>, Alloc>: true_type 499 { /* DUMMY BODY */ }; 91 * 30.6.6, class template future: 92 */ 500 93 501 94 template<class R> … … 579 172 { 580 173 assert(state_); 581 if (state_->is_deffered_function) 582 return future_status::deferred; 583 584 auto res = state_->wait_for(rel_time); 585 586 if (res) 587 return future_status::ready; 588 else 589 return future_status::timeout; 174 175 return state_->wait_for(rel_time); 590 176 } 591 177 … … 594 180 { 595 181 assert(state_); 596 if (state_->is_deffered_function) 597 return future_status::deferred; 598 599 auto res = state_->wait_until(abs_time); 600 601 if (res) 602 return future_status::ready; 603 else 604 return future_status::timeout; 182 183 return state_->wait_until(abs_time); 605 184 } 606 185 … … 647 226 // TODO: Copy & modify once future is done. 648 227 }; 649 650 // TODO: Make sure the move-future constructor of shared_future651 // invalidates the state (i.e. sets to nullptr).652 template<class R>653 class shared_future654 {655 // TODO: Copy & modify once future is done.656 };657 658 template<class R>659 class shared_future<R&>660 {661 // TODO: Copy & modify once future is done.662 };663 664 template<>665 class shared_future<void>666 {667 // TODO: Copy & modify once future is done.668 };669 670 template<class>671 class packaged_task; // undefined672 673 template<class R, class... Args>674 class packaged_task<R(Args...)>675 {676 packaged_task() noexcept677 {}678 679 template<class F>680 explicit packaged_task(F&& f)681 {}682 683 template<class F, class Allocator>684 explicit packaged_task(allocator_arg_t, const Allocator& a, F&& f)685 {}686 687 ~packaged_task()688 {}689 690 packaged_task(const packaged_task&) = delete;691 packaged_task& operator=(const packaged_task&) = delete;692 693 packaged_task(packaged_task&& rhs)694 {}695 696 packaged_task& operator=(packaged_task&& rhs)697 {}698 699 void swap(packaged_task& other) noexcept700 {}701 702 bool valid() const noexcept703 {}704 705 future<R> get_future()706 {}707 708 void operator()(Args...)709 {}710 711 void make_ready_at_thread_exit(Args...)712 {}713 714 void reset()715 {}716 };717 718 template<class R, class... Args>719 void swap(packaged_task<R(Args...)>& lhs, packaged_task<R(Args...)>& rhs) noexcept720 {721 lhs.swap(rhs);722 };723 724 template<class R, class Alloc>725 struct uses_allocator<packaged_task<R>, Alloc>: true_type726 { /* DUMMY BODY */ };727 728 namespace aux729 {730 /**731 * Note: The reason we keep the actual function732 * within the aux namespace is that were the non-policy733 * version of the function call the other one in the std734 * namespace, we'd get resolution conflicts. This way735 * aux::async is properly called even if std::async is736 * called either with or without a launch policy.737 */738 template<class F, class... Args>739 future<result_of_t<decay_t<F>(decay_t<Args>...)>>740 async(launch policy, F&& f, Args&&... args)741 {742 using result_t = result_of_t<decay_t<F>(decay_t<Args>...)>;743 744 bool async = (static_cast<int>(policy) &745 static_cast<int>(launch::async)) != 0;746 bool deferred = (static_cast<int>(policy) &747 static_cast<int>(launch::deferred)) != 0;748 749 /**750 * Note: The case when async | deferred is set in policy751 * is implementation defined, feel free to change.752 */753 if (async && deferred)754 {755 return future<result_t>{756 new aux::deferred_shared_state<757 result_t, F, Args...758 >{forward<F>(f), forward<Args>(args)...}759 };760 }761 else if (async)762 {763 return future<result_t>{764 new aux::async_shared_state<765 result_t, F, Args...766 >{forward<F>(f), forward<Args>(args)...}767 };768 }769 else if (deferred)770 {771 /**772 * Duplicated on purpose because of the default.773 * Do not remove!774 */775 return future<result_t>{776 new aux::deferred_shared_state<777 result_t, F, Args...778 >{forward<F>(f), forward<Args>(args)...}779 };780 }781 782 /**783 * This is undefined behaviour, let's be nice though ;)784 */785 return future<result_t>{786 new aux::deferred_shared_state<787 result_t, F, Args...788 >{forward<F>(f), forward<Args>(args)...}789 };790 }791 }792 793 template<class F>794 decltype(auto) async(F&& f)795 {796 launch policy = static_cast<launch>(797 static_cast<int>(launch::async) |798 static_cast<int>(launch::deferred)799 );800 801 return aux::async(policy, forward<F>(f));802 }803 804 /**805 * The async(launch, F, Args...) and async(F, Args...)806 * overloards must not collide, so we check the first template807 * argument and handle the special case of just a functor808 * above.809 */810 template<class F, class Arg, class... Args>811 decltype(auto) async(F&& f, Arg&& arg, Args&&... args)812 {813 if constexpr (is_same_v<decay_t<F>, launch>)814 return aux::async(f, forward<Arg>(arg), forward<Args>(args)...);815 else816 {817 launch policy = static_cast<launch>(818 static_cast<int>(launch::async) |819 static_cast<int>(launch::deferred)820 );821 822 return aux::async(policy, forward<F>(f), forward<Arg>(arg), forward<Args>(args)...);823 }824 }825 228 } 826 229
Note:
See TracChangeset
for help on using the changeset viewer.