Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • uspace/lib/c/generic/async.c

    rb72efe8 r36e2b55  
    9898#include <ipc/ipc.h>
    9999#include <async.h>
     100#include "private/async.h"
    100101#undef LIBC_ASYNC_C_
    101102
     
    107108#include <errno.h>
    108109#include <sys/time.h>
    109 #include <arch/barrier.h>
     110#include <libarch/barrier.h>
    110111#include <bool.h>
    111112#include <malloc.h>
    112113#include <mem.h>
    113114#include <stdlib.h>
    114 #include "private/async.h"
     115#include <macros.h>
    115116
    116117#define CLIENT_HASH_TABLE_BUCKETS  32
     
    138139        link_t link;
    139140       
    140         sysarg_t in_task_hash;
     141        task_id_t in_task_id;
    141142        atomic_t refcnt;
    142143        void *data;
     
    150151        link_t link;
    151152       
    152         /** Incoming client task hash. */
    153         sysarg_t in_task_hash;
     153        /** Incoming client task ID. */
     154        task_id_t in_task_id;
    154155       
    155156        /** Incoming phone hash. */
     
    203204}
    204205
    205 void *async_get_client_data(void)
    206 {
    207         assert(fibril_connection);
    208         return fibril_connection->client->data;
    209 }
    210 
    211206/** Default fibril function that gets called to handle new connection.
    212207 *
    213208 * This function is defined as a weak symbol - to be redefined in user code.
    214209 *
    215  * @param callid        Hash of the incoming call.
    216  * @param call          Data of the incoming call.
    217  * @param arg           Local argument
     210 * @param callid Hash of the incoming call.
     211 * @param call   Data of the incoming call.
     212 * @param arg    Local argument
    218213 *
    219214 */
     
    228223 * This function is defined as a weak symbol - to be redefined in user code.
    229224 *
    230  * @param callid        Hash of the incoming call.
    231  * @param call          Data of the incoming call.
    232  * @param arg           Local argument.
     225 * @param callid Hash of the incoming call.
     226 * @param call   Data of the incoming call.
     227 * @param arg    Local argument.
    233228 *
    234229 */
     
    289284{
    290285        assert(key);
     286        assert(keys == 2);
    291287        assert(item);
    292288       
    293289        client_t *client = hash_table_get_instance(item, client_t, link);
    294         return (key[0] == client->in_task_hash);
     290        return (key[0] == LOWER32(client->in_task_id) &&
     291            (key[1] == UPPER32(client->in_task_id)));
    295292}
    296293
     
    580577}
    581578
     579static client_t *async_client_get(task_id_t client_id, bool create)
     580{
     581        unsigned long key[2] = {
     582                LOWER32(client_id),
     583                UPPER32(client_id),
     584        };
     585        client_t *client = NULL;
     586
     587        futex_down(&async_futex);
     588        link_t *lnk = hash_table_find(&client_hash_table, key);
     589        if (lnk) {
     590                client = hash_table_get_instance(lnk, client_t, link);
     591                atomic_inc(&client->refcnt);
     592        } else if (create) {
     593                client = malloc(sizeof(client_t));
     594                if (client) {
     595                        client->in_task_id = client_id;
     596                        client->data = async_client_data_create();
     597               
     598                        atomic_set(&client->refcnt, 1);
     599                        hash_table_insert(&client_hash_table, key, &client->link);
     600                }
     601        }
     602
     603        futex_up(&async_futex);
     604        return client;
     605}
     606
     607static void async_client_put(client_t *client)
     608{
     609        bool destroy;
     610        unsigned long key[2] = {
     611                LOWER32(client->in_task_id),
     612                UPPER32(client->in_task_id)
     613        };
     614       
     615        futex_down(&async_futex);
     616       
     617        if (atomic_predec(&client->refcnt) == 0) {
     618                hash_table_remove(&client_hash_table, key, 2);
     619                destroy = true;
     620        } else
     621                destroy = false;
     622       
     623        futex_up(&async_futex);
     624       
     625        if (destroy) {
     626                if (client->data)
     627                        async_client_data_destroy(client->data);
     628               
     629                free(client);
     630        }
     631}
     632
     633void *async_get_client_data(void)
     634{
     635        assert(fibril_connection);
     636        return fibril_connection->client->data;
     637}
     638
     639void *async_get_client_data_by_id(task_id_t client_id)
     640{
     641        client_t *client = async_client_get(client_id, false);
     642        if (!client)
     643                return NULL;
     644        if (!client->data) {
     645                async_client_put(client);
     646                return NULL;
     647        }
     648
     649        return client->data;
     650}
     651
     652void async_put_client_data_by_id(task_id_t client_id)
     653{
     654        client_t *client = async_client_get(client_id, false);
     655
     656        assert(client);
     657        assert(client->data);
     658
     659        /* Drop the reference we got in async_get_client_data_by_hash(). */
     660        async_client_put(client);
     661
     662        /* Drop our own reference we got at the beginning of this function. */
     663        async_client_put(client);
     664}
     665
    582666/** Wrapper for client connection fibril.
    583667 *
     
    598682         */
    599683        fibril_connection = (connection_t *) arg;
    600        
    601         futex_down(&async_futex);
    602684       
    603685        /*
     
    606688         * hash in a new tracking structure.
    607689         */
    608        
    609         unsigned long key = fibril_connection->in_task_hash;
    610         link_t *lnk = hash_table_find(&client_hash_table, &key);
    611        
    612         client_t *client;
    613        
    614         if (lnk) {
    615                 client = hash_table_get_instance(lnk, client_t, link);
    616                 atomic_inc(&client->refcnt);
    617         } else {
    618                 client = malloc(sizeof(client_t));
    619                 if (!client) {
    620                         ipc_answer_0(fibril_connection->callid, ENOMEM);
    621                         futex_up(&async_futex);
    622                         return 0;
    623                 }
    624                
    625                 client->in_task_hash = fibril_connection->in_task_hash;
    626                 client->data = async_client_data_create();
    627                
    628                 atomic_set(&client->refcnt, 1);
    629                 hash_table_insert(&client_hash_table, &key, &client->link);
    630         }
    631        
    632         futex_up(&async_futex);
    633        
     690
     691        client_t *client = async_client_get(fibril_connection->in_task_id, true);
     692        if (!client) {
     693                ipc_answer_0(fibril_connection->callid, ENOMEM);
     694                return 0;
     695        }
     696
    634697        fibril_connection->client = client;
    635698       
     
    643706         * Remove the reference for this client task connection.
    644707         */
    645         bool destroy;
    646        
    647         futex_down(&async_futex);
    648        
    649         if (atomic_predec(&client->refcnt) == 0) {
    650                 hash_table_remove(&client_hash_table, &key, 1);
    651                 destroy = true;
    652         } else
    653                 destroy = false;
    654        
    655         futex_up(&async_futex);
    656        
    657         if (destroy) {
    658                 if (client->data)
    659                         async_client_data_destroy(client->data);
    660                
    661                 free(client);
    662         }
     708        async_client_put(client);
    663709       
    664710        /*
     
    666712         */
    667713        futex_down(&async_futex);
    668         key = fibril_connection->in_phone_hash;
     714        unsigned long key = fibril_connection->in_phone_hash;
    669715        hash_table_remove(&conn_hash_table, &key, 1);
    670716        futex_up(&async_futex);
     
    700746 * particular fibrils.
    701747 *
    702  * @param in_task_hash  Identification of the incoming connection.
     748 * @param in_task_id    Identification of the incoming connection.
    703749 * @param in_phone_hash Identification of the incoming connection.
    704750 * @param callid        Hash of the opening IPC_M_CONNECT_ME_TO call.
     
    709755 * @param cfibril       Fibril function that should be called upon opening the
    710756 *                      connection.
    711  * @param carg          Extra argument to pass to the connection fibril
     757 * @param carg          Extra argument to pass to the connection fibril
    712758 *
    713759 * @return New fibril id or NULL on failure.
    714760 *
    715761 */
    716 fid_t async_new_connection(sysarg_t in_task_hash, sysarg_t in_phone_hash,
     762fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,
    717763    ipc_callid_t callid, ipc_call_t *call,
    718764    async_client_conn_t cfibril, void *carg)
     
    726772        }
    727773       
    728         conn->in_task_hash = in_task_hash;
     774        conn->in_task_id = in_task_id;
    729775        conn->in_phone_hash = in_phone_hash;
    730776        list_initialize(&conn->msg_queue);
     
    785831        case IPC_M_CONNECT_ME_TO:
    786832                /* Open new connection with fibril, etc. */
    787                 async_new_connection(call->in_task_hash, IPC_GET_ARG5(*call),
     833                async_new_connection(call->in_task_id, IPC_GET_ARG5(*call),
    788834                    callid, call, client_connection, NULL);
    789835                return;
     
    933979{
    934980        if (!hash_table_create(&client_hash_table, CLIENT_HASH_TABLE_BUCKETS,
    935             1, &client_hash_table_ops))
     981            2, &client_hash_table_ops))
    936982                abort();
    937983       
     
    949995        session_ns->arg2 = 0;
    950996        session_ns->arg3 = 0;
     997       
     998        fibril_mutex_initialize(&session_ns->remote_state_mtx);
     999        session_ns->remote_state_data = NULL;
    9511000       
    9521001        list_initialize(&session_ns->exch_list);
     
    14261475                return ENOENT;
    14271476       
    1428         sysarg_t task_hash;
    14291477        sysarg_t phone_hash;
    1430         int rc = async_req_3_5(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
    1431             NULL, NULL, NULL, &task_hash, &phone_hash);
     1478        sysarg_t rc;
     1479
     1480        aid_t req;
     1481        ipc_call_t answer;
     1482        req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
     1483            &answer);
     1484        async_wait_for(req, &rc);
    14321485        if (rc != EOK)
    1433                 return rc;
    1434        
     1486                return (int) rc;
     1487
     1488        phone_hash = IPC_GET_ARG5(answer);
     1489
    14351490        if (client_receiver != NULL)
    1436                 async_new_connection(task_hash, phone_hash, 0, NULL,
     1491                async_new_connection(answer.in_task_id, phone_hash, 0, NULL,
    14371492                    client_receiver, carg);
    14381493       
     
    15091564        sess->arg3 = 0;
    15101565       
     1566        fibril_mutex_initialize(&sess->remote_state_mtx);
     1567        sess->remote_state_data = NULL;
     1568       
    15111569        list_initialize(&sess->exch_list);
    15121570        fibril_mutex_initialize(&sess->mutex);
     
    15901648        sess->arg3 = arg3;
    15911649       
     1650        fibril_mutex_initialize(&sess->remote_state_mtx);
     1651        sess->remote_state_data = NULL;
     1652       
    15921653        list_initialize(&sess->exch_list);
    15931654        fibril_mutex_initialize(&sess->mutex);
     
    15951656       
    15961657        return sess;
     1658}
     1659
     1660/** Set arguments for new connections.
     1661 *
     1662 * FIXME This is an ugly hack to work around the problem that parallel
     1663 * exchanges are implemented using parallel connections. When we create
     1664 * a callback session, the framework does not know arguments for the new
     1665 * connections.
     1666 *
     1667 * The proper solution seems to be to implement parallel exchanges using
     1668 * tagging.
     1669 */
     1670void async_sess_args_set(async_sess_t *sess, sysarg_t arg1, sysarg_t arg2,
     1671    sysarg_t arg3)
     1672{
     1673        sess->arg1 = arg1;
     1674        sess->arg2 = arg2;
     1675        sess->arg3 = arg3;
    15971676}
    15981677
     
    16401719        sess->arg3 = arg3;
    16411720       
     1721        fibril_mutex_initialize(&sess->remote_state_mtx);
     1722        sess->remote_state_data = NULL;
     1723       
    16421724        list_initialize(&sess->exch_list);
    16431725        fibril_mutex_initialize(&sess->mutex);
     
    16711753        sess->arg3 = 0;
    16721754       
     1755        fibril_mutex_initialize(&sess->remote_state_mtx);
     1756        sess->remote_state_data = NULL;
     1757       
    16731758        list_initialize(&sess->exch_list);
    16741759        fibril_mutex_initialize(&sess->mutex);
     
    16921777int async_hangup(async_sess_t *sess)
    16931778{
     1779        async_exch_t *exch;
     1780       
    16941781        assert(sess);
    16951782       
    16961783        if (atomic_get(&sess->refcnt) > 0)
    16971784                return EBUSY;
     1785       
     1786        fibril_mutex_lock(&async_sess_mutex);
    16981787       
    16991788        int rc = async_hangup_internal(sess->phone);
    17001789        if (rc == EOK)
    17011790                free(sess);
     1791       
     1792        while (!list_empty(&sess->exch_list)) {
     1793                exch = (async_exch_t *)
     1794                    list_get_instance(list_first(&sess->exch_list),
     1795                    async_exch_t, sess_link);
     1796               
     1797                list_remove(&exch->sess_link);
     1798                list_remove(&exch->global_link);
     1799                async_hangup_internal(exch->phone);
     1800                free(exch);
     1801        }
     1802       
     1803        fibril_mutex_unlock(&async_sess_mutex);
    17021804       
    17031805        return rc;
     
    23342436        sess->arg3 = 0;
    23352437       
     2438        fibril_mutex_initialize(&sess->remote_state_mtx);
     2439        sess->remote_state_data = NULL;
     2440       
    23362441        list_initialize(&sess->exch_list);
    23372442        fibril_mutex_initialize(&sess->mutex);
     
    23802485        sess->arg3 = 0;
    23812486       
     2487        fibril_mutex_initialize(&sess->remote_state_mtx);
     2488        sess->remote_state_data = NULL;
     2489       
    23822490        list_initialize(&sess->exch_list);
    23832491        fibril_mutex_initialize(&sess->mutex);
     
    24222530        sess->arg3 = 0;
    24232531       
     2532        fibril_mutex_initialize(&sess->remote_state_mtx);
     2533        sess->remote_state_data = NULL;
     2534       
    24242535        list_initialize(&sess->exch_list);
    24252536        fibril_mutex_initialize(&sess->mutex);
     
    24292540}
    24302541
     2542int async_state_change_start(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
     2543    sysarg_t arg3, async_exch_t *other_exch)
     2544{
     2545        return async_req_5_0(exch, IPC_M_STATE_CHANGE_AUTHORIZE,
     2546            arg1, arg2, arg3, 0, other_exch->phone);
     2547}
     2548
     2549bool async_state_change_receive(ipc_callid_t *callid, sysarg_t *arg1,
     2550    sysarg_t *arg2, sysarg_t *arg3)
     2551{
     2552        assert(callid);
     2553
     2554        ipc_call_t call;
     2555        *callid = async_get_call(&call);
     2556
     2557        if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE)
     2558                return false;
     2559       
     2560        if (arg1)
     2561                *arg1 = IPC_GET_ARG1(call);
     2562        if (arg2)
     2563                *arg2 = IPC_GET_ARG2(call);
     2564        if (arg3)
     2565                *arg3 = IPC_GET_ARG3(call);
     2566
     2567        return true;
     2568}
     2569
     2570int async_state_change_finalize(ipc_callid_t callid, async_exch_t *other_exch)
     2571{
     2572        return ipc_answer_1(callid, EOK, other_exch->phone);
     2573}
     2574
     2575/** Lock and get session remote state
     2576 *
     2577 * Lock and get the local replica of the remote state
     2578 * in stateful sessions. The call should be paired
     2579 * with async_remote_state_release*().
     2580 *
     2581 * @param[in] sess Stateful session.
     2582 *
     2583 * @return Local replica of the remote state.
     2584 *
     2585 */
     2586void *async_remote_state_acquire(async_sess_t *sess)
     2587{
     2588        fibril_mutex_lock(&sess->remote_state_mtx);
     2589        return sess->remote_state_data;
     2590}
     2591
     2592/** Update the session remote state
     2593 *
     2594 * Update the local replica of the remote state
     2595 * in stateful sessions. The remote state must
     2596 * be already locked.
     2597 *
     2598 * @param[in] sess  Stateful session.
     2599 * @param[in] state New local replica of the remote state.
     2600 *
     2601 */
     2602void async_remote_state_update(async_sess_t *sess, void *state)
     2603{
     2604        assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
     2605        sess->remote_state_data = state;
     2606}
     2607
     2608/** Release the session remote state
     2609 *
     2610 * Unlock the local replica of the remote state
     2611 * in stateful sessions.
     2612 *
     2613 * @param[in] sess Stateful session.
     2614 *
     2615 */
     2616void async_remote_state_release(async_sess_t *sess)
     2617{
     2618        assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
     2619       
     2620        fibril_mutex_unlock(&sess->remote_state_mtx);
     2621}
     2622
     2623/** Release the session remote state and end an exchange
     2624 *
     2625 * Unlock the local replica of the remote state
     2626 * in stateful sessions. This is convenience function
     2627 * which gets the session pointer from the exchange
     2628 * and also ends the exchange.
     2629 *
     2630 * @param[in] exch Stateful session's exchange.
     2631 *
     2632 */
     2633void async_remote_state_release_exchange(async_exch_t *exch)
     2634{
     2635        if (exch == NULL)
     2636                return;
     2637       
     2638        async_sess_t *sess = exch->sess;
     2639        assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
     2640       
     2641        async_exchange_end(exch);
     2642        fibril_mutex_unlock(&sess->remote_state_mtx);
     2643}
     2644
    24312645/** @}
    24322646 */
Note: See TracChangeset for help on using the changeset viewer.