Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • uspace/lib/c/generic/async.c

    r7f9d97f3 rae6021d  
    7777 *   }
    7878 *
    79  *   my_client_connection(icallid, *icall)
     79 *   port_handler(icallid, *icall)
    8080 *   {
    8181 *     if (want_refuse) {
     
    116116#include <stdlib.h>
    117117#include <macros.h>
     118#include <as.h>
     119#include <abi/mm/as.h>
    118120#include "private/libc.h"
    119121
     
    123125        list_t exch_list;
    124126       
     127        /** Session interface */
     128        iface_t iface;
     129       
    125130        /** Exchange management style */
    126131        exch_mgmt_t mgmt;
     
    189194        /** If reply was received. */
    190195        bool done;
    191 
     196       
    192197        /** If the message / reply should be discarded on arrival. */
    193198        bool forget;
    194 
     199       
    195200        /** If already destroyed. */
    196201        bool destroyed;
     
    232237        /** Identification of the opening call. */
    233238        ipc_callid_t callid;
     239       
    234240        /** Call data of the opening call. */
    235241        ipc_call_t call;
    236         /** Local argument or NULL if none. */
    237         void *carg;
    238242       
    239243        /** Identification of the closing call. */
     
    241245       
    242246        /** Fibril function that will be used to handle the connection. */
    243         async_client_conn_t cfibril;
     247        async_port_handler_t handler;
     248       
     249        /** Client data */
     250        void *data;
    244251} connection_t;
     252
     253/** Interface data */
     254typedef struct {
     255        ht_link_t link;
     256       
     257        /** Interface ID */
     258        iface_t iface;
     259       
     260        /** Futex protecting the hash table */
     261        futex_t futex;
     262       
     263        /** Interface ports */
     264        hash_table_t port_hash_table;
     265       
     266        /** Next available port ID */
     267        port_id_t port_id_avail;
     268} interface_t;
     269
     270/* Port data */
     271typedef struct {
     272        ht_link_t link;
     273       
     274        /** Port ID */
     275        port_id_t id;
     276       
     277        /** Port connection handler */
     278        async_port_handler_t handler;
     279       
     280        /** Client data */
     281        void *data;
     282} port_t;
    245283
    246284/* Notification data */
     
    264302{
    265303        struct timeval tv = { 0, 0 };
    266 
     304       
    267305        to->inlist = false;
    268306        to->occurred = false;
     
    287325static amsg_t *amsg_create(void)
    288326{
    289         amsg_t *msg;
    290 
    291         msg = malloc(sizeof(amsg_t));
     327        amsg_t *msg = malloc(sizeof(amsg_t));
    292328        if (msg) {
    293329                msg->done = false;
     
    298334                awaiter_initialize(&msg->wdata);
    299335        }
    300 
     336       
    301337        return msg;
    302338}
     
    335371}
    336372
    337 /** Default fibril function that gets called to handle new connection.
    338  *
    339  * This function is defined as a weak symbol - to be redefined in user code.
     373/** Default fallback fibril function.
     374 *
     375 * This fallback fibril function gets called on incomming
     376 * connections that do not have a specific handler defined.
    340377 *
    341378 * @param callid Hash of the incoming call.
     
    344381 *
    345382 */
    346 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call,
     383static void default_fallback_port_handler(ipc_callid_t callid, ipc_call_t *call,
    347384    void *arg)
    348385{
     
    350387}
    351388
    352 static async_client_conn_t client_connection = default_client_connection;
    353 static size_t notification_handler_stksz = FIBRIL_DFLT_STK_SIZE;
    354 
    355 /** Setter for client_connection function pointer.
    356  *
    357  * @param conn Function that will implement a new connection fibril.
    358  *
    359  */
    360 void async_set_client_connection(async_client_conn_t conn)
    361 {
    362         assert(client_connection == default_client_connection);
    363         client_connection = conn;
    364 }
    365 
    366 /** Set the stack size for the notification handler notification fibrils.
    367  *
    368  * @param size Stack size in bytes.
    369  */
    370 void async_set_notification_handler_stack_size(size_t size)
    371 {
    372         notification_handler_stksz = size;
     389static async_port_handler_t fallback_port_handler =
     390    default_fallback_port_handler;
     391static void *fallback_port_data = NULL;
     392
     393static hash_table_t interface_hash_table;
     394
     395static size_t interface_key_hash(void *key)
     396{
     397        iface_t iface = *(iface_t *) key;
     398        return iface;
     399}
     400
     401static size_t interface_hash(const ht_link_t *item)
     402{
     403        interface_t *interface = hash_table_get_inst(item, interface_t, link);
     404        return interface_key_hash(&interface->iface);
     405}
     406
     407static bool interface_key_equal(void *key, const ht_link_t *item)
     408{
     409        iface_t iface = *(iface_t *) key;
     410        interface_t *interface = hash_table_get_inst(item, interface_t, link);
     411        return iface == interface->iface;
     412}
     413
     414/** Operations for the port hash table. */
     415static hash_table_ops_t interface_hash_table_ops = {
     416        .hash = interface_hash,
     417        .key_hash = interface_key_hash,
     418        .key_equal = interface_key_equal,
     419        .equal = NULL,
     420        .remove_callback = NULL
     421};
     422
     423static size_t port_key_hash(void *key)
     424{
     425        port_id_t port_id = *(port_id_t *) key;
     426        return port_id;
     427}
     428
     429static size_t port_hash(const ht_link_t *item)
     430{
     431        port_t *port = hash_table_get_inst(item, port_t, link);
     432        return port_key_hash(&port->id);
     433}
     434
     435static bool port_key_equal(void *key, const ht_link_t *item)
     436{
     437        port_id_t port_id = *(port_id_t *) key;
     438        port_t *port = hash_table_get_inst(item, port_t, link);
     439        return port_id == port->id;
     440}
     441
     442/** Operations for the port hash table. */
     443static hash_table_ops_t port_hash_table_ops = {
     444        .hash = port_hash,
     445        .key_hash = port_key_hash,
     446        .key_equal = port_key_equal,
     447        .equal = NULL,
     448        .remove_callback = NULL
     449};
     450
     451static interface_t *async_new_interface(iface_t iface)
     452{
     453        interface_t *interface =
     454            (interface_t *) malloc(sizeof(interface_t));
     455        if (!interface)
     456                return NULL;
     457       
     458        bool ret = hash_table_create(&interface->port_hash_table, 0, 0,
     459            &port_hash_table_ops);
     460        if (!ret) {
     461                free(interface);
     462                return NULL;
     463        }
     464       
     465        interface->iface = iface;
     466        futex_initialize(&interface->futex, 1);
     467        interface->port_id_avail = 0;
     468       
     469        hash_table_insert(&interface_hash_table, &interface->link);
     470       
     471        return interface;
     472}
     473
     474static port_t *async_new_port(interface_t *interface,
     475    async_port_handler_t handler, void *data)
     476{
     477        port_t *port = (port_t *) malloc(sizeof(port_t));
     478        if (!port)
     479                return NULL;
     480       
     481        futex_down(&interface->futex);
     482       
     483        port_id_t id = interface->port_id_avail;
     484        interface->port_id_avail++;
     485       
     486        port->id = id;
     487        port->handler = handler;
     488        port->data = data;
     489       
     490        hash_table_insert(&interface->port_hash_table, &port->link);
     491       
     492        futex_up(&interface->futex);
     493       
     494        return port;
    373495}
    374496
     
    387509 */
    388510static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv);
     511
     512int async_create_port(iface_t iface, async_port_handler_t handler,
     513    void *data, port_id_t *port_id)
     514{
     515        if ((iface & IFACE_MOD_MASK) == IFACE_MOD_CALLBACK)
     516                return EINVAL;
     517       
     518        interface_t *interface;
     519       
     520        futex_down(&async_futex);
     521       
     522        ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
     523        if (link)
     524                interface = hash_table_get_inst(link, interface_t, link);
     525        else
     526                interface = async_new_interface(iface);
     527       
     528        if (!interface) {
     529                futex_up(&async_futex);
     530                return ENOMEM;
     531        }
     532       
     533        port_t *port = async_new_port(interface, handler, data);
     534        if (!port) {
     535                futex_up(&async_futex);
     536                return ENOMEM;
     537        }
     538       
     539        *port_id = port->id;
     540       
     541        futex_up(&async_futex);
     542       
     543        return EOK;
     544}
     545
     546void async_set_fallback_port_handler(async_port_handler_t handler, void *data)
     547{
     548        assert(handler != NULL);
     549       
     550        fallback_port_handler = handler;
     551        fallback_port_data = data;
     552}
    389553
    390554static hash_table_t client_hash_table;
     
    458622};
    459623
     624static client_t *async_client_get(task_id_t client_id, bool create)
     625{
     626        client_t *client = NULL;
     627       
     628        futex_down(&async_futex);
     629        ht_link_t *link = hash_table_find(&client_hash_table, &client_id);
     630        if (link) {
     631                client = hash_table_get_inst(link, client_t, link);
     632                atomic_inc(&client->refcnt);
     633        } else if (create) {
     634                client = malloc(sizeof(client_t));
     635                if (client) {
     636                        client->in_task_id = client_id;
     637                        client->data = async_client_data_create();
     638                       
     639                        atomic_set(&client->refcnt, 1);
     640                        hash_table_insert(&client_hash_table, &client->link);
     641                }
     642        }
     643       
     644        futex_up(&async_futex);
     645        return client;
     646}
     647
     648static void async_client_put(client_t *client)
     649{
     650        bool destroy;
     651       
     652        futex_down(&async_futex);
     653       
     654        if (atomic_predec(&client->refcnt) == 0) {
     655                hash_table_remove(&client_hash_table, &client->in_task_id);
     656                destroy = true;
     657        } else
     658                destroy = false;
     659       
     660        futex_up(&async_futex);
     661       
     662        if (destroy) {
     663                if (client->data)
     664                        async_client_data_destroy(client->data);
     665               
     666                free(client);
     667        }
     668}
     669
     670/** Wrapper for client connection fibril.
     671 *
     672 * When a new connection arrives, a fibril with this implementing
     673 * function is created.
     674 *
     675 * @param arg Connection structure pointer.
     676 *
     677 * @return Always zero.
     678 *
     679 */
     680static int connection_fibril(void *arg)
     681{
     682        assert(arg);
     683       
     684        /*
     685         * Setup fibril-local connection pointer.
     686         */
     687        fibril_connection = (connection_t *) arg;
     688       
     689        /*
     690         * Add our reference for the current connection in the client task
     691         * tracking structure. If this is the first reference, create and
     692         * hash in a new tracking structure.
     693         */
     694       
     695        client_t *client = async_client_get(fibril_connection->in_task_id, true);
     696        if (!client) {
     697                ipc_answer_0(fibril_connection->callid, ENOMEM);
     698                return 0;
     699        }
     700       
     701        fibril_connection->client = client;
     702       
     703        /*
     704         * Call the connection handler function.
     705         */
     706        fibril_connection->handler(fibril_connection->callid,
     707            &fibril_connection->call, fibril_connection->data);
     708       
     709        /*
     710         * Remove the reference for this client task connection.
     711         */
     712        async_client_put(client);
     713       
     714        /*
     715         * Remove myself from the connection hash table.
     716         */
     717        futex_down(&async_futex);
     718        hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash);
     719        futex_up(&async_futex);
     720       
     721        /*
     722         * Answer all remaining messages with EHANGUP.
     723         */
     724        while (!list_empty(&fibril_connection->msg_queue)) {
     725                msg_t *msg =
     726                    list_get_instance(list_first(&fibril_connection->msg_queue),
     727                    msg_t, link);
     728               
     729                list_remove(&msg->link);
     730                ipc_answer_0(msg->callid, EHANGUP);
     731                free(msg);
     732        }
     733       
     734        /*
     735         * If the connection was hung-up, answer the last call,
     736         * i.e. IPC_M_PHONE_HUNGUP.
     737         */
     738        if (fibril_connection->close_callid)
     739                ipc_answer_0(fibril_connection->close_callid, EOK);
     740       
     741        free(fibril_connection);
     742        return 0;
     743}
     744
     745/** Create a new fibril for a new connection.
     746 *
     747 * Create new fibril for connection, fill in connection structures
     748 * and insert it into the hash table, so that later we can easily
     749 * do routing of messages to particular fibrils.
     750 *
     751 * @param in_task_id    Identification of the incoming connection.
     752 * @param in_phone_hash Identification of the incoming connection.
     753 * @param callid        Hash of the opening IPC_M_CONNECT_ME_TO call.
     754 *                      If callid is zero, the connection was opened by
     755 *                      accepting the IPC_M_CONNECT_TO_ME call and this
     756 *                      function is called directly by the server.
     757 * @param call          Call data of the opening call.
     758 * @param handler       Connection handler.
     759 * @param data          Client argument to pass to the connection handler.
     760 *
     761 * @return New fibril id or NULL on failure.
     762 *
     763 */
     764static fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,
     765    ipc_callid_t callid, ipc_call_t *call, async_port_handler_t handler,
     766    void *data)
     767{
     768        connection_t *conn = malloc(sizeof(*conn));
     769        if (!conn) {
     770                if (callid)
     771                        ipc_answer_0(callid, ENOMEM);
     772               
     773                return (uintptr_t) NULL;
     774        }
     775       
     776        conn->in_task_id = in_task_id;
     777        conn->in_phone_hash = in_phone_hash;
     778        list_initialize(&conn->msg_queue);
     779        conn->callid = callid;
     780        conn->close_callid = 0;
     781        conn->handler = handler;
     782        conn->data = data;
     783       
     784        if (call)
     785                conn->call = *call;
     786       
     787        /* We will activate the fibril ASAP */
     788        conn->wdata.active = true;
     789        conn->wdata.fid = fibril_create(connection_fibril, conn);
     790       
     791        if (conn->wdata.fid == 0) {
     792                free(conn);
     793               
     794                if (callid)
     795                        ipc_answer_0(callid, ENOMEM);
     796               
     797                return (uintptr_t) NULL;
     798        }
     799       
     800        /* Add connection to the connection hash table */
     801       
     802        futex_down(&async_futex);
     803        hash_table_insert(&conn_hash_table, &conn->link);
     804        futex_up(&async_futex);
     805       
     806        fibril_add_ready(conn->wdata.fid);
     807       
     808        return conn->wdata.fid;
     809}
     810
     811/** Wrapper for making IPC_M_CONNECT_TO_ME calls using the async framework.
     812 *
     813 * Ask through phone for a new connection to some service.
     814 *
     815 * @param exch    Exchange for sending the message.
     816 * @param iface   Callback interface.
     817 * @param arg1    User defined argument.
     818 * @param arg2    User defined argument.
     819 * @param handler Callback handler.
     820 * @param data    Handler data.
     821 * @param port_id ID of the newly created port.
     822 *
     823 * @return Zero on success or a negative error code.
     824 *
     825 */
     826int async_create_callback_port(async_exch_t *exch, iface_t iface, sysarg_t arg1,
     827    sysarg_t arg2, async_port_handler_t handler, void *data, port_id_t *port_id)
     828{
     829        if ((iface & IFACE_MOD_CALLBACK) != IFACE_MOD_CALLBACK)
     830                return EINVAL;
     831       
     832        if (exch == NULL)
     833                return ENOENT;
     834       
     835        ipc_call_t answer;
     836        aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, iface, arg1, arg2,
     837            &answer);
     838       
     839        sysarg_t ret;
     840        async_wait_for(req, &ret);
     841        if (ret != EOK)
     842                return (int) ret;
     843       
     844        sysarg_t phone_hash = IPC_GET_ARG5(answer);
     845        interface_t *interface;
     846       
     847        futex_down(&async_futex);
     848       
     849        ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
     850        if (link)
     851                interface = hash_table_get_inst(link, interface_t, link);
     852        else
     853                interface = async_new_interface(iface);
     854       
     855        if (!interface) {
     856                futex_up(&async_futex);
     857                return ENOMEM;
     858        }
     859       
     860        port_t *port = async_new_port(interface, handler, data);
     861        if (!port) {
     862                futex_up(&async_futex);
     863                return ENOMEM;
     864        }
     865       
     866        *port_id = port->id;
     867       
     868        futex_up(&async_futex);
     869       
     870        fid_t fid = async_new_connection(answer.in_task_id, phone_hash,
     871            0, NULL, handler, data);
     872        if (fid == (uintptr_t) NULL)
     873                return ENOMEM;
     874       
     875        return EOK;
     876}
     877
    460878static size_t notification_key_hash(void *key)
    461879{
     
    571989}
    572990
    573 /** Notification fibril.
    574  *
    575  * When a notification arrives, a fibril with this implementing function is
    576  * created. It calls the corresponding notification handler and does the final
    577  * cleanup.
    578  *
    579  * @param arg Message structure pointer.
    580  *
    581  * @return Always zero.
    582  *
    583  */
    584 static int notification_fibril(void *arg)
    585 {
    586         assert(arg);
    587        
    588         msg_t *msg = (msg_t *) arg;
     991/** Process notification.
     992 *
     993 * @param callid Hash of the incoming call.
     994 * @param call   Data of the incoming call.
     995 *
     996 */
     997static void process_notification(ipc_callid_t callid, ipc_call_t *call)
     998{
    589999        async_notification_handler_t handler = NULL;
    5901000        void *data = NULL;
     1001
     1002        assert(call);
    5911003       
    5921004        futex_down(&async_futex);
    5931005       
    5941006        ht_link_t *link = hash_table_find(&notification_hash_table,
    595             &IPC_GET_IMETHOD(msg->call));
     1007            &IPC_GET_IMETHOD(*call));
    5961008        if (link) {
    5971009                notification_t *notification =
     
    6041016       
    6051017        if (handler)
    606                 handler(msg->callid, &msg->call, data);
    607        
    608         free(msg);
    609         return 0;
    610 }
    611 
    612 /** Process notification.
    613  *
    614  * A new fibril is created which would process the notification.
    615  *
    616  * @param callid Hash of the incoming call.
    617  * @param call   Data of the incoming call.
    618  *
    619  * @return False if an error occured.
    620  *         True if the call was passed to the notification fibril.
    621  *
    622  */
    623 static bool process_notification(ipc_callid_t callid, ipc_call_t *call)
    624 {
    625         assert(call);
    626        
    627         futex_down(&async_futex);
    628        
    629         msg_t *msg = malloc(sizeof(*msg));
    630         if (!msg) {
    631                 futex_up(&async_futex);
    632                 return false;
    633         }
    634        
    635         msg->callid = callid;
    636         msg->call = *call;
    637        
    638         fid_t fid = fibril_create_generic(notification_fibril, msg,
    639             notification_handler_stksz);
    640         if (fid == 0) {
    641                 free(msg);
    642                 futex_up(&async_futex);
    643                 return false;
    644         }
    645        
    646         fibril_add_ready(fid);
    647        
    648         futex_up(&async_futex);
    649         return true;
     1018                handler(callid, call, data);
    6501019}
    6511020
     
    8661235        }
    8671236       
    868         msg_t *msg = list_get_instance(list_first(&conn->msg_queue), msg_t, link);
     1237        msg_t *msg = list_get_instance(list_first(&conn->msg_queue),
     1238            msg_t, link);
    8691239        list_remove(&msg->link);
    8701240       
     
    8771247}
    8781248
    879 static client_t *async_client_get(task_id_t client_id, bool create)
    880 {
    881         client_t *client = NULL;
    882 
    883         futex_down(&async_futex);
    884         ht_link_t *link = hash_table_find(&client_hash_table, &client_id);
    885         if (link) {
    886                 client = hash_table_get_inst(link, client_t, link);
    887                 atomic_inc(&client->refcnt);
    888         } else if (create) {
    889                 client = malloc(sizeof(client_t));
    890                 if (client) {
    891                         client->in_task_id = client_id;
    892                         client->data = async_client_data_create();
    893                
    894                         atomic_set(&client->refcnt, 1);
    895                         hash_table_insert(&client_hash_table, &client->link);
    896                 }
    897         }
    898 
    899         futex_up(&async_futex);
    900         return client;
    901 }
    902 
    903 static void async_client_put(client_t *client)
    904 {
    905         bool destroy;
    906 
    907         futex_down(&async_futex);
    908        
    909         if (atomic_predec(&client->refcnt) == 0) {
    910                 hash_table_remove(&client_hash_table, &client->in_task_id);
    911                 destroy = true;
    912         } else
    913                 destroy = false;
    914        
    915         futex_up(&async_futex);
    916        
    917         if (destroy) {
    918                 if (client->data)
    919                         async_client_data_destroy(client->data);
    920                
    921                 free(client);
    922         }
    923 }
    924 
    9251249void *async_get_client_data(void)
    9261250{
     
    9341258        if (!client)
    9351259                return NULL;
     1260       
    9361261        if (!client->data) {
    9371262                async_client_put(client);
    9381263                return NULL;
    9391264        }
    940 
     1265       
    9411266        return client->data;
    9421267}
     
    9451270{
    9461271        client_t *client = async_client_get(client_id, false);
    947 
     1272       
    9481273        assert(client);
    9491274        assert(client->data);
    950 
     1275       
    9511276        /* Drop the reference we got in async_get_client_data_by_hash(). */
    9521277        async_client_put(client);
    953 
     1278       
    9541279        /* Drop our own reference we got at the beginning of this function. */
    9551280        async_client_put(client);
    9561281}
    9571282
    958 /** Wrapper for client connection fibril.
    959  *
    960  * When a new connection arrives, a fibril with this implementing function is
    961  * created. It calls client_connection() and does the final cleanup.
    962  *
    963  * @param arg Connection structure pointer.
    964  *
    965  * @return Always zero.
    966  *
    967  */
    968 static int connection_fibril(void *arg)
    969 {
    970         assert(arg);
    971        
    972         /*
    973          * Setup fibril-local connection pointer.
    974          */
    975         fibril_connection = (connection_t *) arg;
    976        
    977         /*
    978          * Add our reference for the current connection in the client task
    979          * tracking structure. If this is the first reference, create and
    980          * hash in a new tracking structure.
    981          */
    982 
    983         client_t *client = async_client_get(fibril_connection->in_task_id, true);
    984         if (!client) {
    985                 ipc_answer_0(fibril_connection->callid, ENOMEM);
    986                 return 0;
    987         }
    988 
    989         fibril_connection->client = client;
    990        
    991         /*
    992          * Call the connection handler function.
    993          */
    994         fibril_connection->cfibril(fibril_connection->callid,
    995             &fibril_connection->call, fibril_connection->carg);
    996        
    997         /*
    998          * Remove the reference for this client task connection.
    999          */
    1000         async_client_put(client);
    1001        
    1002         /*
    1003          * Remove myself from the connection hash table.
    1004          */
     1283static port_t *async_find_port(iface_t iface, port_id_t port_id)
     1284{
     1285        port_t *port = NULL;
     1286       
    10051287        futex_down(&async_futex);
    1006         hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash);
     1288       
     1289        ht_link_t *link = hash_table_find(&interface_hash_table, &iface);
     1290        if (link) {
     1291                interface_t *interface =
     1292                    hash_table_get_inst(link, interface_t, link);
     1293               
     1294                link = hash_table_find(&interface->port_hash_table, &port_id);
     1295                if (link)
     1296                        port = hash_table_get_inst(link, port_t, link);
     1297        }
     1298       
    10071299        futex_up(&async_futex);
    10081300       
    1009         /*
    1010          * Answer all remaining messages with EHANGUP.
    1011          */
    1012         while (!list_empty(&fibril_connection->msg_queue)) {
    1013                 msg_t *msg =
    1014                     list_get_instance(list_first(&fibril_connection->msg_queue),
    1015                     msg_t, link);
    1016                
    1017                 list_remove(&msg->link);
    1018                 ipc_answer_0(msg->callid, EHANGUP);
    1019                 free(msg);
    1020         }
    1021        
    1022         /*
    1023          * If the connection was hung-up, answer the last call,
    1024          * i.e. IPC_M_PHONE_HUNGUP.
    1025          */
    1026         if (fibril_connection->close_callid)
    1027                 ipc_answer_0(fibril_connection->close_callid, EOK);
    1028        
    1029         free(fibril_connection);
    1030         return 0;
    1031 }
    1032 
    1033 /** Create a new fibril for a new connection.
    1034  *
    1035  * Create new fibril for connection, fill in connection structures and insert
    1036  * it into the hash table, so that later we can easily do routing of messages to
    1037  * particular fibrils.
    1038  *
    1039  * @param in_task_id    Identification of the incoming connection.
    1040  * @param in_phone_hash Identification of the incoming connection.
    1041  * @param callid        Hash of the opening IPC_M_CONNECT_ME_TO call.
    1042  *                      If callid is zero, the connection was opened by
    1043  *                      accepting the IPC_M_CONNECT_TO_ME call and this function
    1044  *                      is called directly by the server.
    1045  * @param call          Call data of the opening call.
    1046  * @param cfibril       Fibril function that should be called upon opening the
    1047  *                      connection.
    1048  * @param carg          Extra argument to pass to the connection fibril
    1049  *
    1050  * @return New fibril id or NULL on failure.
    1051  *
    1052  */
    1053 fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,
    1054     ipc_callid_t callid, ipc_call_t *call,
    1055     async_client_conn_t cfibril, void *carg)
    1056 {
    1057         connection_t *conn = malloc(sizeof(*conn));
    1058         if (!conn) {
    1059                 if (callid)
    1060                         ipc_answer_0(callid, ENOMEM);
    1061                
    1062                 return (uintptr_t) NULL;
    1063         }
    1064        
    1065         conn->in_task_id = in_task_id;
    1066         conn->in_phone_hash = in_phone_hash;
    1067         list_initialize(&conn->msg_queue);
    1068         conn->callid = callid;
    1069         conn->close_callid = 0;
    1070         conn->carg = carg;
    1071        
    1072         if (call)
    1073                 conn->call = *call;
    1074        
    1075         /* We will activate the fibril ASAP */
    1076         conn->wdata.active = true;
    1077         conn->cfibril = cfibril;
    1078         conn->wdata.fid = fibril_create(connection_fibril, conn);
    1079        
    1080         if (conn->wdata.fid == 0) {
    1081                 free(conn);
    1082                
    1083                 if (callid)
    1084                         ipc_answer_0(callid, ENOMEM);
    1085                
    1086                 return (uintptr_t) NULL;
    1087         }
    1088        
    1089         /* Add connection to the connection hash table */
    1090        
    1091         futex_down(&async_futex);
    1092         hash_table_insert(&conn_hash_table, &conn->link);
    1093         futex_up(&async_futex);
    1094        
    1095         fibril_add_ready(conn->wdata.fid);
    1096        
    1097         return conn->wdata.fid;
     1301        return port;
    10981302}
    10991303
     
    11111315        assert(call);
    11121316       
    1113         /* Unrouted call - take some default action */
     1317        /* Kernel notification */
    11141318        if ((callid & IPC_CALLID_NOTIFICATION)) {
     1319                fibril_t *fibril = (fibril_t *) __tcb_get()->fibril_data;
     1320                unsigned oldsw = fibril->switches;
     1321               
    11151322                process_notification(callid, call);
     1323               
     1324                if (oldsw != fibril->switches) {
     1325                        /*
     1326                         * The notification handler did not execute atomically
     1327                         * and so the current manager fibril assumed the role of
     1328                         * a notification fibril. While waiting for its
     1329                         * resources, it switched to another manager fibril that
     1330                         * had already existed or it created a new one. We
     1331                         * therefore know there is at least yet another
     1332                         * manager fibril that can take over. We now kill the
     1333                         * current 'notification' fibril to prevent fibril
     1334                         * population explosion.
     1335                         */
     1336                        futex_down(&async_futex);
     1337                        fibril_switch(FIBRIL_FROM_DEAD);
     1338                }
     1339               
    11161340                return;
    11171341        }
    11181342       
    1119         switch (IPC_GET_IMETHOD(*call)) {
    1120         case IPC_M_CLONE_ESTABLISH:
    1121         case IPC_M_CONNECT_ME_TO:
     1343        /* New connection */
     1344        if (IPC_GET_IMETHOD(*call) == IPC_M_CONNECT_ME_TO) {
     1345                iface_t iface = (iface_t) IPC_GET_ARG1(*call);
     1346                sysarg_t in_phone_hash = IPC_GET_ARG5(*call);
     1347               
     1348                async_notification_handler_t handler = fallback_port_handler;
     1349                void *data = fallback_port_data;
     1350               
     1351                // TODO: Currently ignores all ports but the first one
     1352                port_t *port = async_find_port(iface, 0);
     1353                if (port) {
     1354                        handler = port->handler;
     1355                        data = port->data;
     1356                }
     1357               
     1358                async_new_connection(call->in_task_id, in_phone_hash, callid,
     1359                    call, handler, data);
     1360                return;
     1361        }
     1362       
     1363        /* Cloned connection */
     1364        if (IPC_GET_IMETHOD(*call) == IPC_M_CLONE_ESTABLISH) {
     1365                // TODO: Currently ignores ports altogether
     1366               
    11221367                /* Open new connection with fibril, etc. */
    11231368                async_new_connection(call->in_task_id, IPC_GET_ARG5(*call),
    1124                     callid, call, client_connection, NULL);
     1369                    callid, call, fallback_port_handler, fallback_port_data);
    11251370                return;
    11261371        }
     
    12671512void async_create_manager(void)
    12681513{
    1269         fid_t fid = fibril_create(async_manager_fibril, NULL);
     1514        fid_t fid = fibril_create_generic(async_manager_fibril, NULL, PAGE_SIZE);
    12701515        if (fid != 0)
    12711516                fibril_add_manager(fid);
     
    12831528void __async_init(void)
    12841529{
     1530        if (!hash_table_create(&interface_hash_table, 0, 0,
     1531            &interface_hash_table_ops))
     1532                abort();
     1533       
    12851534        if (!hash_table_create(&client_hash_table, 0, 0, &client_hash_table_ops))
    12861535                abort();
     
    12971546                abort();
    12981547       
     1548        session_ns->iface = 0;
    12991549        session_ns->mgmt = EXCHANGE_ATOMIC;
    13001550        session_ns->phone = PHONE_NS;
     
    13431593       
    13441594        msg->done = true;
    1345 
     1595       
    13461596        if (msg->forget) {
    13471597                assert(msg->wdata.active);
     
    13511601                fibril_add_ready(msg->wdata.fid);
    13521602        }
    1353 
     1603       
    13541604        futex_up(&async_futex);
    13551605}
     
    13861636       
    13871637        ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, msg,
    1388             reply_received, true);
     1638            reply_received);
    13891639       
    13901640        return (aid_t) msg;
     
    14241674       
    14251675        ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, arg5,
    1426             msg, reply_received, true);
     1676            msg, reply_received);
    14271677       
    14281678        return (aid_t) msg;
     
    14431693       
    14441694        futex_down(&async_futex);
    1445 
     1695       
    14461696        assert(!msg->forget);
    14471697        assert(!msg->destroyed);
    1448 
     1698       
    14491699        if (msg->done) {
    14501700                futex_up(&async_futex);
     
    14871737       
    14881738        amsg_t *msg = (amsg_t *) amsgid;
    1489 
     1739       
    14901740        futex_down(&async_futex);
    1491 
     1741       
    14921742        assert(!msg->forget);
    14931743        assert(!msg->destroyed);
    1494 
     1744       
    14951745        if (msg->done) {
    14961746                futex_up(&async_futex);
     
    15041754        if (timeout < 0)
    15051755                timeout = 0;
    1506 
     1756       
    15071757        getuptime(&msg->wdata.to_event.expires);
    15081758        tv_add_diff(&msg->wdata.to_event.expires, timeout);
     
    15571807{
    15581808        amsg_t *msg = (amsg_t *) amsgid;
    1559 
     1809       
    15601810        assert(msg);
    15611811        assert(!msg->forget);
    15621812        assert(!msg->destroyed);
    1563 
     1813       
    15641814        futex_down(&async_futex);
     1815       
    15651816        if (msg->done) {
    15661817                amsg_destroy(msg);
     
    15691820                msg->forget = true;
    15701821        }
     1822       
    15711823        futex_up(&async_futex);
    15721824}
     
    17111963{
    17121964        if (exch != NULL)
    1713                 ipc_call_async_0(exch->phone, imethod, NULL, NULL, true);
     1965                ipc_call_async_0(exch->phone, imethod, NULL, NULL);
    17141966}
    17151967
     
    17171969{
    17181970        if (exch != NULL)
    1719                 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL, true);
     1971                ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL);
    17201972}
    17211973
     
    17241976{
    17251977        if (exch != NULL)
    1726                 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL,
    1727                     true);
     1978                ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL);
    17281979}
    17291980
     
    17331984        if (exch != NULL)
    17341985                ipc_call_async_3(exch->phone, imethod, arg1, arg2, arg3, NULL,
    1735                     NULL, true);
     1986                    NULL);
    17361987}
    17371988
     
    17411992        if (exch != NULL)
    17421993                ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4,
    1743                     NULL, NULL, true);
     1994                    NULL, NULL);
    17441995}
    17451996
     
    17492000        if (exch != NULL)
    17502001                ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4,
    1751                     arg5, NULL, NULL, true);
     2002                    arg5, NULL, NULL);
    17522003}
    17532004
     
    18142065 * @param arg2            User defined argument.
    18152066 * @param arg3            User defined argument.
    1816  * @param client_receiver Connection handing routine.
    18172067 *
    18182068 * @return Zero on success or a negative error code.
     
    18202070 */
    18212071int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,
    1822     sysarg_t arg3, async_client_conn_t client_receiver, void *carg)
     2072    sysarg_t arg3)
    18232073{
    18242074        if (exch == NULL)
    18252075                return ENOENT;
    18262076       
    1827         sysarg_t phone_hash;
     2077        ipc_call_t answer;
     2078        aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
     2079            &answer);
     2080       
    18282081        sysarg_t rc;
    1829 
    1830         aid_t req;
    1831         ipc_call_t answer;
    1832         req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,
    1833             &answer);
    18342082        async_wait_for(req, &rc);
    18352083        if (rc != EOK)
    18362084                return (int) rc;
    1837 
    1838         phone_hash = IPC_GET_ARG5(answer);
    1839 
    1840         if (client_receiver != NULL)
    1841                 async_new_connection(answer.in_task_id, phone_hash, 0, NULL,
    1842                     client_receiver, carg);
    18432085       
    18442086        return EOK;
     
    18812123       
    18822124        ipc_call_async_0(exch->phone, IPC_M_CLONE_ESTABLISH, msg,
    1883             reply_received, true);
     2125            reply_received);
    18842126       
    18852127        sysarg_t rc;
     
    19002142        }
    19012143       
     2144        sess->iface = 0;
    19022145        sess->mgmt = mgmt;
    19032146        sess->phone = phone;
     
    19292172       
    19302173        ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, arg4,
    1931             msg, reply_received, true);
     2174            msg, reply_received);
    19322175       
    19332176        sysarg_t rc;
     
    19692212        int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3,
    19702213            0);
    1971        
    19722214        if (phone < 0) {
    19732215                errno = phone;
     
    19762218        }
    19772219       
     2220        sess->iface = 0;
    19782221        sess->mgmt = mgmt;
    19792222        sess->phone = phone;
     
    19922235}
    19932236
     2237/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
     2238 *
     2239 * Ask through phone for a new connection to some service and block until
     2240 * success.
     2241 *
     2242 * @param exch  Exchange for sending the message.
     2243 * @param iface Connection interface.
     2244 * @param arg2  User defined argument.
     2245 * @param arg3  User defined argument.
     2246 *
     2247 * @return New session on success or NULL on error.
     2248 *
     2249 */
     2250async_sess_t *async_connect_me_to_iface(async_exch_t *exch, iface_t iface,
     2251    sysarg_t arg2, sysarg_t arg3)
     2252{
     2253        if (exch == NULL) {
     2254                errno = ENOENT;
     2255                return NULL;
     2256        }
     2257       
     2258        async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
     2259        if (sess == NULL) {
     2260                errno = ENOMEM;
     2261                return NULL;
     2262        }
     2263       
     2264        int phone = async_connect_me_to_internal(exch->phone, iface, arg2,
     2265            arg3, 0);
     2266        if (phone < 0) {
     2267                errno = phone;
     2268                free(sess);
     2269                return NULL;
     2270        }
     2271       
     2272        sess->iface = iface;
     2273        sess->phone = phone;
     2274        sess->arg1 = iface;
     2275        sess->arg2 = arg2;
     2276        sess->arg3 = arg3;
     2277       
     2278        fibril_mutex_initialize(&sess->remote_state_mtx);
     2279        sess->remote_state_data = NULL;
     2280       
     2281        list_initialize(&sess->exch_list);
     2282        fibril_mutex_initialize(&sess->mutex);
     2283        atomic_set(&sess->refcnt, 0);
     2284       
     2285        return sess;
     2286}
     2287
    19942288/** Set arguments for new connections.
    19952289 *
     
    20472341        }
    20482342       
     2343        sess->iface = 0;
    20492344        sess->mgmt = mgmt;
    20502345        sess->phone = phone;
     
    20632358}
    20642359
     2360/** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.
     2361 *
     2362 * Ask through phone for a new connection to some service and block until
     2363 * success.
     2364 *
     2365 * @param exch  Exchange for sending the message.
     2366 * @param iface Connection interface.
     2367 * @param arg2  User defined argument.
     2368 * @param arg3  User defined argument.
     2369 *
     2370 * @return New session on success or NULL on error.
     2371 *
     2372 */
     2373async_sess_t *async_connect_me_to_blocking_iface(async_exch_t *exch, iface_t iface,
     2374    sysarg_t arg2, sysarg_t arg3)
     2375{
     2376        if (exch == NULL) {
     2377                errno = ENOENT;
     2378                return NULL;
     2379        }
     2380       
     2381        async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));
     2382        if (sess == NULL) {
     2383                errno = ENOMEM;
     2384                return NULL;
     2385        }
     2386       
     2387        int phone = async_connect_me_to_internal(exch->phone, iface, arg2,
     2388            arg3, IPC_FLAG_BLOCKING);
     2389        if (phone < 0) {
     2390                errno = phone;
     2391                free(sess);
     2392                return NULL;
     2393        }
     2394       
     2395        sess->iface = iface;
     2396        sess->phone = phone;
     2397        sess->arg1 = iface;
     2398        sess->arg2 = arg2;
     2399        sess->arg3 = arg3;
     2400       
     2401        fibril_mutex_initialize(&sess->remote_state_mtx);
     2402        sess->remote_state_data = NULL;
     2403       
     2404        list_initialize(&sess->exch_list);
     2405        fibril_mutex_initialize(&sess->mutex);
     2406        atomic_set(&sess->refcnt, 0);
     2407       
     2408        return sess;
     2409}
     2410
    20652411/** Connect to a task specified by id.
    20662412 *
     
    20812427        }
    20822428       
     2429        sess->iface = 0;
    20832430        sess->mgmt = EXCHANGE_ATOMIC;
    20842431        sess->phone = phone;
     
    21582505                return NULL;
    21592506       
    2160         async_exch_t *exch;
     2507        exch_mgmt_t mgmt = sess->mgmt;
     2508        if (sess->iface != 0)
     2509                mgmt = sess->iface & IFACE_EXCHANGE_MASK;
     2510       
     2511        async_exch_t *exch = NULL;
    21612512       
    21622513        fibril_mutex_lock(&async_sess_mutex);
     
    21772528                 */
    21782529               
    2179                 if ((sess->mgmt == EXCHANGE_ATOMIC) ||
    2180                     (sess->mgmt == EXCHANGE_SERIALIZE)) {
     2530                if ((mgmt == EXCHANGE_ATOMIC) ||
     2531                    (mgmt == EXCHANGE_SERIALIZE)) {
    21812532                        exch = (async_exch_t *) malloc(sizeof(async_exch_t));
    21822533                        if (exch != NULL) {
     
    21862537                                exch->phone = sess->phone;
    21872538                        }
    2188                 } else {  /* EXCHANGE_PARALLEL */
     2539                } else if (mgmt == EXCHANGE_PARALLEL) {
     2540                        int phone;
     2541                       
     2542                retry:
    21892543                        /*
    21902544                         * Make a one-time attempt to connect a new data phone.
    21912545                         */
    2192                        
    2193                         int phone;
    2194                        
    2195 retry:
    21962546                        phone = async_connect_me_to_internal(sess->phone, sess->arg1,
    21972547                            sess->arg2, sess->arg3, 0);
     
    22352585                atomic_inc(&sess->refcnt);
    22362586               
    2237                 if (sess->mgmt == EXCHANGE_SERIALIZE)
     2587                if (mgmt == EXCHANGE_SERIALIZE)
    22382588                        fibril_mutex_lock(&sess->mutex);
    22392589        }
     
    22552605        assert(sess != NULL);
    22562606       
     2607        exch_mgmt_t mgmt = sess->mgmt;
     2608        if (sess->iface != 0)
     2609                mgmt = sess->iface & IFACE_EXCHANGE_MASK;
     2610       
    22572611        atomic_dec(&sess->refcnt);
    22582612       
    2259         if (sess->mgmt == EXCHANGE_SERIALIZE)
     2613        if (mgmt == EXCHANGE_SERIALIZE)
    22602614                fibril_mutex_unlock(&sess->mutex);
    22612615       
     
    26943048        }
    26953049       
    2696         void *_data;
     3050        void *arg_data;
    26973051       
    26983052        if (nullterm)
    2699                 _data = malloc(size + 1);
     3053                arg_data = malloc(size + 1);
    27003054        else
    2701                 _data = malloc(size);
    2702        
    2703         if (_data == NULL) {
     3055                arg_data = malloc(size);
     3056       
     3057        if (arg_data == NULL) {
    27043058                ipc_answer_0(callid, ENOMEM);
    27053059                return ENOMEM;
    27063060        }
    27073061       
    2708         int rc = async_data_write_finalize(callid, _data, size);
     3062        int rc = async_data_write_finalize(callid, arg_data, size);
    27093063        if (rc != EOK) {
    2710                 free(_data);
     3064                free(arg_data);
    27113065                return rc;
    27123066        }
    27133067       
    27143068        if (nullterm)
    2715                 ((char *) _data)[size] = 0;
    2716        
    2717         *data = _data;
     3069                ((char *) arg_data)[size] = 0;
     3070       
     3071        *data = arg_data;
    27183072        if (received != NULL)
    27193073                *received = size;
     
    28133167        }
    28143168       
     3169        sess->iface = 0;
    28153170        sess->mgmt = mgmt;
    28163171        sess->phone = phone;
     
    28623217        }
    28633218       
     3219        sess->iface = 0;
    28643220        sess->mgmt = mgmt;
    28653221        sess->phone = phone;
     
    29073263                return NULL;
    29083264       
     3265        sess->iface = 0;
    29093266        sess->mgmt = mgmt;
    29103267        sess->phone = phone;
     
    29343291{
    29353292        assert(callid);
    2936 
     3293       
    29373294        ipc_call_t call;
    29383295        *callid = async_get_call(&call);
    2939 
     3296       
    29403297        if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE)
    29413298                return false;
     
    29473304        if (arg3)
    29483305                *arg3 = IPC_GET_ARG3(call);
    2949 
     3306       
    29503307        return true;
    29513308}
     
    30263383}
    30273384
     3385void *async_as_area_create(void *base, size_t size, unsigned int flags,
     3386    async_sess_t *pager, sysarg_t id1, sysarg_t id2, sysarg_t id3)
     3387{
     3388        as_area_pager_info_t pager_info = {
     3389                .pager = pager->phone,
     3390                .id1 = id1,
     3391                .id2 = id2,
     3392                .id3 = id3
     3393        };
     3394        return as_area_create(base, size, flags, &pager_info);
     3395}
     3396
    30283397/** @}
    30293398 */
Note: See TracChangeset for help on using the changeset viewer.