Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • uspace/lib/c/generic/async.c

    r8869f7b r36e2b55  
    9898#include <ipc/ipc.h>
    9999#include <async.h>
     100#include "private/async.h"
    100101#undef LIBC_ASYNC_C_
    101102
     
    107108#include <errno.h>
    108109#include <sys/time.h>
    109 #include <arch/barrier.h>
     110#include <libarch/barrier.h>
    110111#include <bool.h>
    111112#include <malloc.h>
    112113#include <mem.h>
    113114#include <stdlib.h>
    114 #include "private/async.h"
     115#include <macros.h>
    115116
    116117#define CLIENT_HASH_TABLE_BUCKETS  32
     
    138139        link_t link;
    139140       
    140         sysarg_t in_task_hash;
     141        task_id_t in_task_id;
    141142        atomic_t refcnt;
    142143        void *data;
     
    150151        link_t link;
    151152       
    152         /** Incoming client task hash. */
    153         sysarg_t in_task_hash;
     153        /** Incoming client task ID. */
     154        task_id_t in_task_id;
    154155       
    155156        /** Incoming phone hash. */
     
    160161       
    161162        /** Messages that should be delivered to this fibril. */
    162         link_t msg_queue;
     163        list_t msg_queue;
    163164       
    164165        /** Identification of the opening call. */
     
    166167        /** Call data of the opening call. */
    167168        ipc_call_t call;
     169        /** Local argument or NULL if none. */
     170        void *carg;
    168171       
    169172        /** Identification of the closing call. */
     
    171174       
    172175        /** Fibril function that will be used to handle the connection. */
    173         void (*cfibril)(ipc_callid_t, ipc_call_t *);
     176        async_client_conn_t cfibril;
    174177} connection_t;
    175178
     
    201204}
    202205
    203 void *async_get_client_data(void)
    204 {
    205         assert(fibril_connection);
    206         return fibril_connection->client->data;
    207 }
    208 
    209206/** Default fibril function that gets called to handle new connection.
    210207 *
     
    213210 * @param callid Hash of the incoming call.
    214211 * @param call   Data of the incoming call.
    215  *
    216  */
    217 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call)
     212 * @param arg    Local argument
     213 *
     214 */
     215static void default_client_connection(ipc_callid_t callid, ipc_call_t *call,
     216    void *arg)
    218217{
    219218        ipc_answer_0(callid, ENOENT);
     
    226225 * @param callid Hash of the incoming call.
    227226 * @param call   Data of the incoming call.
     227 * @param arg    Local argument.
    228228 *
    229229 */
     
    233233
    234234static async_client_conn_t client_connection = default_client_connection;
    235 static async_client_conn_t interrupt_received = default_interrupt_received;
     235static async_interrupt_handler_t interrupt_received = default_interrupt_received;
    236236
    237237/** Setter for client_connection function pointer.
     
    250250 *             notification fibril.
    251251 */
    252 void async_set_interrupt_received(async_client_conn_t intr)
     252void async_set_interrupt_received(async_interrupt_handler_t intr)
    253253{
    254254        interrupt_received = intr;
     
    284284{
    285285        assert(key);
     286        assert(keys == 2);
    286287        assert(item);
    287288       
    288289        client_t *client = hash_table_get_instance(item, client_t, link);
    289         return (key[0] == client->in_task_hash);
     290        return (key[0] == LOWER32(client->in_task_id) &&
     291            (key[1] == UPPER32(client->in_task_id)));
    290292}
    291293
     
    356358        wd->to_event.inlist = true;
    357359       
    358         link_t *tmp = timeout_list.next;
    359         while (tmp != &timeout_list) {
     360        link_t *tmp = timeout_list.head.next;
     361        while (tmp != &timeout_list.head) {
    360362                awaiter_t *cur
    361363                    = list_get_instance(tmp, awaiter_t, to_event.link);
     
    367369        }
    368370       
    369         list_append(&wd->to_event.link, tmp);
     371        list_insert_before(&wd->to_event.link, tmp);
    370372}
    371373
     
    564566        }
    565567       
    566         msg_t *msg = list_get_instance(conn->msg_queue.next, msg_t, link);
     568        msg_t *msg = list_get_instance(list_first(&conn->msg_queue), msg_t, link);
    567569        list_remove(&msg->link);
    568570       
     
    573575        futex_up(&async_futex);
    574576        return callid;
     577}
     578
     579static client_t *async_client_get(task_id_t client_id, bool create)
     580{
     581        unsigned long key[2] = {
     582                LOWER32(client_id),
     583                UPPER32(client_id),
     584        };
     585        client_t *client = NULL;
     586
     587        futex_down(&async_futex);
     588        link_t *lnk = hash_table_find(&client_hash_table, key);
     589        if (lnk) {
     590                client = hash_table_get_instance(lnk, client_t, link);
     591                atomic_inc(&client->refcnt);
     592        } else if (create) {
     593                client = malloc(sizeof(client_t));
     594                if (client) {
     595                        client->in_task_id = client_id;
     596                        client->data = async_client_data_create();
     597               
     598                        atomic_set(&client->refcnt, 1);
     599                        hash_table_insert(&client_hash_table, key, &client->link);
     600                }
     601        }
     602
     603        futex_up(&async_futex);
     604        return client;
     605}
     606
     607static void async_client_put(client_t *client)
     608{
     609        bool destroy;
     610        unsigned long key[2] = {
     611                LOWER32(client->in_task_id),
     612                UPPER32(client->in_task_id)
     613        };
     614       
     615        futex_down(&async_futex);
     616       
     617        if (atomic_predec(&client->refcnt) == 0) {
     618                hash_table_remove(&client_hash_table, key, 2);
     619                destroy = true;
     620        } else
     621                destroy = false;
     622       
     623        futex_up(&async_futex);
     624       
     625        if (destroy) {
     626                if (client->data)
     627                        async_client_data_destroy(client->data);
     628               
     629                free(client);
     630        }
     631}
     632
     633void *async_get_client_data(void)
     634{
     635        assert(fibril_connection);
     636        return fibril_connection->client->data;
     637}
     638
     639void *async_get_client_data_by_id(task_id_t client_id)
     640{
     641        client_t *client = async_client_get(client_id, false);
     642        if (!client)
     643                return NULL;
     644        if (!client->data) {
     645                async_client_put(client);
     646                return NULL;
     647        }
     648
     649        return client->data;
     650}
     651
     652void async_put_client_data_by_id(task_id_t client_id)
     653{
     654        client_t *client = async_client_get(client_id, false);
     655
     656        assert(client);
     657        assert(client->data);
     658
     659        /* Drop the reference we got in async_get_client_data_by_hash(). */
     660        async_client_put(client);
     661
     662        /* Drop our own reference we got at the beginning of this function. */
     663        async_client_put(client);
    575664}
    576665
     
    593682         */
    594683        fibril_connection = (connection_t *) arg;
    595        
    596         futex_down(&async_futex);
    597684       
    598685        /*
     
    601688         * hash in a new tracking structure.
    602689         */
    603        
    604         unsigned long key = fibril_connection->in_task_hash;
    605         link_t *lnk = hash_table_find(&client_hash_table, &key);
    606        
    607         client_t *client;
    608        
    609         if (lnk) {
    610                 client = hash_table_get_instance(lnk, client_t, link);
    611                 atomic_inc(&client->refcnt);
    612         } else {
    613                 client = malloc(sizeof(client_t));
    614                 if (!client) {
    615                         ipc_answer_0(fibril_connection->callid, ENOMEM);
    616                         futex_up(&async_futex);
    617                         return 0;
    618                 }
    619                
    620                 client->in_task_hash = fibril_connection->in_task_hash;
    621                 client->data = async_client_data_create();
    622                
    623                 atomic_set(&client->refcnt, 1);
    624                 hash_table_insert(&client_hash_table, &key, &client->link);
    625         }
    626        
    627         futex_up(&async_futex);
    628        
     690
     691        client_t *client = async_client_get(fibril_connection->in_task_id, true);
     692        if (!client) {
     693                ipc_answer_0(fibril_connection->callid, ENOMEM);
     694                return 0;
     695        }
     696
    629697        fibril_connection->client = client;
    630698       
     
    633701         */
    634702        fibril_connection->cfibril(fibril_connection->callid,
    635             &fibril_connection->call);
     703            &fibril_connection->call, fibril_connection->carg);
    636704       
    637705        /*
    638706         * Remove the reference for this client task connection.
    639707         */
    640         bool destroy;
    641        
    642         futex_down(&async_futex);
    643        
    644         if (atomic_predec(&client->refcnt) == 0) {
    645                 hash_table_remove(&client_hash_table, &key, 1);
    646                 destroy = true;
    647         } else
    648                 destroy = false;
    649        
    650         futex_up(&async_futex);
    651        
    652         if (destroy) {
    653                 if (client->data)
    654                         async_client_data_destroy(client->data);
    655                
    656                 free(client);
    657         }
     708        async_client_put(client);
    658709       
    659710        /*
     
    661712         */
    662713        futex_down(&async_futex);
    663         key = fibril_connection->in_phone_hash;
     714        unsigned long key = fibril_connection->in_phone_hash;
    664715        hash_table_remove(&conn_hash_table, &key, 1);
    665716        futex_up(&async_futex);
     
    670721        while (!list_empty(&fibril_connection->msg_queue)) {
    671722                msg_t *msg =
    672                     list_get_instance(fibril_connection->msg_queue.next, msg_t,
    673                     link);
     723                    list_get_instance(list_first(&fibril_connection->msg_queue),
     724                    msg_t, link);
    674725               
    675726                list_remove(&msg->link);
     
    695746 * particular fibrils.
    696747 *
    697  * @param in_task_hash  Identification of the incoming connection.
     748 * @param in_task_id    Identification of the incoming connection.
    698749 * @param in_phone_hash Identification of the incoming connection.
    699750 * @param callid        Hash of the opening IPC_M_CONNECT_ME_TO call.
     
    704755 * @param cfibril       Fibril function that should be called upon opening the
    705756 *                      connection.
     757 * @param carg          Extra argument to pass to the connection fibril
    706758 *
    707759 * @return New fibril id or NULL on failure.
    708760 *
    709761 */
    710 fid_t async_new_connection(sysarg_t in_task_hash, sysarg_t in_phone_hash,
     762fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,
    711763    ipc_callid_t callid, ipc_call_t *call,
    712     async_client_conn_t cfibril)
     764    async_client_conn_t cfibril, void *carg)
    713765{
    714766        connection_t *conn = malloc(sizeof(*conn));
     
    720772        }
    721773       
    722         conn->in_task_hash = in_task_hash;
     774        conn->in_task_id = in_task_id;
    723775        conn->in_phone_hash = in_phone_hash;
    724776        list_initialize(&conn->msg_queue);
    725777        conn->callid = callid;
    726778        conn->close_callid = 0;
     779        conn->carg = carg;
    727780       
    728781        if (call)
     
    778831        case IPC_M_CONNECT_ME_TO:
    779832                /* Open new connection with fibril, etc. */
    780                 async_new_connection(call->in_task_hash, IPC_GET_ARG5(*call),
    781                     callid, call, client_connection);
     833                async_new_connection(call->in_task_id, IPC_GET_ARG5(*call),
     834                    callid, call, client_connection, NULL);
    782835                return;
    783836        }
     
    799852        futex_down(&async_futex);
    800853       
    801         link_t *cur = timeout_list.next;
    802         while (cur != &timeout_list) {
     854        link_t *cur = list_first(&timeout_list);
     855        while (cur != NULL) {
    803856                awaiter_t *waiter =
    804857                    list_get_instance(cur, awaiter_t, to_event.link);
     
    806859                if (tv_gt(&waiter->to_event.expires, &tv))
    807860                        break;
    808                
    809                 cur = cur->next;
    810861               
    811862                list_remove(&waiter->to_event.link);
     
    821872                        fibril_add_ready(waiter->fid);
    822873                }
     874               
     875                cur = list_first(&timeout_list);
    823876        }
    824877       
     
    847900                suseconds_t timeout;
    848901                if (!list_empty(&timeout_list)) {
    849                         awaiter_t *waiter = list_get_instance(timeout_list.next,
    850                             awaiter_t, to_event.link);
     902                        awaiter_t *waiter = list_get_instance(
     903                            list_first(&timeout_list), awaiter_t, to_event.link);
    851904                       
    852905                        struct timeval tv;
     
    926979{
    927980        if (!hash_table_create(&client_hash_table, CLIENT_HASH_TABLE_BUCKETS,
    928             1, &client_hash_table_ops))
     981            2, &client_hash_table_ops))
    929982                abort();
    930983       
     
    942995        session_ns->arg2 = 0;
    943996        session_ns->arg3 = 0;
     997       
     998        fibril_mutex_initialize(&session_ns->remote_state_mtx);
     999        session_ns->remote_state_data = NULL;
    9441000       
    9451001        list_initialize(&session_ns->exch_list);
     
    14141470 */
    14151471int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
    1416     sysarg_t arg3, async_client_conn_t client_receiver)
     1472    sysarg_t arg3, async_client_conn_t client_receiver, void *carg)
    14171473{
    14181474        if (exch == NULL)
    14191475                return ENOENT;
    14201476       
    1421         sysarg_t task_hash;
    14221477        sysarg_t phone_hash;
    1423         int rc = async_req_3_5(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
    1424             NULL, NULL, NULL, &task_hash, &phone_hash);
     1478        sysarg_t rc;
     1479
     1480        aid_t req;
     1481        ipc_call_t answer;
     1482        req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
     1483            &answer);
     1484        async_wait_for(req, &rc);
    14251485        if (rc != EOK)
    1426                 return rc;
    1427        
     1486                return (int) rc;
     1487
     1488        phone_hash = IPC_GET_ARG5(answer);
     1489
    14281490        if (client_receiver != NULL)
    1429                 async_new_connection(task_hash, phone_hash, 0, NULL,
    1430                     client_receiver);
     1491                async_new_connection(answer.in_task_id, phone_hash, 0, NULL,
     1492                    client_receiver, carg);
    14311493       
    14321494        return EOK;
     
    15021564        sess->arg3 = 0;
    15031565       
     1566        fibril_mutex_initialize(&sess->remote_state_mtx);
     1567        sess->remote_state_data = NULL;
     1568       
    15041569        list_initialize(&sess->exch_list);
    15051570        fibril_mutex_initialize(&sess->mutex);
     
    15831648        sess->arg3 = arg3;
    15841649       
     1650        fibril_mutex_initialize(&sess->remote_state_mtx);
     1651        sess->remote_state_data = NULL;
     1652       
    15851653        list_initialize(&sess->exch_list);
    15861654        fibril_mutex_initialize(&sess->mutex);
     
    15881656       
    15891657        return sess;
     1658}
     1659
     1660/** Set arguments for new connections.
     1661 *
     1662 * FIXME This is an ugly hack to work around the problem that parallel
     1663 * exchanges are implemented using parallel connections. When we create
     1664 * a callback session, the framework does not know arguments for the new
     1665 * connections.
     1666 *
     1667 * The proper solution seems to be to implement parallel exchanges using
     1668 * tagging.
     1669 */
     1670void async_sess_args_set(async_sess_t *sess, sysarg_t arg1, sysarg_t arg2,
     1671    sysarg_t arg3)
     1672{
     1673        sess->arg1 = arg1;
     1674        sess->arg2 = arg2;
     1675        sess->arg3 = arg3;
    15901676}
    15911677
     
    16331719        sess->arg3 = arg3;
    16341720       
     1721        fibril_mutex_initialize(&sess->remote_state_mtx);
     1722        sess->remote_state_data = NULL;
     1723       
    16351724        list_initialize(&sess->exch_list);
    16361725        fibril_mutex_initialize(&sess->mutex);
     
    16641753        sess->arg3 = 0;
    16651754       
     1755        fibril_mutex_initialize(&sess->remote_state_mtx);
     1756        sess->remote_state_data = NULL;
     1757       
    16661758        list_initialize(&sess->exch_list);
    16671759        fibril_mutex_initialize(&sess->mutex);
     
    16851777int async_hangup(async_sess_t *sess)
    16861778{
     1779        async_exch_t *exch;
     1780       
    16871781        assert(sess);
    16881782       
    16891783        if (atomic_get(&sess->refcnt) > 0)
    16901784                return EBUSY;
     1785       
     1786        fibril_mutex_lock(&async_sess_mutex);
    16911787       
    16921788        int rc = async_hangup_internal(sess->phone);
    16931789        if (rc == EOK)
    16941790                free(sess);
     1791       
     1792        while (!list_empty(&sess->exch_list)) {
     1793                exch = (async_exch_t *)
     1794                    list_get_instance(list_first(&sess->exch_list),
     1795                    async_exch_t, sess_link);
     1796               
     1797                list_remove(&exch->sess_link);
     1798                list_remove(&exch->global_link);
     1799                async_hangup_internal(exch->phone);
     1800                free(exch);
     1801        }
     1802       
     1803        fibril_mutex_unlock(&async_sess_mutex);
    16951804       
    16961805        return rc;
     
    17241833                 */
    17251834                exch = (async_exch_t *)
    1726                     list_get_instance(sess->exch_list.next, async_exch_t, sess_link);
     1835                    list_get_instance(list_first(&sess->exch_list),
     1836                    async_exch_t, sess_link);
     1837               
    17271838                list_remove(&exch->sess_link);
    17281839                list_remove(&exch->global_link);
     
    17361847                        exch = (async_exch_t *) malloc(sizeof(async_exch_t));
    17371848                        if (exch != NULL) {
    1738                                 list_initialize(&exch->sess_link);
    1739                                 list_initialize(&exch->global_link);
     1849                                link_initialize(&exch->sess_link);
     1850                                link_initialize(&exch->global_link);
    17401851                                exch->sess = sess;
    17411852                                exch->phone = sess->phone;
     
    17541865                                exch = (async_exch_t *) malloc(sizeof(async_exch_t));
    17551866                                if (exch != NULL) {
    1756                                         list_initialize(&exch->sess_link);
    1757                                         list_initialize(&exch->global_link);
     1867                                        link_initialize(&exch->sess_link);
     1868                                        link_initialize(&exch->global_link);
    17581869                                        exch->sess = sess;
    17591870                                        exch->phone = phone;
     
    17671878                                 */
    17681879                                exch = (async_exch_t *)
    1769                                     list_get_instance(inactive_exch_list.next, async_exch_t,
    1770                                     global_link);
     1880                                    list_get_instance(list_first(&inactive_exch_list),
     1881                                    async_exch_t, global_link);
     1882                               
    17711883                                list_remove(&exch->sess_link);
    17721884                                list_remove(&exch->global_link);
     
    18081920        async_sess_t *sess = exch->sess;
    18091921       
     1922        atomic_dec(&sess->refcnt);
     1923       
    18101924        if (sess->mgmt == EXCHANGE_SERIALIZE)
    18111925                fibril_mutex_unlock(&sess->mutex);
     
    23222436        sess->arg3 = 0;
    23232437       
     2438        fibril_mutex_initialize(&sess->remote_state_mtx);
     2439        sess->remote_state_data = NULL;
     2440       
    23242441        list_initialize(&sess->exch_list);
    23252442        fibril_mutex_initialize(&sess->mutex);
     
    23682485        sess->arg3 = 0;
    23692486       
     2487        fibril_mutex_initialize(&sess->remote_state_mtx);
     2488        sess->remote_state_data = NULL;
     2489       
    23702490        list_initialize(&sess->exch_list);
    23712491        fibril_mutex_initialize(&sess->mutex);
     
    24102530        sess->arg3 = 0;
    24112531       
     2532        fibril_mutex_initialize(&sess->remote_state_mtx);
     2533        sess->remote_state_data = NULL;
     2534       
    24122535        list_initialize(&sess->exch_list);
    24132536        fibril_mutex_initialize(&sess->mutex);
     
    24172540}
    24182541
     2542int async_state_change_start(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
     2543    sysarg_t arg3, async_exch_t *other_exch)
     2544{
     2545        return async_req_5_0(exch, IPC_M_STATE_CHANGE_AUTHORIZE,
     2546            arg1, arg2, arg3, 0, other_exch->phone);
     2547}
     2548
     2549bool async_state_change_receive(ipc_callid_t *callid, sysarg_t *arg1,
     2550    sysarg_t *arg2, sysarg_t *arg3)
     2551{
     2552        assert(callid);
     2553
     2554        ipc_call_t call;
     2555        *callid = async_get_call(&call);
     2556
     2557        if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE)
     2558                return false;
     2559       
     2560        if (arg1)
     2561                *arg1 = IPC_GET_ARG1(call);
     2562        if (arg2)
     2563                *arg2 = IPC_GET_ARG2(call);
     2564        if (arg3)
     2565                *arg3 = IPC_GET_ARG3(call);
     2566
     2567        return true;
     2568}
     2569
     2570int async_state_change_finalize(ipc_callid_t callid, async_exch_t *other_exch)
     2571{
     2572        return ipc_answer_1(callid, EOK, other_exch->phone);
     2573}
     2574
     2575/** Lock and get session remote state
     2576 *
     2577 * Lock and get the local replica of the remote state
     2578 * in stateful sessions. The call should be paired
     2579 * with async_remote_state_release*().
     2580 *
     2581 * @param[in] sess Stateful session.
     2582 *
     2583 * @return Local replica of the remote state.
     2584 *
     2585 */
     2586void *async_remote_state_acquire(async_sess_t *sess)
     2587{
     2588        fibril_mutex_lock(&sess->remote_state_mtx);
     2589        return sess->remote_state_data;
     2590}
     2591
     2592/** Update the session remote state
     2593 *
     2594 * Update the local replica of the remote state
     2595 * in stateful sessions. The remote state must
     2596 * be already locked.
     2597 *
     2598 * @param[in] sess  Stateful session.
     2599 * @param[in] state New local replica of the remote state.
     2600 *
     2601 */
     2602void async_remote_state_update(async_sess_t *sess, void *state)
     2603{
     2604        assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
     2605        sess->remote_state_data = state;
     2606}
     2607
     2608/** Release the session remote state
     2609 *
     2610 * Unlock the local replica of the remote state
     2611 * in stateful sessions.
     2612 *
     2613 * @param[in] sess Stateful session.
     2614 *
     2615 */
     2616void async_remote_state_release(async_sess_t *sess)
     2617{
     2618        assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
     2619       
     2620        fibril_mutex_unlock(&sess->remote_state_mtx);
     2621}
     2622
     2623/** Release the session remote state and end an exchange
     2624 *
     2625 * Unlock the local replica of the remote state
     2626 * in stateful sessions. This is convenience function
     2627 * which gets the session pointer from the exchange
     2628 * and also ends the exchange.
     2629 *
     2630 * @param[in] exch Stateful session's exchange.
     2631 *
     2632 */
     2633void async_remote_state_release_exchange(async_exch_t *exch)
     2634{
     2635        if (exch == NULL)
     2636                return;
     2637       
     2638        async_sess_t *sess = exch->sess;
     2639        assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
     2640       
     2641        async_exchange_end(exch);
     2642        fibril_mutex_unlock(&sess->remote_state_mtx);
     2643}
     2644
    24192645/** @}
    24202646 */
Note: See TracChangeset for help on using the changeset viewer.