Changes in uspace/lib/c/generic/async.c [b72efe8:36e2b55] in mainline
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
uspace/lib/c/generic/async.c
rb72efe8 r36e2b55 98 98 #include <ipc/ipc.h> 99 99 #include <async.h> 100 #include "private/async.h" 100 101 #undef LIBC_ASYNC_C_ 101 102 … … 107 108 #include <errno.h> 108 109 #include <sys/time.h> 109 #include < arch/barrier.h>110 #include <libarch/barrier.h> 110 111 #include <bool.h> 111 112 #include <malloc.h> 112 113 #include <mem.h> 113 114 #include <stdlib.h> 114 #include "private/async.h"115 #include <macros.h> 115 116 116 117 #define CLIENT_HASH_TABLE_BUCKETS 32 … … 138 139 link_t link; 139 140 140 sysarg_t in_task_hash;141 task_id_t in_task_id; 141 142 atomic_t refcnt; 142 143 void *data; … … 150 151 link_t link; 151 152 152 /** Incoming client task hash. */153 sysarg_t in_task_hash;153 /** Incoming client task ID. */ 154 task_id_t in_task_id; 154 155 155 156 /** Incoming phone hash. */ … … 203 204 } 204 205 205 void *async_get_client_data(void)206 {207 assert(fibril_connection);208 return fibril_connection->client->data;209 }210 211 206 /** Default fibril function that gets called to handle new connection. 212 207 * 213 208 * This function is defined as a weak symbol - to be redefined in user code. 214 209 * 215 * @param callid 216 * @param call 217 * @param arg 210 * @param callid Hash of the incoming call. 211 * @param call Data of the incoming call. 212 * @param arg Local argument 218 213 * 219 214 */ … … 228 223 * This function is defined as a weak symbol - to be redefined in user code. 229 224 * 230 * @param callid 231 * @param call 232 * @param arg 225 * @param callid Hash of the incoming call. 226 * @param call Data of the incoming call. 227 * @param arg Local argument. 233 228 * 234 229 */ … … 289 284 { 290 285 assert(key); 286 assert(keys == 2); 291 287 assert(item); 292 288 293 289 client_t *client = hash_table_get_instance(item, client_t, link); 294 return (key[0] == client->in_task_hash); 290 return (key[0] == LOWER32(client->in_task_id) && 291 (key[1] == UPPER32(client->in_task_id))); 295 292 } 296 293 … … 580 577 } 581 578 579 static client_t *async_client_get(task_id_t client_id, bool create) 580 { 581 unsigned long key[2] = { 582 LOWER32(client_id), 583 UPPER32(client_id), 584 }; 585 client_t *client = NULL; 586 587 futex_down(&async_futex); 588 link_t *lnk = hash_table_find(&client_hash_table, key); 589 if (lnk) { 590 client = hash_table_get_instance(lnk, client_t, link); 591 atomic_inc(&client->refcnt); 592 } else if (create) { 593 client = malloc(sizeof(client_t)); 594 if (client) { 595 client->in_task_id = client_id; 596 client->data = async_client_data_create(); 597 598 atomic_set(&client->refcnt, 1); 599 hash_table_insert(&client_hash_table, key, &client->link); 600 } 601 } 602 603 futex_up(&async_futex); 604 return client; 605 } 606 607 static void async_client_put(client_t *client) 608 { 609 bool destroy; 610 unsigned long key[2] = { 611 LOWER32(client->in_task_id), 612 UPPER32(client->in_task_id) 613 }; 614 615 futex_down(&async_futex); 616 617 if (atomic_predec(&client->refcnt) == 0) { 618 hash_table_remove(&client_hash_table, key, 2); 619 destroy = true; 620 } else 621 destroy = false; 622 623 futex_up(&async_futex); 624 625 if (destroy) { 626 if (client->data) 627 async_client_data_destroy(client->data); 628 629 free(client); 630 } 631 } 632 633 void *async_get_client_data(void) 634 { 635 assert(fibril_connection); 636 return fibril_connection->client->data; 637 } 638 639 void *async_get_client_data_by_id(task_id_t client_id) 640 { 641 client_t *client = async_client_get(client_id, false); 642 if (!client) 643 return NULL; 644 if (!client->data) { 645 async_client_put(client); 646 return NULL; 647 } 648 649 return client->data; 650 } 651 652 void async_put_client_data_by_id(task_id_t client_id) 653 { 654 client_t *client = async_client_get(client_id, false); 655 656 assert(client); 657 assert(client->data); 658 659 /* Drop the reference we got in async_get_client_data_by_hash(). */ 660 async_client_put(client); 661 662 /* Drop our own reference we got at the beginning of this function. */ 663 async_client_put(client); 664 } 665 582 666 /** Wrapper for client connection fibril. 583 667 * … … 598 682 */ 599 683 fibril_connection = (connection_t *) arg; 600 601 futex_down(&async_futex);602 684 603 685 /* … … 606 688 * hash in a new tracking structure. 607 689 */ 608 609 unsigned long key = fibril_connection->in_task_hash; 610 link_t *lnk = hash_table_find(&client_hash_table, &key); 611 612 client_t *client; 613 614 if (lnk) { 615 client = hash_table_get_instance(lnk, client_t, link); 616 atomic_inc(&client->refcnt); 617 } else { 618 client = malloc(sizeof(client_t)); 619 if (!client) { 620 ipc_answer_0(fibril_connection->callid, ENOMEM); 621 futex_up(&async_futex); 622 return 0; 623 } 624 625 client->in_task_hash = fibril_connection->in_task_hash; 626 client->data = async_client_data_create(); 627 628 atomic_set(&client->refcnt, 1); 629 hash_table_insert(&client_hash_table, &key, &client->link); 630 } 631 632 futex_up(&async_futex); 633 690 691 client_t *client = async_client_get(fibril_connection->in_task_id, true); 692 if (!client) { 693 ipc_answer_0(fibril_connection->callid, ENOMEM); 694 return 0; 695 } 696 634 697 fibril_connection->client = client; 635 698 … … 643 706 * Remove the reference for this client task connection. 644 707 */ 645 bool destroy; 646 647 futex_down(&async_futex); 648 649 if (atomic_predec(&client->refcnt) == 0) { 650 hash_table_remove(&client_hash_table, &key, 1); 651 destroy = true; 652 } else 653 destroy = false; 654 655 futex_up(&async_futex); 656 657 if (destroy) { 658 if (client->data) 659 async_client_data_destroy(client->data); 660 661 free(client); 662 } 708 async_client_put(client); 663 709 664 710 /* … … 666 712 */ 667 713 futex_down(&async_futex); 668 key = fibril_connection->in_phone_hash;714 unsigned long key = fibril_connection->in_phone_hash; 669 715 hash_table_remove(&conn_hash_table, &key, 1); 670 716 futex_up(&async_futex); … … 700 746 * particular fibrils. 701 747 * 702 * @param in_task_ hashIdentification of the incoming connection.748 * @param in_task_id Identification of the incoming connection. 703 749 * @param in_phone_hash Identification of the incoming connection. 704 750 * @param callid Hash of the opening IPC_M_CONNECT_ME_TO call. … … 709 755 * @param cfibril Fibril function that should be called upon opening the 710 756 * connection. 711 * @param carg 757 * @param carg Extra argument to pass to the connection fibril 712 758 * 713 759 * @return New fibril id or NULL on failure. 714 760 * 715 761 */ 716 fid_t async_new_connection( sysarg_t in_task_hash, sysarg_t in_phone_hash,762 fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash, 717 763 ipc_callid_t callid, ipc_call_t *call, 718 764 async_client_conn_t cfibril, void *carg) … … 726 772 } 727 773 728 conn->in_task_ hash = in_task_hash;774 conn->in_task_id = in_task_id; 729 775 conn->in_phone_hash = in_phone_hash; 730 776 list_initialize(&conn->msg_queue); … … 785 831 case IPC_M_CONNECT_ME_TO: 786 832 /* Open new connection with fibril, etc. */ 787 async_new_connection(call->in_task_ hash, IPC_GET_ARG5(*call),833 async_new_connection(call->in_task_id, IPC_GET_ARG5(*call), 788 834 callid, call, client_connection, NULL); 789 835 return; … … 933 979 { 934 980 if (!hash_table_create(&client_hash_table, CLIENT_HASH_TABLE_BUCKETS, 935 1, &client_hash_table_ops))981 2, &client_hash_table_ops)) 936 982 abort(); 937 983 … … 949 995 session_ns->arg2 = 0; 950 996 session_ns->arg3 = 0; 997 998 fibril_mutex_initialize(&session_ns->remote_state_mtx); 999 session_ns->remote_state_data = NULL; 951 1000 952 1001 list_initialize(&session_ns->exch_list); … … 1426 1475 return ENOENT; 1427 1476 1428 sysarg_t task_hash;1429 1477 sysarg_t phone_hash; 1430 int rc = async_req_3_5(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3, 1431 NULL, NULL, NULL, &task_hash, &phone_hash); 1478 sysarg_t rc; 1479 1480 aid_t req; 1481 ipc_call_t answer; 1482 req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3, 1483 &answer); 1484 async_wait_for(req, &rc); 1432 1485 if (rc != EOK) 1433 return rc; 1434 1486 return (int) rc; 1487 1488 phone_hash = IPC_GET_ARG5(answer); 1489 1435 1490 if (client_receiver != NULL) 1436 async_new_connection( task_hash, phone_hash, 0, NULL,1491 async_new_connection(answer.in_task_id, phone_hash, 0, NULL, 1437 1492 client_receiver, carg); 1438 1493 … … 1509 1564 sess->arg3 = 0; 1510 1565 1566 fibril_mutex_initialize(&sess->remote_state_mtx); 1567 sess->remote_state_data = NULL; 1568 1511 1569 list_initialize(&sess->exch_list); 1512 1570 fibril_mutex_initialize(&sess->mutex); … … 1590 1648 sess->arg3 = arg3; 1591 1649 1650 fibril_mutex_initialize(&sess->remote_state_mtx); 1651 sess->remote_state_data = NULL; 1652 1592 1653 list_initialize(&sess->exch_list); 1593 1654 fibril_mutex_initialize(&sess->mutex); … … 1595 1656 1596 1657 return sess; 1658 } 1659 1660 /** Set arguments for new connections. 1661 * 1662 * FIXME This is an ugly hack to work around the problem that parallel 1663 * exchanges are implemented using parallel connections. When we create 1664 * a callback session, the framework does not know arguments for the new 1665 * connections. 1666 * 1667 * The proper solution seems to be to implement parallel exchanges using 1668 * tagging. 1669 */ 1670 void async_sess_args_set(async_sess_t *sess, sysarg_t arg1, sysarg_t arg2, 1671 sysarg_t arg3) 1672 { 1673 sess->arg1 = arg1; 1674 sess->arg2 = arg2; 1675 sess->arg3 = arg3; 1597 1676 } 1598 1677 … … 1640 1719 sess->arg3 = arg3; 1641 1720 1721 fibril_mutex_initialize(&sess->remote_state_mtx); 1722 sess->remote_state_data = NULL; 1723 1642 1724 list_initialize(&sess->exch_list); 1643 1725 fibril_mutex_initialize(&sess->mutex); … … 1671 1753 sess->arg3 = 0; 1672 1754 1755 fibril_mutex_initialize(&sess->remote_state_mtx); 1756 sess->remote_state_data = NULL; 1757 1673 1758 list_initialize(&sess->exch_list); 1674 1759 fibril_mutex_initialize(&sess->mutex); … … 1692 1777 int async_hangup(async_sess_t *sess) 1693 1778 { 1779 async_exch_t *exch; 1780 1694 1781 assert(sess); 1695 1782 1696 1783 if (atomic_get(&sess->refcnt) > 0) 1697 1784 return EBUSY; 1785 1786 fibril_mutex_lock(&async_sess_mutex); 1698 1787 1699 1788 int rc = async_hangup_internal(sess->phone); 1700 1789 if (rc == EOK) 1701 1790 free(sess); 1791 1792 while (!list_empty(&sess->exch_list)) { 1793 exch = (async_exch_t *) 1794 list_get_instance(list_first(&sess->exch_list), 1795 async_exch_t, sess_link); 1796 1797 list_remove(&exch->sess_link); 1798 list_remove(&exch->global_link); 1799 async_hangup_internal(exch->phone); 1800 free(exch); 1801 } 1802 1803 fibril_mutex_unlock(&async_sess_mutex); 1702 1804 1703 1805 return rc; … … 2334 2436 sess->arg3 = 0; 2335 2437 2438 fibril_mutex_initialize(&sess->remote_state_mtx); 2439 sess->remote_state_data = NULL; 2440 2336 2441 list_initialize(&sess->exch_list); 2337 2442 fibril_mutex_initialize(&sess->mutex); … … 2380 2485 sess->arg3 = 0; 2381 2486 2487 fibril_mutex_initialize(&sess->remote_state_mtx); 2488 sess->remote_state_data = NULL; 2489 2382 2490 list_initialize(&sess->exch_list); 2383 2491 fibril_mutex_initialize(&sess->mutex); … … 2422 2530 sess->arg3 = 0; 2423 2531 2532 fibril_mutex_initialize(&sess->remote_state_mtx); 2533 sess->remote_state_data = NULL; 2534 2424 2535 list_initialize(&sess->exch_list); 2425 2536 fibril_mutex_initialize(&sess->mutex); … … 2429 2540 } 2430 2541 2542 int async_state_change_start(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2, 2543 sysarg_t arg3, async_exch_t *other_exch) 2544 { 2545 return async_req_5_0(exch, IPC_M_STATE_CHANGE_AUTHORIZE, 2546 arg1, arg2, arg3, 0, other_exch->phone); 2547 } 2548 2549 bool async_state_change_receive(ipc_callid_t *callid, sysarg_t *arg1, 2550 sysarg_t *arg2, sysarg_t *arg3) 2551 { 2552 assert(callid); 2553 2554 ipc_call_t call; 2555 *callid = async_get_call(&call); 2556 2557 if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE) 2558 return false; 2559 2560 if (arg1) 2561 *arg1 = IPC_GET_ARG1(call); 2562 if (arg2) 2563 *arg2 = IPC_GET_ARG2(call); 2564 if (arg3) 2565 *arg3 = IPC_GET_ARG3(call); 2566 2567 return true; 2568 } 2569 2570 int async_state_change_finalize(ipc_callid_t callid, async_exch_t *other_exch) 2571 { 2572 return ipc_answer_1(callid, EOK, other_exch->phone); 2573 } 2574 2575 /** Lock and get session remote state 2576 * 2577 * Lock and get the local replica of the remote state 2578 * in stateful sessions. The call should be paired 2579 * with async_remote_state_release*(). 2580 * 2581 * @param[in] sess Stateful session. 2582 * 2583 * @return Local replica of the remote state. 2584 * 2585 */ 2586 void *async_remote_state_acquire(async_sess_t *sess) 2587 { 2588 fibril_mutex_lock(&sess->remote_state_mtx); 2589 return sess->remote_state_data; 2590 } 2591 2592 /** Update the session remote state 2593 * 2594 * Update the local replica of the remote state 2595 * in stateful sessions. The remote state must 2596 * be already locked. 2597 * 2598 * @param[in] sess Stateful session. 2599 * @param[in] state New local replica of the remote state. 2600 * 2601 */ 2602 void async_remote_state_update(async_sess_t *sess, void *state) 2603 { 2604 assert(fibril_mutex_is_locked(&sess->remote_state_mtx)); 2605 sess->remote_state_data = state; 2606 } 2607 2608 /** Release the session remote state 2609 * 2610 * Unlock the local replica of the remote state 2611 * in stateful sessions. 2612 * 2613 * @param[in] sess Stateful session. 2614 * 2615 */ 2616 void async_remote_state_release(async_sess_t *sess) 2617 { 2618 assert(fibril_mutex_is_locked(&sess->remote_state_mtx)); 2619 2620 fibril_mutex_unlock(&sess->remote_state_mtx); 2621 } 2622 2623 /** Release the session remote state and end an exchange 2624 * 2625 * Unlock the local replica of the remote state 2626 * in stateful sessions. This is convenience function 2627 * which gets the session pointer from the exchange 2628 * and also ends the exchange. 2629 * 2630 * @param[in] exch Stateful session's exchange. 2631 * 2632 */ 2633 void async_remote_state_release_exchange(async_exch_t *exch) 2634 { 2635 if (exch == NULL) 2636 return; 2637 2638 async_sess_t *sess = exch->sess; 2639 assert(fibril_mutex_is_locked(&sess->remote_state_mtx)); 2640 2641 async_exchange_end(exch); 2642 fibril_mutex_unlock(&sess->remote_state_mtx); 2643 } 2644 2431 2645 /** @} 2432 2646 */
Note:
See TracChangeset
for help on using the changeset viewer.