Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • uspace/lib/c/generic/async.c

    rc7bbf029 rcff3fb6  
    4040 * programming.
    4141 *
    42  * You should be able to write very simple multithreaded programs, the async
    43  * framework will automatically take care of most synchronization problems.
     42 * You should be able to write very simple multithreaded programs. The async
     43 * framework will automatically take care of most of the synchronization
     44 * problems.
    4445 *
    4546 * Example of use (pseudo C):
     
    5354 *   int fibril1(void *arg)
    5455 *   {
    55  *     conn = async_connect_me_to();
    56  *     c1 = async_send(conn);
    57  *     c2 = async_send(conn);
     56 *     conn = async_connect_me_to(...);
     57 *
     58 *     exch = async_exchange_begin(conn);
     59 *     c1 = async_send(exch);
     60 *     async_exchange_end(exch);
     61 *
     62 *     exch = async_exchange_begin(conn);
     63 *     c2 = async_send(exch);
     64 *     async_exchange_end(exch);
     65 *
    5866 *     async_wait_for(c1);
    5967 *     async_wait_for(c2);
     
    9098#include <ipc/ipc.h>
    9199#include <async.h>
     100#include "private/async.h"
    92101#undef LIBC_ASYNC_C_
    93102
    94103#include <futex.h>
    95104#include <fibril.h>
    96 #include <stdio.h>
    97105#include <adt/hash_table.h>
    98106#include <adt/list.h>
     
    100108#include <errno.h>
    101109#include <sys/time.h>
    102 #include <arch/barrier.h>
     110#include <libarch/barrier.h>
    103111#include <bool.h>
     112#include <malloc.h>
     113#include <mem.h>
    104114#include <stdlib.h>
    105 #include <malloc.h>
    106 #include "private/async.h"
    107 
     115#include <macros.h>
     116
     117#define CLIENT_HASH_TABLE_BUCKETS  32
     118#define CONN_HASH_TABLE_BUCKETS    32
     119
     120/** Session data */
     121struct async_sess {
     122        /** List of inactive exchanges */
     123        list_t exch_list;
     124       
     125        /** Exchange management style */
     126        exch_mgmt_t mgmt;
     127       
     128        /** Session identification */
     129        int phone;
     130       
     131        /** First clone connection argument */
     132        sysarg_t arg1;
     133       
     134        /** Second clone connection argument */
     135        sysarg_t arg2;
     136       
     137        /** Third clone connection argument */
     138        sysarg_t arg3;
     139       
     140        /** Exchange mutex */
     141        fibril_mutex_t mutex;
     142       
     143        /** Number of opened exchanges */
     144        atomic_t refcnt;
     145       
     146        /** Mutex for stateful connections */
     147        fibril_mutex_t remote_state_mtx;
     148       
     149        /** Data for stateful connections */
     150        void *remote_state_data;
     151};
     152
     153/** Exchange data */
     154struct async_exch {
     155        /** Link into list of inactive exchanges */
     156        link_t sess_link;
     157       
     158        /** Link into global list of inactive exchanges */
     159        link_t global_link;
     160       
     161        /** Session pointer */
     162        async_sess_t *sess;
     163       
     164        /** Exchange identification */
     165        int phone;
     166};
     167
     168/** Async framework global futex */
    108169atomic_t async_futex = FUTEX_INITIALIZER;
    109170
     
    111172atomic_t threads_in_ipc_wait = { 0 };
    112173
    113 typedef struct {
    114         awaiter_t wdata;
    115        
    116         /** If reply was received. */
    117         bool done;
    118        
    119         /** Pointer to where the answer data is stored. */
    120         ipc_call_t *dataptr;
    121        
    122         sysarg_t retval;
    123 } amsg_t;
    124 
    125 /**
    126  * Structures of this type are used to group information about
    127  * a call and about a message queue link.
    128  */
     174/** Naming service session */
     175async_sess_t *session_ns;
     176
     177/** Call data */
    129178typedef struct {
    130179        link_t link;
     180       
    131181        ipc_callid_t callid;
    132182        ipc_call_t call;
    133183} msg_t;
    134184
     185/** Message data */
    135186typedef struct {
    136         sysarg_t in_task_hash;
     187        awaiter_t wdata;
     188       
     189        /** If reply was received. */
     190        bool done;
     191       
     192        /** Pointer to where the answer data is stored. */
     193        ipc_call_t *dataptr;
     194       
     195        sysarg_t retval;
     196} amsg_t;
     197
     198/* Client connection data */
     199typedef struct {
    137200        link_t link;
    138         int refcnt;
     201       
     202        task_id_t in_task_id;
     203        atomic_t refcnt;
    139204        void *data;
    140205} client_t;
    141206
     207/* Server connection data */
    142208typedef struct {
    143209        awaiter_t wdata;
     
    146212        link_t link;
    147213       
    148         /** Incoming client task hash. */
    149         sysarg_t in_task_hash;
     214        /** Incoming client task ID. */
     215        task_id_t in_task_id;
     216       
    150217        /** Incoming phone hash. */
    151218        sysarg_t in_phone_hash;
     
    155222       
    156223        /** Messages that should be delivered to this fibril. */
    157         link_t msg_queue;
     224        list_t msg_queue;
    158225       
    159226        /** Identification of the opening call. */
     
    161228        /** Call data of the opening call. */
    162229        ipc_call_t call;
     230        /** Local argument or NULL if none. */
     231        void *carg;
    163232       
    164233        /** Identification of the closing call. */
     
    166235       
    167236        /** Fibril function that will be used to handle the connection. */
    168         void (*cfibril)(ipc_callid_t, ipc_call_t *);
     237        async_client_conn_t cfibril;
    169238} connection_t;
    170239
    171240/** Identifier of the incoming connection handled by the current fibril. */
    172 static fibril_local connection_t *FIBRIL_connection;
     241static fibril_local connection_t *fibril_connection;
    173242
    174243static void *default_client_data_constructor(void)
     
    196265}
    197266
    198 void *async_client_data_get(void)
    199 {
    200         assert(FIBRIL_connection);
    201         return FIBRIL_connection->client->data;
    202 }
    203 
    204267/** Default fibril function that gets called to handle new connection.
    205268 *
     
    208271 * @param callid Hash of the incoming call.
    209272 * @param call   Data of the incoming call.
    210  *
    211  */
    212 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call)
     273 * @param arg    Local argument
     274 *
     275 */
     276static void default_client_connection(ipc_callid_t callid, ipc_call_t *call,
     277    void *arg)
    213278{
    214279        ipc_answer_0(callid, ENOENT);
    215280}
    216 
    217 /**
    218  * Pointer to a fibril function that will be used to handle connections.
    219  */
    220 static async_client_conn_t client_connection = default_client_connection;
    221281
    222282/** Default fibril function that gets called to handle interrupt notifications.
     
    226286 * @param callid Hash of the incoming call.
    227287 * @param call   Data of the incoming call.
     288 * @param arg    Local argument.
    228289 *
    229290 */
     
    232293}
    233294
    234 /**
    235  * Pointer to a fibril function that will be used to handle interrupt
    236  * notifications.
    237  */
    238 static async_client_conn_t interrupt_received = default_interrupt_received;
     295static async_client_conn_t client_connection = default_client_connection;
     296static async_interrupt_handler_t interrupt_received = default_interrupt_received;
     297
     298/** Setter for client_connection function pointer.
     299 *
     300 * @param conn Function that will implement a new connection fibril.
     301 *
     302 */
     303void async_set_client_connection(async_client_conn_t conn)
     304{
     305        client_connection = conn;
     306}
     307
     308/** Setter for interrupt_received function pointer.
     309 *
     310 * @param intr Function that will implement a new interrupt
     311 *             notification fibril.
     312 */
     313void async_set_interrupt_received(async_interrupt_handler_t intr)
     314{
     315        interrupt_received = intr;
     316}
     317
     318/** Mutex protecting inactive_exch_list and avail_phone_cv.
     319 *
     320 */
     321static FIBRIL_MUTEX_INITIALIZE(async_sess_mutex);
     322
     323/** List of all currently inactive exchanges.
     324 *
     325 */
     326static LIST_INITIALIZE(inactive_exch_list);
     327
     328/** Condition variable to wait for a phone to become available.
     329 *
     330 */
     331static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv);
    239332
    240333static hash_table_t client_hash_table;
     
    242335static LIST_INITIALIZE(timeout_list);
    243336
    244 #define CLIENT_HASH_TABLE_BUCKETS  32
    245 #define CONN_HASH_TABLE_BUCKETS    32
    246 
    247337static hash_index_t client_hash(unsigned long key[])
    248338{
    249339        assert(key);
     340       
    250341        return (((key[0]) >> 4) % CLIENT_HASH_TABLE_BUCKETS);
    251342}
     
    253344static int client_compare(unsigned long key[], hash_count_t keys, link_t *item)
    254345{
     346        assert(key);
     347        assert(keys == 2);
     348        assert(item);
     349       
    255350        client_t *client = hash_table_get_instance(item, client_t, link);
    256         return (key[0] == client->in_task_hash);
     351        return (key[0] == LOWER32(client->in_task_id) &&
     352            (key[1] == UPPER32(client->in_task_id)));
    257353}
    258354
     
    278374{
    279375        assert(key);
     376       
    280377        return (((key[0]) >> 4) % CONN_HASH_TABLE_BUCKETS);
    281378}
     
    292389static int conn_compare(unsigned long key[], hash_count_t keys, link_t *item)
    293390{
     391        assert(key);
     392        assert(item);
     393       
    294394        connection_t *conn = hash_table_get_instance(item, connection_t, link);
    295395        return (key[0] == conn->in_phone_hash);
     
    314414void async_insert_timeout(awaiter_t *wd)
    315415{
     416        assert(wd);
     417       
    316418        wd->to_event.occurred = false;
    317419        wd->to_event.inlist = true;
    318420       
    319         link_t *tmp = timeout_list.next;
    320         while (tmp != &timeout_list) {
     421        link_t *tmp = timeout_list.head.next;
     422        while (tmp != &timeout_list.head) {
    321423                awaiter_t *cur
    322424                    = list_get_instance(tmp, awaiter_t, to_event.link);
     
    328430        }
    329431       
    330         list_append(&wd->to_event.link, tmp);
     432        list_insert_before(&wd->to_event.link, tmp);
    331433}
    332434
     
    346448static bool route_call(ipc_callid_t callid, ipc_call_t *call)
    347449{
     450        assert(call);
     451       
    348452        futex_down(&async_futex);
    349453       
     
    400504static int notification_fibril(void *arg)
    401505{
     506        assert(arg);
     507       
    402508        msg_t *msg = (msg_t *) arg;
    403509        interrupt_received(msg->callid, &msg->call);
     
    420526static bool process_notification(ipc_callid_t callid, ipc_call_t *call)
    421527{
     528        assert(call);
     529       
    422530        futex_down(&async_futex);
    423531       
     
    458566ipc_callid_t async_get_call_timeout(ipc_call_t *call, suseconds_t usecs)
    459567{
    460         assert(FIBRIL_connection);
     568        assert(call);
     569        assert(fibril_connection);
    461570       
    462571        /* Why doing this?
    463          * GCC 4.1.0 coughs on FIBRIL_connection-> dereference.
     572         * GCC 4.1.0 coughs on fibril_connection-> dereference.
    464573         * GCC 4.1.1 happilly puts the rdhwr instruction in delay slot.
    465574         *           I would never expect to find so many errors in
    466575         *           a compiler.
    467576         */
    468         connection_t *conn = FIBRIL_connection;
     577        connection_t *conn = fibril_connection;
    469578       
    470579        futex_down(&async_futex);
     
    518627        }
    519628       
    520         msg_t *msg = list_get_instance(conn->msg_queue.next, msg_t, link);
     629        msg_t *msg = list_get_instance(list_first(&conn->msg_queue), msg_t, link);
    521630        list_remove(&msg->link);
    522631       
     
    529638}
    530639
     640static client_t *async_client_get(task_id_t client_id, bool create)
     641{
     642        unsigned long key[2] = {
     643                LOWER32(client_id),
     644                UPPER32(client_id),
     645        };
     646        client_t *client = NULL;
     647
     648        futex_down(&async_futex);
     649        link_t *lnk = hash_table_find(&client_hash_table, key);
     650        if (lnk) {
     651                client = hash_table_get_instance(lnk, client_t, link);
     652                atomic_inc(&client->refcnt);
     653        } else if (create) {
     654                client = malloc(sizeof(client_t));
     655                if (client) {
     656                        client->in_task_id = client_id;
     657                        client->data = async_client_data_create();
     658               
     659                        atomic_set(&client->refcnt, 1);
     660                        hash_table_insert(&client_hash_table, key, &client->link);
     661                }
     662        }
     663
     664        futex_up(&async_futex);
     665        return client;
     666}
     667
     668static void async_client_put(client_t *client)
     669{
     670        bool destroy;
     671        unsigned long key[2] = {
     672                LOWER32(client->in_task_id),
     673                UPPER32(client->in_task_id)
     674        };
     675       
     676        futex_down(&async_futex);
     677       
     678        if (atomic_predec(&client->refcnt) == 0) {
     679                hash_table_remove(&client_hash_table, key, 2);
     680                destroy = true;
     681        } else
     682                destroy = false;
     683       
     684        futex_up(&async_futex);
     685       
     686        if (destroy) {
     687                if (client->data)
     688                        async_client_data_destroy(client->data);
     689               
     690                free(client);
     691        }
     692}
     693
     694void *async_get_client_data(void)
     695{
     696        assert(fibril_connection);
     697        return fibril_connection->client->data;
     698}
     699
     700void *async_get_client_data_by_id(task_id_t client_id)
     701{
     702        client_t *client = async_client_get(client_id, false);
     703        if (!client)
     704                return NULL;
     705        if (!client->data) {
     706                async_client_put(client);
     707                return NULL;
     708        }
     709
     710        return client->data;
     711}
     712
     713void async_put_client_data_by_id(task_id_t client_id)
     714{
     715        client_t *client = async_client_get(client_id, false);
     716
     717        assert(client);
     718        assert(client->data);
     719
     720        /* Drop the reference we got in async_get_client_data_by_hash(). */
     721        async_client_put(client);
     722
     723        /* Drop our own reference we got at the beginning of this function. */
     724        async_client_put(client);
     725}
     726
    531727/** Wrapper for client connection fibril.
    532728 *
     
    541737static int connection_fibril(void *arg)
    542738{
     739        assert(arg);
     740       
    543741        /*
    544742         * Setup fibril-local connection pointer.
    545743         */
    546         FIBRIL_connection = (connection_t *) arg;
    547        
    548         futex_down(&async_futex);
     744        fibril_connection = (connection_t *) arg;
    549745       
    550746        /*
     
    553749         * hash in a new tracking structure.
    554750         */
    555        
    556         unsigned long key = FIBRIL_connection->in_task_hash;
    557         link_t *lnk = hash_table_find(&client_hash_table, &key);
    558        
    559         client_t *client;
    560        
    561         if (lnk) {
    562                 client = hash_table_get_instance(lnk, client_t, link);
    563                 client->refcnt++;
    564         } else {
    565                 client = malloc(sizeof(client_t));
    566                 if (!client) {
    567                         ipc_answer_0(FIBRIL_connection->callid, ENOMEM);
    568                         futex_up(&async_futex);
    569                         return 0;
    570                 }
    571                
    572                 client->in_task_hash = FIBRIL_connection->in_task_hash;
    573                
    574                 async_serialize_start();
    575                 client->data = async_client_data_create();
    576                 async_serialize_end();
    577                
    578                 client->refcnt = 1;
    579                 hash_table_insert(&client_hash_table, &key, &client->link);
    580         }
    581        
    582         futex_up(&async_futex);
    583        
    584         FIBRIL_connection->client = client;
     751
     752        client_t *client = async_client_get(fibril_connection->in_task_id, true);
     753        if (!client) {
     754                ipc_answer_0(fibril_connection->callid, ENOMEM);
     755                return 0;
     756        }
     757
     758        fibril_connection->client = client;
    585759       
    586760        /*
    587761         * Call the connection handler function.
    588762         */
    589         FIBRIL_connection->cfibril(FIBRIL_connection->callid,
    590             &FIBRIL_connection->call);
     763        fibril_connection->cfibril(fibril_connection->callid,
     764            &fibril_connection->call, fibril_connection->carg);
    591765       
    592766        /*
    593767         * Remove the reference for this client task connection.
    594768         */
    595         bool destroy;
    596        
    597         futex_down(&async_futex);
    598        
    599         if (--client->refcnt == 0) {
    600                 hash_table_remove(&client_hash_table, &key, 1);
    601                 destroy = true;
    602         } else
    603                 destroy = false;
    604        
    605         futex_up(&async_futex);
    606        
    607         if (destroy) {
    608                 if (client->data)
    609                         async_client_data_destroy(client->data);
    610                
    611                 free(client);
    612         }
     769        async_client_put(client);
    613770       
    614771        /*
     
    616773         */
    617774        futex_down(&async_futex);
    618         key = FIBRIL_connection->in_phone_hash;
     775        unsigned long key = fibril_connection->in_phone_hash;
    619776        hash_table_remove(&conn_hash_table, &key, 1);
    620777        futex_up(&async_futex);
     
    623780         * Answer all remaining messages with EHANGUP.
    624781         */
    625         while (!list_empty(&FIBRIL_connection->msg_queue)) {
     782        while (!list_empty(&fibril_connection->msg_queue)) {
    626783                msg_t *msg =
    627                     list_get_instance(FIBRIL_connection->msg_queue.next, msg_t,
    628                     link);
     784                    list_get_instance(list_first(&fibril_connection->msg_queue),
     785                    msg_t, link);
    629786               
    630787                list_remove(&msg->link);
     
    637794         * i.e. IPC_M_PHONE_HUNGUP.
    638795         */
    639         if (FIBRIL_connection->close_callid)
    640                 ipc_answer_0(FIBRIL_connection->close_callid, EOK);
    641        
    642         free(FIBRIL_connection);
     796        if (fibril_connection->close_callid)
     797                ipc_answer_0(fibril_connection->close_callid, EOK);
     798       
     799        free(fibril_connection);
    643800        return 0;
    644801}
     
    646803/** Create a new fibril for a new connection.
    647804 *
    648  * Create new fibril for connection, fill in connection structures and inserts
     805 * Create new fibril for connection, fill in connection structures and insert
    649806 * it into the hash table, so that later we can easily do routing of messages to
    650807 * particular fibrils.
    651808 *
    652  * @param in_task_hash  Identification of the incoming connection.
     809 * @param in_task_id    Identification of the incoming connection.
    653810 * @param in_phone_hash Identification of the incoming connection.
    654811 * @param callid        Hash of the opening IPC_M_CONNECT_ME_TO call.
     
    659816 * @param cfibril       Fibril function that should be called upon opening the
    660817 *                      connection.
     818 * @param carg          Extra argument to pass to the connection fibril
    661819 *
    662820 * @return New fibril id or NULL on failure.
    663821 *
    664822 */
    665 fid_t async_new_connection(sysarg_t in_task_hash, sysarg_t in_phone_hash,
     823fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,
    666824    ipc_callid_t callid, ipc_call_t *call,
    667     void (*cfibril)(ipc_callid_t, ipc_call_t *))
     825    async_client_conn_t cfibril, void *carg)
    668826{
    669827        connection_t *conn = malloc(sizeof(*conn));
     
    675833        }
    676834       
    677         conn->in_task_hash = in_task_hash;
     835        conn->in_task_id = in_task_id;
    678836        conn->in_phone_hash = in_phone_hash;
    679837        list_initialize(&conn->msg_queue);
    680838        conn->callid = callid;
    681839        conn->close_callid = 0;
     840        conn->carg = carg;
    682841       
    683842        if (call)
     
    721880static void handle_call(ipc_callid_t callid, ipc_call_t *call)
    722881{
     882        assert(call);
     883       
    723884        /* Unrouted call - take some default action */
    724885        if ((callid & IPC_CALLID_NOTIFICATION)) {
     
    731892        case IPC_M_CONNECT_ME_TO:
    732893                /* Open new connection with fibril, etc. */
    733                 async_new_connection(call->in_task_hash, IPC_GET_ARG5(*call),
    734                     callid, call, client_connection);
     894                async_new_connection(call->in_task_id, IPC_GET_ARG5(*call),
     895                    callid, call, client_connection, NULL);
    735896                return;
    736897        }
     
    752913        futex_down(&async_futex);
    753914       
    754         link_t *cur = timeout_list.next;
    755         while (cur != &timeout_list) {
     915        link_t *cur = list_first(&timeout_list);
     916        while (cur != NULL) {
    756917                awaiter_t *waiter =
    757918                    list_get_instance(cur, awaiter_t, to_event.link);
     
    759920                if (tv_gt(&waiter->to_event.expires, &tv))
    760921                        break;
    761                
    762                 cur = cur->next;
    763922               
    764923                list_remove(&waiter->to_event.link);
     
    774933                        fibril_add_ready(waiter->fid);
    775934                }
     935               
     936                cur = list_first(&timeout_list);
    776937        }
    777938       
     
    800961                suseconds_t timeout;
    801962                if (!list_empty(&timeout_list)) {
    802                         awaiter_t *waiter = list_get_instance(timeout_list.next,
    803                             awaiter_t, to_event.link);
     963                        awaiter_t *waiter = list_get_instance(
     964                            list_first(&timeout_list), awaiter_t, to_event.link);
    804965                       
    805966                        struct timeval tv;
     
    8781039void __async_init(void)
    8791040{
    880         if (!hash_table_create(&client_hash_table, CLIENT_HASH_TABLE_BUCKETS, 1,
    881             &client_hash_table_ops))
     1041        if (!hash_table_create(&client_hash_table, CLIENT_HASH_TABLE_BUCKETS,
     1042            2, &client_hash_table_ops))
    8821043                abort();
    8831044       
    884         if (!hash_table_create(&conn_hash_table, CONN_HASH_TABLE_BUCKETS, 1,
    885             &conn_hash_table_ops))
     1045        if (!hash_table_create(&conn_hash_table, CONN_HASH_TABLE_BUCKETS,
     1046            1, &conn_hash_table_ops))
    8861047                abort();
     1048       
     1049        session_ns = (async_sess_t *) malloc(sizeof(async_sess_t));
     1050        if (session_ns == NULL)
     1051                abort();
     1052       
     1053        session_ns->mgmt = EXCHANGE_ATOMIC;
     1054        session_ns->phone = PHONE_NS;
     1055        session_ns->arg1 = 0;
     1056        session_ns->arg2 = 0;
     1057        session_ns->arg3 = 0;
     1058       
     1059        fibril_mutex_initialize(&session_ns->remote_state_mtx);
     1060        session_ns->remote_state_data = NULL;
     1061       
     1062        list_initialize(&session_ns->exch_list);
     1063        fibril_mutex_initialize(&session_ns->mutex);
     1064        atomic_set(&session_ns->refcnt, 0);
    8871065}
    8881066
     
    8991077 *
    9001078 */
    901 static void reply_received(void *arg, int retval, ipc_call_t *data)
    902 {
     1079void reply_received(void *arg, int retval, ipc_call_t *data)
     1080{
     1081        assert(arg);
     1082       
    9031083        futex_down(&async_futex);
    9041084       
     
    9301110 * completion.
    9311111 *
    932  * @param phoneid Handle of the phone that will be used for the send.
    933  * @param method  Service-defined method.
     1112 * @param exch    Exchange for sending the message.
     1113 * @param imethod Service-defined interface and method.
    9341114 * @param arg1    Service-defined payload argument.
    9351115 * @param arg2    Service-defined payload argument.
     
    9421122 *
    9431123 */
    944 aid_t async_send_fast(int phoneid, sysarg_t method, sysarg_t arg1,
     1124aid_t async_send_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
    9451125    sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, ipc_call_t *dataptr)
    9461126{
     1127        if (exch == NULL)
     1128                return 0;
     1129       
    9471130        amsg_t *msg = malloc(sizeof(amsg_t));
    948        
    949         if (!msg)
     1131        if (msg == NULL)
    9501132                return 0;
    9511133       
     
    9611143        msg->wdata.active = true;
    9621144       
    963         ipc_call_async_4(phoneid, method, arg1, arg2, arg3, arg4, msg,
     1145        ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, msg,
    9641146            reply_received, true);
    9651147       
     
    9721154 * completion.
    9731155 *
    974  * @param phoneid Handle of the phone that will be used for the send.
    975  * @param method  Service-defined method.
     1156 * @param exch    Exchange for sending the message.
     1157 * @param imethod Service-defined interface and method.
    9761158 * @param arg1    Service-defined payload argument.
    9771159 * @param arg2    Service-defined payload argument.
     
    9851167 *
    9861168 */
    987 aid_t async_send_slow(int phoneid, sysarg_t method, sysarg_t arg1,
     1169aid_t async_send_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
    9881170    sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5,
    9891171    ipc_call_t *dataptr)
    9901172{
     1173        if (exch == NULL)
     1174                return 0;
     1175       
    9911176        amsg_t *msg = malloc(sizeof(amsg_t));
    9921177       
    993         if (!msg)
     1178        if (msg == NULL)
    9941179                return 0;
    9951180       
     
    10051190        msg->wdata.active = true;
    10061191       
    1007         ipc_call_async_5(phoneid, method, arg1, arg2, arg3, arg4, arg5, msg,
    1008             reply_received, true);
     1192        ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, arg5,
     1193            msg, reply_received, true);
    10091194       
    10101195        return (aid_t) msg;
     
    10201205void async_wait_for(aid_t amsgid, sysarg_t *retval)
    10211206{
     1207        assert(amsgid);
     1208       
    10221209        amsg_t *msg = (amsg_t *) amsgid;
    10231210       
     
    10561243int async_wait_timeout(aid_t amsgid, sysarg_t *retval, suseconds_t timeout)
    10571244{
     1245        assert(amsgid);
     1246       
    10581247        amsg_t *msg = (amsg_t *) amsgid;
    10591248       
     
    11241313}
    11251314
    1126 /** Setter for client_connection function pointer.
    1127  *
    1128  * @param conn Function that will implement a new connection fibril.
    1129  *
    1130  */
    1131 void async_set_client_connection(async_client_conn_t conn)
    1132 {
    1133         client_connection = conn;
    1134 }
    1135 
    1136 /** Setter for interrupt_received function pointer.
    1137  *
    1138  * @param intr Function that will implement a new interrupt
    1139  *             notification fibril.
    1140  */
    1141 void async_set_interrupt_received(async_client_conn_t intr)
    1142 {
    1143         interrupt_received = intr;
    1144 }
    1145 
    11461315/** Pseudo-synchronous message sending - fast version.
    11471316 *
     
    11511320 * transferring more arguments, see the slower async_req_slow().
    11521321 *
    1153  * @param phoneid Hash of the phone through which to make the call.
    1154  * @param method  Method of the call.
     1322 * @param exch    Exchange for sending the message.
     1323 * @param imethod Interface and method of the call.
    11551324 * @param arg1    Service-defined payload argument.
    11561325 * @param arg2    Service-defined payload argument.
     
    11661335 *
    11671336 */
    1168 sysarg_t async_req_fast(int phoneid, sysarg_t method, sysarg_t arg1,
     1337sysarg_t async_req_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
    11691338    sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t *r1, sysarg_t *r2,
    11701339    sysarg_t *r3, sysarg_t *r4, sysarg_t *r5)
    11711340{
     1341        if (exch == NULL)
     1342                return ENOENT;
     1343       
    11721344        ipc_call_t result;
    1173         aid_t eid = async_send_4(phoneid, method, arg1, arg2, arg3, arg4,
     1345        aid_t aid = async_send_4(exch, imethod, arg1, arg2, arg3, arg4,
    11741346            &result);
    11751347       
    11761348        sysarg_t rc;
    1177         async_wait_for(eid, &rc);
     1349        async_wait_for(aid, &rc);
    11781350       
    11791351        if (r1)
     
    11991371 * Send message asynchronously and return only after the reply arrives.
    12001372 *
    1201  * @param phoneid Hash of the phone through which to make the call.
    1202  * @param method  Method of the call.
     1373 * @param exch    Exchange for sending the message.
     1374 * @param imethod Interface and method of the call.
    12031375 * @param arg1    Service-defined payload argument.
    12041376 * @param arg2    Service-defined payload argument.
     
    12151387 *
    12161388 */
    1217 sysarg_t async_req_slow(int phoneid, sysarg_t method, sysarg_t arg1,
     1389sysarg_t async_req_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
    12181390    sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5, sysarg_t *r1,
    12191391    sysarg_t *r2, sysarg_t *r3, sysarg_t *r4, sysarg_t *r5)
    12201392{
     1393        if (exch == NULL)
     1394                return ENOENT;
     1395       
    12211396        ipc_call_t result;
    1222         aid_t eid = async_send_5(phoneid, method, arg1, arg2, arg3, arg4, arg5,
     1397        aid_t aid = async_send_5(exch, imethod, arg1, arg2, arg3, arg4, arg5,
    12231398            &result);
    12241399       
    12251400        sysarg_t rc;
    1226         async_wait_for(eid, &rc);
     1401        async_wait_for(aid, &rc);
    12271402       
    12281403        if (r1)
     
    12441419}
    12451420
    1246 void async_msg_0(int phone, sysarg_t imethod)
    1247 {
    1248         ipc_call_async_0(phone, imethod, NULL, NULL, true);
    1249 }
    1250 
    1251 void async_msg_1(int phone, sysarg_t imethod, sysarg_t arg1)
    1252 {
    1253         ipc_call_async_1(phone, imethod, arg1, NULL, NULL, true);
    1254 }
    1255 
    1256 void async_msg_2(int phone, sysarg_t imethod, sysarg_t arg1, sysarg_t arg2)
    1257 {
    1258         ipc_call_async_2(phone, imethod, arg1, arg2, NULL, NULL, true);
    1259 }
    1260 
    1261 void async_msg_3(int phone, sysarg_t imethod, sysarg_t arg1, sysarg_t arg2,
    1262     sysarg_t arg3)
    1263 {
    1264         ipc_call_async_3(phone, imethod, arg1, arg2, arg3, NULL, NULL, true);
    1265 }
    1266 
    1267 void async_msg_4(int phone, sysarg_t imethod, sysarg_t arg1, sysarg_t arg2,
    1268     sysarg_t arg3, sysarg_t arg4)
    1269 {
    1270         ipc_call_async_4(phone, imethod, arg1, arg2, arg3, arg4, NULL, NULL,
    1271             true);
    1272 }
    1273 
    1274 void async_msg_5(int phone, sysarg_t imethod, sysarg_t arg1, sysarg_t arg2,
    1275     sysarg_t arg3, sysarg_t arg4, sysarg_t arg5)
    1276 {
    1277         ipc_call_async_5(phone, imethod, arg1, arg2, arg3, arg4, arg5, NULL,
    1278             NULL, true);
     1421void async_msg_0(async_exch_t *exch, sysarg_t imethod)
     1422{
     1423        if (exch != NULL)
     1424                ipc_call_async_0(exch->phone, imethod, NULL, NULL, true);
     1425}
     1426
     1427void async_msg_1(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1)
     1428{
     1429        if (exch != NULL)
     1430                ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL, true);
     1431}
     1432
     1433void async_msg_2(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
     1434    sysarg_t arg2)
     1435{
     1436        if (exch != NULL)
     1437                ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL,
     1438                    true);
     1439}
     1440
     1441void async_msg_3(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
     1442    sysarg_t arg2, sysarg_t arg3)
     1443{
     1444        if (exch != NULL)
     1445                ipc_call_async_3(exch->phone, imethod, arg1, arg2, arg3, NULL,
     1446                    NULL, true);
     1447}
     1448
     1449void async_msg_4(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
     1450    sysarg_t arg2, sysarg_t arg3, sysarg_t arg4)
     1451{
     1452        if (exch != NULL)
     1453                ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4,
     1454                    NULL, NULL, true);
     1455}
     1456
     1457void async_msg_5(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,
     1458    sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5)
     1459{
     1460        if (exch != NULL)
     1461                ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4,
     1462                    arg5, NULL, NULL, true);
    12791463}
    12801464
     
    13131497}
    13141498
    1315 int async_forward_fast(ipc_callid_t callid, int phoneid, sysarg_t imethod,
    1316     sysarg_t arg1, sysarg_t arg2, unsigned int mode)
    1317 {
    1318         return ipc_forward_fast(callid, phoneid, imethod, arg1, arg2, mode);
    1319 }
    1320 
    1321 int async_forward_slow(ipc_callid_t callid, int phoneid, sysarg_t imethod,
    1322     sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5,
    1323     unsigned int mode)
    1324 {
    1325         return ipc_forward_slow(callid, phoneid, imethod, arg1, arg2, arg3, arg4,
    1326             arg5, mode);
     1499int async_forward_fast(ipc_callid_t callid, async_exch_t *exch,
     1500    sysarg_t imethod, sysarg_t arg1, sysarg_t arg2, unsigned int mode)
     1501{
     1502        if (exch == NULL)
     1503                return ENOENT;
     1504       
     1505        return ipc_forward_fast(callid, exch->phone, imethod, arg1, arg2, mode);
     1506}
     1507
     1508int async_forward_slow(ipc_callid_t callid, async_exch_t *exch,
     1509    sysarg_t imethod, sysarg_t arg1, sysarg_t arg2, sysarg_t arg3,
     1510    sysarg_t arg4, sysarg_t arg5, unsigned int mode)
     1511{
     1512        if (exch == NULL)
     1513                return ENOENT;
     1514       
     1515        return ipc_forward_slow(callid, exch->phone, imethod, arg1, arg2, arg3,
     1516            arg4, arg5, mode);
    13271517}
    13281518
     
    13311521 * Ask through phone for a new connection to some service.
    13321522 *
    1333  * @param phone           Phone handle used for contacting the other side.
     1523 * @param exch            Exchange for sending the message.
    13341524 * @param arg1            User defined argument.
    13351525 * @param arg2            User defined argument.
     
    13371527 * @param client_receiver Connection handing routine.
    13381528 *
    1339  * @return New phone handle on success or a negative error code.
    1340  *
    1341  */
    1342 int async_connect_to_me(int phone, sysarg_t arg1, sysarg_t arg2,
    1343     sysarg_t arg3, async_client_conn_t client_receiver)
    1344 {
    1345         sysarg_t task_hash;
     1529 * @return Zero on success or a negative error code.
     1530 *
     1531 */
     1532int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
     1533    sysarg_t arg3, async_client_conn_t client_receiver, void *carg)
     1534{
     1535        if (exch == NULL)
     1536                return ENOENT;
     1537       
    13461538        sysarg_t phone_hash;
    1347         int rc = async_req_3_5(phone, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
    1348             NULL, NULL, NULL, &task_hash, &phone_hash);
     1539        sysarg_t rc;
     1540
     1541        aid_t req;
     1542        ipc_call_t answer;
     1543        req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
     1544            &answer);
     1545        async_wait_for(req, &rc);
     1546        if (rc != EOK)
     1547                return (int) rc;
     1548
     1549        phone_hash = IPC_GET_ARG5(answer);
     1550
     1551        if (client_receiver != NULL)
     1552                async_new_connection(answer.in_task_id, phone_hash, 0, NULL,
     1553                    client_receiver, carg);
     1554       
     1555        return EOK;
     1556}
     1557
     1558/** Wrapper for making IPC_M_CONNECT_ME calls using the async framework.
     1559 *
     1560 * Ask through for a cloned connection to some service.
     1561 *
     1562 * @param mgmt Exchange management style.
     1563 * @param exch Exchange for sending the message.
     1564 *
     1565 * @return New session on success or NULL on error.
     1566 *
     1567 */
     1568async_sess_t *async_connect_me(exch_mgmt_t mgmt, async_exch_t *exch)
     1569{
     1570        if (exch == NULL) {
     1571                errno = ENOENT;
     1572                return NULL;
     1573        }
     1574       
     1575        async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
     1576        if (sess == NULL) {
     1577                errno = ENOMEM;
     1578                return NULL;
     1579        }
     1580       
     1581        ipc_call_t result;
     1582       
     1583        amsg_t *msg = malloc(sizeof(amsg_t));
     1584        if (msg == NULL) {
     1585                free(sess);
     1586                errno = ENOMEM;
     1587                return NULL;
     1588        }
     1589       
     1590        msg->done = false;
     1591        msg->dataptr = &result;
     1592       
     1593        msg->wdata.to_event.inlist = false;
     1594       
     1595        /*
     1596         * We may sleep in the next method,
     1597         * but it will use its own means
     1598         */
     1599        msg->wdata.active = true;
     1600       
     1601        ipc_call_async_0(exch->phone, IPC_M_CONNECT_ME, msg,
     1602            reply_received, true);
     1603       
     1604        sysarg_t rc;
     1605        async_wait_for((aid_t) msg, &rc);
     1606       
     1607        if (rc != EOK) {
     1608                errno = rc;
     1609                free(sess);
     1610                return NULL;
     1611        }
     1612       
     1613        int phone = (int) IPC_GET_ARG5(result);
     1614       
     1615        if (phone < 0) {
     1616                errno = phone;
     1617                free(sess);
     1618                return NULL;
     1619        }
     1620       
     1621        sess->mgmt = mgmt;
     1622        sess->phone = phone;
     1623        sess->arg1 = 0;
     1624        sess->arg2 = 0;
     1625        sess->arg3 = 0;
     1626       
     1627        fibril_mutex_initialize(&sess->remote_state_mtx);
     1628        sess->remote_state_data = NULL;
     1629       
     1630        list_initialize(&sess->exch_list);
     1631        fibril_mutex_initialize(&sess->mutex);
     1632        atomic_set(&sess->refcnt, 0);
     1633       
     1634        return sess;
     1635}
     1636
     1637static int async_connect_me_to_internal(int phone, sysarg_t arg1, sysarg_t arg2,
     1638    sysarg_t arg3, sysarg_t arg4)
     1639{
     1640        ipc_call_t result;
     1641       
     1642        amsg_t *msg = malloc(sizeof(amsg_t));
     1643        if (msg == NULL)
     1644                return ENOENT;
     1645       
     1646        msg->done = false;
     1647        msg->dataptr = &result;
     1648       
     1649        msg->wdata.to_event.inlist = false;
     1650       
     1651        /*
     1652         * We may sleep in the next method,
     1653         * but it will use its own means
     1654         */
     1655        msg->wdata.active = true;
     1656       
     1657        ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, arg4,
     1658            msg, reply_received, true);
     1659       
     1660        sysarg_t rc;
     1661        async_wait_for((aid_t) msg, &rc);
     1662       
    13491663        if (rc != EOK)
    13501664                return rc;
    13511665       
    1352         if (client_receiver != NULL)
    1353                 async_new_connection(task_hash, phone_hash, 0, NULL,
    1354                     client_receiver);
    1355        
    1356         return EOK;
     1666        return (int) IPC_GET_ARG5(result);
    13571667}
    13581668
    13591669/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
    13601670 *
    1361  * Ask through phone for a new connection to some service.
    1362  *
    1363  * @param phone Phone handle used for contacting the other side.
    1364  * @param arg1  User defined argument.
    1365  * @param arg2  User defined argument.
    1366  * @param arg3  User defined argument.
    1367  *
    1368  * @return New phone handle on success or a negative error code.
    1369  *
    1370  */
    1371 int async_connect_me_to(int phone, sysarg_t arg1, sysarg_t arg2,
     1671 * Ask through for a new connection to some service.
     1672 *
     1673 * @param mgmt Exchange management style.
     1674 * @param exch Exchange for sending the message.
     1675 * @param arg1 User defined argument.
     1676 * @param arg2 User defined argument.
     1677 * @param arg3 User defined argument.
     1678 *
     1679 * @return New session on success or NULL on error.
     1680 *
     1681 */
     1682async_sess_t *async_connect_me_to(exch_mgmt_t mgmt, async_exch_t *exch,
     1683    sysarg_t arg1, sysarg_t arg2, sysarg_t arg3)
     1684{
     1685        if (exch == NULL) {
     1686                errno = ENOENT;
     1687                return NULL;
     1688        }
     1689       
     1690        async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
     1691        if (sess == NULL) {
     1692                errno = ENOMEM;
     1693                return NULL;
     1694        }
     1695       
     1696        int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3,
     1697            0);
     1698       
     1699        if (phone < 0) {
     1700                errno = phone;
     1701                free(sess);
     1702                return NULL;
     1703        }
     1704       
     1705        sess->mgmt = mgmt;
     1706        sess->phone = phone;
     1707        sess->arg1 = arg1;
     1708        sess->arg2 = arg2;
     1709        sess->arg3 = arg3;
     1710       
     1711        fibril_mutex_initialize(&sess->remote_state_mtx);
     1712        sess->remote_state_data = NULL;
     1713       
     1714        list_initialize(&sess->exch_list);
     1715        fibril_mutex_initialize(&sess->mutex);
     1716        atomic_set(&sess->refcnt, 0);
     1717       
     1718        return sess;
     1719}
     1720
     1721/** Set arguments for new connections.
     1722 *
     1723 * FIXME This is an ugly hack to work around the problem that parallel
     1724 * exchanges are implemented using parallel connections. When we create
     1725 * a callback session, the framework does not know arguments for the new
     1726 * connections.
     1727 *
     1728 * The proper solution seems to be to implement parallel exchanges using
     1729 * tagging.
     1730 */
     1731void async_sess_args_set(async_sess_t *sess, sysarg_t arg1, sysarg_t arg2,
    13721732    sysarg_t arg3)
    13731733{
    1374         sysarg_t newphid;
    1375         int rc = async_req_3_5(phone, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3,
    1376             NULL, NULL, NULL, NULL, &newphid);
    1377        
    1378         if (rc != EOK)
    1379                 return rc;
    1380        
    1381         return newphid;
     1734        sess->arg1 = arg1;
     1735        sess->arg2 = arg2;
     1736        sess->arg3 = arg3;
    13821737}
    13831738
     
    13871742 * success.
    13881743 *
    1389  * @param phoneid Phone handle used for contacting the other side.
    1390  * @param arg1    User defined argument.
    1391  * @param arg2    User defined argument.
    1392  * @param arg3    User defined argument.
    1393  *
    1394  * @return New phone handle on success or a negative error code.
    1395  *
    1396  */
    1397 int async_connect_me_to_blocking(int phoneid, sysarg_t arg1, sysarg_t arg2,
    1398     sysarg_t arg3)
    1399 {
    1400         sysarg_t newphid;
    1401         int rc = async_req_4_5(phoneid, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3,
    1402             IPC_FLAG_BLOCKING, NULL, NULL, NULL, NULL, &newphid);
    1403        
    1404         if (rc != EOK)
    1405                 return rc;
    1406        
    1407         return newphid;
     1744 * @param mgmt Exchange management style.
     1745 * @param exch Exchange for sending the message.
     1746 * @param arg1 User defined argument.
     1747 * @param arg2 User defined argument.
     1748 * @param arg3 User defined argument.
     1749 *
     1750 * @return New session on success or NULL on error.
     1751 *
     1752 */
     1753async_sess_t *async_connect_me_to_blocking(exch_mgmt_t mgmt, async_exch_t *exch,
     1754    sysarg_t arg1, sysarg_t arg2, sysarg_t arg3)
     1755{
     1756        if (exch == NULL) {
     1757                errno = ENOENT;
     1758                return NULL;
     1759        }
     1760       
     1761        async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
     1762        if (sess == NULL) {
     1763                errno = ENOMEM;
     1764                return NULL;
     1765        }
     1766       
     1767        int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3,
     1768            IPC_FLAG_BLOCKING);
     1769       
     1770        if (phone < 0) {
     1771                errno = phone;
     1772                free(sess);
     1773                return NULL;
     1774        }
     1775       
     1776        sess->mgmt = mgmt;
     1777        sess->phone = phone;
     1778        sess->arg1 = arg1;
     1779        sess->arg2 = arg2;
     1780        sess->arg3 = arg3;
     1781       
     1782        fibril_mutex_initialize(&sess->remote_state_mtx);
     1783        sess->remote_state_data = NULL;
     1784       
     1785        list_initialize(&sess->exch_list);
     1786        fibril_mutex_initialize(&sess->mutex);
     1787        atomic_set(&sess->refcnt, 0);
     1788       
     1789        return sess;
    14081790}
    14091791
     
    14111793 *
    14121794 */
    1413 int async_connect_kbox(task_id_t id)
    1414 {
    1415         return ipc_connect_kbox(id);
     1795async_sess_t *async_connect_kbox(task_id_t id)
     1796{
     1797        async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
     1798        if (sess == NULL) {
     1799                errno = ENOMEM;
     1800                return NULL;
     1801        }
     1802       
     1803        int phone = ipc_connect_kbox(id);
     1804        if (phone < 0) {
     1805                errno = phone;
     1806                free(sess);
     1807                return NULL;
     1808        }
     1809       
     1810        sess->mgmt = EXCHANGE_ATOMIC;
     1811        sess->phone = phone;
     1812        sess->arg1 = 0;
     1813        sess->arg2 = 0;
     1814        sess->arg3 = 0;
     1815       
     1816        fibril_mutex_initialize(&sess->remote_state_mtx);
     1817        sess->remote_state_data = NULL;
     1818       
     1819        list_initialize(&sess->exch_list);
     1820        fibril_mutex_initialize(&sess->mutex);
     1821        atomic_set(&sess->refcnt, 0);
     1822       
     1823        return sess;
     1824}
     1825
     1826static int async_hangup_internal(int phone)
     1827{
     1828        return ipc_hangup(phone);
    14161829}
    14171830
    14181831/** Wrapper for ipc_hangup.
    14191832 *
    1420  * @param phone Phone handle to hung up.
     1833 * @param sess Session to hung up.
    14211834 *
    14221835 * @return Zero on success or a negative error code.
    14231836 *
    14241837 */
    1425 int async_hangup(int phone)
    1426 {
    1427         return ipc_hangup(phone);
     1838int async_hangup(async_sess_t *sess)
     1839{
     1840        async_exch_t *exch;
     1841       
     1842        assert(sess);
     1843       
     1844        if (atomic_get(&sess->refcnt) > 0)
     1845                return EBUSY;
     1846       
     1847        fibril_mutex_lock(&async_sess_mutex);
     1848
     1849        int rc = async_hangup_internal(sess->phone);
     1850       
     1851        while (!list_empty(&sess->exch_list)) {
     1852                exch = (async_exch_t *)
     1853                    list_get_instance(list_first(&sess->exch_list),
     1854                    async_exch_t, sess_link);
     1855               
     1856                list_remove(&exch->sess_link);
     1857                list_remove(&exch->global_link);
     1858                async_hangup_internal(exch->phone);
     1859                free(exch);
     1860        }
     1861
     1862        free(sess);
     1863       
     1864        fibril_mutex_unlock(&async_sess_mutex);
     1865       
     1866        return rc;
    14281867}
    14291868
     
    14341873}
    14351874
     1875/** Start new exchange in a session.
     1876 *
     1877 * @param session Session.
     1878 *
     1879 * @return New exchange or NULL on error.
     1880 *
     1881 */
     1882async_exch_t *async_exchange_begin(async_sess_t *sess)
     1883{
     1884        if (sess == NULL)
     1885                return NULL;
     1886       
     1887        async_exch_t *exch;
     1888       
     1889        fibril_mutex_lock(&async_sess_mutex);
     1890       
     1891        if (!list_empty(&sess->exch_list)) {
     1892                /*
     1893                 * There are inactive exchanges in the session.
     1894                 */
     1895                exch = (async_exch_t *)
     1896                    list_get_instance(list_first(&sess->exch_list),
     1897                    async_exch_t, sess_link);
     1898               
     1899                list_remove(&exch->sess_link);
     1900                list_remove(&exch->global_link);
     1901        } else {
     1902                /*
     1903                 * There are no available exchanges in the session.
     1904                 */
     1905               
     1906                if ((sess->mgmt == EXCHANGE_ATOMIC) ||
     1907                    (sess->mgmt == EXCHANGE_SERIALIZE)) {
     1908                        exch = (async_exch_t *) malloc(sizeof(async_exch_t));
     1909                        if (exch != NULL) {
     1910                                link_initialize(&exch->sess_link);
     1911                                link_initialize(&exch->global_link);
     1912                                exch->sess = sess;
     1913                                exch->phone = sess->phone;
     1914                        }
     1915                } else {  /* EXCHANGE_PARALLEL */
     1916                        /*
     1917                         * Make a one-time attempt to connect a new data phone.
     1918                         */
     1919                       
     1920                        int phone;
     1921                       
     1922retry:
     1923                        phone = async_connect_me_to_internal(sess->phone, sess->arg1,
     1924                            sess->arg2, sess->arg3, 0);
     1925                        if (phone >= 0) {
     1926                                exch = (async_exch_t *) malloc(sizeof(async_exch_t));
     1927                                if (exch != NULL) {
     1928                                        link_initialize(&exch->sess_link);
     1929                                        link_initialize(&exch->global_link);
     1930                                        exch->sess = sess;
     1931                                        exch->phone = phone;
     1932                                } else
     1933                                        async_hangup_internal(phone);
     1934                        } else if (!list_empty(&inactive_exch_list)) {
     1935                                /*
     1936                                 * We did not manage to connect a new phone. But we
     1937                                 * can try to close some of the currently inactive
     1938                                 * connections in other sessions and try again.
     1939                                 */
     1940                                exch = (async_exch_t *)
     1941                                    list_get_instance(list_first(&inactive_exch_list),
     1942                                    async_exch_t, global_link);
     1943                               
     1944                                list_remove(&exch->sess_link);
     1945                                list_remove(&exch->global_link);
     1946                                async_hangup_internal(exch->phone);
     1947                                free(exch);
     1948                                goto retry;
     1949                        } else {
     1950                                /*
     1951                                 * Wait for a phone to become available.
     1952                                 */
     1953                                fibril_condvar_wait(&avail_phone_cv, &async_sess_mutex);
     1954                                goto retry;
     1955                        }
     1956                }
     1957        }
     1958       
     1959        fibril_mutex_unlock(&async_sess_mutex);
     1960       
     1961        if (exch != NULL) {
     1962                atomic_inc(&sess->refcnt);
     1963               
     1964                if (sess->mgmt == EXCHANGE_SERIALIZE)
     1965                        fibril_mutex_lock(&sess->mutex);
     1966        }
     1967       
     1968        return exch;
     1969}
     1970
     1971/** Finish an exchange.
     1972 *
     1973 * @param exch Exchange to finish.
     1974 *
     1975 */
     1976void async_exchange_end(async_exch_t *exch)
     1977{
     1978        if (exch == NULL)
     1979                return;
     1980       
     1981        async_sess_t *sess = exch->sess;
     1982       
     1983        atomic_dec(&sess->refcnt);
     1984       
     1985        if (sess->mgmt == EXCHANGE_SERIALIZE)
     1986                fibril_mutex_unlock(&sess->mutex);
     1987       
     1988        fibril_mutex_lock(&async_sess_mutex);
     1989       
     1990        list_append(&exch->sess_link, &sess->exch_list);
     1991        list_append(&exch->global_link, &inactive_exch_list);
     1992        fibril_condvar_signal(&avail_phone_cv);
     1993       
     1994        fibril_mutex_unlock(&async_sess_mutex);
     1995}
     1996
    14361997/** Wrapper for IPC_M_SHARE_IN calls using the async framework.
    14371998 *
    1438  * @param phoneid Phone that will be used to contact the receiving side.
    1439  * @param dst     Destination address space area base.
    1440  * @param size    Size of the destination address space area.
    1441  * @param arg     User defined argument.
    1442  * @param flags   Storage for the received flags. Can be NULL.
     1999 * @param exch  Exchange for sending the message.
     2000 * @param dst   Destination address space area base.
     2001 * @param size  Size of the destination address space area.
     2002 * @param arg   User defined argument.
     2003 * @param flags Storage for the received flags. Can be NULL.
    14432004 *
    14442005 * @return Zero on success or a negative error code from errno.h.
    14452006 *
    14462007 */
    1447 int async_share_in_start(int phoneid, void *dst, size_t size, sysarg_t arg,
    1448     unsigned int *flags)
    1449 {
     2008int async_share_in_start(async_exch_t *exch, void *dst, size_t size,
     2009    sysarg_t arg, unsigned int *flags)
     2010{
     2011        if (exch == NULL)
     2012                return ENOENT;
     2013       
    14502014        sysarg_t tmp_flags;
    1451         int res = async_req_3_2(phoneid, IPC_M_SHARE_IN, (sysarg_t) dst,
     2015        int res = async_req_3_2(exch, IPC_M_SHARE_IN, (sysarg_t) dst,
    14522016            (sysarg_t) size, arg, NULL, &tmp_flags);
    14532017       
     
    15072071/** Wrapper for IPC_M_SHARE_OUT calls using the async framework.
    15082072 *
    1509  * @param phoneid Phone that will be used to contact the receiving side.
    1510  * @param src     Source address space area base address.
    1511  * @param flags   Flags to be used for sharing. Bits can be only cleared.
     2073 * @param exch  Exchange for sending the message.
     2074 * @param src   Source address space area base address.
     2075 * @param flags Flags to be used for sharing. Bits can be only cleared.
    15122076 *
    15132077 * @return Zero on success or a negative error code from errno.h.
    15142078 *
    15152079 */
    1516 int async_share_out_start(int phoneid, void *src, unsigned int flags)
    1517 {
    1518         return async_req_3_0(phoneid, IPC_M_SHARE_OUT, (sysarg_t) src, 0,
     2080int async_share_out_start(async_exch_t *exch, void *src, unsigned int flags)
     2081{
     2082        if (exch == NULL)
     2083                return ENOENT;
     2084       
     2085        return async_req_3_0(exch, IPC_M_SHARE_OUT, (sysarg_t) src, 0,
    15192086            (sysarg_t) flags);
    15202087}
     
    15692136}
    15702137
     2138/** Start IPC_M_DATA_READ using the async framework.
     2139 *
     2140 * @param exch    Exchange for sending the message.
     2141 * @param dst     Address of the beginning of the destination buffer.
     2142 * @param size    Size of the destination buffer (in bytes).
     2143 * @param dataptr Storage of call data (arg 2 holds actual data size).
     2144 *
     2145 * @return Hash of the sent message or 0 on error.
     2146 *
     2147 */
     2148aid_t async_data_read(async_exch_t *exch, void *dst, size_t size,
     2149    ipc_call_t *dataptr)
     2150{
     2151        return async_send_2(exch, IPC_M_DATA_READ, (sysarg_t) dst,
     2152            (sysarg_t) size, dataptr);
     2153}
     2154
    15712155/** Wrapper for IPC_M_DATA_READ calls using the async framework.
    15722156 *
    1573  * @param phoneid Phone that will be used to contact the receiving side.
    1574  * @param dst     Address of the beginning of the destination buffer.
    1575  * @param size    Size of the destination buffer.
    1576  * @param flags   Flags to control the data transfer.
     2157 * @param exch Exchange for sending the message.
     2158 * @param dst  Address of the beginning of the destination buffer.
     2159 * @param size Size of the destination buffer.
    15772160 *
    15782161 * @return Zero on success or a negative error code from errno.h.
    15792162 *
    15802163 */
    1581 int
    1582 async_data_read_start_generic(int phoneid, void *dst, size_t size, int flags)
    1583 {
    1584         return async_req_3_0(phoneid, IPC_M_DATA_READ, (sysarg_t) dst,
    1585             (sysarg_t) size, (sysarg_t) flags);
     2164int async_data_read_start(async_exch_t *exch, void *dst, size_t size)
     2165{
     2166        if (exch == NULL)
     2167                return ENOENT;
     2168       
     2169        return async_req_2_0(exch, IPC_M_DATA_READ, (sysarg_t) dst,
     2170            (sysarg_t) size);
    15862171}
    15872172
     
    16382223 *
    16392224 */
    1640 int async_data_read_forward_fast(int phoneid, sysarg_t method, sysarg_t arg1,
    1641     sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, ipc_call_t *dataptr)
    1642 {
     2225int async_data_read_forward_fast(async_exch_t *exch, sysarg_t imethod,
     2226    sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, sysarg_t arg4,
     2227    ipc_call_t *dataptr)
     2228{
     2229        if (exch == NULL)
     2230                return ENOENT;
     2231       
    16432232        ipc_callid_t callid;
    16442233        if (!async_data_read_receive(&callid, NULL)) {
     
    16472236        }
    16482237       
    1649         aid_t msg = async_send_fast(phoneid, method, arg1, arg2, arg3, arg4,
     2238        aid_t msg = async_send_fast(exch, imethod, arg1, arg2, arg3, arg4,
    16502239            dataptr);
    16512240        if (msg == 0) {
     
    16542243        }
    16552244       
    1656         int retval = ipc_forward_fast(callid, phoneid, 0, 0, 0,
     2245        int retval = ipc_forward_fast(callid, exch->phone, 0, 0, 0,
    16572246            IPC_FF_ROUTE_FROM_ME);
    16582247        if (retval != EOK) {
     
    16702259/** Wrapper for IPC_M_DATA_WRITE calls using the async framework.
    16712260 *
    1672  * @param phoneid Phone that will be used to contact the receiving side.
    1673  * @param src     Address of the beginning of the source buffer.
    1674  * @param size    Size of the source buffer.
    1675  * @param flags   Flags to control the data transfer.
     2261 * @param exch Exchange for sending the message.
     2262 * @param src  Address of the beginning of the source buffer.
     2263 * @param size Size of the source buffer.
    16762264 *
    16772265 * @return Zero on success or a negative error code from errno.h.
    16782266 *
    16792267 */
    1680 int
    1681 async_data_write_start_generic(int phoneid, const void *src, size_t size,
    1682     int flags)
    1683 {
    1684         return async_req_3_0(phoneid, IPC_M_DATA_WRITE, (sysarg_t) src,
    1685             (sysarg_t) size, (sysarg_t) flags);
     2268int async_data_write_start(async_exch_t *exch, const void *src, size_t size)
     2269{
     2270        if (exch == NULL)
     2271                return ENOENT;
     2272       
     2273        return async_req_2_0(exch, IPC_M_DATA_WRITE, (sysarg_t) src,
     2274            (sysarg_t) size);
    16862275}
    16872276
     
    17592348    size_t *received)
    17602349{
     2350        assert(data);
     2351       
    17612352        ipc_callid_t callid;
    17622353        size_t size;
     
    18262417 *
    18272418 */
    1828 int async_data_write_forward_fast(int phoneid, sysarg_t method, sysarg_t arg1,
    1829     sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, ipc_call_t *dataptr)
    1830 {
     2419int async_data_write_forward_fast(async_exch_t *exch, sysarg_t imethod,
     2420    sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, sysarg_t arg4,
     2421    ipc_call_t *dataptr)
     2422{
     2423        if (exch == NULL)
     2424                return ENOENT;
     2425       
    18312426        ipc_callid_t callid;
    18322427        if (!async_data_write_receive(&callid, NULL)) {
     
    18352430        }
    18362431       
    1837         aid_t msg = async_send_fast(phoneid, method, arg1, arg2, arg3, arg4,
     2432        aid_t msg = async_send_fast(exch, imethod, arg1, arg2, arg3, arg4,
    18382433            dataptr);
    18392434        if (msg == 0) {
     
    18422437        }
    18432438       
    1844         int retval = ipc_forward_fast(callid, phoneid, 0, 0, 0,
     2439        int retval = ipc_forward_fast(callid, exch->phone, 0, 0, 0,
    18452440            IPC_FF_ROUTE_FROM_ME);
    18462441        if (retval != EOK) {
     
    18562451}
    18572452
     2453/** Wrapper for sending an exchange over different exchange for cloning
     2454 *
     2455 * @param exch       Exchange to be used for sending.
     2456 * @param clone_exch Exchange to be cloned.
     2457 *
     2458 */
     2459int async_exchange_clone(async_exch_t *exch, async_exch_t *clone_exch)
     2460{
     2461        return async_req_1_0(exch, IPC_M_CONNECTION_CLONE, clone_exch->phone);
     2462}
     2463
     2464/** Wrapper for receiving the IPC_M_CONNECTION_CLONE calls.
     2465 *
     2466 * If the current call is IPC_M_CONNECTION_CLONE then a new
     2467 * async session is created for the accepted phone.
     2468 *
     2469 * @param mgmt Exchange management style.
     2470 *
     2471 * @return New async session or NULL on failure.
     2472 *
     2473 */
     2474async_sess_t *async_clone_receive(exch_mgmt_t mgmt)
     2475{
     2476        /* Accept the phone */
     2477        ipc_call_t call;
     2478        ipc_callid_t callid = async_get_call(&call);
     2479        int phone = (int) IPC_GET_ARG1(call);
     2480       
     2481        if ((IPC_GET_IMETHOD(call) != IPC_M_CONNECTION_CLONE) ||
     2482            (phone < 0)) {
     2483                async_answer_0(callid, EINVAL);
     2484                return NULL;
     2485        }
     2486       
     2487        async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
     2488        if (sess == NULL) {
     2489                async_answer_0(callid, ENOMEM);
     2490                return NULL;
     2491        }
     2492       
     2493        sess->mgmt = mgmt;
     2494        sess->phone = phone;
     2495        sess->arg1 = 0;
     2496        sess->arg2 = 0;
     2497        sess->arg3 = 0;
     2498       
     2499        fibril_mutex_initialize(&sess->remote_state_mtx);
     2500        sess->remote_state_data = NULL;
     2501       
     2502        list_initialize(&sess->exch_list);
     2503        fibril_mutex_initialize(&sess->mutex);
     2504        atomic_set(&sess->refcnt, 0);
     2505       
     2506        /* Acknowledge the cloned phone */
     2507        async_answer_0(callid, EOK);
     2508       
     2509        return sess;
     2510}
     2511
     2512/** Wrapper for receiving the IPC_M_CONNECT_TO_ME calls.
     2513 *
     2514 * If the current call is IPC_M_CONNECT_TO_ME then a new
     2515 * async session is created for the accepted phone.
     2516 *
     2517 * @param mgmt Exchange management style.
     2518 *
     2519 * @return New async session.
     2520 * @return NULL on failure.
     2521 *
     2522 */
     2523async_sess_t *async_callback_receive(exch_mgmt_t mgmt)
     2524{
     2525        /* Accept the phone */
     2526        ipc_call_t call;
     2527        ipc_callid_t callid = async_get_call(&call);
     2528        int phone = (int) IPC_GET_ARG5(call);
     2529       
     2530        if ((IPC_GET_IMETHOD(call) != IPC_M_CONNECT_TO_ME) ||
     2531            (phone < 0)) {
     2532                async_answer_0(callid, EINVAL);
     2533                return NULL;
     2534        }
     2535       
     2536        async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
     2537        if (sess == NULL) {
     2538                async_answer_0(callid, ENOMEM);
     2539                return NULL;
     2540        }
     2541       
     2542        sess->mgmt = mgmt;
     2543        sess->phone = phone;
     2544        sess->arg1 = 0;
     2545        sess->arg2 = 0;
     2546        sess->arg3 = 0;
     2547       
     2548        fibril_mutex_initialize(&sess->remote_state_mtx);
     2549        sess->remote_state_data = NULL;
     2550       
     2551        list_initialize(&sess->exch_list);
     2552        fibril_mutex_initialize(&sess->mutex);
     2553        atomic_set(&sess->refcnt, 0);
     2554       
     2555        /* Acknowledge the connected phone */
     2556        async_answer_0(callid, EOK);
     2557       
     2558        return sess;
     2559}
     2560
     2561/** Wrapper for receiving the IPC_M_CONNECT_TO_ME calls.
     2562 *
     2563 * If the call is IPC_M_CONNECT_TO_ME then a new
     2564 * async session is created. However, the phone is
     2565 * not accepted automatically.
     2566 *
     2567 * @param mgmt   Exchange management style.
     2568 * @param call   Call data.
     2569 *
     2570 * @return New async session.
     2571 * @return NULL on failure.
     2572 * @return NULL if the call is not IPC_M_CONNECT_TO_ME.
     2573 *
     2574 */
     2575async_sess_t *async_callback_receive_start(exch_mgmt_t mgmt, ipc_call_t *call)
     2576{
     2577        int phone = (int) IPC_GET_ARG5(*call);
     2578       
     2579        if ((IPC_GET_IMETHOD(*call) != IPC_M_CONNECT_TO_ME) ||
     2580            (phone < 0))
     2581                return NULL;
     2582       
     2583        async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
     2584        if (sess == NULL)
     2585                return NULL;
     2586       
     2587        sess->mgmt = mgmt;
     2588        sess->phone = phone;
     2589        sess->arg1 = 0;
     2590        sess->arg2 = 0;
     2591        sess->arg3 = 0;
     2592       
     2593        fibril_mutex_initialize(&sess->remote_state_mtx);
     2594        sess->remote_state_data = NULL;
     2595       
     2596        list_initialize(&sess->exch_list);
     2597        fibril_mutex_initialize(&sess->mutex);
     2598        atomic_set(&sess->refcnt, 0);
     2599       
     2600        return sess;
     2601}
     2602
     2603int async_state_change_start(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
     2604    sysarg_t arg3, async_exch_t *other_exch)
     2605{
     2606        return async_req_5_0(exch, IPC_M_STATE_CHANGE_AUTHORIZE,
     2607            arg1, arg2, arg3, 0, other_exch->phone);
     2608}
     2609
     2610bool async_state_change_receive(ipc_callid_t *callid, sysarg_t *arg1,
     2611    sysarg_t *arg2, sysarg_t *arg3)
     2612{
     2613        assert(callid);
     2614
     2615        ipc_call_t call;
     2616        *callid = async_get_call(&call);
     2617
     2618        if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE)
     2619                return false;
     2620       
     2621        if (arg1)
     2622                *arg1 = IPC_GET_ARG1(call);
     2623        if (arg2)
     2624                *arg2 = IPC_GET_ARG2(call);
     2625        if (arg3)
     2626                *arg3 = IPC_GET_ARG3(call);
     2627
     2628        return true;
     2629}
     2630
     2631int async_state_change_finalize(ipc_callid_t callid, async_exch_t *other_exch)
     2632{
     2633        return ipc_answer_1(callid, EOK, other_exch->phone);
     2634}
     2635
     2636/** Lock and get session remote state
     2637 *
     2638 * Lock and get the local replica of the remote state
     2639 * in stateful sessions. The call should be paired
     2640 * with async_remote_state_release*().
     2641 *
     2642 * @param[in] sess Stateful session.
     2643 *
     2644 * @return Local replica of the remote state.
     2645 *
     2646 */
     2647void *async_remote_state_acquire(async_sess_t *sess)
     2648{
     2649        fibril_mutex_lock(&sess->remote_state_mtx);
     2650        return sess->remote_state_data;
     2651}
     2652
     2653/** Update the session remote state
     2654 *
     2655 * Update the local replica of the remote state
     2656 * in stateful sessions. The remote state must
     2657 * be already locked.
     2658 *
     2659 * @param[in] sess  Stateful session.
     2660 * @param[in] state New local replica of the remote state.
     2661 *
     2662 */
     2663void async_remote_state_update(async_sess_t *sess, void *state)
     2664{
     2665        assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
     2666        sess->remote_state_data = state;
     2667}
     2668
     2669/** Release the session remote state
     2670 *
     2671 * Unlock the local replica of the remote state
     2672 * in stateful sessions.
     2673 *
     2674 * @param[in] sess Stateful session.
     2675 *
     2676 */
     2677void async_remote_state_release(async_sess_t *sess)
     2678{
     2679        assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
     2680       
     2681        fibril_mutex_unlock(&sess->remote_state_mtx);
     2682}
     2683
     2684/** Release the session remote state and end an exchange
     2685 *
     2686 * Unlock the local replica of the remote state
     2687 * in stateful sessions. This is convenience function
     2688 * which gets the session pointer from the exchange
     2689 * and also ends the exchange.
     2690 *
     2691 * @param[in] exch Stateful session's exchange.
     2692 *
     2693 */
     2694void async_remote_state_release_exchange(async_exch_t *exch)
     2695{
     2696        if (exch == NULL)
     2697                return;
     2698       
     2699        async_sess_t *sess = exch->sess;
     2700        assert(fibril_mutex_is_locked(&sess->remote_state_mtx));
     2701       
     2702        async_exchange_end(exch);
     2703        fibril_mutex_unlock(&sess->remote_state_mtx);
     2704}
     2705
    18582706/** @}
    18592707 */
Note: See TracChangeset for help on using the changeset viewer.