Changes in uspace/lib/c/generic/async.c [8869f7b:36e2b55] in mainline
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
uspace/lib/c/generic/async.c
r8869f7b r36e2b55 98 98 #include <ipc/ipc.h> 99 99 #include <async.h> 100 #include "private/async.h" 100 101 #undef LIBC_ASYNC_C_ 101 102 … … 107 108 #include <errno.h> 108 109 #include <sys/time.h> 109 #include < arch/barrier.h>110 #include <libarch/barrier.h> 110 111 #include <bool.h> 111 112 #include <malloc.h> 112 113 #include <mem.h> 113 114 #include <stdlib.h> 114 #include "private/async.h"115 #include <macros.h> 115 116 116 117 #define CLIENT_HASH_TABLE_BUCKETS 32 … … 138 139 link_t link; 139 140 140 sysarg_t in_task_hash;141 task_id_t in_task_id; 141 142 atomic_t refcnt; 142 143 void *data; … … 150 151 link_t link; 151 152 152 /** Incoming client task hash. */153 sysarg_t in_task_hash;153 /** Incoming client task ID. */ 154 task_id_t in_task_id; 154 155 155 156 /** Incoming phone hash. */ … … 160 161 161 162 /** Messages that should be delivered to this fibril. */ 162 li nk_t msg_queue;163 list_t msg_queue; 163 164 164 165 /** Identification of the opening call. */ … … 166 167 /** Call data of the opening call. */ 167 168 ipc_call_t call; 169 /** Local argument or NULL if none. */ 170 void *carg; 168 171 169 172 /** Identification of the closing call. */ … … 171 174 172 175 /** Fibril function that will be used to handle the connection. */ 173 void (*cfibril)(ipc_callid_t, ipc_call_t *);176 async_client_conn_t cfibril; 174 177 } connection_t; 175 178 … … 201 204 } 202 205 203 void *async_get_client_data(void)204 {205 assert(fibril_connection);206 return fibril_connection->client->data;207 }208 209 206 /** Default fibril function that gets called to handle new connection. 210 207 * … … 213 210 * @param callid Hash of the incoming call. 214 211 * @param call Data of the incoming call. 215 * 216 */ 217 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call) 212 * @param arg Local argument 213 * 214 */ 215 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call, 216 void *arg) 218 217 { 219 218 ipc_answer_0(callid, ENOENT); … … 226 225 * @param callid Hash of the incoming call. 227 226 * @param call Data of the incoming call. 227 * @param arg Local argument. 228 228 * 229 229 */ … … 233 233 234 234 static async_client_conn_t client_connection = default_client_connection; 235 static async_ client_conn_t interrupt_received = default_interrupt_received;235 static async_interrupt_handler_t interrupt_received = default_interrupt_received; 236 236 237 237 /** Setter for client_connection function pointer. … … 250 250 * notification fibril. 251 251 */ 252 void async_set_interrupt_received(async_ client_conn_t intr)252 void async_set_interrupt_received(async_interrupt_handler_t intr) 253 253 { 254 254 interrupt_received = intr; … … 284 284 { 285 285 assert(key); 286 assert(keys == 2); 286 287 assert(item); 287 288 288 289 client_t *client = hash_table_get_instance(item, client_t, link); 289 return (key[0] == client->in_task_hash); 290 return (key[0] == LOWER32(client->in_task_id) && 291 (key[1] == UPPER32(client->in_task_id))); 290 292 } 291 293 … … 356 358 wd->to_event.inlist = true; 357 359 358 link_t *tmp = timeout_list. next;359 while (tmp != &timeout_list ) {360 link_t *tmp = timeout_list.head.next; 361 while (tmp != &timeout_list.head) { 360 362 awaiter_t *cur 361 363 = list_get_instance(tmp, awaiter_t, to_event.link); … … 367 369 } 368 370 369 list_ append(&wd->to_event.link, tmp);371 list_insert_before(&wd->to_event.link, tmp); 370 372 } 371 373 … … 564 566 } 565 567 566 msg_t *msg = list_get_instance( conn->msg_queue.next, msg_t, link);568 msg_t *msg = list_get_instance(list_first(&conn->msg_queue), msg_t, link); 567 569 list_remove(&msg->link); 568 570 … … 573 575 futex_up(&async_futex); 574 576 return callid; 577 } 578 579 static client_t *async_client_get(task_id_t client_id, bool create) 580 { 581 unsigned long key[2] = { 582 LOWER32(client_id), 583 UPPER32(client_id), 584 }; 585 client_t *client = NULL; 586 587 futex_down(&async_futex); 588 link_t *lnk = hash_table_find(&client_hash_table, key); 589 if (lnk) { 590 client = hash_table_get_instance(lnk, client_t, link); 591 atomic_inc(&client->refcnt); 592 } else if (create) { 593 client = malloc(sizeof(client_t)); 594 if (client) { 595 client->in_task_id = client_id; 596 client->data = async_client_data_create(); 597 598 atomic_set(&client->refcnt, 1); 599 hash_table_insert(&client_hash_table, key, &client->link); 600 } 601 } 602 603 futex_up(&async_futex); 604 return client; 605 } 606 607 static void async_client_put(client_t *client) 608 { 609 bool destroy; 610 unsigned long key[2] = { 611 LOWER32(client->in_task_id), 612 UPPER32(client->in_task_id) 613 }; 614 615 futex_down(&async_futex); 616 617 if (atomic_predec(&client->refcnt) == 0) { 618 hash_table_remove(&client_hash_table, key, 2); 619 destroy = true; 620 } else 621 destroy = false; 622 623 futex_up(&async_futex); 624 625 if (destroy) { 626 if (client->data) 627 async_client_data_destroy(client->data); 628 629 free(client); 630 } 631 } 632 633 void *async_get_client_data(void) 634 { 635 assert(fibril_connection); 636 return fibril_connection->client->data; 637 } 638 639 void *async_get_client_data_by_id(task_id_t client_id) 640 { 641 client_t *client = async_client_get(client_id, false); 642 if (!client) 643 return NULL; 644 if (!client->data) { 645 async_client_put(client); 646 return NULL; 647 } 648 649 return client->data; 650 } 651 652 void async_put_client_data_by_id(task_id_t client_id) 653 { 654 client_t *client = async_client_get(client_id, false); 655 656 assert(client); 657 assert(client->data); 658 659 /* Drop the reference we got in async_get_client_data_by_hash(). */ 660 async_client_put(client); 661 662 /* Drop our own reference we got at the beginning of this function. */ 663 async_client_put(client); 575 664 } 576 665 … … 593 682 */ 594 683 fibril_connection = (connection_t *) arg; 595 596 futex_down(&async_futex);597 684 598 685 /* … … 601 688 * hash in a new tracking structure. 602 689 */ 603 604 unsigned long key = fibril_connection->in_task_hash; 605 link_t *lnk = hash_table_find(&client_hash_table, &key); 606 607 client_t *client; 608 609 if (lnk) { 610 client = hash_table_get_instance(lnk, client_t, link); 611 atomic_inc(&client->refcnt); 612 } else { 613 client = malloc(sizeof(client_t)); 614 if (!client) { 615 ipc_answer_0(fibril_connection->callid, ENOMEM); 616 futex_up(&async_futex); 617 return 0; 618 } 619 620 client->in_task_hash = fibril_connection->in_task_hash; 621 client->data = async_client_data_create(); 622 623 atomic_set(&client->refcnt, 1); 624 hash_table_insert(&client_hash_table, &key, &client->link); 625 } 626 627 futex_up(&async_futex); 628 690 691 client_t *client = async_client_get(fibril_connection->in_task_id, true); 692 if (!client) { 693 ipc_answer_0(fibril_connection->callid, ENOMEM); 694 return 0; 695 } 696 629 697 fibril_connection->client = client; 630 698 … … 633 701 */ 634 702 fibril_connection->cfibril(fibril_connection->callid, 635 &fibril_connection->call );703 &fibril_connection->call, fibril_connection->carg); 636 704 637 705 /* 638 706 * Remove the reference for this client task connection. 639 707 */ 640 bool destroy; 641 642 futex_down(&async_futex); 643 644 if (atomic_predec(&client->refcnt) == 0) { 645 hash_table_remove(&client_hash_table, &key, 1); 646 destroy = true; 647 } else 648 destroy = false; 649 650 futex_up(&async_futex); 651 652 if (destroy) { 653 if (client->data) 654 async_client_data_destroy(client->data); 655 656 free(client); 657 } 708 async_client_put(client); 658 709 659 710 /* … … 661 712 */ 662 713 futex_down(&async_futex); 663 key = fibril_connection->in_phone_hash;714 unsigned long key = fibril_connection->in_phone_hash; 664 715 hash_table_remove(&conn_hash_table, &key, 1); 665 716 futex_up(&async_futex); … … 670 721 while (!list_empty(&fibril_connection->msg_queue)) { 671 722 msg_t *msg = 672 list_get_instance( fibril_connection->msg_queue.next, msg_t,673 link);723 list_get_instance(list_first(&fibril_connection->msg_queue), 724 msg_t, link); 674 725 675 726 list_remove(&msg->link); … … 695 746 * particular fibrils. 696 747 * 697 * @param in_task_ hashIdentification of the incoming connection.748 * @param in_task_id Identification of the incoming connection. 698 749 * @param in_phone_hash Identification of the incoming connection. 699 750 * @param callid Hash of the opening IPC_M_CONNECT_ME_TO call. … … 704 755 * @param cfibril Fibril function that should be called upon opening the 705 756 * connection. 757 * @param carg Extra argument to pass to the connection fibril 706 758 * 707 759 * @return New fibril id or NULL on failure. 708 760 * 709 761 */ 710 fid_t async_new_connection( sysarg_t in_task_hash, sysarg_t in_phone_hash,762 fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash, 711 763 ipc_callid_t callid, ipc_call_t *call, 712 async_client_conn_t cfibril )764 async_client_conn_t cfibril, void *carg) 713 765 { 714 766 connection_t *conn = malloc(sizeof(*conn)); … … 720 772 } 721 773 722 conn->in_task_ hash = in_task_hash;774 conn->in_task_id = in_task_id; 723 775 conn->in_phone_hash = in_phone_hash; 724 776 list_initialize(&conn->msg_queue); 725 777 conn->callid = callid; 726 778 conn->close_callid = 0; 779 conn->carg = carg; 727 780 728 781 if (call) … … 778 831 case IPC_M_CONNECT_ME_TO: 779 832 /* Open new connection with fibril, etc. */ 780 async_new_connection(call->in_task_ hash, IPC_GET_ARG5(*call),781 callid, call, client_connection );833 async_new_connection(call->in_task_id, IPC_GET_ARG5(*call), 834 callid, call, client_connection, NULL); 782 835 return; 783 836 } … … 799 852 futex_down(&async_futex); 800 853 801 link_t *cur = timeout_list.next;802 while (cur != &timeout_list) {854 link_t *cur = list_first(&timeout_list); 855 while (cur != NULL) { 803 856 awaiter_t *waiter = 804 857 list_get_instance(cur, awaiter_t, to_event.link); … … 806 859 if (tv_gt(&waiter->to_event.expires, &tv)) 807 860 break; 808 809 cur = cur->next;810 861 811 862 list_remove(&waiter->to_event.link); … … 821 872 fibril_add_ready(waiter->fid); 822 873 } 874 875 cur = list_first(&timeout_list); 823 876 } 824 877 … … 847 900 suseconds_t timeout; 848 901 if (!list_empty(&timeout_list)) { 849 awaiter_t *waiter = list_get_instance( timeout_list.next,850 awaiter_t, to_event.link);902 awaiter_t *waiter = list_get_instance( 903 list_first(&timeout_list), awaiter_t, to_event.link); 851 904 852 905 struct timeval tv; … … 926 979 { 927 980 if (!hash_table_create(&client_hash_table, CLIENT_HASH_TABLE_BUCKETS, 928 1, &client_hash_table_ops))981 2, &client_hash_table_ops)) 929 982 abort(); 930 983 … … 942 995 session_ns->arg2 = 0; 943 996 session_ns->arg3 = 0; 997 998 fibril_mutex_initialize(&session_ns->remote_state_mtx); 999 session_ns->remote_state_data = NULL; 944 1000 945 1001 list_initialize(&session_ns->exch_list); … … 1414 1470 */ 1415 1471 int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2, 1416 sysarg_t arg3, async_client_conn_t client_receiver )1472 sysarg_t arg3, async_client_conn_t client_receiver, void *carg) 1417 1473 { 1418 1474 if (exch == NULL) 1419 1475 return ENOENT; 1420 1476 1421 sysarg_t task_hash;1422 1477 sysarg_t phone_hash; 1423 int rc = async_req_3_5(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3, 1424 NULL, NULL, NULL, &task_hash, &phone_hash); 1478 sysarg_t rc; 1479 1480 aid_t req; 1481 ipc_call_t answer; 1482 req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3, 1483 &answer); 1484 async_wait_for(req, &rc); 1425 1485 if (rc != EOK) 1426 return rc; 1427 1486 return (int) rc; 1487 1488 phone_hash = IPC_GET_ARG5(answer); 1489 1428 1490 if (client_receiver != NULL) 1429 async_new_connection( task_hash, phone_hash, 0, NULL,1430 client_receiver );1491 async_new_connection(answer.in_task_id, phone_hash, 0, NULL, 1492 client_receiver, carg); 1431 1493 1432 1494 return EOK; … … 1502 1564 sess->arg3 = 0; 1503 1565 1566 fibril_mutex_initialize(&sess->remote_state_mtx); 1567 sess->remote_state_data = NULL; 1568 1504 1569 list_initialize(&sess->exch_list); 1505 1570 fibril_mutex_initialize(&sess->mutex); … … 1583 1648 sess->arg3 = arg3; 1584 1649 1650 fibril_mutex_initialize(&sess->remote_state_mtx); 1651 sess->remote_state_data = NULL; 1652 1585 1653 list_initialize(&sess->exch_list); 1586 1654 fibril_mutex_initialize(&sess->mutex); … … 1588 1656 1589 1657 return sess; 1658 } 1659 1660 /** Set arguments for new connections. 1661 * 1662 * FIXME This is an ugly hack to work around the problem that parallel 1663 * exchanges are implemented using parallel connections. When we create 1664 * a callback session, the framework does not know arguments for the new 1665 * connections. 1666 * 1667 * The proper solution seems to be to implement parallel exchanges using 1668 * tagging. 1669 */ 1670 void async_sess_args_set(async_sess_t *sess, sysarg_t arg1, sysarg_t arg2, 1671 sysarg_t arg3) 1672 { 1673 sess->arg1 = arg1; 1674 sess->arg2 = arg2; 1675 sess->arg3 = arg3; 1590 1676 } 1591 1677 … … 1633 1719 sess->arg3 = arg3; 1634 1720 1721 fibril_mutex_initialize(&sess->remote_state_mtx); 1722 sess->remote_state_data = NULL; 1723 1635 1724 list_initialize(&sess->exch_list); 1636 1725 fibril_mutex_initialize(&sess->mutex); … … 1664 1753 sess->arg3 = 0; 1665 1754 1755 fibril_mutex_initialize(&sess->remote_state_mtx); 1756 sess->remote_state_data = NULL; 1757 1666 1758 list_initialize(&sess->exch_list); 1667 1759 fibril_mutex_initialize(&sess->mutex); … … 1685 1777 int async_hangup(async_sess_t *sess) 1686 1778 { 1779 async_exch_t *exch; 1780 1687 1781 assert(sess); 1688 1782 1689 1783 if (atomic_get(&sess->refcnt) > 0) 1690 1784 return EBUSY; 1785 1786 fibril_mutex_lock(&async_sess_mutex); 1691 1787 1692 1788 int rc = async_hangup_internal(sess->phone); 1693 1789 if (rc == EOK) 1694 1790 free(sess); 1791 1792 while (!list_empty(&sess->exch_list)) { 1793 exch = (async_exch_t *) 1794 list_get_instance(list_first(&sess->exch_list), 1795 async_exch_t, sess_link); 1796 1797 list_remove(&exch->sess_link); 1798 list_remove(&exch->global_link); 1799 async_hangup_internal(exch->phone); 1800 free(exch); 1801 } 1802 1803 fibril_mutex_unlock(&async_sess_mutex); 1695 1804 1696 1805 return rc; … … 1724 1833 */ 1725 1834 exch = (async_exch_t *) 1726 list_get_instance(sess->exch_list.next, async_exch_t, sess_link); 1835 list_get_instance(list_first(&sess->exch_list), 1836 async_exch_t, sess_link); 1837 1727 1838 list_remove(&exch->sess_link); 1728 1839 list_remove(&exch->global_link); … … 1736 1847 exch = (async_exch_t *) malloc(sizeof(async_exch_t)); 1737 1848 if (exch != NULL) { 1738 li st_initialize(&exch->sess_link);1739 li st_initialize(&exch->global_link);1849 link_initialize(&exch->sess_link); 1850 link_initialize(&exch->global_link); 1740 1851 exch->sess = sess; 1741 1852 exch->phone = sess->phone; … … 1754 1865 exch = (async_exch_t *) malloc(sizeof(async_exch_t)); 1755 1866 if (exch != NULL) { 1756 li st_initialize(&exch->sess_link);1757 li st_initialize(&exch->global_link);1867 link_initialize(&exch->sess_link); 1868 link_initialize(&exch->global_link); 1758 1869 exch->sess = sess; 1759 1870 exch->phone = phone; … … 1767 1878 */ 1768 1879 exch = (async_exch_t *) 1769 list_get_instance(inactive_exch_list.next, async_exch_t, 1770 global_link); 1880 list_get_instance(list_first(&inactive_exch_list), 1881 async_exch_t, global_link); 1882 1771 1883 list_remove(&exch->sess_link); 1772 1884 list_remove(&exch->global_link); … … 1808 1920 async_sess_t *sess = exch->sess; 1809 1921 1922 atomic_dec(&sess->refcnt); 1923 1810 1924 if (sess->mgmt == EXCHANGE_SERIALIZE) 1811 1925 fibril_mutex_unlock(&sess->mutex); … … 2322 2436 sess->arg3 = 0; 2323 2437 2438 fibril_mutex_initialize(&sess->remote_state_mtx); 2439 sess->remote_state_data = NULL; 2440 2324 2441 list_initialize(&sess->exch_list); 2325 2442 fibril_mutex_initialize(&sess->mutex); … … 2368 2485 sess->arg3 = 0; 2369 2486 2487 fibril_mutex_initialize(&sess->remote_state_mtx); 2488 sess->remote_state_data = NULL; 2489 2370 2490 list_initialize(&sess->exch_list); 2371 2491 fibril_mutex_initialize(&sess->mutex); … … 2410 2530 sess->arg3 = 0; 2411 2531 2532 fibril_mutex_initialize(&sess->remote_state_mtx); 2533 sess->remote_state_data = NULL; 2534 2412 2535 list_initialize(&sess->exch_list); 2413 2536 fibril_mutex_initialize(&sess->mutex); … … 2417 2540 } 2418 2541 2542 int async_state_change_start(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2, 2543 sysarg_t arg3, async_exch_t *other_exch) 2544 { 2545 return async_req_5_0(exch, IPC_M_STATE_CHANGE_AUTHORIZE, 2546 arg1, arg2, arg3, 0, other_exch->phone); 2547 } 2548 2549 bool async_state_change_receive(ipc_callid_t *callid, sysarg_t *arg1, 2550 sysarg_t *arg2, sysarg_t *arg3) 2551 { 2552 assert(callid); 2553 2554 ipc_call_t call; 2555 *callid = async_get_call(&call); 2556 2557 if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE) 2558 return false; 2559 2560 if (arg1) 2561 *arg1 = IPC_GET_ARG1(call); 2562 if (arg2) 2563 *arg2 = IPC_GET_ARG2(call); 2564 if (arg3) 2565 *arg3 = IPC_GET_ARG3(call); 2566 2567 return true; 2568 } 2569 2570 int async_state_change_finalize(ipc_callid_t callid, async_exch_t *other_exch) 2571 { 2572 return ipc_answer_1(callid, EOK, other_exch->phone); 2573 } 2574 2575 /** Lock and get session remote state 2576 * 2577 * Lock and get the local replica of the remote state 2578 * in stateful sessions. The call should be paired 2579 * with async_remote_state_release*(). 2580 * 2581 * @param[in] sess Stateful session. 2582 * 2583 * @return Local replica of the remote state. 2584 * 2585 */ 2586 void *async_remote_state_acquire(async_sess_t *sess) 2587 { 2588 fibril_mutex_lock(&sess->remote_state_mtx); 2589 return sess->remote_state_data; 2590 } 2591 2592 /** Update the session remote state 2593 * 2594 * Update the local replica of the remote state 2595 * in stateful sessions. The remote state must 2596 * be already locked. 2597 * 2598 * @param[in] sess Stateful session. 2599 * @param[in] state New local replica of the remote state. 2600 * 2601 */ 2602 void async_remote_state_update(async_sess_t *sess, void *state) 2603 { 2604 assert(fibril_mutex_is_locked(&sess->remote_state_mtx)); 2605 sess->remote_state_data = state; 2606 } 2607 2608 /** Release the session remote state 2609 * 2610 * Unlock the local replica of the remote state 2611 * in stateful sessions. 2612 * 2613 * @param[in] sess Stateful session. 2614 * 2615 */ 2616 void async_remote_state_release(async_sess_t *sess) 2617 { 2618 assert(fibril_mutex_is_locked(&sess->remote_state_mtx)); 2619 2620 fibril_mutex_unlock(&sess->remote_state_mtx); 2621 } 2622 2623 /** Release the session remote state and end an exchange 2624 * 2625 * Unlock the local replica of the remote state 2626 * in stateful sessions. This is convenience function 2627 * which gets the session pointer from the exchange 2628 * and also ends the exchange. 2629 * 2630 * @param[in] exch Stateful session's exchange. 2631 * 2632 */ 2633 void async_remote_state_release_exchange(async_exch_t *exch) 2634 { 2635 if (exch == NULL) 2636 return; 2637 2638 async_sess_t *sess = exch->sess; 2639 assert(fibril_mutex_is_locked(&sess->remote_state_mtx)); 2640 2641 async_exchange_end(exch); 2642 fibril_mutex_unlock(&sess->remote_state_mtx); 2643 } 2644 2419 2645 /** @} 2420 2646 */
Note:
See TracChangeset
for help on using the changeset viewer.