Changes in uspace/lib/c/generic/async.c [36e2b55:8869f7b] in mainline
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
uspace/lib/c/generic/async.c
r36e2b55 r8869f7b 98 98 #include <ipc/ipc.h> 99 99 #include <async.h> 100 #include "private/async.h"101 100 #undef LIBC_ASYNC_C_ 102 101 … … 108 107 #include <errno.h> 109 108 #include <sys/time.h> 110 #include < libarch/barrier.h>109 #include <arch/barrier.h> 111 110 #include <bool.h> 112 111 #include <malloc.h> 113 112 #include <mem.h> 114 113 #include <stdlib.h> 115 #include <macros.h>114 #include "private/async.h" 116 115 117 116 #define CLIENT_HASH_TABLE_BUCKETS 32 … … 139 138 link_t link; 140 139 141 task_id_t in_task_id;140 sysarg_t in_task_hash; 142 141 atomic_t refcnt; 143 142 void *data; … … 151 150 link_t link; 152 151 153 /** Incoming client task ID. */154 task_id_t in_task_id;152 /** Incoming client task hash. */ 153 sysarg_t in_task_hash; 155 154 156 155 /** Incoming phone hash. */ … … 161 160 162 161 /** Messages that should be delivered to this fibril. */ 163 li st_t msg_queue;162 link_t msg_queue; 164 163 165 164 /** Identification of the opening call. */ … … 167 166 /** Call data of the opening call. */ 168 167 ipc_call_t call; 169 /** Local argument or NULL if none. */170 void *carg;171 168 172 169 /** Identification of the closing call. */ … … 174 171 175 172 /** Fibril function that will be used to handle the connection. */ 176 async_client_conn_t cfibril;173 void (*cfibril)(ipc_callid_t, ipc_call_t *); 177 174 } connection_t; 178 175 … … 204 201 } 205 202 203 void *async_get_client_data(void) 204 { 205 assert(fibril_connection); 206 return fibril_connection->client->data; 207 } 208 206 209 /** Default fibril function that gets called to handle new connection. 207 210 * … … 210 213 * @param callid Hash of the incoming call. 211 214 * @param call Data of the incoming call. 212 * @param arg Local argument 213 * 214 */ 215 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call, 216 void *arg) 215 * 216 */ 217 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call) 217 218 { 218 219 ipc_answer_0(callid, ENOENT); … … 225 226 * @param callid Hash of the incoming call. 226 227 * @param call Data of the incoming call. 227 * @param arg Local argument.228 228 * 229 229 */ … … 233 233 234 234 static async_client_conn_t client_connection = default_client_connection; 235 static async_ interrupt_handler_t interrupt_received = default_interrupt_received;235 static async_client_conn_t interrupt_received = default_interrupt_received; 236 236 237 237 /** Setter for client_connection function pointer. … … 250 250 * notification fibril. 251 251 */ 252 void async_set_interrupt_received(async_ interrupt_handler_t intr)252 void async_set_interrupt_received(async_client_conn_t intr) 253 253 { 254 254 interrupt_received = intr; … … 284 284 { 285 285 assert(key); 286 assert(keys == 2);287 286 assert(item); 288 287 289 288 client_t *client = hash_table_get_instance(item, client_t, link); 290 return (key[0] == LOWER32(client->in_task_id) && 291 (key[1] == UPPER32(client->in_task_id))); 289 return (key[0] == client->in_task_hash); 292 290 } 293 291 … … 358 356 wd->to_event.inlist = true; 359 357 360 link_t *tmp = timeout_list. head.next;361 while (tmp != &timeout_list .head) {358 link_t *tmp = timeout_list.next; 359 while (tmp != &timeout_list) { 362 360 awaiter_t *cur 363 361 = list_get_instance(tmp, awaiter_t, to_event.link); … … 369 367 } 370 368 371 list_ insert_before(&wd->to_event.link, tmp);369 list_append(&wd->to_event.link, tmp); 372 370 } 373 371 … … 566 564 } 567 565 568 msg_t *msg = list_get_instance( list_first(&conn->msg_queue), msg_t, link);566 msg_t *msg = list_get_instance(conn->msg_queue.next, msg_t, link); 569 567 list_remove(&msg->link); 570 568 … … 575 573 futex_up(&async_futex); 576 574 return callid; 577 }578 579 static client_t *async_client_get(task_id_t client_id, bool create)580 {581 unsigned long key[2] = {582 LOWER32(client_id),583 UPPER32(client_id),584 };585 client_t *client = NULL;586 587 futex_down(&async_futex);588 link_t *lnk = hash_table_find(&client_hash_table, key);589 if (lnk) {590 client = hash_table_get_instance(lnk, client_t, link);591 atomic_inc(&client->refcnt);592 } else if (create) {593 client = malloc(sizeof(client_t));594 if (client) {595 client->in_task_id = client_id;596 client->data = async_client_data_create();597 598 atomic_set(&client->refcnt, 1);599 hash_table_insert(&client_hash_table, key, &client->link);600 }601 }602 603 futex_up(&async_futex);604 return client;605 }606 607 static void async_client_put(client_t *client)608 {609 bool destroy;610 unsigned long key[2] = {611 LOWER32(client->in_task_id),612 UPPER32(client->in_task_id)613 };614 615 futex_down(&async_futex);616 617 if (atomic_predec(&client->refcnt) == 0) {618 hash_table_remove(&client_hash_table, key, 2);619 destroy = true;620 } else621 destroy = false;622 623 futex_up(&async_futex);624 625 if (destroy) {626 if (client->data)627 async_client_data_destroy(client->data);628 629 free(client);630 }631 }632 633 void *async_get_client_data(void)634 {635 assert(fibril_connection);636 return fibril_connection->client->data;637 }638 639 void *async_get_client_data_by_id(task_id_t client_id)640 {641 client_t *client = async_client_get(client_id, false);642 if (!client)643 return NULL;644 if (!client->data) {645 async_client_put(client);646 return NULL;647 }648 649 return client->data;650 }651 652 void async_put_client_data_by_id(task_id_t client_id)653 {654 client_t *client = async_client_get(client_id, false);655 656 assert(client);657 assert(client->data);658 659 /* Drop the reference we got in async_get_client_data_by_hash(). */660 async_client_put(client);661 662 /* Drop our own reference we got at the beginning of this function. */663 async_client_put(client);664 575 } 665 576 … … 682 593 */ 683 594 fibril_connection = (connection_t *) arg; 595 596 futex_down(&async_futex); 684 597 685 598 /* … … 688 601 * hash in a new tracking structure. 689 602 */ 690 691 client_t *client = async_client_get(fibril_connection->in_task_id, true); 692 if (!client) { 693 ipc_answer_0(fibril_connection->callid, ENOMEM); 694 return 0; 695 } 696 603 604 unsigned long key = fibril_connection->in_task_hash; 605 link_t *lnk = hash_table_find(&client_hash_table, &key); 606 607 client_t *client; 608 609 if (lnk) { 610 client = hash_table_get_instance(lnk, client_t, link); 611 atomic_inc(&client->refcnt); 612 } else { 613 client = malloc(sizeof(client_t)); 614 if (!client) { 615 ipc_answer_0(fibril_connection->callid, ENOMEM); 616 futex_up(&async_futex); 617 return 0; 618 } 619 620 client->in_task_hash = fibril_connection->in_task_hash; 621 client->data = async_client_data_create(); 622 623 atomic_set(&client->refcnt, 1); 624 hash_table_insert(&client_hash_table, &key, &client->link); 625 } 626 627 futex_up(&async_futex); 628 697 629 fibril_connection->client = client; 698 630 … … 701 633 */ 702 634 fibril_connection->cfibril(fibril_connection->callid, 703 &fibril_connection->call , fibril_connection->carg);635 &fibril_connection->call); 704 636 705 637 /* 706 638 * Remove the reference for this client task connection. 707 639 */ 708 async_client_put(client); 640 bool destroy; 641 642 futex_down(&async_futex); 643 644 if (atomic_predec(&client->refcnt) == 0) { 645 hash_table_remove(&client_hash_table, &key, 1); 646 destroy = true; 647 } else 648 destroy = false; 649 650 futex_up(&async_futex); 651 652 if (destroy) { 653 if (client->data) 654 async_client_data_destroy(client->data); 655 656 free(client); 657 } 709 658 710 659 /* … … 712 661 */ 713 662 futex_down(&async_futex); 714 unsigned longkey = fibril_connection->in_phone_hash;663 key = fibril_connection->in_phone_hash; 715 664 hash_table_remove(&conn_hash_table, &key, 1); 716 665 futex_up(&async_futex); … … 721 670 while (!list_empty(&fibril_connection->msg_queue)) { 722 671 msg_t *msg = 723 list_get_instance( list_first(&fibril_connection->msg_queue),724 msg_t,link);672 list_get_instance(fibril_connection->msg_queue.next, msg_t, 673 link); 725 674 726 675 list_remove(&msg->link); … … 746 695 * particular fibrils. 747 696 * 748 * @param in_task_ idIdentification of the incoming connection.697 * @param in_task_hash Identification of the incoming connection. 749 698 * @param in_phone_hash Identification of the incoming connection. 750 699 * @param callid Hash of the opening IPC_M_CONNECT_ME_TO call. … … 755 704 * @param cfibril Fibril function that should be called upon opening the 756 705 * connection. 757 * @param carg Extra argument to pass to the connection fibril758 706 * 759 707 * @return New fibril id or NULL on failure. 760 708 * 761 709 */ 762 fid_t async_new_connection( task_id_t in_task_id, sysarg_t in_phone_hash,710 fid_t async_new_connection(sysarg_t in_task_hash, sysarg_t in_phone_hash, 763 711 ipc_callid_t callid, ipc_call_t *call, 764 async_client_conn_t cfibril , void *carg)712 async_client_conn_t cfibril) 765 713 { 766 714 connection_t *conn = malloc(sizeof(*conn)); … … 772 720 } 773 721 774 conn->in_task_ id = in_task_id;722 conn->in_task_hash = in_task_hash; 775 723 conn->in_phone_hash = in_phone_hash; 776 724 list_initialize(&conn->msg_queue); 777 725 conn->callid = callid; 778 726 conn->close_callid = 0; 779 conn->carg = carg;780 727 781 728 if (call) … … 831 778 case IPC_M_CONNECT_ME_TO: 832 779 /* Open new connection with fibril, etc. */ 833 async_new_connection(call->in_task_ id, IPC_GET_ARG5(*call),834 callid, call, client_connection , NULL);780 async_new_connection(call->in_task_hash, IPC_GET_ARG5(*call), 781 callid, call, client_connection); 835 782 return; 836 783 } … … 852 799 futex_down(&async_futex); 853 800 854 link_t *cur = list_first(&timeout_list);855 while (cur != NULL) {801 link_t *cur = timeout_list.next; 802 while (cur != &timeout_list) { 856 803 awaiter_t *waiter = 857 804 list_get_instance(cur, awaiter_t, to_event.link); … … 859 806 if (tv_gt(&waiter->to_event.expires, &tv)) 860 807 break; 808 809 cur = cur->next; 861 810 862 811 list_remove(&waiter->to_event.link); … … 872 821 fibril_add_ready(waiter->fid); 873 822 } 874 875 cur = list_first(&timeout_list);876 823 } 877 824 … … 900 847 suseconds_t timeout; 901 848 if (!list_empty(&timeout_list)) { 902 awaiter_t *waiter = list_get_instance( 903 list_first(&timeout_list),awaiter_t, to_event.link);849 awaiter_t *waiter = list_get_instance(timeout_list.next, 850 awaiter_t, to_event.link); 904 851 905 852 struct timeval tv; … … 979 926 { 980 927 if (!hash_table_create(&client_hash_table, CLIENT_HASH_TABLE_BUCKETS, 981 2, &client_hash_table_ops))928 1, &client_hash_table_ops)) 982 929 abort(); 983 930 … … 995 942 session_ns->arg2 = 0; 996 943 session_ns->arg3 = 0; 997 998 fibril_mutex_initialize(&session_ns->remote_state_mtx);999 session_ns->remote_state_data = NULL;1000 944 1001 945 list_initialize(&session_ns->exch_list); … … 1470 1414 */ 1471 1415 int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2, 1472 sysarg_t arg3, async_client_conn_t client_receiver , void *carg)1416 sysarg_t arg3, async_client_conn_t client_receiver) 1473 1417 { 1474 1418 if (exch == NULL) 1475 1419 return ENOENT; 1476 1420 1421 sysarg_t task_hash; 1477 1422 sysarg_t phone_hash; 1478 sysarg_t rc; 1479 1480 aid_t req; 1481 ipc_call_t answer; 1482 req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3, 1483 &answer); 1484 async_wait_for(req, &rc); 1423 int rc = async_req_3_5(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3, 1424 NULL, NULL, NULL, &task_hash, &phone_hash); 1485 1425 if (rc != EOK) 1486 return (int) rc; 1487 1488 phone_hash = IPC_GET_ARG5(answer); 1489 1426 return rc; 1427 1490 1428 if (client_receiver != NULL) 1491 async_new_connection( answer.in_task_id, phone_hash, 0, NULL,1492 client_receiver , carg);1429 async_new_connection(task_hash, phone_hash, 0, NULL, 1430 client_receiver); 1493 1431 1494 1432 return EOK; … … 1564 1502 sess->arg3 = 0; 1565 1503 1566 fibril_mutex_initialize(&sess->remote_state_mtx);1567 sess->remote_state_data = NULL;1568 1569 1504 list_initialize(&sess->exch_list); 1570 1505 fibril_mutex_initialize(&sess->mutex); … … 1648 1583 sess->arg3 = arg3; 1649 1584 1650 fibril_mutex_initialize(&sess->remote_state_mtx);1651 sess->remote_state_data = NULL;1652 1653 1585 list_initialize(&sess->exch_list); 1654 1586 fibril_mutex_initialize(&sess->mutex); … … 1656 1588 1657 1589 return sess; 1658 }1659 1660 /** Set arguments for new connections.1661 *1662 * FIXME This is an ugly hack to work around the problem that parallel1663 * exchanges are implemented using parallel connections. When we create1664 * a callback session, the framework does not know arguments for the new1665 * connections.1666 *1667 * The proper solution seems to be to implement parallel exchanges using1668 * tagging.1669 */1670 void async_sess_args_set(async_sess_t *sess, sysarg_t arg1, sysarg_t arg2,1671 sysarg_t arg3)1672 {1673 sess->arg1 = arg1;1674 sess->arg2 = arg2;1675 sess->arg3 = arg3;1676 1590 } 1677 1591 … … 1719 1633 sess->arg3 = arg3; 1720 1634 1721 fibril_mutex_initialize(&sess->remote_state_mtx);1722 sess->remote_state_data = NULL;1723 1724 1635 list_initialize(&sess->exch_list); 1725 1636 fibril_mutex_initialize(&sess->mutex); … … 1753 1664 sess->arg3 = 0; 1754 1665 1755 fibril_mutex_initialize(&sess->remote_state_mtx);1756 sess->remote_state_data = NULL;1757 1758 1666 list_initialize(&sess->exch_list); 1759 1667 fibril_mutex_initialize(&sess->mutex); … … 1777 1685 int async_hangup(async_sess_t *sess) 1778 1686 { 1779 async_exch_t *exch;1780 1781 1687 assert(sess); 1782 1688 1783 1689 if (atomic_get(&sess->refcnt) > 0) 1784 1690 return EBUSY; 1785 1786 fibril_mutex_lock(&async_sess_mutex);1787 1691 1788 1692 int rc = async_hangup_internal(sess->phone); 1789 1693 if (rc == EOK) 1790 1694 free(sess); 1791 1792 while (!list_empty(&sess->exch_list)) {1793 exch = (async_exch_t *)1794 list_get_instance(list_first(&sess->exch_list),1795 async_exch_t, sess_link);1796 1797 list_remove(&exch->sess_link);1798 list_remove(&exch->global_link);1799 async_hangup_internal(exch->phone);1800 free(exch);1801 }1802 1803 fibril_mutex_unlock(&async_sess_mutex);1804 1695 1805 1696 return rc; … … 1833 1724 */ 1834 1725 exch = (async_exch_t *) 1835 list_get_instance(list_first(&sess->exch_list), 1836 async_exch_t, sess_link); 1837 1726 list_get_instance(sess->exch_list.next, async_exch_t, sess_link); 1838 1727 list_remove(&exch->sess_link); 1839 1728 list_remove(&exch->global_link); … … 1847 1736 exch = (async_exch_t *) malloc(sizeof(async_exch_t)); 1848 1737 if (exch != NULL) { 1849 li nk_initialize(&exch->sess_link);1850 li nk_initialize(&exch->global_link);1738 list_initialize(&exch->sess_link); 1739 list_initialize(&exch->global_link); 1851 1740 exch->sess = sess; 1852 1741 exch->phone = sess->phone; … … 1865 1754 exch = (async_exch_t *) malloc(sizeof(async_exch_t)); 1866 1755 if (exch != NULL) { 1867 li nk_initialize(&exch->sess_link);1868 li nk_initialize(&exch->global_link);1756 list_initialize(&exch->sess_link); 1757 list_initialize(&exch->global_link); 1869 1758 exch->sess = sess; 1870 1759 exch->phone = phone; … … 1878 1767 */ 1879 1768 exch = (async_exch_t *) 1880 list_get_instance(list_first(&inactive_exch_list), 1881 async_exch_t, global_link); 1882 1769 list_get_instance(inactive_exch_list.next, async_exch_t, 1770 global_link); 1883 1771 list_remove(&exch->sess_link); 1884 1772 list_remove(&exch->global_link); … … 1920 1808 async_sess_t *sess = exch->sess; 1921 1809 1922 atomic_dec(&sess->refcnt);1923 1924 1810 if (sess->mgmt == EXCHANGE_SERIALIZE) 1925 1811 fibril_mutex_unlock(&sess->mutex); … … 2436 2322 sess->arg3 = 0; 2437 2323 2438 fibril_mutex_initialize(&sess->remote_state_mtx);2439 sess->remote_state_data = NULL;2440 2441 2324 list_initialize(&sess->exch_list); 2442 2325 fibril_mutex_initialize(&sess->mutex); … … 2485 2368 sess->arg3 = 0; 2486 2369 2487 fibril_mutex_initialize(&sess->remote_state_mtx);2488 sess->remote_state_data = NULL;2489 2490 2370 list_initialize(&sess->exch_list); 2491 2371 fibril_mutex_initialize(&sess->mutex); … … 2530 2410 sess->arg3 = 0; 2531 2411 2532 fibril_mutex_initialize(&sess->remote_state_mtx);2533 sess->remote_state_data = NULL;2534 2535 2412 list_initialize(&sess->exch_list); 2536 2413 fibril_mutex_initialize(&sess->mutex); … … 2540 2417 } 2541 2418 2542 int async_state_change_start(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,2543 sysarg_t arg3, async_exch_t *other_exch)2544 {2545 return async_req_5_0(exch, IPC_M_STATE_CHANGE_AUTHORIZE,2546 arg1, arg2, arg3, 0, other_exch->phone);2547 }2548 2549 bool async_state_change_receive(ipc_callid_t *callid, sysarg_t *arg1,2550 sysarg_t *arg2, sysarg_t *arg3)2551 {2552 assert(callid);2553 2554 ipc_call_t call;2555 *callid = async_get_call(&call);2556 2557 if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE)2558 return false;2559 2560 if (arg1)2561 *arg1 = IPC_GET_ARG1(call);2562 if (arg2)2563 *arg2 = IPC_GET_ARG2(call);2564 if (arg3)2565 *arg3 = IPC_GET_ARG3(call);2566 2567 return true;2568 }2569 2570 int async_state_change_finalize(ipc_callid_t callid, async_exch_t *other_exch)2571 {2572 return ipc_answer_1(callid, EOK, other_exch->phone);2573 }2574 2575 /** Lock and get session remote state2576 *2577 * Lock and get the local replica of the remote state2578 * in stateful sessions. The call should be paired2579 * with async_remote_state_release*().2580 *2581 * @param[in] sess Stateful session.2582 *2583 * @return Local replica of the remote state.2584 *2585 */2586 void *async_remote_state_acquire(async_sess_t *sess)2587 {2588 fibril_mutex_lock(&sess->remote_state_mtx);2589 return sess->remote_state_data;2590 }2591 2592 /** Update the session remote state2593 *2594 * Update the local replica of the remote state2595 * in stateful sessions. The remote state must2596 * be already locked.2597 *2598 * @param[in] sess Stateful session.2599 * @param[in] state New local replica of the remote state.2600 *2601 */2602 void async_remote_state_update(async_sess_t *sess, void *state)2603 {2604 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));2605 sess->remote_state_data = state;2606 }2607 2608 /** Release the session remote state2609 *2610 * Unlock the local replica of the remote state2611 * in stateful sessions.2612 *2613 * @param[in] sess Stateful session.2614 *2615 */2616 void async_remote_state_release(async_sess_t *sess)2617 {2618 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));2619 2620 fibril_mutex_unlock(&sess->remote_state_mtx);2621 }2622 2623 /** Release the session remote state and end an exchange2624 *2625 * Unlock the local replica of the remote state2626 * in stateful sessions. This is convenience function2627 * which gets the session pointer from the exchange2628 * and also ends the exchange.2629 *2630 * @param[in] exch Stateful session's exchange.2631 *2632 */2633 void async_remote_state_release_exchange(async_exch_t *exch)2634 {2635 if (exch == NULL)2636 return;2637 2638 async_sess_t *sess = exch->sess;2639 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));2640 2641 async_exchange_end(exch);2642 fibril_mutex_unlock(&sess->remote_state_mtx);2643 }2644 2645 2419 /** @} 2646 2420 */
Note:
See TracChangeset
for help on using the changeset viewer.