Changeset 925a21e in mainline for uspace/lib/c/generic/async.c
- Timestamp:
- 2011-09-24T14:20:29Z (13 years ago)
- Branches:
- lfn, master, serial, ticket/834-toolchain-update, topic/msim-upgrade, topic/simplify-dev-export
- Children:
- 5bf76c1
- Parents:
- 867e2555 (diff), 1ab4aca (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
uspace/lib/c/generic/async.c
r867e2555 r925a21e 98 98 #include <ipc/ipc.h> 99 99 #include <async.h> 100 #include "private/async.h" 100 101 #undef LIBC_ASYNC_C_ 101 102 … … 107 108 #include <errno.h> 108 109 #include <sys/time.h> 109 #include < arch/barrier.h>110 #include <libarch/barrier.h> 110 111 #include <bool.h> 111 112 #include <malloc.h> 112 113 #include <mem.h> 113 114 #include <stdlib.h> 114 #include "private/async.h"115 #include <macros.h> 115 116 116 117 #define CLIENT_HASH_TABLE_BUCKETS 32 117 118 #define CONN_HASH_TABLE_BUCKETS 32 119 120 /** Session data */ 121 struct async_sess { 122 /** List of inactive exchanges */ 123 list_t exch_list; 124 125 /** Exchange management style */ 126 exch_mgmt_t mgmt; 127 128 /** Session identification */ 129 int phone; 130 131 /** First clone connection argument */ 132 sysarg_t arg1; 133 134 /** Second clone connection argument */ 135 sysarg_t arg2; 136 137 /** Third clone connection argument */ 138 sysarg_t arg3; 139 140 /** Exchange mutex */ 141 fibril_mutex_t mutex; 142 143 /** Number of opened exchanges */ 144 atomic_t refcnt; 145 146 /** Mutex for stateful connections */ 147 fibril_mutex_t remote_state_mtx; 148 149 /** Data for stateful connections */ 150 void *remote_state_data; 151 }; 152 153 /** Exchange data */ 154 struct async_exch { 155 /** Link into list of inactive exchanges */ 156 link_t sess_link; 157 158 /** Link into global list of inactive exchanges */ 159 link_t global_link; 160 161 /** Session pointer */ 162 async_sess_t *sess; 163 164 /** Exchange identification */ 165 int phone; 166 }; 118 167 119 168 /** Async framework global futex */ … … 134 183 } msg_t; 135 184 185 /** Message data */ 186 typedef struct { 187 awaiter_t wdata; 188 189 /** If reply was received. */ 190 bool done; 191 192 /** Pointer to where the answer data is stored. */ 193 ipc_call_t *dataptr; 194 195 sysarg_t retval; 196 } amsg_t; 197 136 198 /* Client connection data */ 137 199 typedef struct { 138 200 link_t link; 139 201 140 sysarg_t in_task_hash;202 task_id_t in_task_id; 141 203 atomic_t refcnt; 142 204 void *data; … … 150 212 link_t link; 151 213 152 /** Incoming client task hash. */153 sysarg_t in_task_hash;214 /** Incoming client task ID. */ 215 task_id_t in_task_id; 154 216 155 217 /** Incoming phone hash. */ … … 203 265 } 204 266 205 void *async_get_client_data(void)206 {207 assert(fibril_connection);208 return fibril_connection->client->data;209 }210 211 267 /** Default fibril function that gets called to handle new connection. 212 268 * … … 289 345 { 290 346 assert(key); 347 assert(keys == 2); 291 348 assert(item); 292 349 293 350 client_t *client = hash_table_get_instance(item, client_t, link); 294 return (key[0] == client->in_task_hash); 351 return (key[0] == LOWER32(client->in_task_id) && 352 (key[1] == UPPER32(client->in_task_id))); 295 353 } 296 354 … … 580 638 } 581 639 640 static client_t *async_client_get(task_id_t client_id, bool create) 641 { 642 unsigned long key[2] = { 643 LOWER32(client_id), 644 UPPER32(client_id), 645 }; 646 client_t *client = NULL; 647 648 futex_down(&async_futex); 649 link_t *lnk = hash_table_find(&client_hash_table, key); 650 if (lnk) { 651 client = hash_table_get_instance(lnk, client_t, link); 652 atomic_inc(&client->refcnt); 653 } else if (create) { 654 client = malloc(sizeof(client_t)); 655 if (client) { 656 client->in_task_id = client_id; 657 client->data = async_client_data_create(); 658 659 atomic_set(&client->refcnt, 1); 660 hash_table_insert(&client_hash_table, key, &client->link); 661 } 662 } 663 664 futex_up(&async_futex); 665 return client; 666 } 667 668 static void async_client_put(client_t *client) 669 { 670 bool destroy; 671 unsigned long key[2] = { 672 LOWER32(client->in_task_id), 673 UPPER32(client->in_task_id) 674 }; 675 676 futex_down(&async_futex); 677 678 if (atomic_predec(&client->refcnt) == 0) { 679 hash_table_remove(&client_hash_table, key, 2); 680 destroy = true; 681 } else 682 destroy = false; 683 684 futex_up(&async_futex); 685 686 if (destroy) { 687 if (client->data) 688 async_client_data_destroy(client->data); 689 690 free(client); 691 } 692 } 693 694 void *async_get_client_data(void) 695 { 696 assert(fibril_connection); 697 return fibril_connection->client->data; 698 } 699 700 void *async_get_client_data_by_id(task_id_t client_id) 701 { 702 client_t *client = async_client_get(client_id, false); 703 if (!client) 704 return NULL; 705 if (!client->data) { 706 async_client_put(client); 707 return NULL; 708 } 709 710 return client->data; 711 } 712 713 void async_put_client_data_by_id(task_id_t client_id) 714 { 715 client_t *client = async_client_get(client_id, false); 716 717 assert(client); 718 assert(client->data); 719 720 /* Drop the reference we got in async_get_client_data_by_hash(). */ 721 async_client_put(client); 722 723 /* Drop our own reference we got at the beginning of this function. */ 724 async_client_put(client); 725 } 726 582 727 /** Wrapper for client connection fibril. 583 728 * … … 598 743 */ 599 744 fibril_connection = (connection_t *) arg; 600 601 futex_down(&async_futex);602 745 603 746 /* … … 606 749 * hash in a new tracking structure. 607 750 */ 608 609 unsigned long key = fibril_connection->in_task_hash; 610 link_t *lnk = hash_table_find(&client_hash_table, &key); 611 612 client_t *client; 613 614 if (lnk) { 615 client = hash_table_get_instance(lnk, client_t, link); 616 atomic_inc(&client->refcnt); 617 } else { 618 client = malloc(sizeof(client_t)); 619 if (!client) { 620 ipc_answer_0(fibril_connection->callid, ENOMEM); 621 futex_up(&async_futex); 622 return 0; 623 } 624 625 client->in_task_hash = fibril_connection->in_task_hash; 626 client->data = async_client_data_create(); 627 628 atomic_set(&client->refcnt, 1); 629 hash_table_insert(&client_hash_table, &key, &client->link); 630 } 631 632 futex_up(&async_futex); 633 751 752 client_t *client = async_client_get(fibril_connection->in_task_id, true); 753 if (!client) { 754 ipc_answer_0(fibril_connection->callid, ENOMEM); 755 return 0; 756 } 757 634 758 fibril_connection->client = client; 635 759 … … 643 767 * Remove the reference for this client task connection. 644 768 */ 645 bool destroy; 646 647 futex_down(&async_futex); 648 649 if (atomic_predec(&client->refcnt) == 0) { 650 hash_table_remove(&client_hash_table, &key, 1); 651 destroy = true; 652 } else 653 destroy = false; 654 655 futex_up(&async_futex); 656 657 if (destroy) { 658 if (client->data) 659 async_client_data_destroy(client->data); 660 661 free(client); 662 } 769 async_client_put(client); 663 770 664 771 /* … … 666 773 */ 667 774 futex_down(&async_futex); 668 key = fibril_connection->in_phone_hash;775 unsigned long key = fibril_connection->in_phone_hash; 669 776 hash_table_remove(&conn_hash_table, &key, 1); 670 777 futex_up(&async_futex); … … 700 807 * particular fibrils. 701 808 * 702 * @param in_task_ hashIdentification of the incoming connection.809 * @param in_task_id Identification of the incoming connection. 703 810 * @param in_phone_hash Identification of the incoming connection. 704 811 * @param callid Hash of the opening IPC_M_CONNECT_ME_TO call. … … 714 821 * 715 822 */ 716 fid_t async_new_connection( sysarg_t in_task_hash, sysarg_t in_phone_hash,823 fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash, 717 824 ipc_callid_t callid, ipc_call_t *call, 718 825 async_client_conn_t cfibril, void *carg) … … 726 833 } 727 834 728 conn->in_task_ hash = in_task_hash;835 conn->in_task_id = in_task_id; 729 836 conn->in_phone_hash = in_phone_hash; 730 837 list_initialize(&conn->msg_queue); … … 785 892 case IPC_M_CONNECT_ME_TO: 786 893 /* Open new connection with fibril, etc. */ 787 async_new_connection(call->in_task_ hash, IPC_GET_ARG5(*call),894 async_new_connection(call->in_task_id, IPC_GET_ARG5(*call), 788 895 callid, call, client_connection, NULL); 789 896 return; … … 933 1040 { 934 1041 if (!hash_table_create(&client_hash_table, CLIENT_HASH_TABLE_BUCKETS, 935 1, &client_hash_table_ops))1042 2, &client_hash_table_ops)) 936 1043 abort(); 937 1044 … … 949 1056 session_ns->arg2 = 0; 950 1057 session_ns->arg3 = 0; 1058 1059 fibril_mutex_initialize(&session_ns->remote_state_mtx); 1060 session_ns->remote_state_data = NULL; 951 1061 952 1062 list_initialize(&session_ns->exch_list); … … 1426 1536 return ENOENT; 1427 1537 1428 sysarg_t task_hash;1429 1538 sysarg_t phone_hash; 1430 int rc = async_req_3_5(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3, 1431 NULL, NULL, NULL, &task_hash, &phone_hash); 1539 sysarg_t rc; 1540 1541 aid_t req; 1542 ipc_call_t answer; 1543 req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3, 1544 &answer); 1545 async_wait_for(req, &rc); 1432 1546 if (rc != EOK) 1433 return rc; 1434 1547 return (int) rc; 1548 1549 phone_hash = IPC_GET_ARG5(answer); 1550 1435 1551 if (client_receiver != NULL) 1436 async_new_connection( task_hash, phone_hash, 0, NULL,1552 async_new_connection(answer.in_task_id, phone_hash, 0, NULL, 1437 1553 client_receiver, carg); 1438 1554 … … 1509 1625 sess->arg3 = 0; 1510 1626 1627 fibril_mutex_initialize(&sess->remote_state_mtx); 1628 sess->remote_state_data = NULL; 1629 1511 1630 list_initialize(&sess->exch_list); 1512 1631 fibril_mutex_initialize(&sess->mutex); … … 1590 1709 sess->arg3 = arg3; 1591 1710 1711 fibril_mutex_initialize(&sess->remote_state_mtx); 1712 sess->remote_state_data = NULL; 1713 1592 1714 list_initialize(&sess->exch_list); 1593 1715 fibril_mutex_initialize(&sess->mutex); … … 1595 1717 1596 1718 return sess; 1719 } 1720 1721 /** Set arguments for new connections. 1722 * 1723 * FIXME This is an ugly hack to work around the problem that parallel 1724 * exchanges are implemented using parallel connections. When we create 1725 * a callback session, the framework does not know arguments for the new 1726 * connections. 1727 * 1728 * The proper solution seems to be to implement parallel exchanges using 1729 * tagging. 1730 */ 1731 void async_sess_args_set(async_sess_t *sess, sysarg_t arg1, sysarg_t arg2, 1732 sysarg_t arg3) 1733 { 1734 sess->arg1 = arg1; 1735 sess->arg2 = arg2; 1736 sess->arg3 = arg3; 1597 1737 } 1598 1738 … … 1640 1780 sess->arg3 = arg3; 1641 1781 1782 fibril_mutex_initialize(&sess->remote_state_mtx); 1783 sess->remote_state_data = NULL; 1784 1642 1785 list_initialize(&sess->exch_list); 1643 1786 fibril_mutex_initialize(&sess->mutex); … … 1671 1814 sess->arg3 = 0; 1672 1815 1816 fibril_mutex_initialize(&sess->remote_state_mtx); 1817 sess->remote_state_data = NULL; 1818 1673 1819 list_initialize(&sess->exch_list); 1674 1820 fibril_mutex_initialize(&sess->mutex); … … 1692 1838 int async_hangup(async_sess_t *sess) 1693 1839 { 1840 async_exch_t *exch; 1841 1694 1842 assert(sess); 1695 1843 1696 1844 if (atomic_get(&sess->refcnt) > 0) 1697 1845 return EBUSY; 1846 1847 fibril_mutex_lock(&async_sess_mutex); 1698 1848 1699 1849 int rc = async_hangup_internal(sess->phone); 1700 1850 if (rc == EOK) 1701 1851 free(sess); 1852 1853 while (!list_empty(&sess->exch_list)) { 1854 exch = (async_exch_t *) 1855 list_get_instance(list_first(&sess->exch_list), 1856 async_exch_t, sess_link); 1857 1858 list_remove(&exch->sess_link); 1859 list_remove(&exch->global_link); 1860 async_hangup_internal(exch->phone); 1861 free(exch); 1862 } 1863 1864 fibril_mutex_unlock(&async_sess_mutex); 1702 1865 1703 1866 return rc; … … 2334 2497 sess->arg3 = 0; 2335 2498 2499 fibril_mutex_initialize(&sess->remote_state_mtx); 2500 sess->remote_state_data = NULL; 2501 2336 2502 list_initialize(&sess->exch_list); 2337 2503 fibril_mutex_initialize(&sess->mutex); … … 2380 2546 sess->arg3 = 0; 2381 2547 2548 fibril_mutex_initialize(&sess->remote_state_mtx); 2549 sess->remote_state_data = NULL; 2550 2382 2551 list_initialize(&sess->exch_list); 2383 2552 fibril_mutex_initialize(&sess->mutex); … … 2422 2591 sess->arg3 = 0; 2423 2592 2593 fibril_mutex_initialize(&sess->remote_state_mtx); 2594 sess->remote_state_data = NULL; 2595 2424 2596 list_initialize(&sess->exch_list); 2425 2597 fibril_mutex_initialize(&sess->mutex); … … 2429 2601 } 2430 2602 2603 int async_state_change_start(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2, 2604 sysarg_t arg3, async_exch_t *other_exch) 2605 { 2606 return async_req_5_0(exch, IPC_M_STATE_CHANGE_AUTHORIZE, 2607 arg1, arg2, arg3, 0, other_exch->phone); 2608 } 2609 2610 bool async_state_change_receive(ipc_callid_t *callid, sysarg_t *arg1, 2611 sysarg_t *arg2, sysarg_t *arg3) 2612 { 2613 assert(callid); 2614 2615 ipc_call_t call; 2616 *callid = async_get_call(&call); 2617 2618 if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE) 2619 return false; 2620 2621 if (arg1) 2622 *arg1 = IPC_GET_ARG1(call); 2623 if (arg2) 2624 *arg2 = IPC_GET_ARG2(call); 2625 if (arg3) 2626 *arg3 = IPC_GET_ARG3(call); 2627 2628 return true; 2629 } 2630 2631 int async_state_change_finalize(ipc_callid_t callid, async_exch_t *other_exch) 2632 { 2633 return ipc_answer_1(callid, EOK, other_exch->phone); 2634 } 2635 2636 /** Lock and get session remote state 2637 * 2638 * Lock and get the local replica of the remote state 2639 * in stateful sessions. The call should be paired 2640 * with async_remote_state_release*(). 2641 * 2642 * @param[in] sess Stateful session. 2643 * 2644 * @return Local replica of the remote state. 2645 * 2646 */ 2647 void *async_remote_state_acquire(async_sess_t *sess) 2648 { 2649 fibril_mutex_lock(&sess->remote_state_mtx); 2650 return sess->remote_state_data; 2651 } 2652 2653 /** Update the session remote state 2654 * 2655 * Update the local replica of the remote state 2656 * in stateful sessions. The remote state must 2657 * be already locked. 2658 * 2659 * @param[in] sess Stateful session. 2660 * @param[in] state New local replica of the remote state. 2661 * 2662 */ 2663 void async_remote_state_update(async_sess_t *sess, void *state) 2664 { 2665 assert(fibril_mutex_is_locked(&sess->remote_state_mtx)); 2666 sess->remote_state_data = state; 2667 } 2668 2669 /** Release the session remote state 2670 * 2671 * Unlock the local replica of the remote state 2672 * in stateful sessions. 2673 * 2674 * @param[in] sess Stateful session. 2675 * 2676 */ 2677 void async_remote_state_release(async_sess_t *sess) 2678 { 2679 assert(fibril_mutex_is_locked(&sess->remote_state_mtx)); 2680 2681 fibril_mutex_unlock(&sess->remote_state_mtx); 2682 } 2683 2684 /** Release the session remote state and end an exchange 2685 * 2686 * Unlock the local replica of the remote state 2687 * in stateful sessions. This is convenience function 2688 * which gets the session pointer from the exchange 2689 * and also ends the exchange. 2690 * 2691 * @param[in] exch Stateful session's exchange. 2692 * 2693 */ 2694 void async_remote_state_release_exchange(async_exch_t *exch) 2695 { 2696 if (exch == NULL) 2697 return; 2698 2699 async_sess_t *sess = exch->sess; 2700 assert(fibril_mutex_is_locked(&sess->remote_state_mtx)); 2701 2702 async_exchange_end(exch); 2703 fibril_mutex_unlock(&sess->remote_state_mtx); 2704 } 2705 2431 2706 /** @} 2432 2707 */
Note:
See TracChangeset
for help on using the changeset viewer.