Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • uspace/lib/c/generic/async.c

    rcff3fb6 r1db6dfd  
    189189        /** If reply was received. */
    190190        bool done;
     191
     192        /** If the message / reply should be discarded on arrival. */
     193        bool forget;
     194
     195        /** If already destroyed. */
     196        bool destroyed;
    191197       
    192198        /** Pointer to where the answer data is stored. */
     
    241247static fibril_local connection_t *fibril_connection;
    242248
     249static void to_event_initialize(to_event_t *to)
     250{
     251        struct timeval tv = { 0, 0 };
     252
     253        to->inlist = false;
     254        to->occurred = false;
     255        link_initialize(&to->link);
     256        to->expires = tv;
     257}
     258
     259static void wu_event_initialize(wu_event_t *wu)
     260{
     261        wu->inlist = false;
     262        link_initialize(&wu->link);
     263}
     264
     265void awaiter_initialize(awaiter_t *aw)
     266{
     267        aw->fid = 0;
     268        aw->active = false;
     269        to_event_initialize(&aw->to_event);
     270        wu_event_initialize(&aw->wu_event);
     271}
     272
     273static amsg_t *amsg_create(void)
     274{
     275        amsg_t *msg;
     276
     277        msg = malloc(sizeof(amsg_t));
     278        if (msg) {
     279                msg->done = false;
     280                msg->forget = false;
     281                msg->destroyed = false;
     282                msg->dataptr = NULL;
     283                msg->retval = (sysarg_t) EINVAL;
     284                awaiter_initialize(&msg->wdata);
     285        }
     286
     287        return msg;
     288}
     289
     290static void amsg_destroy(amsg_t *msg)
     291{
     292        assert(!msg->destroyed);
     293        msg->destroyed = true;
     294        free(msg);
     295}
     296
    243297static void *default_client_data_constructor(void)
    244298{
     
    257311void async_set_client_data_constructor(async_client_data_ctor_t ctor)
    258312{
     313        assert(async_client_data_create == default_client_data_constructor);
    259314        async_client_data_create = ctor;
    260315}
     
    262317void async_set_client_data_destructor(async_client_data_dtor_t dtor)
    263318{
     319        assert(async_client_data_destroy == default_client_data_destructor);
    264320        async_client_data_destroy = dtor;
    265321}
     
    303359void async_set_client_connection(async_client_conn_t conn)
    304360{
     361        assert(client_connection == default_client_connection);
    305362        client_connection = conn;
    306363}
     
    9601017               
    9611018                suseconds_t timeout;
     1019                unsigned int flags = SYNCH_FLAGS_NONE;
    9621020                if (!list_empty(&timeout_list)) {
    9631021                        awaiter_t *waiter = list_get_instance(
     
    9701028                                futex_up(&async_futex);
    9711029                                handle_expired_timeouts();
    972                                 continue;
    973                         } else
     1030                                /*
     1031                                 * Notice that even if the event(s) already
     1032                                 * expired (and thus the other fibril was
     1033                                 * supposed to be running already),
     1034                                 * we check for incoming IPC.
     1035                                 *
     1036                                 * Otherwise, a fibril that continuously
     1037                                 * creates (almost) expired events could
     1038                                 * prevent IPC retrieval from the kernel.
     1039                                 */
     1040                                timeout = 0;
     1041                                flags = SYNCH_FLAGS_NON_BLOCKING;
     1042
     1043                        } else {
    9741044                                timeout = tv_sub(&waiter->to_event.expires, &tv);
    975                 } else
     1045                                futex_up(&async_futex);
     1046                        }
     1047                } else {
     1048                        futex_up(&async_futex);
    9761049                        timeout = SYNCH_NO_TIMEOUT;
    977                
    978                 futex_up(&async_futex);
     1050                }
    9791051               
    9801052                atomic_inc(&threads_in_ipc_wait);
    9811053               
    9821054                ipc_call_t call;
    983                 ipc_callid_t callid = ipc_wait_cycle(&call, timeout,
    984                     SYNCH_FLAGS_NONE);
     1055                ipc_callid_t callid = ipc_wait_cycle(&call, timeout, flags);
    9851056               
    9861057                atomic_dec(&threads_in_ipc_wait);
     
    10971168       
    10981169        msg->done = true;
    1099         if (!msg->wdata.active) {
     1170
     1171        if (msg->forget) {
     1172                assert(msg->wdata.active);
     1173                amsg_destroy(msg);
     1174        } else if (!msg->wdata.active) {
    11001175                msg->wdata.active = true;
    11011176                fibril_add_ready(msg->wdata.fid);
    11021177        }
    1103        
     1178
    11041179        futex_up(&async_futex);
    11051180}
     
    11281203                return 0;
    11291204       
    1130         amsg_t *msg = malloc(sizeof(amsg_t));
     1205        amsg_t *msg = amsg_create();
    11311206        if (msg == NULL)
    11321207                return 0;
    11331208       
    1134         msg->done = false;
    11351209        msg->dataptr = dataptr;
    1136        
    1137         msg->wdata.to_event.inlist = false;
    1138        
    1139         /*
    1140          * We may sleep in the next method,
    1141          * but it will use its own means
    1142          */
    11431210        msg->wdata.active = true;
    11441211       
     
    11741241                return 0;
    11751242       
    1176         amsg_t *msg = malloc(sizeof(amsg_t));
    1177        
     1243        amsg_t *msg = amsg_create();
    11781244        if (msg == NULL)
    11791245                return 0;
    11801246       
    1181         msg->done = false;
    11821247        msg->dataptr = dataptr;
    1183        
    1184         msg->wdata.to_event.inlist = false;
    1185        
    1186         /*
    1187          * We may sleep in the next method,
    1188          * but it will use its own means
    1189          */
    11901248        msg->wdata.active = true;
    11911249       
     
    12101268       
    12111269        futex_down(&async_futex);
     1270
     1271        assert(!msg->forget);
     1272        assert(!msg->destroyed);
     1273
    12121274        if (msg->done) {
    12131275                futex_up(&async_futex);
     
    12281290                *retval = msg->retval;
    12291291       
    1230         free(msg);
     1292        amsg_destroy(msg);
    12311293}
    12321294
    12331295/** Wait for a message sent by the async framework, timeout variant.
     1296 *
     1297 * If the wait times out, the caller may choose to either wait again by calling
     1298 * async_wait_for() or async_wait_timeout(), or forget the message via
     1299 * async_forget().
    12341300 *
    12351301 * @param amsgid  Hash of the message to wait for.
     
    12461312       
    12471313        amsg_t *msg = (amsg_t *) amsgid;
    1248        
    1249         /* TODO: Let it go through the event read at least once */
    1250         if (timeout < 0)
    1251                 return ETIMEOUT;
    1252        
     1314
    12531315        futex_down(&async_futex);
     1316
     1317        assert(!msg->forget);
     1318        assert(!msg->destroyed);
     1319
    12541320        if (msg->done) {
    12551321                futex_up(&async_futex);
     
    12571323        }
    12581324       
     1325        /*
     1326         * Negative timeout is converted to zero timeout to avoid
     1327         * using tv_add with negative augmenter.
     1328         */
     1329        if (timeout < 0)
     1330                timeout = 0;
     1331
    12591332        gettimeofday(&msg->wdata.to_event.expires, NULL);
    12601333        tv_add(&msg->wdata.to_event.expires, timeout);
    12611334       
     1335        /*
     1336         * Current fibril is inserted as waiting regardless of the
     1337         * "size" of the timeout.
     1338         *
     1339         * Checking for msg->done and immediately bailing out when
     1340         * timeout == 0 would mean that the manager fibril would never
     1341         * run (consider single threaded program).
     1342         * Thus the IPC answer would be never retrieved from the kernel.
     1343         *
     1344         * Notice that the actual delay would be very small because we
     1345         * - switch to manager fibril
     1346         * - the manager sees expired timeout
     1347         * - and thus adds us back to ready queue
     1348         * - manager switches back to some ready fibril
     1349         *   (prior it, it checks for incoming IPC).
     1350         *
     1351         */
    12621352        msg->wdata.fid = fibril_get_id();
    12631353        msg->wdata.active = false;
     
    12761366                *retval = msg->retval;
    12771367       
    1278         free(msg);
     1368        amsg_destroy(msg);
    12791369       
    12801370        return 0;
    12811371}
     1372 
     1373/** Discard the message / reply on arrival.
     1374 *
     1375 * The message will be marked to be discarded once the reply arrives in
     1376 * reply_received(). It is not allowed to call async_wait_for() or
     1377 * async_wait_timeout() on this message after a call to this function.
     1378 *
     1379 * @param amsgid  Hash of the message to forget.
     1380 */
     1381void async_forget(aid_t amsgid)
     1382{
     1383        amsg_t *msg = (amsg_t *) amsgid;
     1384
     1385        assert(msg);
     1386        assert(!msg->forget);
     1387        assert(!msg->destroyed);
     1388
     1389        futex_down(&async_futex);
     1390        if (msg->done)
     1391                amsg_destroy(msg);
     1392        else
     1393                msg->forget = true;
     1394        futex_up(&async_futex);
     1395}
    12821396
    12831397/** Wait for specified time.
     
    12901404void async_usleep(suseconds_t timeout)
    12911405{
    1292         amsg_t *msg = malloc(sizeof(amsg_t));
    1293        
     1406        amsg_t *msg = amsg_create();
    12941407        if (!msg)
    12951408                return;
    12961409       
    12971410        msg->wdata.fid = fibril_get_id();
    1298         msg->wdata.active = false;
    12991411       
    13001412        gettimeofday(&msg->wdata.to_event.expires, NULL);
     
    13101422        /* Futex is up automatically after fibril_switch() */
    13111423       
    1312         free(msg);
     1424        amsg_destroy(msg);
    13131425}
    13141426
     
    15811693        ipc_call_t result;
    15821694       
    1583         amsg_t *msg = malloc(sizeof(amsg_t));
    1584         if (msg == NULL) {
     1695        amsg_t *msg = amsg_create();
     1696        if (!msg) {
    15851697                free(sess);
    15861698                errno = ENOMEM;
     
    15881700        }
    15891701       
    1590         msg->done = false;
    15911702        msg->dataptr = &result;
    1592        
    1593         msg->wdata.to_event.inlist = false;
    1594        
    1595         /*
    1596          * We may sleep in the next method,
    1597          * but it will use its own means
    1598          */
    15991703        msg->wdata.active = true;
    16001704       
     
    16401744        ipc_call_t result;
    16411745       
    1642         amsg_t *msg = malloc(sizeof(amsg_t));
    1643         if (msg == NULL)
     1746        amsg_t *msg = amsg_create();
     1747        if (!msg)
    16441748                return ENOENT;
    16451749       
    1646         msg->done = false;
    16471750        msg->dataptr = &result;
    1648        
    1649         msg->wdata.to_event.inlist = false;
    1650        
    1651         /*
    1652          * We may sleep in the next method,
    1653          * but it will use its own means
    1654          */
    16551751        msg->wdata.active = true;
    16561752       
     
    18461942       
    18471943        fibril_mutex_lock(&async_sess_mutex);
    1848 
     1944       
    18491945        int rc = async_hangup_internal(sess->phone);
    18501946       
     
    19982094 *
    19992095 * @param exch  Exchange for sending the message.
    2000  * @param dst   Destination address space area base.
    20012096 * @param size  Size of the destination address space area.
    20022097 * @param arg   User defined argument.
    20032098 * @param flags Storage for the received flags. Can be NULL.
     2099 * @param dst   Destination address space area base. Cannot be NULL.
    20042100 *
    20052101 * @return Zero on success or a negative error code from errno.h.
    20062102 *
    20072103 */
    2008 int async_share_in_start(async_exch_t *exch, void *dst, size_t size,
    2009     sysarg_t arg, unsigned int *flags)
     2104int async_share_in_start(async_exch_t *exch, size_t size, sysarg_t arg,
     2105    unsigned int *flags, void **dst)
    20102106{
    20112107        if (exch == NULL)
    20122108                return ENOENT;
    20132109       
    2014         sysarg_t tmp_flags;
    2015         int res = async_req_3_2(exch, IPC_M_SHARE_IN, (sysarg_t) dst,
    2016             (sysarg_t) size, arg, NULL, &tmp_flags);
     2110        sysarg_t _flags = 0;
     2111        sysarg_t _dst = (sysarg_t) -1;
     2112        int res = async_req_2_4(exch, IPC_M_SHARE_IN, (sysarg_t) size,
     2113            arg, NULL, &_flags, NULL, &_dst);
    20172114       
    20182115        if (flags)
    2019                 *flags = (unsigned int) tmp_flags;
    2020        
     2116                *flags = (unsigned int) _flags;
     2117       
     2118        *dst = (void *) _dst;
    20212119        return res;
    20222120}
     
    20472145                return false;
    20482146       
    2049         *size = (size_t) IPC_GET_ARG2(data);
     2147        *size = (size_t) IPC_GET_ARG1(data);
    20502148        return true;
    20512149}
     
    20532151/** Wrapper for answering the IPC_M_SHARE_IN calls using the async framework.
    20542152 *
    2055  * This wrapper only makes it more comfortable to answer IPC_M_DATA_READ
     2153 * This wrapper only makes it more comfortable to answer IPC_M_SHARE_IN
    20562154 * calls so that the user doesn't have to remember the meaning of each IPC
    20572155 * argument.
     
    21312229 *
    21322230 */
    2133 int async_share_out_finalize(ipc_callid_t callid, void *dst)
     2231int async_share_out_finalize(ipc_callid_t callid, void **dst)
    21342232{
    21352233        return ipc_share_out_finalize(callid, dst);
     
    22462344            IPC_FF_ROUTE_FROM_ME);
    22472345        if (retval != EOK) {
    2248                 async_wait_for(msg, NULL);
     2346                async_forget(msg);
    22492347                ipc_answer_0(callid, retval);
    22502348                return retval;
     
    24402538            IPC_FF_ROUTE_FROM_ME);
    24412539        if (retval != EOK) {
    2442                 async_wait_for(msg, NULL);
     2540                async_forget(msg);
    24432541                ipc_answer_0(callid, retval);
    24442542                return retval;
Note: See TracChangeset for help on using the changeset viewer.