Changes in uspace/lib/c/generic/async.c [cff3fb6:1db6dfd] in mainline
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
uspace/lib/c/generic/async.c
rcff3fb6 r1db6dfd 189 189 /** If reply was received. */ 190 190 bool done; 191 192 /** If the message / reply should be discarded on arrival. */ 193 bool forget; 194 195 /** If already destroyed. */ 196 bool destroyed; 191 197 192 198 /** Pointer to where the answer data is stored. */ … … 241 247 static fibril_local connection_t *fibril_connection; 242 248 249 static void to_event_initialize(to_event_t *to) 250 { 251 struct timeval tv = { 0, 0 }; 252 253 to->inlist = false; 254 to->occurred = false; 255 link_initialize(&to->link); 256 to->expires = tv; 257 } 258 259 static void wu_event_initialize(wu_event_t *wu) 260 { 261 wu->inlist = false; 262 link_initialize(&wu->link); 263 } 264 265 void awaiter_initialize(awaiter_t *aw) 266 { 267 aw->fid = 0; 268 aw->active = false; 269 to_event_initialize(&aw->to_event); 270 wu_event_initialize(&aw->wu_event); 271 } 272 273 static amsg_t *amsg_create(void) 274 { 275 amsg_t *msg; 276 277 msg = malloc(sizeof(amsg_t)); 278 if (msg) { 279 msg->done = false; 280 msg->forget = false; 281 msg->destroyed = false; 282 msg->dataptr = NULL; 283 msg->retval = (sysarg_t) EINVAL; 284 awaiter_initialize(&msg->wdata); 285 } 286 287 return msg; 288 } 289 290 static void amsg_destroy(amsg_t *msg) 291 { 292 assert(!msg->destroyed); 293 msg->destroyed = true; 294 free(msg); 295 } 296 243 297 static void *default_client_data_constructor(void) 244 298 { … … 257 311 void async_set_client_data_constructor(async_client_data_ctor_t ctor) 258 312 { 313 assert(async_client_data_create == default_client_data_constructor); 259 314 async_client_data_create = ctor; 260 315 } … … 262 317 void async_set_client_data_destructor(async_client_data_dtor_t dtor) 263 318 { 319 assert(async_client_data_destroy == default_client_data_destructor); 264 320 async_client_data_destroy = dtor; 265 321 } … … 303 359 void async_set_client_connection(async_client_conn_t conn) 304 360 { 361 assert(client_connection == default_client_connection); 305 362 client_connection = conn; 306 363 } … … 960 1017 961 1018 suseconds_t timeout; 1019 unsigned int flags = SYNCH_FLAGS_NONE; 962 1020 if (!list_empty(&timeout_list)) { 963 1021 awaiter_t *waiter = list_get_instance( … … 970 1028 futex_up(&async_futex); 971 1029 handle_expired_timeouts(); 972 continue; 973 } else 1030 /* 1031 * Notice that even if the event(s) already 1032 * expired (and thus the other fibril was 1033 * supposed to be running already), 1034 * we check for incoming IPC. 1035 * 1036 * Otherwise, a fibril that continuously 1037 * creates (almost) expired events could 1038 * prevent IPC retrieval from the kernel. 1039 */ 1040 timeout = 0; 1041 flags = SYNCH_FLAGS_NON_BLOCKING; 1042 1043 } else { 974 1044 timeout = tv_sub(&waiter->to_event.expires, &tv); 975 } else 1045 futex_up(&async_futex); 1046 } 1047 } else { 1048 futex_up(&async_futex); 976 1049 timeout = SYNCH_NO_TIMEOUT; 977 978 futex_up(&async_futex); 1050 } 979 1051 980 1052 atomic_inc(&threads_in_ipc_wait); 981 1053 982 1054 ipc_call_t call; 983 ipc_callid_t callid = ipc_wait_cycle(&call, timeout, 984 SYNCH_FLAGS_NONE); 1055 ipc_callid_t callid = ipc_wait_cycle(&call, timeout, flags); 985 1056 986 1057 atomic_dec(&threads_in_ipc_wait); … … 1097 1168 1098 1169 msg->done = true; 1099 if (!msg->wdata.active) { 1170 1171 if (msg->forget) { 1172 assert(msg->wdata.active); 1173 amsg_destroy(msg); 1174 } else if (!msg->wdata.active) { 1100 1175 msg->wdata.active = true; 1101 1176 fibril_add_ready(msg->wdata.fid); 1102 1177 } 1103 1178 1104 1179 futex_up(&async_futex); 1105 1180 } … … 1128 1203 return 0; 1129 1204 1130 amsg_t *msg = malloc(sizeof(amsg_t));1205 amsg_t *msg = amsg_create(); 1131 1206 if (msg == NULL) 1132 1207 return 0; 1133 1208 1134 msg->done = false;1135 1209 msg->dataptr = dataptr; 1136 1137 msg->wdata.to_event.inlist = false;1138 1139 /*1140 * We may sleep in the next method,1141 * but it will use its own means1142 */1143 1210 msg->wdata.active = true; 1144 1211 … … 1174 1241 return 0; 1175 1242 1176 amsg_t *msg = malloc(sizeof(amsg_t)); 1177 1243 amsg_t *msg = amsg_create(); 1178 1244 if (msg == NULL) 1179 1245 return 0; 1180 1246 1181 msg->done = false;1182 1247 msg->dataptr = dataptr; 1183 1184 msg->wdata.to_event.inlist = false;1185 1186 /*1187 * We may sleep in the next method,1188 * but it will use its own means1189 */1190 1248 msg->wdata.active = true; 1191 1249 … … 1210 1268 1211 1269 futex_down(&async_futex); 1270 1271 assert(!msg->forget); 1272 assert(!msg->destroyed); 1273 1212 1274 if (msg->done) { 1213 1275 futex_up(&async_futex); … … 1228 1290 *retval = msg->retval; 1229 1291 1230 free(msg);1292 amsg_destroy(msg); 1231 1293 } 1232 1294 1233 1295 /** Wait for a message sent by the async framework, timeout variant. 1296 * 1297 * If the wait times out, the caller may choose to either wait again by calling 1298 * async_wait_for() or async_wait_timeout(), or forget the message via 1299 * async_forget(). 1234 1300 * 1235 1301 * @param amsgid Hash of the message to wait for. … … 1246 1312 1247 1313 amsg_t *msg = (amsg_t *) amsgid; 1248 1249 /* TODO: Let it go through the event read at least once */ 1250 if (timeout < 0) 1251 return ETIMEOUT; 1252 1314 1253 1315 futex_down(&async_futex); 1316 1317 assert(!msg->forget); 1318 assert(!msg->destroyed); 1319 1254 1320 if (msg->done) { 1255 1321 futex_up(&async_futex); … … 1257 1323 } 1258 1324 1325 /* 1326 * Negative timeout is converted to zero timeout to avoid 1327 * using tv_add with negative augmenter. 1328 */ 1329 if (timeout < 0) 1330 timeout = 0; 1331 1259 1332 gettimeofday(&msg->wdata.to_event.expires, NULL); 1260 1333 tv_add(&msg->wdata.to_event.expires, timeout); 1261 1334 1335 /* 1336 * Current fibril is inserted as waiting regardless of the 1337 * "size" of the timeout. 1338 * 1339 * Checking for msg->done and immediately bailing out when 1340 * timeout == 0 would mean that the manager fibril would never 1341 * run (consider single threaded program). 1342 * Thus the IPC answer would be never retrieved from the kernel. 1343 * 1344 * Notice that the actual delay would be very small because we 1345 * - switch to manager fibril 1346 * - the manager sees expired timeout 1347 * - and thus adds us back to ready queue 1348 * - manager switches back to some ready fibril 1349 * (prior it, it checks for incoming IPC). 1350 * 1351 */ 1262 1352 msg->wdata.fid = fibril_get_id(); 1263 1353 msg->wdata.active = false; … … 1276 1366 *retval = msg->retval; 1277 1367 1278 free(msg);1368 amsg_destroy(msg); 1279 1369 1280 1370 return 0; 1281 1371 } 1372 1373 /** Discard the message / reply on arrival. 1374 * 1375 * The message will be marked to be discarded once the reply arrives in 1376 * reply_received(). It is not allowed to call async_wait_for() or 1377 * async_wait_timeout() on this message after a call to this function. 1378 * 1379 * @param amsgid Hash of the message to forget. 1380 */ 1381 void async_forget(aid_t amsgid) 1382 { 1383 amsg_t *msg = (amsg_t *) amsgid; 1384 1385 assert(msg); 1386 assert(!msg->forget); 1387 assert(!msg->destroyed); 1388 1389 futex_down(&async_futex); 1390 if (msg->done) 1391 amsg_destroy(msg); 1392 else 1393 msg->forget = true; 1394 futex_up(&async_futex); 1395 } 1282 1396 1283 1397 /** Wait for specified time. … … 1290 1404 void async_usleep(suseconds_t timeout) 1291 1405 { 1292 amsg_t *msg = malloc(sizeof(amsg_t)); 1293 1406 amsg_t *msg = amsg_create(); 1294 1407 if (!msg) 1295 1408 return; 1296 1409 1297 1410 msg->wdata.fid = fibril_get_id(); 1298 msg->wdata.active = false;1299 1411 1300 1412 gettimeofday(&msg->wdata.to_event.expires, NULL); … … 1310 1422 /* Futex is up automatically after fibril_switch() */ 1311 1423 1312 free(msg);1424 amsg_destroy(msg); 1313 1425 } 1314 1426 … … 1581 1693 ipc_call_t result; 1582 1694 1583 amsg_t *msg = malloc(sizeof(amsg_t));1584 if ( msg == NULL) {1695 amsg_t *msg = amsg_create(); 1696 if (!msg) { 1585 1697 free(sess); 1586 1698 errno = ENOMEM; … … 1588 1700 } 1589 1701 1590 msg->done = false;1591 1702 msg->dataptr = &result; 1592 1593 msg->wdata.to_event.inlist = false;1594 1595 /*1596 * We may sleep in the next method,1597 * but it will use its own means1598 */1599 1703 msg->wdata.active = true; 1600 1704 … … 1640 1744 ipc_call_t result; 1641 1745 1642 amsg_t *msg = malloc(sizeof(amsg_t));1643 if ( msg == NULL)1746 amsg_t *msg = amsg_create(); 1747 if (!msg) 1644 1748 return ENOENT; 1645 1749 1646 msg->done = false;1647 1750 msg->dataptr = &result; 1648 1649 msg->wdata.to_event.inlist = false;1650 1651 /*1652 * We may sleep in the next method,1653 * but it will use its own means1654 */1655 1751 msg->wdata.active = true; 1656 1752 … … 1846 1942 1847 1943 fibril_mutex_lock(&async_sess_mutex); 1848 1944 1849 1945 int rc = async_hangup_internal(sess->phone); 1850 1946 … … 1998 2094 * 1999 2095 * @param exch Exchange for sending the message. 2000 * @param dst Destination address space area base.2001 2096 * @param size Size of the destination address space area. 2002 2097 * @param arg User defined argument. 2003 2098 * @param flags Storage for the received flags. Can be NULL. 2099 * @param dst Destination address space area base. Cannot be NULL. 2004 2100 * 2005 2101 * @return Zero on success or a negative error code from errno.h. 2006 2102 * 2007 2103 */ 2008 int async_share_in_start(async_exch_t *exch, void *dst, size_t size,2009 sysarg_t arg, unsigned int *flags)2104 int async_share_in_start(async_exch_t *exch, size_t size, sysarg_t arg, 2105 unsigned int *flags, void **dst) 2010 2106 { 2011 2107 if (exch == NULL) 2012 2108 return ENOENT; 2013 2109 2014 sysarg_t tmp_flags; 2015 int res = async_req_3_2(exch, IPC_M_SHARE_IN, (sysarg_t) dst, 2016 (sysarg_t) size, arg, NULL, &tmp_flags); 2110 sysarg_t _flags = 0; 2111 sysarg_t _dst = (sysarg_t) -1; 2112 int res = async_req_2_4(exch, IPC_M_SHARE_IN, (sysarg_t) size, 2113 arg, NULL, &_flags, NULL, &_dst); 2017 2114 2018 2115 if (flags) 2019 *flags = (unsigned int) tmp_flags; 2020 2116 *flags = (unsigned int) _flags; 2117 2118 *dst = (void *) _dst; 2021 2119 return res; 2022 2120 } … … 2047 2145 return false; 2048 2146 2049 *size = (size_t) IPC_GET_ARG 2(data);2147 *size = (size_t) IPC_GET_ARG1(data); 2050 2148 return true; 2051 2149 } … … 2053 2151 /** Wrapper for answering the IPC_M_SHARE_IN calls using the async framework. 2054 2152 * 2055 * This wrapper only makes it more comfortable to answer IPC_M_ DATA_READ2153 * This wrapper only makes it more comfortable to answer IPC_M_SHARE_IN 2056 2154 * calls so that the user doesn't have to remember the meaning of each IPC 2057 2155 * argument. … … 2131 2229 * 2132 2230 */ 2133 int async_share_out_finalize(ipc_callid_t callid, void * dst)2231 int async_share_out_finalize(ipc_callid_t callid, void **dst) 2134 2232 { 2135 2233 return ipc_share_out_finalize(callid, dst); … … 2246 2344 IPC_FF_ROUTE_FROM_ME); 2247 2345 if (retval != EOK) { 2248 async_ wait_for(msg, NULL);2346 async_forget(msg); 2249 2347 ipc_answer_0(callid, retval); 2250 2348 return retval; … … 2440 2538 IPC_FF_ROUTE_FROM_ME); 2441 2539 if (retval != EOK) { 2442 async_ wait_for(msg, NULL);2540 async_forget(msg); 2443 2541 ipc_answer_0(callid, retval); 2444 2542 return retval;
Note:
See TracChangeset
for help on using the changeset viewer.