Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • uspace/lib/c/generic/async.c

    r36e2b55 r8869f7b  
    9898#include <ipc/ipc.h>
    9999#include <async.h>
    100 #include "private/async.h"
    101100#undef LIBC_ASYNC_C_
    102101
     
    108107#include <errno.h>
    109108#include <sys/time.h>
    110 #include <libarch/barrier.h>
     109#include <arch/barrier.h>
    111110#include <bool.h>
    112111#include <malloc.h>
    113112#include <mem.h>
    114113#include <stdlib.h>
    115 #include <macros.h>
     114#include "private/async.h"
    116115
    117116#define CLIENT_HASH_TABLE_BUCKETS  32
     
    139138        link_t link;
    140139       
    141         task_id_t in_task_id;
     140        sysarg_t in_task_hash;
    142141        atomic_t refcnt;
    143142        void *data;
     
    151150        link_t link;
    152151       
    153         /** Incoming client task ID. */
    154         task_id_t in_task_id;
     152        /** Incoming client task hash. */
     153        sysarg_t in_task_hash;
    155154       
    156155        /** Incoming phone hash. */
     
    161160       
    162161        /** Messages that should be delivered to this fibril. */
    163         list_t msg_queue;
     162        link_t msg_queue;
    164163       
    165164        /** Identification of the opening call. */
     
    167166        /** Call data of the opening call. */
    168167        ipc_call_t call;
    169         /** Local argument or NULL if none. */
    170         void *carg;
    171168       
    172169        /** Identification of the closing call. */
     
    174171       
    175172        /** Fibril function that will be used to handle the connection. */
    176         async_client_conn_t cfibril;
     173        void (*cfibril)(ipc_callid_t, ipc_call_t *);
    177174} connection_t;
    178175
     
    204201}
    205202
     203void *async_get_client_data(void)
     204{
     205        assert(fibril_connection);
     206        return fibril_connection->client->data;
     207}
     208
    206209/** Default fibril function that gets called to handle new connection.
    207210 *
     
    210213 * @param callid Hash of the incoming call.
    211214 * @param call   Data of the incoming call.
    212  * @param arg    Local argument
    213  *
    214  */
    215 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call,
    216     void *arg)
     215 *
     216 */
     217static void default_client_connection(ipc_callid_t callid, ipc_call_t *call)
    217218{
    218219        ipc_answer_0(callid, ENOENT);
     
    225226 * @param callid Hash of the incoming call.
    226227 * @param call   Data of the incoming call.
    227  * @param arg    Local argument.
    228228 *
    229229 */
     
    233233
    234234static async_client_conn_t client_connection = default_client_connection;
    235 static async_interrupt_handler_t interrupt_received = default_interrupt_received;
     235static async_client_conn_t interrupt_received = default_interrupt_received;
    236236
    237237/** Setter for client_connection function pointer.
     
    250250 *             notification fibril.
    251251 */
    252 void async_set_interrupt_received(async_interrupt_handler_t intr)
     252void async_set_interrupt_received(async_client_conn_t intr)
    253253{
    254254        interrupt_received = intr;
     
    284284{
    285285        assert(key);
    286         assert(keys == 2);
    287286        assert(item);
    288287       
    289288        client_t *client = hash_table_get_instance(item, client_t, link);
    290         return (key[0] == LOWER32(client->in_task_id) &&
    291             (key[1] == UPPER32(client->in_task_id)));
     289        return (key[0] == client->in_task_hash);
    292290}
    293291
     
    358356        wd->to_event.inlist = true;
    359357       
    360         link_t *tmp = timeout_list.head.next;
    361         while (tmp != &timeout_list.head) {
     358        link_t *tmp = timeout_list.next;
     359        while (tmp != &timeout_list) {
    362360                awaiter_t *cur
    363361                    = list_get_instance(tmp, awaiter_t, to_event.link);
     
    369367        }
    370368       
    371         list_insert_before(&wd->to_event.link, tmp);
     369        list_append(&wd->to_event.link, tmp);
    372370}
    373371
     
    566564        }
    567565       
    568         msg_t *msg = list_get_instance(list_first(&conn->msg_queue), msg_t, link);
     566        msg_t *msg = list_get_instance(conn->msg_queue.next, msg_t, link);
    569567        list_remove(&msg->link);
    570568       
     
    575573        futex_up(&async_futex);
    576574        return callid;
    577 }
    578 
    579 static client_t *async_client_get(task_id_t client_id, bool create)
    580 {
    581         unsigned long key[2] = {
    582                 LOWER32(client_id),
    583                 UPPER32(client_id),
    584         };
    585         client_t *client = NULL;
    586 
    587         futex_down(&async_futex);
    588         link_t *lnk = hash_table_find(&client_hash_table, key);
    589         if (lnk) {
    590                 client = hash_table_get_instance(lnk, client_t, link);
    591                 atomic_inc(&client->refcnt);
    592         } else if (create) {
    593                 client = malloc(sizeof(client_t));
    594                 if (client) {
    595                         client->in_task_id = client_id;
    596                         client->data = async_client_data_create();
    597                
    598                         atomic_set(&client->refcnt, 1);
    599                         hash_table_insert(&client_hash_table, key, &client->link);
    600                 }
    601         }
    602 
    603         futex_up(&async_futex);
    604         return client;
    605 }
    606 
    607 static void async_client_put(client_t *client)
    608 {
    609         bool destroy;
    610         unsigned long key[2] = {
    611                 LOWER32(client->in_task_id),
    612                 UPPER32(client->in_task_id)
    613         };
    614        
    615         futex_down(&async_futex);
    616        
    617         if (atomic_predec(&client->refcnt) == 0) {
    618                 hash_table_remove(&client_hash_table, key, 2);
    619                 destroy = true;
    620         } else
    621                 destroy = false;
    622        
    623         futex_up(&async_futex);
    624        
    625         if (destroy) {
    626                 if (client->data)
    627                         async_client_data_destroy(client->data);
    628                
    629                 free(client);
    630         }
    631 }
    632 
    633 void *async_get_client_data(void)
    634 {
    635         assert(fibril_connection);
    636         return fibril_connection->client->data;
    637 }
    638 
    639 void *async_get_client_data_by_id(task_id_t client_id)
    640 {
    641         client_t *client = async_client_get(client_id, false);
    642         if (!client)
    643                 return NULL;
    644         if (!client->data) {
    645                 async_client_put(client);
    646                 return NULL;
    647         }
    648 
    649         return client->data;
    650 }
    651 
    652 void async_put_client_data_by_id(task_id_t client_id)
    653 {
    654         client_t *client = async_client_get(client_id, false);
    655 
    656         assert(client);
    657         assert(client->data);
    658 
    659         /* Drop the reference we got in async_get_client_data_by_hash(). */
    660         async_client_put(client);
    661 
    662         /* Drop our own reference we got at the beginning of this function. */
    663         async_client_put(client);
    664575}
    665576
     
    682593         */
    683594        fibril_connection = (connection_t *) arg;
     595       
     596        futex_down(&async_futex);
    684597       
    685598        /*
     
    688601         * hash in a new tracking structure.
    689602         */
    690 
    691         client_t *client = async_client_get(fibril_connection->in_task_id, true);
    692         if (!client) {
    693                 ipc_answer_0(fibril_connection->callid, ENOMEM);
    694                 return 0;
    695         }
    696 
     603       
     604        unsigned long key = fibril_connection->in_task_hash;
     605        link_t *lnk = hash_table_find(&client_hash_table, &key);
     606       
     607        client_t *client;
     608       
     609        if (lnk) {
     610                client = hash_table_get_instance(lnk, client_t, link);
     611                atomic_inc(&client->refcnt);
     612        } else {
     613                client = malloc(sizeof(client_t));
     614                if (!client) {
     615                        ipc_answer_0(fibril_connection->callid, ENOMEM);
     616                        futex_up(&async_futex);
     617                        return 0;
     618                }
     619               
     620                client->in_task_hash = fibril_connection->in_task_hash;
     621                client->data = async_client_data_create();
     622               
     623                atomic_set(&client->refcnt, 1);
     624                hash_table_insert(&client_hash_table, &key, &client->link);
     625        }
     626       
     627        futex_up(&async_futex);
     628       
    697629        fibril_connection->client = client;
    698630       
     
    701633         */
    702634        fibril_connection->cfibril(fibril_connection->callid,
    703             &fibril_connection->call, fibril_connection->carg);
     635            &fibril_connection->call);
    704636       
    705637        /*
    706638         * Remove the reference for this client task connection.
    707639         */
    708         async_client_put(client);
     640        bool destroy;
     641       
     642        futex_down(&async_futex);
     643       
     644        if (atomic_predec(&client->refcnt) == 0) {
     645                hash_table_remove(&client_hash_table, &key, 1);
     646                destroy = true;
     647        } else
     648                destroy = false;
     649       
     650        futex_up(&async_futex);
     651       
     652        if (destroy) {
     653                if (client->data)
     654                        async_client_data_destroy(client->data);
     655               
     656                free(client);
     657        }
    709658       
    710659        /*
     
    712661         */
    713662        futex_down(&async_futex);
    714         unsigned long key = fibril_connection->in_phone_hash;
     663        key = fibril_connection->in_phone_hash;
    715664        hash_table_remove(&conn_hash_table, &key, 1);
    716665        futex_up(&async_futex);
     
    721670        while (!list_empty(&fibril_connection->msg_queue)) {
    722671                msg_t *msg =
    723                     list_get_instance(list_first(&fibril_connection->msg_queue),
    724                     msg_t, link);
     672                    list_get_instance(fibril_connection->msg_queue.next, msg_t,
     673                    link);
    725674               
    726675                list_remove(&msg->link);
     
    746695 * particular fibrils.
    747696 *
    748  * @param in_task_id    Identification of the incoming connection.
     697 * @param in_task_hash  Identification of the incoming connection.
    749698 * @param in_phone_hash Identification of the incoming connection.
    750699 * @param callid        Hash of the opening IPC_M_CONNECT_ME_TO call.
     
    755704 * @param cfibril       Fibril function that should be called upon opening the
    756705 *                      connection.
    757  * @param carg          Extra argument to pass to the connection fibril
    758706 *
    759707 * @return New fibril id or NULL on failure.
    760708 *
    761709 */
    762 fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,
     710fid_t async_new_connection(sysarg_t in_task_hash, sysarg_t in_phone_hash,
    763711    ipc_callid_t callid, ipc_call_t *call,
    764     async_client_conn_t cfibril, void *carg)
     712    async_client_conn_t cfibril)
    765713{
    766714        connection_t *conn = malloc(sizeof(*conn));
     
    772720        }
    773721       
    774         conn->in_task_id = in_task_id;
     722        conn->in_task_hash = in_task_hash;
    775723        conn->in_phone_hash = in_phone_hash;
    776724        list_initialize(&conn->msg_queue);
    777725        conn->callid = callid;
    778726        conn->close_callid = 0;
    779         conn->carg = carg;
    780727       
    781728        if (call)
     
    831778        case IPC_M_CONNECT_ME_TO:
    832779                /* Open new connection with fibril, etc. */
    833                 async_new_connection(call->in_task_id, IPC_GET_ARG5(*call),
    834                     callid, call, client_connection, NULL);
     780                async_new_connection(call->in_task_hash, IPC_GET_ARG5(*call),
     781                    callid, call, client_connection);
    835782                return;
    836783        }
     
    852799        futex_down(&async_futex);
    853800       
    854         link_t *cur = list_first(&timeout_list);
    855         while (cur != NULL) {
     801        link_t *cur = timeout_list.next;
     802        while (cur != &timeout_list) {
    856803                awaiter_t *waiter =
    857804                    list_get_instance(cur, awaiter_t, to_event.link);
     
    859806                if (tv_gt(&waiter->to_event.expires, &tv))
    860807                        break;
     808               
     809                cur = cur->next;
    861810               
    862811                list_remove(&waiter->to_event.link);
     
    872821                        fibril_add_ready(waiter->fid);
    873822                }
    874                
    875                 cur = list_first(&timeout_list);
    876823        }
    877824       
     
    900847                suseconds_t timeout;
    901848                if (!list_empty(&timeout_list)) {
    902                         awaiter_t *waiter = list_get_instance(
    903                             list_first(&timeout_list), awaiter_t, to_event.link);
     849                        awaiter_t *waiter = list_get_instance(timeout_list.next,
     850                            awaiter_t, to_event.link);
    904851                       
    905852                        struct timeval tv;
     
    979926{
    980927        if (!hash_table_create(&client_hash_table, CLIENT_HASH_TABLE_BUCKETS,
    981             2, &client_hash_table_ops))
     928            1, &client_hash_table_ops))
    982929                abort();
    983930       
     
    995942        session_ns->arg2 = 0;
    996943        session_ns->arg3 = 0;
    997        
    998         fibril_mutex_initialize(&session_ns->remote_state_mtx);
    999         session_ns->remote_state_data = NULL;
    1000944       
    1001945        list_initialize(&session_ns->exch_list);
     
    14701414 */
    14711415int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
    1472     sysarg_t arg3, async_client_conn_t client_receiver, void *carg)
     1416    sysarg_t arg3, async_client_conn_t client_receiver)
    14731417{
    14741418        if (exch == NULL)
    14751419                return ENOENT;
    14761420       
     1421        sysarg_t task_hash;
    14771422        sysarg_t phone_hash;
    1478         sysarg_t rc;
    1479 
    1480         aid_t req;
    1481         ipc_call_t answer;
    1482         req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
    1483             &answer);
    1484         async_wait_for(req, &rc);
     1423        int rc = async_req_3_5(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
     1424            NULL, NULL, NULL, &task_hash, &phone_hash);
    14851425        if (rc != EOK)
    1486                 return (int) rc;
    1487 
    1488         phone_hash = IPC_GET_ARG5(answer);
    1489 
     1426                return rc;
     1427       
    14901428        if (client_receiver != NULL)
    1491                 async_new_connection(answer.in_task_id, phone_hash, 0, NULL,
    1492                     client_receiver, carg);
     1429                async_new_connection(task_hash, phone_hash, 0, NULL,
     1430                    client_receiver);
    14931431       
    14941432        return EOK;
     
    15641502        sess->arg3 = 0;
    15651503       
    1566         fibril_mutex_initialize(&sess->remote_state_mtx);
    1567         sess->remote_state_data = NULL;
    1568        
    15691504        list_initialize(&sess->exch_list);
    15701505        fibril_mutex_initialize(&sess->mutex);
     
    16481583        sess->arg3 = arg3;
    16491584       
    1650         fibril_mutex_initialize(&sess->remote_state_mtx);
    1651         sess->remote_state_data = NULL;
    1652        
    16531585        list_initialize(&sess->exch_list);
    16541586        fibril_mutex_initialize(&sess->mutex);
     
    16561588       
    16571589        return sess;
    1658 }
    1659 
    1660 /** Set arguments for new connections.
    1661  *
    1662  * FIXME This is an ugly hack to work around the problem that parallel
    1663  * exchanges are implemented using parallel connections. When we create
    1664  * a callback session, the framework does not know arguments for the new
    1665  * connections.
    1666  *
    1667  * The proper solution seems to be to implement parallel exchanges using
    1668  * tagging.
    1669  */
    1670 void async_sess_args_set(async_sess_t *sess, sysarg_t arg1, sysarg_t arg2,
    1671     sysarg_t arg3)
    1672 {
    1673         sess->arg1 = arg1;
    1674         sess->arg2 = arg2;
    1675         sess->arg3 = arg3;
    16761590}
    16771591
     
    17191633        sess->arg3 = arg3;
    17201634       
    1721         fibril_mutex_initialize(&sess->remote_state_mtx);
    1722         sess->remote_state_data = NULL;
    1723        
    17241635        list_initialize(&sess->exch_list);
    17251636        fibril_mutex_initialize(&sess->mutex);
     
    17531664        sess->arg3 = 0;
    17541665       
    1755         fibril_mutex_initialize(&sess->remote_state_mtx);
    1756         sess->remote_state_data = NULL;
    1757        
    17581666        list_initialize(&sess->exch_list);
    17591667        fibril_mutex_initialize(&sess->mutex);
     
    17771685int async_hangup(async_sess_t *sess)
    17781686{
    1779         async_exch_t *exch;
    1780        
    17811687        assert(sess);
    17821688       
    17831689        if (atomic_get(&sess->refcnt) > 0)
    17841690                return EBUSY;
    1785        
    1786         fibril_mutex_lock(&async_sess_mutex);
    17871691       
    17881692        int rc = async_hangup_internal(sess->phone);
    17891693        if (rc == EOK)
    17901694                free(sess);
    1791        
    1792         while (!list_empty(&sess->exch_list)) {
    1793                 exch = (async_exch_t *)
    1794                     list_get_instance(list_first(&sess->exch_list),
    1795                     async_exch_t, sess_link);
    1796                
    1797                 list_remove(&exch->sess_link);
    1798                 list_remove(&exch->global_link);
    1799                 async_hangup_internal(exch->phone);
    1800                 free(exch);
    1801         }
    1802        
    1803         fibril_mutex_unlock(&async_sess_mutex);
    18041695       
    18051696        return rc;
     
    18331724                 */
    18341725                exch = (async_exch_t *)
    1835                     list_get_instance(list_first(&sess->exch_list),
    1836                     async_exch_t, sess_link);
    1837                
     1726                    list_get_instance(sess->exch_list.next, async_exch_t, sess_link);
    18381727                list_remove(&exch->sess_link);
    18391728                list_remove(&exch->global_link);
     
    18471736                        exch = (async_exch_t *) malloc(sizeof(async_exch_t));
    18481737                        if (exch != NULL) {
    1849                                 link_initialize(&exch->sess_link);
    1850                                 link_initialize(&exch->global_link);
     1738                                list_initialize(&exch->sess_link);
     1739                                list_initialize(&exch->global_link);
    18511740                                exch->sess = sess;
    18521741                                exch->phone = sess->phone;
     
    18651754                                exch = (async_exch_t *) malloc(sizeof(async_exch_t));
    18661755                                if (exch != NULL) {
    1867                                         link_initialize(&exch->sess_link);
    1868                                         link_initialize(&exch->global_link);
     1756                                        list_initialize(&exch->sess_link);
     1757                                        list_initialize(&exch->global_link);
    18691758                                        exch->sess = sess;
    18701759                                        exch->phone = phone;
     
    18781767                                 */
    18791768                                exch = (async_exch_t *)
    1880                                     list_get_instance(list_first(&inactive_exch_list),
    1881                                     async_exch_t, global_link);
    1882                                
     1769                                    list_get_instance(inactive_exch_list.next, async_exch_t,
     1770                                    global_link);
    18831771                                list_remove(&exch->sess_link);
    18841772                                list_remove(&exch->global_link);
     
    19201808        async_sess_t *sess = exch->sess;
    19211809       
    1922         atomic_dec(&sess->refcnt);
    1923        
    19241810        if (sess->mgmt == EXCHANGE_SERIALIZE)
    19251811                fibril_mutex_unlock(&sess->mutex);
     
    24362322        sess->arg3 = 0;
    24372323       
    2438         fibril_mutex_initialize(&sess->remote_state_mtx);
    2439         sess->remote_state_data = NULL;
    2440        
    24412324        list_initialize(&sess->exch_list);
    24422325        fibril_mutex_initialize(&sess->mutex);
     
    24852368        sess->arg3 = 0;
    24862369       
    2487         fibril_mutex_initialize(&sess->remote_state_mtx);
    2488         sess->remote_state_data = NULL;
    2489        
    24902370        list_initialize(&sess->exch_list);
    24912371        fibril_mutex_initialize(&sess->mutex);
     
    25302410        sess->arg3 = 0;
    25312411       
    2532         fibril_mutex_initialize(&sess->remote_state_mtx);
    2533         sess->remote_state_data = NULL;
    2534        
    25352412        list_initialize(&sess->exch_list);
    25362413        fibril_mutex_initialize(&sess->mutex);
     
    25402417}
    25412418
    2542 int async_state_change_start(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
    2543     sysarg_t arg3, async_exch_t *other_exch)
    2544 {
    2545         return async_req_5_0(exch, IPC_M_STATE_CHANGE_AUTHORIZE,
    2546             arg1, arg2, arg3, 0, other_exch->phone);
    2547 }
    2548 
    2549 bool async_state_change_receive(ipc_callid_t *callid, sysarg_t *arg1,
    2550     sysarg_t *arg2, sysarg_t *arg3)
    2551 {
    2552         assert(callid);
    2553 
    2554         ipc_call_t call;
    2555         *callid = async_get_call(&call);
    2556 
    2557         if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE)
    2558                 return false;
    2559        
    2560         if (arg1)
    2561                 *arg1 = IPC_GET_ARG1(call);
    2562         if (arg2)
    2563                 *arg2 = IPC_GET_ARG2(call);
    2564         if (arg3)
    2565                 *arg3 = IPC_GET_ARG3(call);
    2566 
    2567         return true;
    2568 }
    2569 
    2570 int async_state_change_finalize(ipc_callid_t callid, async_exch_t *other_exch)
    2571 {
    2572         return ipc_answer_1(callid, EOK, other_exch->phone);
    2573 }
    2574 
    2575 /** Lock and get session remote state
    2576  *
    2577  * Lock and get the local replica of the remote state
    2578  * in stateful sessions. The call should be paired
    2579  * with async_remote_state_release*().
    2580  *
    2581  * @param[in] sess Stateful session.
    2582  *
    2583  * @return Local replica of the remote state.
    2584  *
    2585  */
    2586 void *async_remote_state_acquire(async_sess_t *sess)
    2587 {
    2588         fibril_mutex_lock(&sess->remote_state_mtx);
    2589         return sess->remote_state_data;
    2590 }
    2591 
    2592 /** Update the session remote state
    2593  *
    2594  * Update the local replica of the remote state
    2595  * in stateful sessions. The remote state must
    2596  * be already locked.
    2597  *
    2598  * @param[in] sess  Stateful session.
    2599  * @param[in] state New local replica of the remote state.
    2600  *
    2601  */
    2602 void async_remote_state_update(async_sess_t *sess, void *state)
    2603 {
    2604         assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
    2605         sess->remote_state_data = state;
    2606 }
    2607 
    2608 /** Release the session remote state
    2609  *
    2610  * Unlock the local replica of the remote state
    2611  * in stateful sessions.
    2612  *
    2613  * @param[in] sess Stateful session.
    2614  *
    2615  */
    2616 void async_remote_state_release(async_sess_t *sess)
    2617 {
    2618         assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
    2619        
    2620         fibril_mutex_unlock(&sess->remote_state_mtx);
    2621 }
    2622 
    2623 /** Release the session remote state and end an exchange
    2624  *
    2625  * Unlock the local replica of the remote state
    2626  * in stateful sessions. This is convenience function
    2627  * which gets the session pointer from the exchange
    2628  * and also ends the exchange.
    2629  *
    2630  * @param[in] exch Stateful session's exchange.
    2631  *
    2632  */
    2633 void async_remote_state_release_exchange(async_exch_t *exch)
    2634 {
    2635         if (exch == NULL)
    2636                 return;
    2637        
    2638         async_sess_t *sess = exch->sess;
    2639         assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
    2640        
    2641         async_exchange_end(exch);
    2642         fibril_mutex_unlock(&sess->remote_state_mtx);
    2643 }
    2644 
    26452419/** @}
    26462420 */
Note: See TracChangeset for help on using the changeset viewer.