Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • TabularUnified uspace/lib/c/generic/async.c

    rae6021d r7f9d97f3  
    7777 *   }
    7878 *
    79  *   port_handler(icallid, *icall)
     79 *   my_client_connection(icallid, *icall)
    8080 *   {
    8181 *     if (want_refuse) {
     
    116116#include <stdlib.h>
    117117#include <macros.h>
    118 #include <as.h>
    119 #include <abi/mm/as.h>
    120118#include "private/libc.h"
    121119
     
    125123        list_t exch_list;
    126124       
    127         /** Session interface */
    128         iface_t iface;
    129        
    130125        /** Exchange management style */
    131126        exch_mgmt_t mgmt;
     
    194189        /** If reply was received. */
    195190        bool done;
    196        
     191
    197192        /** If the message / reply should be discarded on arrival. */
    198193        bool forget;
    199        
     194
    200195        /** If already destroyed. */
    201196        bool destroyed;
     
    237232        /** Identification of the opening call. */
    238233        ipc_callid_t callid;
    239        
    240234        /** Call data of the opening call. */
    241235        ipc_call_t call;
     236        /** Local argument or NULL if none. */
     237        void *carg;
    242238       
    243239        /** Identification of the closing call. */
     
    245241       
    246242        /** Fibril function that will be used to handle the connection. */
    247         async_port_handler_t handler;
    248        
    249         /** Client data */
    250         void *data;
     243        async_client_conn_t cfibril;
    251244} connection_t;
    252 
    253 /** Interface data */
    254 typedef struct {
    255         ht_link_t link;
    256        
    257         /** Interface ID */
    258         iface_t iface;
    259        
    260         /** Futex protecting the hash table */
    261         futex_t futex;
    262        
    263         /** Interface ports */
    264         hash_table_t port_hash_table;
    265        
    266         /** Next available port ID */
    267         port_id_t port_id_avail;
    268 } interface_t;
    269 
    270 /* Port data */
    271 typedef struct {
    272         ht_link_t link;
    273        
    274         /** Port ID */
    275         port_id_t id;
    276        
    277         /** Port connection handler */
    278         async_port_handler_t handler;
    279        
    280         /** Client data */
    281         void *data;
    282 } port_t;
    283245
    284246/* Notification data */
     
    302264{
    303265        struct timeval tv = { 0, 0 };
    304        
     266
    305267        to->inlist = false;
    306268        to->occurred = false;
     
    325287static amsg_t *amsg_create(void)
    326288{
    327         amsg_t *msg = malloc(sizeof(amsg_t));
     289        amsg_t *msg;
     290
     291        msg = malloc(sizeof(amsg_t));
    328292        if (msg) {
    329293                msg->done = false;
     
    334298                awaiter_initialize(&msg->wdata);
    335299        }
    336        
     300
    337301        return msg;
    338302}
     
    371335}
    372336
    373 /** Default fallback fibril function.
    374  *
    375  * This fallback fibril function gets called on incomming
    376  * connections that do not have a specific handler defined.
     337/** Default fibril function that gets called to handle new connection.
     338 *
     339 * This function is defined as a weak symbol - to be redefined in user code.
    377340 *
    378341 * @param callid Hash of the incoming call.
     
    381344 *
    382345 */
    383 static void default_fallback_port_handler(ipc_callid_t callid, ipc_call_t *call,
     346static void default_client_connection(ipc_callid_t callid, ipc_call_t *call,
    384347    void *arg)
    385348{
     
    387350}
    388351
    389 static async_port_handler_t fallback_port_handler =
    390     default_fallback_port_handler;
    391 static void *fallback_port_data = NULL;
    392 
    393 static hash_table_t interface_hash_table;
    394 
    395 static size_t interface_key_hash(void *key)
    396 {
    397         iface_t iface = *(iface_t *) key;
    398         return iface;
    399 }
    400 
    401 static size_t interface_hash(const ht_link_t *item)
    402 {
    403         interface_t *interface = hash_table_get_inst(item, interface_t, link);
    404         return interface_key_hash(&interface->iface);
    405 }
    406 
    407 static bool interface_key_equal(void *key, const ht_link_t *item)
    408 {
    409         iface_t iface = *(iface_t *) key;
    410         interface_t *interface = hash_table_get_inst(item, interface_t, link);
    411         return iface == interface->iface;
    412 }
    413 
    414 /** Operations for the port hash table. */
    415 static hash_table_ops_t interface_hash_table_ops = {
    416         .hash = interface_hash,
    417         .key_hash = interface_key_hash,
    418         .key_equal = interface_key_equal,
    419         .equal = NULL,
    420         .remove_callback = NULL
    421 };
    422 
    423 static size_t port_key_hash(void *key)
    424 {
    425         port_id_t port_id = *(port_id_t *) key;
    426         return port_id;
    427 }
    428 
    429 static size_t port_hash(const ht_link_t *item)
    430 {
    431         port_t *port = hash_table_get_inst(item, port_t, link);
    432         return port_key_hash(&port->id);
    433 }
    434 
    435 static bool port_key_equal(void *key, const ht_link_t *item)
    436 {
    437         port_id_t port_id = *(port_id_t *) key;
    438         port_t *port = hash_table_get_inst(item, port_t, link);
    439         return port_id == port->id;
    440 }
    441 
    442 /** Operations for the port hash table. */
    443 static hash_table_ops_t port_hash_table_ops = {
    444         .hash = port_hash,
    445         .key_hash = port_key_hash,
    446         .key_equal = port_key_equal,
    447         .equal = NULL,
    448         .remove_callback = NULL
    449 };
    450 
    451 static interface_t *async_new_interface(iface_t iface)
    452 {
    453         interface_t *interface =
    454             (interface_t *) malloc(sizeof(interface_t));
    455         if (!interface)
    456                 return NULL;
    457        
    458         bool ret = hash_table_create(&interface->port_hash_table, 0, 0,
    459             &port_hash_table_ops);
    460         if (!ret) {
    461                 free(interface);
    462                 return NULL;
    463         }
    464        
    465         interface->iface = iface;
    466         futex_initialize(&interface->futex, 1);
    467         interface->port_id_avail = 0;
    468        
    469         hash_table_insert(&interface_hash_table, &interface->link);
    470        
    471         return interface;
    472 }
    473 
    474 static port_t *async_new_port(interface_t *interface,
    475     async_port_handler_t handler, void *data)
    476 {
    477         port_t *port = (port_t *) malloc(sizeof(port_t));
    478         if (!port)
    479                 return NULL;
    480        
    481         futex_down(&interface->futex);
    482        
    483         port_id_t id = interface->port_id_avail;
    484         interface->port_id_avail++;
    485        
    486         port->id = id;
    487         port->handler = handler;
    488         port->data = data;
    489        
    490         hash_table_insert(&interface->port_hash_table, &port->link);
    491        
    492         futex_up(&interface->futex);
    493        
    494         return port;
     352static async_client_conn_t client_connection = default_client_connection;
     353static size_t notification_handler_stksz = FIBRIL_DFLT_STK_SIZE;
     354
     355/** Setter for client_connection function pointer.
     356 *
     357 * @param conn Function that will implement a new connection fibril.
     358 *
     359 */
     360void async_set_client_connection(async_client_conn_t conn)
     361{
     362        assert(client_connection == default_client_connection);
     363        client_connection = conn;
     364}
     365
     366/** Set the stack size for the notification handler notification fibrils.
     367 *
     368 * @param size Stack size in bytes.
     369 */
     370void async_set_notification_handler_stack_size(size_t size)
     371{
     372        notification_handler_stksz = size;
    495373}
    496374
     
    509387 */
    510388static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv);
    511 
    512 int async_create_port(iface_t iface, async_port_handler_t handler,
    513     void *data, port_id_t *port_id)
    514 {
    515         if ((iface & IFACE_MOD_MASK) == IFACE_MOD_CALLBACK)
    516                 return EINVAL;
    517        
    518         interface_t *interface;
    519        
    520         futex_down(&async_futex);
    521        
    522         ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
    523         if (link)
    524                 interface = hash_table_get_inst(link, interface_t, link);
    525         else
    526                 interface = async_new_interface(iface);
    527        
    528         if (!interface) {
    529                 futex_up(&async_futex);
    530                 return ENOMEM;
    531         }
    532        
    533         port_t *port = async_new_port(interface, handler, data);
    534         if (!port) {
    535                 futex_up(&async_futex);
    536                 return ENOMEM;
    537         }
    538        
    539         *port_id = port->id;
    540        
    541         futex_up(&async_futex);
    542        
    543         return EOK;
    544 }
    545 
    546 void async_set_fallback_port_handler(async_port_handler_t handler, void *data)
    547 {
    548         assert(handler != NULL);
    549        
    550         fallback_port_handler = handler;
    551         fallback_port_data = data;
    552 }
    553389
    554390static hash_table_t client_hash_table;
     
    621457        .remove_callback = NULL
    622458};
    623 
    624 static client_t *async_client_get(task_id_t client_id, bool create)
    625 {
    626         client_t *client = NULL;
    627        
    628         futex_down(&async_futex);
    629         ht_link_t *link = hash_table_find(&client_hash_table, &client_id);
    630         if (link) {
    631                 client = hash_table_get_inst(link, client_t, link);
    632                 atomic_inc(&client->refcnt);
    633         } else if (create) {
    634                 client = malloc(sizeof(client_t));
    635                 if (client) {
    636                         client->in_task_id = client_id;
    637                         client->data = async_client_data_create();
    638                        
    639                         atomic_set(&client->refcnt, 1);
    640                         hash_table_insert(&client_hash_table, &client->link);
    641                 }
    642         }
    643        
    644         futex_up(&async_futex);
    645         return client;
    646 }
    647 
    648 static void async_client_put(client_t *client)
    649 {
    650         bool destroy;
    651        
    652         futex_down(&async_futex);
    653        
    654         if (atomic_predec(&client->refcnt) == 0) {
    655                 hash_table_remove(&client_hash_table, &client->in_task_id);
    656                 destroy = true;
    657         } else
    658                 destroy = false;
    659        
    660         futex_up(&async_futex);
    661        
    662         if (destroy) {
    663                 if (client->data)
    664                         async_client_data_destroy(client->data);
    665                
    666                 free(client);
    667         }
    668 }
    669 
    670 /** Wrapper for client connection fibril.
    671  *
    672  * When a new connection arrives, a fibril with this implementing
    673  * function is created.
    674  *
    675  * @param arg Connection structure pointer.
    676  *
    677  * @return Always zero.
    678  *
    679  */
    680 static int connection_fibril(void *arg)
    681 {
    682         assert(arg);
    683        
    684         /*
    685          * Setup fibril-local connection pointer.
    686          */
    687         fibril_connection = (connection_t *) arg;
    688        
    689         /*
    690          * Add our reference for the current connection in the client task
    691          * tracking structure. If this is the first reference, create and
    692          * hash in a new tracking structure.
    693          */
    694        
    695         client_t *client = async_client_get(fibril_connection->in_task_id, true);
    696         if (!client) {
    697                 ipc_answer_0(fibril_connection->callid, ENOMEM);
    698                 return 0;
    699         }
    700        
    701         fibril_connection->client = client;
    702        
    703         /*
    704          * Call the connection handler function.
    705          */
    706         fibril_connection->handler(fibril_connection->callid,
    707             &fibril_connection->call, fibril_connection->data);
    708        
    709         /*
    710          * Remove the reference for this client task connection.
    711          */
    712         async_client_put(client);
    713        
    714         /*
    715          * Remove myself from the connection hash table.
    716          */
    717         futex_down(&async_futex);
    718         hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash);
    719         futex_up(&async_futex);
    720        
    721         /*
    722          * Answer all remaining messages with EHANGUP.
    723          */
    724         while (!list_empty(&fibril_connection->msg_queue)) {
    725                 msg_t *msg =
    726                     list_get_instance(list_first(&fibril_connection->msg_queue),
    727                     msg_t, link);
    728                
    729                 list_remove(&msg->link);
    730                 ipc_answer_0(msg->callid, EHANGUP);
    731                 free(msg);
    732         }
    733        
    734         /*
    735          * If the connection was hung-up, answer the last call,
    736          * i.e. IPC_M_PHONE_HUNGUP.
    737          */
    738         if (fibril_connection->close_callid)
    739                 ipc_answer_0(fibril_connection->close_callid, EOK);
    740        
    741         free(fibril_connection);
    742         return 0;
    743 }
    744 
    745 /** Create a new fibril for a new connection.
    746  *
    747  * Create new fibril for connection, fill in connection structures
    748  * and insert it into the hash table, so that later we can easily
    749  * do routing of messages to particular fibrils.
    750  *
    751  * @param in_task_id    Identification of the incoming connection.
    752  * @param in_phone_hash Identification of the incoming connection.
    753  * @param callid        Hash of the opening IPC_M_CONNECT_ME_TO call.
    754  *                      If callid is zero, the connection was opened by
    755  *                      accepting the IPC_M_CONNECT_TO_ME call and this
    756  *                      function is called directly by the server.
    757  * @param call          Call data of the opening call.
    758  * @param handler       Connection handler.
    759  * @param data          Client argument to pass to the connection handler.
    760  *
    761  * @return New fibril id or NULL on failure.
    762  *
    763  */
    764 static fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,
    765     ipc_callid_t callid, ipc_call_t *call, async_port_handler_t handler,
    766     void *data)
    767 {
    768         connection_t *conn = malloc(sizeof(*conn));
    769         if (!conn) {
    770                 if (callid)
    771                         ipc_answer_0(callid, ENOMEM);
    772                
    773                 return (uintptr_t) NULL;
    774         }
    775        
    776         conn->in_task_id = in_task_id;
    777         conn->in_phone_hash = in_phone_hash;
    778         list_initialize(&conn->msg_queue);
    779         conn->callid = callid;
    780         conn->close_callid = 0;
    781         conn->handler = handler;
    782         conn->data = data;
    783        
    784         if (call)
    785                 conn->call = *call;
    786        
    787         /* We will activate the fibril ASAP */
    788         conn->wdata.active = true;
    789         conn->wdata.fid = fibril_create(connection_fibril, conn);
    790        
    791         if (conn->wdata.fid == 0) {
    792                 free(conn);
    793                
    794                 if (callid)
    795                         ipc_answer_0(callid, ENOMEM);
    796                
    797                 return (uintptr_t) NULL;
    798         }
    799        
    800         /* Add connection to the connection hash table */
    801        
    802         futex_down(&async_futex);
    803         hash_table_insert(&conn_hash_table, &conn->link);
    804         futex_up(&async_futex);
    805        
    806         fibril_add_ready(conn->wdata.fid);
    807        
    808         return conn->wdata.fid;
    809 }
    810 
    811 /** Wrapper for making IPC_M_CONNECT_TO_ME calls using the async framework.
    812  *
    813  * Ask through phone for a new connection to some service.
    814  *
    815  * @param exch    Exchange for sending the message.
    816  * @param iface   Callback interface.
    817  * @param arg1    User defined argument.
    818  * @param arg2    User defined argument.
    819  * @param handler Callback handler.
    820  * @param data    Handler data.
    821  * @param port_id ID of the newly created port.
    822  *
    823  * @return Zero on success or a negative error code.
    824  *
    825  */
    826 int async_create_callback_port(async_exch_t *exch, iface_t iface, sysarg_t arg1,
    827     sysarg_t arg2, async_port_handler_t handler, void *data, port_id_t *port_id)
    828 {
    829         if ((iface & IFACE_MOD_CALLBACK) != IFACE_MOD_CALLBACK)
    830                 return EINVAL;
    831        
    832         if (exch == NULL)
    833                 return ENOENT;
    834        
    835         ipc_call_t answer;
    836         aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, iface, arg1, arg2,
    837             &answer);
    838        
    839         sysarg_t ret;
    840         async_wait_for(req, &ret);
    841         if (ret != EOK)
    842                 return (int) ret;
    843        
    844         sysarg_t phone_hash = IPC_GET_ARG5(answer);
    845         interface_t *interface;
    846        
    847         futex_down(&async_futex);
    848        
    849         ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
    850         if (link)
    851                 interface = hash_table_get_inst(link, interface_t, link);
    852         else
    853                 interface = async_new_interface(iface);
    854        
    855         if (!interface) {
    856                 futex_up(&async_futex);
    857                 return ENOMEM;
    858         }
    859        
    860         port_t *port = async_new_port(interface, handler, data);
    861         if (!port) {
    862                 futex_up(&async_futex);
    863                 return ENOMEM;
    864         }
    865        
    866         *port_id = port->id;
    867        
    868         futex_up(&async_futex);
    869        
    870         fid_t fid = async_new_connection(answer.in_task_id, phone_hash,
    871             0, NULL, handler, data);
    872         if (fid == (uintptr_t) NULL)
    873                 return ENOMEM;
    874        
    875         return EOK;
    876 }
    877459
    878460static size_t notification_key_hash(void *key)
     
    989571}
    990572
    991 /** Process notification.
    992  *
    993  * @param callid Hash of the incoming call.
    994  * @param call   Data of the incoming call.
    995  *
    996  */
    997 static void process_notification(ipc_callid_t callid, ipc_call_t *call)
    998 {
     573/** Notification fibril.
     574 *
     575 * When a notification arrives, a fibril with this implementing function is
     576 * created. It calls the corresponding notification handler and does the final
     577 * cleanup.
     578 *
     579 * @param arg Message structure pointer.
     580 *
     581 * @return Always zero.
     582 *
     583 */
     584static int notification_fibril(void *arg)
     585{
     586        assert(arg);
     587       
     588        msg_t *msg = (msg_t *) arg;
    999589        async_notification_handler_t handler = NULL;
    1000590        void *data = NULL;
    1001 
    1002         assert(call);
    1003591       
    1004592        futex_down(&async_futex);
    1005593       
    1006594        ht_link_t *link = hash_table_find(&notification_hash_table,
    1007             &IPC_GET_IMETHOD(*call));
     595            &IPC_GET_IMETHOD(msg->call));
    1008596        if (link) {
    1009597                notification_t *notification =
     
    1016604       
    1017605        if (handler)
    1018                 handler(callid, call, data);
     606                handler(msg->callid, &msg->call, data);
     607       
     608        free(msg);
     609        return 0;
     610}
     611
     612/** Process notification.
     613 *
     614 * A new fibril is created which would process the notification.
     615 *
     616 * @param callid Hash of the incoming call.
     617 * @param call   Data of the incoming call.
     618 *
     619 * @return False if an error occured.
     620 *         True if the call was passed to the notification fibril.
     621 *
     622 */
     623static bool process_notification(ipc_callid_t callid, ipc_call_t *call)
     624{
     625        assert(call);
     626       
     627        futex_down(&async_futex);
     628       
     629        msg_t *msg = malloc(sizeof(*msg));
     630        if (!msg) {
     631                futex_up(&async_futex);
     632                return false;
     633        }
     634       
     635        msg->callid = callid;
     636        msg->call = *call;
     637       
     638        fid_t fid = fibril_create_generic(notification_fibril, msg,
     639            notification_handler_stksz);
     640        if (fid == 0) {
     641                free(msg);
     642                futex_up(&async_futex);
     643                return false;
     644        }
     645       
     646        fibril_add_ready(fid);
     647       
     648        futex_up(&async_futex);
     649        return true;
    1019650}
    1020651
     
    1235866        }
    1236867       
    1237         msg_t *msg = list_get_instance(list_first(&conn->msg_queue),
    1238             msg_t, link);
     868        msg_t *msg = list_get_instance(list_first(&conn->msg_queue), msg_t, link);
    1239869        list_remove(&msg->link);
    1240870       
     
    1247877}
    1248878
     879static client_t *async_client_get(task_id_t client_id, bool create)
     880{
     881        client_t *client = NULL;
     882
     883        futex_down(&async_futex);
     884        ht_link_t *link = hash_table_find(&client_hash_table, &client_id);
     885        if (link) {
     886                client = hash_table_get_inst(link, client_t, link);
     887                atomic_inc(&client->refcnt);
     888        } else if (create) {
     889                client = malloc(sizeof(client_t));
     890                if (client) {
     891                        client->in_task_id = client_id;
     892                        client->data = async_client_data_create();
     893               
     894                        atomic_set(&client->refcnt, 1);
     895                        hash_table_insert(&client_hash_table, &client->link);
     896                }
     897        }
     898
     899        futex_up(&async_futex);
     900        return client;
     901}
     902
     903static void async_client_put(client_t *client)
     904{
     905        bool destroy;
     906
     907        futex_down(&async_futex);
     908       
     909        if (atomic_predec(&client->refcnt) == 0) {
     910                hash_table_remove(&client_hash_table, &client->in_task_id);
     911                destroy = true;
     912        } else
     913                destroy = false;
     914       
     915        futex_up(&async_futex);
     916       
     917        if (destroy) {
     918                if (client->data)
     919                        async_client_data_destroy(client->data);
     920               
     921                free(client);
     922        }
     923}
     924
    1249925void *async_get_client_data(void)
    1250926{
     
    1258934        if (!client)
    1259935                return NULL;
    1260        
    1261936        if (!client->data) {
    1262937                async_client_put(client);
    1263938                return NULL;
    1264939        }
    1265        
     940
    1266941        return client->data;
    1267942}
     
    1270945{
    1271946        client_t *client = async_client_get(client_id, false);
    1272        
     947
    1273948        assert(client);
    1274949        assert(client->data);
    1275        
     950
    1276951        /* Drop the reference we got in async_get_client_data_by_hash(). */
    1277952        async_client_put(client);
    1278        
     953
    1279954        /* Drop our own reference we got at the beginning of this function. */
    1280955        async_client_put(client);
    1281956}
    1282957
    1283 static port_t *async_find_port(iface_t iface, port_id_t port_id)
    1284 {
    1285         port_t *port = NULL;
    1286        
     958/** Wrapper for client connection fibril.
     959 *
     960 * When a new connection arrives, a fibril with this implementing function is
     961 * created. It calls client_connection() and does the final cleanup.
     962 *
     963 * @param arg Connection structure pointer.
     964 *
     965 * @return Always zero.
     966 *
     967 */
     968static int connection_fibril(void *arg)
     969{
     970        assert(arg);
     971       
     972        /*
     973         * Setup fibril-local connection pointer.
     974         */
     975        fibril_connection = (connection_t *) arg;
     976       
     977        /*
     978         * Add our reference for the current connection in the client task
     979         * tracking structure. If this is the first reference, create and
     980         * hash in a new tracking structure.
     981         */
     982
     983        client_t *client = async_client_get(fibril_connection->in_task_id, true);
     984        if (!client) {
     985                ipc_answer_0(fibril_connection->callid, ENOMEM);
     986                return 0;
     987        }
     988
     989        fibril_connection->client = client;
     990       
     991        /*
     992         * Call the connection handler function.
     993         */
     994        fibril_connection->cfibril(fibril_connection->callid,
     995            &fibril_connection->call, fibril_connection->carg);
     996       
     997        /*
     998         * Remove the reference for this client task connection.
     999         */
     1000        async_client_put(client);
     1001       
     1002        /*
     1003         * Remove myself from the connection hash table.
     1004         */
    12871005        futex_down(&async_futex);
    1288        
    1289         ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
    1290         if (link) {
    1291                 interface_t *interface =
    1292                     hash_table_get_inst(link, interface_t, link);
     1006        hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash);
     1007        futex_up(&async_futex);
     1008       
     1009        /*
     1010         * Answer all remaining messages with EHANGUP.
     1011         */
     1012        while (!list_empty(&fibril_connection->msg_queue)) {
     1013                msg_t *msg =
     1014                    list_get_instance(list_first(&fibril_connection->msg_queue),
     1015                    msg_t, link);
    12931016               
    1294                 link = hash_table_find(&interface->port_hash_table, &port_id);
    1295                 if (link)
    1296                         port = hash_table_get_inst(link, port_t, link);
    1297         }
    1298        
     1017                list_remove(&msg->link);
     1018                ipc_answer_0(msg->callid, EHANGUP);
     1019                free(msg);
     1020        }
     1021       
     1022        /*
     1023         * If the connection was hung-up, answer the last call,
     1024         * i.e. IPC_M_PHONE_HUNGUP.
     1025         */
     1026        if (fibril_connection->close_callid)
     1027                ipc_answer_0(fibril_connection->close_callid, EOK);
     1028       
     1029        free(fibril_connection);
     1030        return 0;
     1031}
     1032
     1033/** Create a new fibril for a new connection.
     1034 *
     1035 * Create new fibril for connection, fill in connection structures and insert
     1036 * it into the hash table, so that later we can easily do routing of messages to
     1037 * particular fibrils.
     1038 *
     1039 * @param in_task_id    Identification of the incoming connection.
     1040 * @param in_phone_hash Identification of the incoming connection.
     1041 * @param callid        Hash of the opening IPC_M_CONNECT_ME_TO call.
     1042 *                      If callid is zero, the connection was opened by
     1043 *                      accepting the IPC_M_CONNECT_TO_ME call and this function
     1044 *                      is called directly by the server.
     1045 * @param call          Call data of the opening call.
     1046 * @param cfibril       Fibril function that should be called upon opening the
     1047 *                      connection.
     1048 * @param carg          Extra argument to pass to the connection fibril
     1049 *
     1050 * @return New fibril id or NULL on failure.
     1051 *
     1052 */
     1053fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,
     1054    ipc_callid_t callid, ipc_call_t *call,
     1055    async_client_conn_t cfibril, void *carg)
     1056{
     1057        connection_t *conn = malloc(sizeof(*conn));
     1058        if (!conn) {
     1059                if (callid)
     1060                        ipc_answer_0(callid, ENOMEM);
     1061               
     1062                return (uintptr_t) NULL;
     1063        }
     1064       
     1065        conn->in_task_id = in_task_id;
     1066        conn->in_phone_hash = in_phone_hash;
     1067        list_initialize(&conn->msg_queue);
     1068        conn->callid = callid;
     1069        conn->close_callid = 0;
     1070        conn->carg = carg;
     1071       
     1072        if (call)
     1073                conn->call = *call;
     1074       
     1075        /* We will activate the fibril ASAP */
     1076        conn->wdata.active = true;
     1077        conn->cfibril = cfibril;
     1078        conn->wdata.fid = fibril_create(connection_fibril, conn);
     1079       
     1080        if (conn->wdata.fid == 0) {
     1081                free(conn);
     1082               
     1083                if (callid)
     1084                        ipc_answer_0(callid, ENOMEM);
     1085               
     1086                return (uintptr_t) NULL;
     1087        }
     1088       
     1089        /* Add connection to the connection hash table */
     1090       
     1091        futex_down(&async_futex);
     1092        hash_table_insert(&conn_hash_table, &conn->link);
    12991093        futex_up(&async_futex);
    13001094       
    1301         return port;
     1095        fibril_add_ready(conn->wdata.fid);
     1096       
     1097        return conn->wdata.fid;
    13021098}
    13031099
     
    13151111        assert(call);
    13161112       
    1317         /* Kernel notification */
     1113        /* Unrouted call - take some default action */
    13181114        if ((callid & IPC_CALLID_NOTIFICATION)) {
    1319                 fibril_t *fibril = (fibril_t *) __tcb_get()->fibril_data;
    1320                 unsigned oldsw = fibril->switches;
    1321                
    13221115                process_notification(callid, call);
    1323                
    1324                 if (oldsw != fibril->switches) {
    1325                         /*
    1326                          * The notification handler did not execute atomically
    1327                          * and so the current manager fibril assumed the role of
    1328                          * a notification fibril. While waiting for its
    1329                          * resources, it switched to another manager fibril that
    1330                          * had already existed or it created a new one. We
    1331                          * therefore know there is at least yet another
    1332                          * manager fibril that can take over. We now kill the
    1333                          * current 'notification' fibril to prevent fibril
    1334                          * population explosion.
    1335                          */
    1336                         futex_down(&async_futex);
    1337                         fibril_switch(FIBRIL_FROM_DEAD);
    1338                 }
    1339                
    13401116                return;
    13411117        }
    13421118       
    1343         /* New connection */
    1344         if (IPC_GET_IMETHOD(*call) == IPC_M_CONNECT_ME_TO) {
    1345                 iface_t iface = (iface_t) IPC_GET_ARG1(*call);
    1346                 sysarg_t in_phone_hash = IPC_GET_ARG5(*call);
    1347                
    1348                 async_notification_handler_t handler = fallback_port_handler;
    1349                 void *data = fallback_port_data;
    1350                
    1351                 // TODO: Currently ignores all ports but the first one
    1352                 port_t *port = async_find_port(iface, 0);
    1353                 if (port) {
    1354                         handler = port->handler;
    1355                         data = port->data;
    1356                 }
    1357                
    1358                 async_new_connection(call->in_task_id, in_phone_hash, callid,
    1359                     call, handler, data);
    1360                 return;
    1361         }
    1362        
    1363         /* Cloned connection */
    1364         if (IPC_GET_IMETHOD(*call) == IPC_M_CLONE_ESTABLISH) {
    1365                 // TODO: Currently ignores ports altogether
    1366                
     1119        switch (IPC_GET_IMETHOD(*call)) {
     1120        case IPC_M_CLONE_ESTABLISH:
     1121        case IPC_M_CONNECT_ME_TO:
    13671122                /* Open new connection with fibril, etc. */
    13681123                async_new_connection(call->in_task_id, IPC_GET_ARG5(*call),
    1369                     callid, call, fallback_port_handler, fallback_port_data);
     1124                    callid, call, client_connection, NULL);
    13701125                return;
    13711126        }
     
    15121267void async_create_manager(void)
    15131268{
    1514         fid_t fid = fibril_create_generic(async_manager_fibril, NULL, PAGE_SIZE);
     1269        fid_t fid = fibril_create(async_manager_fibril, NULL);
    15151270        if (fid != 0)
    15161271                fibril_add_manager(fid);
     
    15281283void __async_init(void)
    15291284{
    1530         if (!hash_table_create(&interface_hash_table, 0, 0,
    1531             &interface_hash_table_ops))
    1532                 abort();
    1533        
    15341285        if (!hash_table_create(&client_hash_table, 0, 0, &client_hash_table_ops))
    15351286                abort();
     
    15461297                abort();
    15471298       
    1548         session_ns->iface = 0;
    15491299        session_ns->mgmt = EXCHANGE_ATOMIC;
    15501300        session_ns->phone = PHONE_NS;
     
    15931343       
    15941344        msg->done = true;
    1595        
     1345
    15961346        if (msg->forget) {
    15971347                assert(msg->wdata.active);
     
    16011351                fibril_add_ready(msg->wdata.fid);
    16021352        }
    1603        
     1353
    16041354        futex_up(&async_futex);
    16051355}
     
    16361386       
    16371387        ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, msg,
    1638             reply_received);
     1388            reply_received, true);
    16391389       
    16401390        return (aid_t) msg;
     
    16741424       
    16751425        ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, arg5,
    1676             msg, reply_received);
     1426            msg, reply_received, true);
    16771427       
    16781428        return (aid_t) msg;
     
    16931443       
    16941444        futex_down(&async_futex);
    1695        
     1445
    16961446        assert(!msg->forget);
    16971447        assert(!msg->destroyed);
    1698        
     1448
    16991449        if (msg->done) {
    17001450                futex_up(&async_futex);
     
    17371487       
    17381488        amsg_t *msg = (amsg_t *) amsgid;
    1739        
     1489
    17401490        futex_down(&async_futex);
    1741        
     1491
    17421492        assert(!msg->forget);
    17431493        assert(!msg->destroyed);
    1744        
     1494
    17451495        if (msg->done) {
    17461496                futex_up(&async_futex);
     
    17541504        if (timeout < 0)
    17551505                timeout = 0;
    1756        
     1506
    17571507        getuptime(&msg->wdata.to_event.expires);
    17581508        tv_add_diff(&msg->wdata.to_event.expires, timeout);
     
    18071557{
    18081558        amsg_t *msg = (amsg_t *) amsgid;
    1809        
     1559
    18101560        assert(msg);
    18111561        assert(!msg->forget);
    18121562        assert(!msg->destroyed);
    1813        
     1563
    18141564        futex_down(&async_futex);
    1815        
    18161565        if (msg->done) {
    18171566                amsg_destroy(msg);
     
    18201569                msg->forget = true;
    18211570        }
    1822        
    18231571        futex_up(&async_futex);
    18241572}
     
    19631711{
    19641712        if (exch != NULL)
    1965                 ipc_call_async_0(exch->phone, imethod, NULL, NULL);
     1713                ipc_call_async_0(exch->phone, imethod, NULL, NULL, true);
    19661714}
    19671715
     
    19691717{
    19701718        if (exch != NULL)
    1971                 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL);
     1719                ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL, true);
    19721720}
    19731721
     
    19761724{
    19771725        if (exch != NULL)
    1978                 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL);
     1726                ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL,
     1727                    true);
    19791728}
    19801729
     
    19841733        if (exch != NULL)
    19851734                ipc_call_async_3(exch->phone, imethod, arg1, arg2, arg3, NULL,
    1986                     NULL);
     1735                    NULL, true);
    19871736}
    19881737
     
    19921741        if (exch != NULL)
    19931742                ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4,
    1994                     NULL, NULL);
     1743                    NULL, NULL, true);
    19951744}
    19961745
     
    20001749        if (exch != NULL)
    20011750                ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4,
    2002                     arg5, NULL, NULL);
     1751                    arg5, NULL, NULL, true);
    20031752}
    20041753
     
    20651814 * @param arg2            User defined argument.
    20661815 * @param arg3            User defined argument.
     1816 * @param client_receiver Connection handing routine.
    20671817 *
    20681818 * @return Zero on success or a negative error code.
     
    20701820 */
    20711821int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
    2072     sysarg_t arg3)
     1822    sysarg_t arg3, async_client_conn_t client_receiver, void *carg)
    20731823{
    20741824        if (exch == NULL)
    20751825                return ENOENT;
    20761826       
     1827        sysarg_t phone_hash;
     1828        sysarg_t rc;
     1829
     1830        aid_t req;
    20771831        ipc_call_t answer;
    2078         aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
     1832        req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
    20791833            &answer);
    2080        
    2081         sysarg_t rc;
    20821834        async_wait_for(req, &rc);
    20831835        if (rc != EOK)
    20841836                return (int) rc;
     1837
     1838        phone_hash = IPC_GET_ARG5(answer);
     1839
     1840        if (client_receiver != NULL)
     1841                async_new_connection(answer.in_task_id, phone_hash, 0, NULL,
     1842                    client_receiver, carg);
    20851843       
    20861844        return EOK;
     
    21231881       
    21241882        ipc_call_async_0(exch->phone, IPC_M_CLONE_ESTABLISH, msg,
    2125             reply_received);
     1883            reply_received, true);
    21261884       
    21271885        sysarg_t rc;
     
    21421900        }
    21431901       
    2144         sess->iface = 0;
    21451902        sess->mgmt = mgmt;
    21461903        sess->phone = phone;
     
    21721929       
    21731930        ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, arg4,
    2174             msg, reply_received);
     1931            msg, reply_received, true);
    21751932       
    21761933        sysarg_t rc;
     
    22121969        int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3,
    22131970            0);
     1971       
    22141972        if (phone < 0) {
    22151973                errno = phone;
     
    22181976        }
    22191977       
    2220         sess->iface = 0;
    22211978        sess->mgmt = mgmt;
    22221979        sess->phone = phone;
     
    22351992}
    22361993
    2237 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
    2238  *
    2239  * Ask through phone for a new connection to some service and block until
    2240  * success.
    2241  *
    2242  * @param exch  Exchange for sending the message.
    2243  * @param iface Connection interface.
    2244  * @param arg2  User defined argument.
    2245  * @param arg3  User defined argument.
    2246  *
    2247  * @return New session on success or NULL on error.
    2248  *
    2249  */
    2250 async_sess_t *async_connect_me_to_iface(async_exch_t *exch, iface_t iface,
    2251     sysarg_t arg2, sysarg_t arg3)
    2252 {
    2253         if (exch == NULL) {
    2254                 errno = ENOENT;
    2255                 return NULL;
    2256         }
    2257        
    2258         async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
    2259         if (sess == NULL) {
    2260                 errno = ENOMEM;
    2261                 return NULL;
    2262         }
    2263        
    2264         int phone = async_connect_me_to_internal(exch->phone, iface, arg2,
    2265             arg3, 0);
    2266         if (phone < 0) {
    2267                 errno = phone;
    2268                 free(sess);
    2269                 return NULL;
    2270         }
    2271        
    2272         sess->iface = iface;
    2273         sess->phone = phone;
    2274         sess->arg1 = iface;
    2275         sess->arg2 = arg2;
    2276         sess->arg3 = arg3;
    2277        
    2278         fibril_mutex_initialize(&sess->remote_state_mtx);
    2279         sess->remote_state_data = NULL;
    2280        
    2281         list_initialize(&sess->exch_list);
    2282         fibril_mutex_initialize(&sess->mutex);
    2283         atomic_set(&sess->refcnt, 0);
    2284        
    2285         return sess;
    2286 }
    2287 
    22881994/** Set arguments for new connections.
    22891995 *
     
    23412047        }
    23422048       
    2343         sess->iface = 0;
    23442049        sess->mgmt = mgmt;
    23452050        sess->phone = phone;
     
    23582063}
    23592064
    2360 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
    2361  *
    2362  * Ask through phone for a new connection to some service and block until
    2363  * success.
    2364  *
    2365  * @param exch  Exchange for sending the message.
    2366  * @param iface Connection interface.
    2367  * @param arg2  User defined argument.
    2368  * @param arg3  User defined argument.
    2369  *
    2370  * @return New session on success or NULL on error.
    2371  *
    2372  */
    2373 async_sess_t *async_connect_me_to_blocking_iface(async_exch_t *exch, iface_t iface,
    2374     sysarg_t arg2, sysarg_t arg3)
    2375 {
    2376         if (exch == NULL) {
    2377                 errno = ENOENT;
    2378                 return NULL;
    2379         }
    2380        
    2381         async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
    2382         if (sess == NULL) {
    2383                 errno = ENOMEM;
    2384                 return NULL;
    2385         }
    2386        
    2387         int phone = async_connect_me_to_internal(exch->phone, iface, arg2,
    2388             arg3, IPC_FLAG_BLOCKING);
    2389         if (phone < 0) {
    2390                 errno = phone;
    2391                 free(sess);
    2392                 return NULL;
    2393         }
    2394        
    2395         sess->iface = iface;
    2396         sess->phone = phone;
    2397         sess->arg1 = iface;
    2398         sess->arg2 = arg2;
    2399         sess->arg3 = arg3;
    2400        
    2401         fibril_mutex_initialize(&sess->remote_state_mtx);
    2402         sess->remote_state_data = NULL;
    2403        
    2404         list_initialize(&sess->exch_list);
    2405         fibril_mutex_initialize(&sess->mutex);
    2406         atomic_set(&sess->refcnt, 0);
    2407        
    2408         return sess;
    2409 }
    2410 
    24112065/** Connect to a task specified by id.
    24122066 *
     
    24272081        }
    24282082       
    2429         sess->iface = 0;
    24302083        sess->mgmt = EXCHANGE_ATOMIC;
    24312084        sess->phone = phone;
     
    25052158                return NULL;
    25062159       
    2507         exch_mgmt_t mgmt = sess->mgmt;
    2508         if (sess->iface != 0)
    2509                 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
    2510        
    2511         async_exch_t *exch = NULL;
     2160        async_exch_t *exch;
    25122161       
    25132162        fibril_mutex_lock(&async_sess_mutex);
     
    25282177                 */
    25292178               
    2530                 if ((mgmt == EXCHANGE_ATOMIC) ||
    2531                     (mgmt == EXCHANGE_SERIALIZE)) {
     2179                if ((sess->mgmt == EXCHANGE_ATOMIC) ||
     2180                    (sess->mgmt == EXCHANGE_SERIALIZE)) {
    25322181                        exch = (async_exch_t *) malloc(sizeof(async_exch_t));
    25332182                        if (exch != NULL) {
     
    25372186                                exch->phone = sess->phone;
    25382187                        }
    2539                 } else if (mgmt == EXCHANGE_PARALLEL) {
    2540                         int phone;
    2541                        
    2542                 retry:
     2188                } else {  /* EXCHANGE_PARALLEL */
    25432189                        /*
    25442190                         * Make a one-time attempt to connect a new data phone.
    25452191                         */
     2192                       
     2193                        int phone;
     2194                       
     2195retry:
    25462196                        phone = async_connect_me_to_internal(sess->phone, sess->arg1,
    25472197                            sess->arg2, sess->arg3, 0);
     
    25852235                atomic_inc(&sess->refcnt);
    25862236               
    2587                 if (mgmt == EXCHANGE_SERIALIZE)
     2237                if (sess->mgmt == EXCHANGE_SERIALIZE)
    25882238                        fibril_mutex_lock(&sess->mutex);
    25892239        }
     
    26052255        assert(sess != NULL);
    26062256       
    2607         exch_mgmt_t mgmt = sess->mgmt;
    2608         if (sess->iface != 0)
    2609                 mgmt = sess->iface & IFACE_EXCHANGE_MASK;
    2610        
    26112257        atomic_dec(&sess->refcnt);
    26122258       
    2613         if (mgmt == EXCHANGE_SERIALIZE)
     2259        if (sess->mgmt == EXCHANGE_SERIALIZE)
    26142260                fibril_mutex_unlock(&sess->mutex);
    26152261       
     
    30482694        }
    30492695       
    3050         void *arg_data;
     2696        void *_data;
    30512697       
    30522698        if (nullterm)
    3053                 arg_data = malloc(size + 1);
     2699                _data = malloc(size + 1);
    30542700        else
    3055                 arg_data = malloc(size);
    3056        
    3057         if (arg_data == NULL) {
     2701                _data = malloc(size);
     2702       
     2703        if (_data == NULL) {
    30582704                ipc_answer_0(callid, ENOMEM);
    30592705                return ENOMEM;
    30602706        }
    30612707       
    3062         int rc = async_data_write_finalize(callid, arg_data, size);
     2708        int rc = async_data_write_finalize(callid, _data, size);
    30632709        if (rc != EOK) {
    3064                 free(arg_data);
     2710                free(_data);
    30652711                return rc;
    30662712        }
    30672713       
    30682714        if (nullterm)
    3069                 ((char *) arg_data)[size] = 0;
    3070        
    3071         *data = arg_data;
     2715                ((char *) _data)[size] = 0;
     2716       
     2717        *data = _data;
    30722718        if (received != NULL)
    30732719                *received = size;
     
    31672813        }
    31682814       
    3169         sess->iface = 0;
    31702815        sess->mgmt = mgmt;
    31712816        sess->phone = phone;
     
    32172862        }
    32182863       
    3219         sess->iface = 0;
    32202864        sess->mgmt = mgmt;
    32212865        sess->phone = phone;
     
    32632907                return NULL;
    32642908       
    3265         sess->iface = 0;
    32662909        sess->mgmt = mgmt;
    32672910        sess->phone = phone;
     
    32912934{
    32922935        assert(callid);
    3293        
     2936
    32942937        ipc_call_t call;
    32952938        *callid = async_get_call(&call);
    3296        
     2939
    32972940        if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE)
    32982941                return false;
     
    33042947        if (arg3)
    33052948                *arg3 = IPC_GET_ARG3(call);
    3306        
     2949
    33072950        return true;
    33082951}
     
    33833026}
    33843027
    3385 void *async_as_area_create(void *base, size_t size, unsigned int flags,
    3386     async_sess_t *pager, sysarg_t id1, sysarg_t id2, sysarg_t id3)
    3387 {
    3388         as_area_pager_info_t pager_info = {
    3389                 .pager = pager->phone,
    3390                 .id1 = id1,
    3391                 .id2 = id2,
    3392                 .id3 = id3
    3393         };
    3394         return as_area_create(base, size, flags, &pager_info);
    3395 }
    3396 
    33973028/** @}
    33983029 */
Note: See TracChangeset for help on using the changeset viewer.