00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00090 #include <futex.h>
00091 #include <async.h>
00092 #include <psthread.h>
00093 #include <stdio.h>
00094 #include <libadt/hash_table.h>
00095 #include <libadt/list.h>
00096 #include <ipc/ipc.h>
00097 #include <assert.h>
00098 #include <errno.h>
00099 #include <time.h>
00100 #include <arch/barrier.h>
00101
00102 atomic_t async_futex = FUTEX_INITIALIZER;
00103 static hash_table_t conn_hash_table;
00104 static LIST_INITIALIZE(timeout_list);
00105
00106 typedef struct {
00107 struct timeval expires;
00108 int inlist;
00109 link_t link;
00110
00111 pstid_t ptid;
00112 int active;
00113 int timedout;
00114 } awaiter_t;
00115
00116 typedef struct {
00117 awaiter_t wdata;
00118
00119 int done;
00120 ipc_call_t *dataptr;
00122 ipcarg_t retval;
00123 } amsg_t;
00124
00125 typedef struct {
00126 link_t link;
00127 ipc_callid_t callid;
00128 ipc_call_t call;
00129 } msg_t;
00130
00131 typedef struct {
00132 awaiter_t wdata;
00133
00134 link_t link;
00135 ipcarg_t in_phone_hash;
00136 link_t msg_queue;
00137
00138 ipc_callid_t callid;
00139 ipc_call_t call;
00140 ipc_callid_t close_callid;
00141 void (*cthread)(ipc_callid_t,ipc_call_t *);
00142 } connection_t;
00143
00145 __thread connection_t *PS_connection;
00148 __thread int in_interrupt_handler;
00149
00150 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call);
00151 static void default_interrupt_received(ipc_callid_t callid, ipc_call_t *call);
00152 static async_client_conn_t client_connection = default_client_connection;
00153 static async_client_conn_t interrupt_received = default_interrupt_received;
00154
00156 static void tv_add(struct timeval *tv, suseconds_t usecs)
00157 {
00158 tv->tv_sec += usecs / 1000000;
00159 tv->tv_usec += usecs % 1000000;
00160 if (tv->tv_usec > 1000000) {
00161 tv->tv_sec++;
00162 tv->tv_usec -= 1000000;
00163 }
00164 }
00165
00167 static suseconds_t tv_sub(struct timeval *tv1, struct timeval *tv2)
00168 {
00169 suseconds_t result;
00170
00171 result = tv1->tv_usec - tv2->tv_usec;
00172 result += (tv1->tv_sec - tv2->tv_sec) * 1000000;
00173
00174 return result;
00175 }
00176
00181 static int tv_gt(struct timeval *tv1, struct timeval *tv2)
00182 {
00183 if (tv1->tv_sec > tv2->tv_sec)
00184 return 1;
00185 if (tv1->tv_sec == tv2->tv_sec && tv1->tv_usec > tv2->tv_usec)
00186 return 1;
00187 return 0;
00188 }
00189 static int tv_gteq(struct timeval *tv1, struct timeval *tv2)
00190 {
00191 if (tv1->tv_sec > tv2->tv_sec)
00192 return 1;
00193 if (tv1->tv_sec == tv2->tv_sec && tv1->tv_usec >= tv2->tv_usec)
00194 return 1;
00195 return 0;
00196 }
00197
00198
00199 #define CONN_HASH_TABLE_CHAINS 32
00200
00201 static hash_index_t conn_hash(unsigned long *key)
00202 {
00203 assert(key);
00204 return ((*key) >> 4) % CONN_HASH_TABLE_CHAINS;
00205 }
00206
00207 static int conn_compare(unsigned long key[], hash_count_t keys, link_t *item)
00208 {
00209 connection_t *hs;
00210
00211 hs = hash_table_get_instance(item, connection_t, link);
00212
00213 return key[0] == hs->in_phone_hash;
00214 }
00215
00216 static void conn_remove(link_t *item)
00217 {
00218 free(hash_table_get_instance(item, connection_t, link));
00219 }
00220
00221
00223 static hash_table_operations_t conn_hash_table_ops = {
00224 .hash = conn_hash,
00225 .compare = conn_compare,
00226 .remove_callback = conn_remove
00227 };
00228
00232 static void insert_timeout(awaiter_t *wd)
00233 {
00234 link_t *tmp;
00235 awaiter_t *cur;
00236
00237 wd->timedout = 0;
00238 wd->inlist = 1;
00239
00240 tmp = timeout_list.next;
00241 while (tmp != &timeout_list) {
00242 cur = list_get_instance(tmp, awaiter_t, link);
00243 if (tv_gteq(&cur->expires, &wd->expires))
00244 break;
00245 tmp = tmp->next;
00246 }
00247 list_append(&wd->link, tmp);
00248 }
00249
00250
00251
00255 static int route_call(ipc_callid_t callid, ipc_call_t *call)
00256 {
00257 connection_t *conn;
00258 msg_t *msg;
00259 link_t *hlp;
00260 unsigned long key;
00261
00262 futex_down(&async_futex);
00263
00264 key = call->in_phone_hash;
00265 hlp = hash_table_find(&conn_hash_table, &key);
00266 if (!hlp) {
00267 futex_up(&async_futex);
00268 return 0;
00269 }
00270 conn = hash_table_get_instance(hlp, connection_t, link);
00271
00272 msg = malloc(sizeof(*msg));
00273 msg->callid = callid;
00274 msg->call = *call;
00275 list_append(&msg->link, &conn->msg_queue);
00276
00277 if (IPC_GET_METHOD(*call) == IPC_M_PHONE_HUNGUP)
00278 conn->close_callid = callid;
00279
00280
00281 if (!conn->wdata.active) {
00282
00283 if (conn->wdata.inlist) {
00284 conn->wdata.inlist = 0;
00285 list_remove(&conn->wdata.link);
00286 }
00287 conn->wdata.active = 1;
00288 psthread_add_ready(conn->wdata.ptid);
00289 }
00290
00291 futex_up(&async_futex);
00292
00293 return 1;
00294 }
00295
00297 ipc_callid_t async_get_call_timeout(ipc_call_t *call, suseconds_t usecs)
00298 {
00299 msg_t *msg;
00300 ipc_callid_t callid;
00301 connection_t *conn;
00302
00303 assert(PS_connection);
00304
00305
00306
00307
00308
00309 conn = PS_connection;
00310
00311 futex_down(&async_futex);
00312
00313 if (usecs) {
00314 gettimeofday(&conn->wdata.expires, NULL);
00315 tv_add(&conn->wdata.expires, usecs);
00316 } else {
00317 conn->wdata.inlist = 0;
00318 }
00319
00320 while (list_empty(&conn->msg_queue)) {
00321 if (usecs)
00322 insert_timeout(&conn->wdata);
00323
00324 conn->wdata.active = 0;
00325 psthread_schedule_next_adv(PS_TO_MANAGER);
00326
00327
00328 futex_down(&async_futex);
00329 if (usecs && conn->wdata.timedout && \
00330 list_empty(&conn->msg_queue)) {
00331
00332 futex_up(&async_futex);
00333 return 0;
00334 }
00335 }
00336
00337 msg = list_get_instance(conn->msg_queue.next, msg_t, link);
00338 list_remove(&msg->link);
00339 callid = msg->callid;
00340 *call = msg->call;
00341 free(msg);
00342
00343 futex_up(&async_futex);
00344 return callid;
00345 }
00346
00352 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call)
00353 {
00354 ipc_answer_fast(callid, ENOENT, 0, 0);
00355 }
00356 static void default_interrupt_received(ipc_callid_t callid, ipc_call_t *call)
00357 {
00358 }
00359
00367 static int connection_thread(void *arg)
00368 {
00369 unsigned long key;
00370 msg_t *msg;
00371 int close_answered = 0;
00372
00373
00374 PS_connection = (connection_t *)arg;
00375 PS_connection->cthread(PS_connection->callid, &PS_connection->call);
00376
00377
00378 futex_down(&async_futex);
00379 key = PS_connection->in_phone_hash;
00380 hash_table_remove(&conn_hash_table, &key, 1);
00381 futex_up(&async_futex);
00382
00383
00384 while (!list_empty(&PS_connection->msg_queue)) {
00385 msg = list_get_instance(PS_connection->msg_queue.next, msg_t, link);
00386 list_remove(&msg->link);
00387 if (msg->callid == PS_connection->close_callid)
00388 close_answered = 1;
00389 ipc_answer_fast(msg->callid, EHANGUP, 0, 0);
00390 free(msg);
00391 }
00392 if (PS_connection->close_callid)
00393 ipc_answer_fast(PS_connection->close_callid, 0, 0, 0);
00394
00395 return 0;
00396 }
00397
00412 pstid_t async_new_connection(ipcarg_t in_phone_hash,ipc_callid_t callid, ipc_call_t *call, void (*cthread)(ipc_callid_t, ipc_call_t *))
00413 {
00414 connection_t *conn;
00415 unsigned long key;
00416
00417 conn = malloc(sizeof(*conn));
00418 if (!conn) {
00419 ipc_answer_fast(callid, ENOMEM, 0, 0);
00420 return NULL;
00421 }
00422 conn->in_phone_hash = in_phone_hash;
00423 list_initialize(&conn->msg_queue);
00424 conn->callid = callid;
00425 conn->close_callid = 0;
00426 if (call)
00427 conn->call = *call;
00428 conn->wdata.active = 1;
00429 conn->cthread = cthread;
00430
00431 conn->wdata.ptid = psthread_create(connection_thread, conn);
00432 if (!conn->wdata.ptid) {
00433 free(conn);
00434 ipc_answer_fast(callid, ENOMEM, 0, 0);
00435 return NULL;
00436 }
00437
00438 key = conn->in_phone_hash;
00439 futex_down(&async_futex);
00440 hash_table_insert(&conn_hash_table, &key, &conn->link);
00441 futex_up(&async_futex);
00442
00443 psthread_add_ready(conn->wdata.ptid);
00444
00445 return conn->wdata.ptid;
00446 }
00447
00449 static void handle_call(ipc_callid_t callid, ipc_call_t *call)
00450 {
00451
00452 if ((callid & IPC_CALLID_NOTIFICATION)) {
00453 in_interrupt_handler = 1;
00454 (*interrupt_received)(callid,call);
00455 in_interrupt_handler = 0;
00456 return;
00457 }
00458
00459 switch (IPC_GET_METHOD(*call)) {
00460 case IPC_M_CONNECT_ME_TO:
00461
00462 async_new_connection(IPC_GET_ARG3(*call), callid, call, client_connection);
00463 return;
00464 }
00465
00466
00467 if (route_call(callid, call))
00468 return;
00469
00470
00471 ipc_answer_fast(callid, EHANGUP, 0, 0);
00472 }
00473
00477 static void handle_expired_timeouts(void)
00478 {
00479 struct timeval tv;
00480 awaiter_t *waiter;
00481 link_t *cur;
00482
00483 gettimeofday(&tv,NULL);
00484 futex_down(&async_futex);
00485
00486 cur = timeout_list.next;
00487 while (cur != &timeout_list) {
00488 waiter = list_get_instance(cur,awaiter_t,link);
00489 if (tv_gt(&waiter->expires, &tv))
00490 break;
00491 cur = cur->next;
00492 list_remove(&waiter->link);
00493 waiter->inlist = 0;
00494 waiter->timedout = 1;
00495
00496
00497
00498 if (!waiter->active) {
00499 waiter->active = 1;
00500 psthread_add_ready(waiter->ptid);
00501 }
00502 }
00503
00504 futex_up(&async_futex);
00505 }
00506
00508 static int async_manager_worker(void)
00509 {
00510 ipc_call_t call;
00511 ipc_callid_t callid;
00512 int timeout;
00513 awaiter_t *waiter;
00514 struct timeval tv;
00515
00516 while (1) {
00517 if (psthread_schedule_next_adv(PS_FROM_MANAGER)) {
00518 futex_up(&async_futex);
00519
00520
00521
00522 continue;
00523 }
00524 futex_down(&async_futex);
00525 if (!list_empty(&timeout_list)) {
00526 waiter = list_get_instance(timeout_list.next,awaiter_t,link);
00527 gettimeofday(&tv,NULL);
00528 if (tv_gteq(&tv, &waiter->expires)) {
00529 futex_up(&async_futex);
00530 handle_expired_timeouts();
00531 continue;
00532 } else
00533 timeout = tv_sub(&waiter->expires, &tv);
00534 } else
00535 timeout = SYNCH_NO_TIMEOUT;
00536 futex_up(&async_futex);
00537
00538 callid = ipc_wait_cycle(&call, timeout, SYNCH_FLAGS_NONE);
00539
00540 if (!callid) {
00541 handle_expired_timeouts();
00542 continue;
00543 }
00544
00545 if (callid & IPC_CALLID_ANSWERED) {
00546 continue;
00547 }
00548
00549 handle_call(callid, &call);
00550 }
00551
00552 return 0;
00553 }
00554
00562 static int async_manager_thread(void *arg)
00563 {
00564 futex_up(&async_futex);
00565
00566
00567 async_manager_worker();
00568
00569 return 0;
00570 }
00571
00573 void async_create_manager(void)
00574 {
00575 pstid_t ptid;
00576
00577 ptid = psthread_create(async_manager_thread, NULL);
00578 psthread_add_manager(ptid);
00579 }
00580
00582 void async_destroy_manager(void)
00583 {
00584 psthread_remove_manager();
00585 }
00586
00588 int _async_init(void)
00589 {
00590 if (!hash_table_create(&conn_hash_table, CONN_HASH_TABLE_CHAINS, 1, &conn_hash_table_ops)) {
00591 printf("%s: cannot create hash table\n", "async");
00592 return ENOMEM;
00593 }
00594
00595 return 0;
00596 }
00597
00602 static void reply_received(void *private, int retval,
00603 ipc_call_t *data)
00604 {
00605 amsg_t *msg = (amsg_t *) private;
00606
00607 msg->retval = retval;
00608
00609 futex_down(&async_futex);
00610
00611
00612
00613 if (msg->dataptr)
00614 *msg->dataptr = *data;
00615
00616 write_barrier();
00617
00618 if (msg->wdata.inlist)
00619 list_remove(&msg->wdata.link);
00620 msg->done = 1;
00621 if (! msg->wdata.active) {
00622 msg->wdata.active = 1;
00623 psthread_add_ready(msg->wdata.ptid);
00624 }
00625 futex_up(&async_futex);
00626 }
00627
00633 aid_t async_send_2(int phoneid, ipcarg_t method, ipcarg_t arg1, ipcarg_t arg2,
00634 ipc_call_t *dataptr)
00635 {
00636 amsg_t *msg;
00637
00638 if (in_interrupt_handler) {
00639 printf("Cannot send asynchronou request in interrupt handler.\n");
00640 _exit(1);
00641 }
00642
00643 msg = malloc(sizeof(*msg));
00644 msg->done = 0;
00645 msg->dataptr = dataptr;
00646
00647 msg->wdata.active = 1;
00648
00649 ipc_call_async_2(phoneid,method,arg1,arg2,msg,reply_received,1);
00650
00651 return (aid_t) msg;
00652 }
00653
00659 aid_t async_send_3(int phoneid, ipcarg_t method, ipcarg_t arg1, ipcarg_t arg2,
00660 ipcarg_t arg3, ipc_call_t *dataptr)
00661 {
00662 amsg_t *msg;
00663
00664 if (in_interrupt_handler) {
00665 printf("Cannot send asynchronou request in interrupt handler.\n");
00666 _exit(1);
00667 }
00668
00669 msg = malloc(sizeof(*msg));
00670 msg->done = 0;
00671 msg->dataptr = dataptr;
00672
00673 msg->wdata.active = 1;
00674
00675 ipc_call_async_3(phoneid,method,arg1,arg2,arg3, msg,reply_received,1);
00676
00677 return (aid_t) msg;
00678 }
00679
00687 void async_wait_for(aid_t amsgid, ipcarg_t *retval)
00688 {
00689 amsg_t *msg = (amsg_t *) amsgid;
00690
00691 futex_down(&async_futex);
00692 if (msg->done) {
00693 futex_up(&async_futex);
00694 goto done;
00695 }
00696
00697 msg->wdata.ptid = psthread_get_id();
00698 msg->wdata.active = 0;
00699 msg->wdata.inlist = 0;
00700
00701 psthread_schedule_next_adv(PS_TO_MANAGER);
00702
00703 done:
00704 if (retval)
00705 *retval = msg->retval;
00706 free(msg);
00707 }
00708
00718 int async_wait_timeout(aid_t amsgid, ipcarg_t *retval, suseconds_t timeout)
00719 {
00720 amsg_t *msg = (amsg_t *) amsgid;
00721
00722
00723 if (timeout < 0)
00724 return ETIMEOUT;
00725
00726 futex_down(&async_futex);
00727 if (msg->done) {
00728 futex_up(&async_futex);
00729 goto done;
00730 }
00731
00732 gettimeofday(&msg->wdata.expires, NULL);
00733 tv_add(&msg->wdata.expires, timeout);
00734
00735 msg->wdata.ptid = psthread_get_id();
00736 msg->wdata.active = 0;
00737 insert_timeout(&msg->wdata);
00738
00739
00740 psthread_schedule_next_adv(PS_TO_MANAGER);
00741
00742
00743 if (!msg->done)
00744 return ETIMEOUT;
00745
00746 done:
00747 if (retval)
00748 *retval = msg->retval;
00749 free(msg);
00750
00751 return 0;
00752 }
00753
00758 void async_usleep(suseconds_t timeout)
00759 {
00760 amsg_t *msg;
00761
00762 if (in_interrupt_handler) {
00763 printf("Cannot call async_usleep in interrupt handler.\n");
00764 _exit(1);
00765 }
00766
00767 msg = malloc(sizeof(*msg));
00768 if (!msg)
00769 return;
00770
00771 msg->wdata.ptid = psthread_get_id();
00772 msg->wdata.active = 0;
00773
00774 gettimeofday(&msg->wdata.expires, NULL);
00775 tv_add(&msg->wdata.expires, timeout);
00776
00777 futex_down(&async_futex);
00778 insert_timeout(&msg->wdata);
00779
00780 psthread_schedule_next_adv(PS_TO_MANAGER);
00781
00782 free(msg);
00783 }
00784
00789 void async_set_client_connection(async_client_conn_t conn)
00790 {
00791 client_connection = conn;
00792 }
00793 void async_set_interrupt_received(async_client_conn_t conn)
00794 {
00795 interrupt_received = conn;
00796 }
00797
00798
00799 void async_msg_3(int phoneid, ipcarg_t method, ipcarg_t arg1,
00800 ipcarg_t arg2, ipcarg_t arg3)
00801 {
00802 ipc_call_async_3(phoneid, method, arg1, arg2, arg3, NULL, NULL, !in_interrupt_handler);
00803 }
00804
00805 void async_msg_2(int phoneid, ipcarg_t method, ipcarg_t arg1, ipcarg_t arg2)
00806 {
00807 ipc_call_async_2(phoneid, method, arg1, arg2, NULL, NULL, !in_interrupt_handler);
00808 }
00809
00810