Changes in uspace/lib/c/generic/async.c [cff3fb6:c7bbf029] in mainline
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
uspace/lib/c/generic/async.c
rcff3fb6 rc7bbf029 40 40 * programming. 41 41 * 42 * You should be able to write very simple multithreaded programs. The async 43 * framework will automatically take care of most of the synchronization 44 * problems. 42 * You should be able to write very simple multithreaded programs, the async 43 * framework will automatically take care of most synchronization problems. 45 44 * 46 45 * Example of use (pseudo C): … … 54 53 * int fibril1(void *arg) 55 54 * { 56 * conn = async_connect_me_to(...); 57 * 58 * exch = async_exchange_begin(conn); 59 * c1 = async_send(exch); 60 * async_exchange_end(exch); 61 * 62 * exch = async_exchange_begin(conn); 63 * c2 = async_send(exch); 64 * async_exchange_end(exch); 65 * 55 * conn = async_connect_me_to(); 56 * c1 = async_send(conn); 57 * c2 = async_send(conn); 66 58 * async_wait_for(c1); 67 59 * async_wait_for(c2); … … 98 90 #include <ipc/ipc.h> 99 91 #include <async.h> 100 #include "private/async.h"101 92 #undef LIBC_ASYNC_C_ 102 93 103 94 #include <futex.h> 104 95 #include <fibril.h> 96 #include <stdio.h> 105 97 #include <adt/hash_table.h> 106 98 #include <adt/list.h> … … 108 100 #include <errno.h> 109 101 #include <sys/time.h> 110 #include < libarch/barrier.h>102 #include <arch/barrier.h> 111 103 #include <bool.h> 104 #include <stdlib.h> 112 105 #include <malloc.h> 113 #include <mem.h> 114 #include <stdlib.h> 115 #include <macros.h> 116 117 #define CLIENT_HASH_TABLE_BUCKETS 32 118 #define CONN_HASH_TABLE_BUCKETS 32 119 120 /** Session data */ 121 struct async_sess { 122 /** List of inactive exchanges */ 123 list_t exch_list; 124 125 /** Exchange management style */ 126 exch_mgmt_t mgmt; 127 128 /** Session identification */ 129 int phone; 130 131 /** First clone connection argument */ 132 sysarg_t arg1; 133 134 /** Second clone connection argument */ 135 sysarg_t arg2; 136 137 /** Third clone connection argument */ 138 sysarg_t arg3; 139 140 /** Exchange mutex */ 141 fibril_mutex_t mutex; 142 143 /** Number of opened exchanges */ 144 atomic_t refcnt; 145 146 /** Mutex for stateful connections */ 147 fibril_mutex_t remote_state_mtx; 148 149 /** Data for stateful connections */ 150 void *remote_state_data; 151 }; 152 153 /** Exchange data */ 154 struct async_exch { 155 /** Link into list of inactive exchanges */ 156 link_t sess_link; 157 158 /** Link into global list of inactive exchanges */ 159 link_t global_link; 160 161 /** Session pointer */ 162 async_sess_t *sess; 163 164 /** Exchange identification */ 165 int phone; 166 }; 167 168 /** Async framework global futex */ 106 #include "private/async.h" 107 169 108 atomic_t async_futex = FUTEX_INITIALIZER; 170 109 … … 172 111 atomic_t threads_in_ipc_wait = { 0 }; 173 112 174 /** Naming service session */ 175 async_sess_t *session_ns; 176 177 /** Call data */ 113 typedef struct { 114 awaiter_t wdata; 115 116 /** If reply was received. */ 117 bool done; 118 119 /** Pointer to where the answer data is stored. */ 120 ipc_call_t *dataptr; 121 122 sysarg_t retval; 123 } amsg_t; 124 125 /** 126 * Structures of this type are used to group information about 127 * a call and about a message queue link. 128 */ 178 129 typedef struct { 179 130 link_t link; 180 181 131 ipc_callid_t callid; 182 132 ipc_call_t call; 183 133 } msg_t; 184 134 185 /** Message data */ 135 typedef struct { 136 sysarg_t in_task_hash; 137 link_t link; 138 int refcnt; 139 void *data; 140 } client_t; 141 186 142 typedef struct { 187 143 awaiter_t wdata; 188 144 189 /** If reply was received. */190 bool done;191 192 /** Pointer to where the answer data is stored. */193 ipc_call_t *dataptr;194 195 sysarg_t retval;196 } amsg_t;197 198 /* Client connection data */199 typedef struct {200 link_t link;201 202 task_id_t in_task_id;203 atomic_t refcnt;204 void *data;205 } client_t;206 207 /* Server connection data */208 typedef struct {209 awaiter_t wdata;210 211 145 /** Hash table link. */ 212 146 link_t link; 213 147 214 /** Incoming client task ID. */ 215 task_id_t in_task_id; 216 148 /** Incoming client task hash. */ 149 sysarg_t in_task_hash; 217 150 /** Incoming phone hash. */ 218 151 sysarg_t in_phone_hash; … … 222 155 223 156 /** Messages that should be delivered to this fibril. */ 224 li st_t msg_queue;157 link_t msg_queue; 225 158 226 159 /** Identification of the opening call. */ … … 228 161 /** Call data of the opening call. */ 229 162 ipc_call_t call; 230 /** Local argument or NULL if none. */231 void *carg;232 163 233 164 /** Identification of the closing call. */ … … 235 166 236 167 /** Fibril function that will be used to handle the connection. */ 237 async_client_conn_t cfibril;168 void (*cfibril)(ipc_callid_t, ipc_call_t *); 238 169 } connection_t; 239 170 240 171 /** Identifier of the incoming connection handled by the current fibril. */ 241 static fibril_local connection_t * fibril_connection;172 static fibril_local connection_t *FIBRIL_connection; 242 173 243 174 static void *default_client_data_constructor(void) … … 265 196 } 266 197 198 void *async_client_data_get(void) 199 { 200 assert(FIBRIL_connection); 201 return FIBRIL_connection->client->data; 202 } 203 267 204 /** Default fibril function that gets called to handle new connection. 268 205 * … … 271 208 * @param callid Hash of the incoming call. 272 209 * @param call Data of the incoming call. 273 * @param arg Local argument 274 * 275 */ 276 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call, 277 void *arg) 210 * 211 */ 212 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call) 278 213 { 279 214 ipc_answer_0(callid, ENOENT); 280 215 } 216 217 /** 218 * Pointer to a fibril function that will be used to handle connections. 219 */ 220 static async_client_conn_t client_connection = default_client_connection; 281 221 282 222 /** Default fibril function that gets called to handle interrupt notifications. … … 286 226 * @param callid Hash of the incoming call. 287 227 * @param call Data of the incoming call. 288 * @param arg Local argument.289 228 * 290 229 */ … … 293 232 } 294 233 295 static async_client_conn_t client_connection = default_client_connection; 296 static async_interrupt_handler_t interrupt_received = default_interrupt_received; 297 298 /** Setter for client_connection function pointer. 299 * 300 * @param conn Function that will implement a new connection fibril. 301 * 302 */ 303 void async_set_client_connection(async_client_conn_t conn) 304 { 305 client_connection = conn; 306 } 307 308 /** Setter for interrupt_received function pointer. 309 * 310 * @param intr Function that will implement a new interrupt 311 * notification fibril. 312 */ 313 void async_set_interrupt_received(async_interrupt_handler_t intr) 314 { 315 interrupt_received = intr; 316 } 317 318 /** Mutex protecting inactive_exch_list and avail_phone_cv. 319 * 320 */ 321 static FIBRIL_MUTEX_INITIALIZE(async_sess_mutex); 322 323 /** List of all currently inactive exchanges. 324 * 325 */ 326 static LIST_INITIALIZE(inactive_exch_list); 327 328 /** Condition variable to wait for a phone to become available. 329 * 330 */ 331 static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv); 234 /** 235 * Pointer to a fibril function that will be used to handle interrupt 236 * notifications. 237 */ 238 static async_client_conn_t interrupt_received = default_interrupt_received; 332 239 333 240 static hash_table_t client_hash_table; … … 335 242 static LIST_INITIALIZE(timeout_list); 336 243 244 #define CLIENT_HASH_TABLE_BUCKETS 32 245 #define CONN_HASH_TABLE_BUCKETS 32 246 337 247 static hash_index_t client_hash(unsigned long key[]) 338 248 { 339 249 assert(key); 340 341 250 return (((key[0]) >> 4) % CLIENT_HASH_TABLE_BUCKETS); 342 251 } … … 344 253 static int client_compare(unsigned long key[], hash_count_t keys, link_t *item) 345 254 { 346 assert(key);347 assert(keys == 2);348 assert(item);349 350 255 client_t *client = hash_table_get_instance(item, client_t, link); 351 return (key[0] == LOWER32(client->in_task_id) && 352 (key[1] == UPPER32(client->in_task_id))); 256 return (key[0] == client->in_task_hash); 353 257 } 354 258 … … 374 278 { 375 279 assert(key); 376 377 280 return (((key[0]) >> 4) % CONN_HASH_TABLE_BUCKETS); 378 281 } … … 389 292 static int conn_compare(unsigned long key[], hash_count_t keys, link_t *item) 390 293 { 391 assert(key);392 assert(item);393 394 294 connection_t *conn = hash_table_get_instance(item, connection_t, link); 395 295 return (key[0] == conn->in_phone_hash); … … 414 314 void async_insert_timeout(awaiter_t *wd) 415 315 { 416 assert(wd);417 418 316 wd->to_event.occurred = false; 419 317 wd->to_event.inlist = true; 420 318 421 link_t *tmp = timeout_list. head.next;422 while (tmp != &timeout_list .head) {319 link_t *tmp = timeout_list.next; 320 while (tmp != &timeout_list) { 423 321 awaiter_t *cur 424 322 = list_get_instance(tmp, awaiter_t, to_event.link); … … 430 328 } 431 329 432 list_ insert_before(&wd->to_event.link, tmp);330 list_append(&wd->to_event.link, tmp); 433 331 } 434 332 … … 448 346 static bool route_call(ipc_callid_t callid, ipc_call_t *call) 449 347 { 450 assert(call);451 452 348 futex_down(&async_futex); 453 349 … … 504 400 static int notification_fibril(void *arg) 505 401 { 506 assert(arg);507 508 402 msg_t *msg = (msg_t *) arg; 509 403 interrupt_received(msg->callid, &msg->call); … … 526 420 static bool process_notification(ipc_callid_t callid, ipc_call_t *call) 527 421 { 528 assert(call);529 530 422 futex_down(&async_futex); 531 423 … … 566 458 ipc_callid_t async_get_call_timeout(ipc_call_t *call, suseconds_t usecs) 567 459 { 568 assert(call); 569 assert(fibril_connection); 460 assert(FIBRIL_connection); 570 461 571 462 /* Why doing this? 572 * GCC 4.1.0 coughs on fibril_connection-> dereference.463 * GCC 4.1.0 coughs on FIBRIL_connection-> dereference. 573 464 * GCC 4.1.1 happilly puts the rdhwr instruction in delay slot. 574 465 * I would never expect to find so many errors in 575 466 * a compiler. 576 467 */ 577 connection_t *conn = fibril_connection;468 connection_t *conn = FIBRIL_connection; 578 469 579 470 futex_down(&async_futex); … … 627 518 } 628 519 629 msg_t *msg = list_get_instance( list_first(&conn->msg_queue), msg_t, link);520 msg_t *msg = list_get_instance(conn->msg_queue.next, msg_t, link); 630 521 list_remove(&msg->link); 631 522 … … 638 529 } 639 530 640 static client_t *async_client_get(task_id_t client_id, bool create)641 {642 unsigned long key[2] = {643 LOWER32(client_id),644 UPPER32(client_id),645 };646 client_t *client = NULL;647 648 futex_down(&async_futex);649 link_t *lnk = hash_table_find(&client_hash_table, key);650 if (lnk) {651 client = hash_table_get_instance(lnk, client_t, link);652 atomic_inc(&client->refcnt);653 } else if (create) {654 client = malloc(sizeof(client_t));655 if (client) {656 client->in_task_id = client_id;657 client->data = async_client_data_create();658 659 atomic_set(&client->refcnt, 1);660 hash_table_insert(&client_hash_table, key, &client->link);661 }662 }663 664 futex_up(&async_futex);665 return client;666 }667 668 static void async_client_put(client_t *client)669 {670 bool destroy;671 unsigned long key[2] = {672 LOWER32(client->in_task_id),673 UPPER32(client->in_task_id)674 };675 676 futex_down(&async_futex);677 678 if (atomic_predec(&client->refcnt) == 0) {679 hash_table_remove(&client_hash_table, key, 2);680 destroy = true;681 } else682 destroy = false;683 684 futex_up(&async_futex);685 686 if (destroy) {687 if (client->data)688 async_client_data_destroy(client->data);689 690 free(client);691 }692 }693 694 void *async_get_client_data(void)695 {696 assert(fibril_connection);697 return fibril_connection->client->data;698 }699 700 void *async_get_client_data_by_id(task_id_t client_id)701 {702 client_t *client = async_client_get(client_id, false);703 if (!client)704 return NULL;705 if (!client->data) {706 async_client_put(client);707 return NULL;708 }709 710 return client->data;711 }712 713 void async_put_client_data_by_id(task_id_t client_id)714 {715 client_t *client = async_client_get(client_id, false);716 717 assert(client);718 assert(client->data);719 720 /* Drop the reference we got in async_get_client_data_by_hash(). */721 async_client_put(client);722 723 /* Drop our own reference we got at the beginning of this function. */724 async_client_put(client);725 }726 727 531 /** Wrapper for client connection fibril. 728 532 * … … 737 541 static int connection_fibril(void *arg) 738 542 { 739 assert(arg);740 741 543 /* 742 544 * Setup fibril-local connection pointer. 743 545 */ 744 fibril_connection = (connection_t *) arg; 546 FIBRIL_connection = (connection_t *) arg; 547 548 futex_down(&async_futex); 745 549 746 550 /* … … 749 553 * hash in a new tracking structure. 750 554 */ 751 752 client_t *client = async_client_get(fibril_connection->in_task_id, true); 753 if (!client) { 754 ipc_answer_0(fibril_connection->callid, ENOMEM); 755 return 0; 756 } 757 758 fibril_connection->client = client; 555 556 unsigned long key = FIBRIL_connection->in_task_hash; 557 link_t *lnk = hash_table_find(&client_hash_table, &key); 558 559 client_t *client; 560 561 if (lnk) { 562 client = hash_table_get_instance(lnk, client_t, link); 563 client->refcnt++; 564 } else { 565 client = malloc(sizeof(client_t)); 566 if (!client) { 567 ipc_answer_0(FIBRIL_connection->callid, ENOMEM); 568 futex_up(&async_futex); 569 return 0; 570 } 571 572 client->in_task_hash = FIBRIL_connection->in_task_hash; 573 574 async_serialize_start(); 575 client->data = async_client_data_create(); 576 async_serialize_end(); 577 578 client->refcnt = 1; 579 hash_table_insert(&client_hash_table, &key, &client->link); 580 } 581 582 futex_up(&async_futex); 583 584 FIBRIL_connection->client = client; 759 585 760 586 /* 761 587 * Call the connection handler function. 762 588 */ 763 fibril_connection->cfibril(fibril_connection->callid,764 & fibril_connection->call, fibril_connection->carg);589 FIBRIL_connection->cfibril(FIBRIL_connection->callid, 590 &FIBRIL_connection->call); 765 591 766 592 /* 767 593 * Remove the reference for this client task connection. 768 594 */ 769 async_client_put(client); 595 bool destroy; 596 597 futex_down(&async_futex); 598 599 if (--client->refcnt == 0) { 600 hash_table_remove(&client_hash_table, &key, 1); 601 destroy = true; 602 } else 603 destroy = false; 604 605 futex_up(&async_futex); 606 607 if (destroy) { 608 if (client->data) 609 async_client_data_destroy(client->data); 610 611 free(client); 612 } 770 613 771 614 /* … … 773 616 */ 774 617 futex_down(&async_futex); 775 unsigned long key = fibril_connection->in_phone_hash;618 key = FIBRIL_connection->in_phone_hash; 776 619 hash_table_remove(&conn_hash_table, &key, 1); 777 620 futex_up(&async_futex); … … 780 623 * Answer all remaining messages with EHANGUP. 781 624 */ 782 while (!list_empty(& fibril_connection->msg_queue)) {625 while (!list_empty(&FIBRIL_connection->msg_queue)) { 783 626 msg_t *msg = 784 list_get_instance( list_first(&fibril_connection->msg_queue),785 msg_t,link);627 list_get_instance(FIBRIL_connection->msg_queue.next, msg_t, 628 link); 786 629 787 630 list_remove(&msg->link); … … 794 637 * i.e. IPC_M_PHONE_HUNGUP. 795 638 */ 796 if ( fibril_connection->close_callid)797 ipc_answer_0( fibril_connection->close_callid, EOK);798 799 free( fibril_connection);639 if (FIBRIL_connection->close_callid) 640 ipc_answer_0(FIBRIL_connection->close_callid, EOK); 641 642 free(FIBRIL_connection); 800 643 return 0; 801 644 } … … 803 646 /** Create a new fibril for a new connection. 804 647 * 805 * Create new fibril for connection, fill in connection structures and insert 648 * Create new fibril for connection, fill in connection structures and inserts 806 649 * it into the hash table, so that later we can easily do routing of messages to 807 650 * particular fibrils. 808 651 * 809 * @param in_task_ idIdentification of the incoming connection.652 * @param in_task_hash Identification of the incoming connection. 810 653 * @param in_phone_hash Identification of the incoming connection. 811 654 * @param callid Hash of the opening IPC_M_CONNECT_ME_TO call. … … 816 659 * @param cfibril Fibril function that should be called upon opening the 817 660 * connection. 818 * @param carg Extra argument to pass to the connection fibril819 661 * 820 662 * @return New fibril id or NULL on failure. 821 663 * 822 664 */ 823 fid_t async_new_connection( task_id_t in_task_id, sysarg_t in_phone_hash,665 fid_t async_new_connection(sysarg_t in_task_hash, sysarg_t in_phone_hash, 824 666 ipc_callid_t callid, ipc_call_t *call, 825 async_client_conn_t cfibril, void *carg)667 void (*cfibril)(ipc_callid_t, ipc_call_t *)) 826 668 { 827 669 connection_t *conn = malloc(sizeof(*conn)); … … 833 675 } 834 676 835 conn->in_task_ id = in_task_id;677 conn->in_task_hash = in_task_hash; 836 678 conn->in_phone_hash = in_phone_hash; 837 679 list_initialize(&conn->msg_queue); 838 680 conn->callid = callid; 839 681 conn->close_callid = 0; 840 conn->carg = carg;841 682 842 683 if (call) … … 880 721 static void handle_call(ipc_callid_t callid, ipc_call_t *call) 881 722 { 882 assert(call);883 884 723 /* Unrouted call - take some default action */ 885 724 if ((callid & IPC_CALLID_NOTIFICATION)) { … … 892 731 case IPC_M_CONNECT_ME_TO: 893 732 /* Open new connection with fibril, etc. */ 894 async_new_connection(call->in_task_ id, IPC_GET_ARG5(*call),895 callid, call, client_connection , NULL);733 async_new_connection(call->in_task_hash, IPC_GET_ARG5(*call), 734 callid, call, client_connection); 896 735 return; 897 736 } … … 913 752 futex_down(&async_futex); 914 753 915 link_t *cur = list_first(&timeout_list);916 while (cur != NULL) {754 link_t *cur = timeout_list.next; 755 while (cur != &timeout_list) { 917 756 awaiter_t *waiter = 918 757 list_get_instance(cur, awaiter_t, to_event.link); … … 920 759 if (tv_gt(&waiter->to_event.expires, &tv)) 921 760 break; 761 762 cur = cur->next; 922 763 923 764 list_remove(&waiter->to_event.link); … … 933 774 fibril_add_ready(waiter->fid); 934 775 } 935 936 cur = list_first(&timeout_list);937 776 } 938 777 … … 961 800 suseconds_t timeout; 962 801 if (!list_empty(&timeout_list)) { 963 awaiter_t *waiter = list_get_instance( 964 list_first(&timeout_list),awaiter_t, to_event.link);802 awaiter_t *waiter = list_get_instance(timeout_list.next, 803 awaiter_t, to_event.link); 965 804 966 805 struct timeval tv; … … 1039 878 void __async_init(void) 1040 879 { 1041 if (!hash_table_create(&client_hash_table, CLIENT_HASH_TABLE_BUCKETS, 1042 2,&client_hash_table_ops))880 if (!hash_table_create(&client_hash_table, CLIENT_HASH_TABLE_BUCKETS, 1, 881 &client_hash_table_ops)) 1043 882 abort(); 1044 883 1045 if (!hash_table_create(&conn_hash_table, CONN_HASH_TABLE_BUCKETS, 1046 1,&conn_hash_table_ops))884 if (!hash_table_create(&conn_hash_table, CONN_HASH_TABLE_BUCKETS, 1, 885 &conn_hash_table_ops)) 1047 886 abort(); 1048 1049 session_ns = (async_sess_t *) malloc(sizeof(async_sess_t));1050 if (session_ns == NULL)1051 abort();1052 1053 session_ns->mgmt = EXCHANGE_ATOMIC;1054 session_ns->phone = PHONE_NS;1055 session_ns->arg1 = 0;1056 session_ns->arg2 = 0;1057 session_ns->arg3 = 0;1058 1059 fibril_mutex_initialize(&session_ns->remote_state_mtx);1060 session_ns->remote_state_data = NULL;1061 1062 list_initialize(&session_ns->exch_list);1063 fibril_mutex_initialize(&session_ns->mutex);1064 atomic_set(&session_ns->refcnt, 0);1065 887 } 1066 888 … … 1077 899 * 1078 900 */ 1079 void reply_received(void *arg, int retval, ipc_call_t *data) 1080 { 1081 assert(arg); 1082 901 static void reply_received(void *arg, int retval, ipc_call_t *data) 902 { 1083 903 futex_down(&async_futex); 1084 904 … … 1110 930 * completion. 1111 931 * 1112 * @param exch Exchange for sending the message.1113 * @param imethod Service-defined interface and method.932 * @param phoneid Handle of the phone that will be used for the send. 933 * @param method Service-defined method. 1114 934 * @param arg1 Service-defined payload argument. 1115 935 * @param arg2 Service-defined payload argument. … … 1122 942 * 1123 943 */ 1124 aid_t async_send_fast( async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,944 aid_t async_send_fast(int phoneid, sysarg_t method, sysarg_t arg1, 1125 945 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, ipc_call_t *dataptr) 1126 946 { 1127 if (exch == NULL)1128 return 0;1129 1130 947 amsg_t *msg = malloc(sizeof(amsg_t)); 1131 if (msg == NULL) 948 949 if (!msg) 1132 950 return 0; 1133 951 … … 1143 961 msg->wdata.active = true; 1144 962 1145 ipc_call_async_4( exch->phone, imethod, arg1, arg2, arg3, arg4, msg,963 ipc_call_async_4(phoneid, method, arg1, arg2, arg3, arg4, msg, 1146 964 reply_received, true); 1147 965 … … 1154 972 * completion. 1155 973 * 1156 * @param exch Exchange for sending the message.1157 * @param imethod Service-defined interface and method.974 * @param phoneid Handle of the phone that will be used for the send. 975 * @param method Service-defined method. 1158 976 * @param arg1 Service-defined payload argument. 1159 977 * @param arg2 Service-defined payload argument. … … 1167 985 * 1168 986 */ 1169 aid_t async_send_slow( async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,987 aid_t async_send_slow(int phoneid, sysarg_t method, sysarg_t arg1, 1170 988 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5, 1171 989 ipc_call_t *dataptr) 1172 990 { 1173 if (exch == NULL)1174 return 0;1175 1176 991 amsg_t *msg = malloc(sizeof(amsg_t)); 1177 992 1178 if ( msg == NULL)993 if (!msg) 1179 994 return 0; 1180 995 … … 1190 1005 msg->wdata.active = true; 1191 1006 1192 ipc_call_async_5( exch->phone, imethod, arg1, arg2, arg3, arg4, arg5,1193 msg,reply_received, true);1007 ipc_call_async_5(phoneid, method, arg1, arg2, arg3, arg4, arg5, msg, 1008 reply_received, true); 1194 1009 1195 1010 return (aid_t) msg; … … 1205 1020 void async_wait_for(aid_t amsgid, sysarg_t *retval) 1206 1021 { 1207 assert(amsgid);1208 1209 1022 amsg_t *msg = (amsg_t *) amsgid; 1210 1023 … … 1243 1056 int async_wait_timeout(aid_t amsgid, sysarg_t *retval, suseconds_t timeout) 1244 1057 { 1245 assert(amsgid);1246 1247 1058 amsg_t *msg = (amsg_t *) amsgid; 1248 1059 … … 1313 1124 } 1314 1125 1126 /** Setter for client_connection function pointer. 1127 * 1128 * @param conn Function that will implement a new connection fibril. 1129 * 1130 */ 1131 void async_set_client_connection(async_client_conn_t conn) 1132 { 1133 client_connection = conn; 1134 } 1135 1136 /** Setter for interrupt_received function pointer. 1137 * 1138 * @param intr Function that will implement a new interrupt 1139 * notification fibril. 1140 */ 1141 void async_set_interrupt_received(async_client_conn_t intr) 1142 { 1143 interrupt_received = intr; 1144 } 1145 1315 1146 /** Pseudo-synchronous message sending - fast version. 1316 1147 * … … 1320 1151 * transferring more arguments, see the slower async_req_slow(). 1321 1152 * 1322 * @param exch Exchange for sending the message.1323 * @param imethod Interface and method of the call.1153 * @param phoneid Hash of the phone through which to make the call. 1154 * @param method Method of the call. 1324 1155 * @param arg1 Service-defined payload argument. 1325 1156 * @param arg2 Service-defined payload argument. … … 1335 1166 * 1336 1167 */ 1337 sysarg_t async_req_fast( async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,1168 sysarg_t async_req_fast(int phoneid, sysarg_t method, sysarg_t arg1, 1338 1169 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t *r1, sysarg_t *r2, 1339 1170 sysarg_t *r3, sysarg_t *r4, sysarg_t *r5) 1340 1171 { 1341 if (exch == NULL)1342 return ENOENT;1343 1344 1172 ipc_call_t result; 1345 aid_t aid = async_send_4(exch, imethod, arg1, arg2, arg3, arg4,1173 aid_t eid = async_send_4(phoneid, method, arg1, arg2, arg3, arg4, 1346 1174 &result); 1347 1175 1348 1176 sysarg_t rc; 1349 async_wait_for( aid, &rc);1177 async_wait_for(eid, &rc); 1350 1178 1351 1179 if (r1) … … 1371 1199 * Send message asynchronously and return only after the reply arrives. 1372 1200 * 1373 * @param exch Exchange for sending the message.1374 * @param imethod Interface and method of the call.1201 * @param phoneid Hash of the phone through which to make the call. 1202 * @param method Method of the call. 1375 1203 * @param arg1 Service-defined payload argument. 1376 1204 * @param arg2 Service-defined payload argument. … … 1387 1215 * 1388 1216 */ 1389 sysarg_t async_req_slow( async_exch_t *exch, sysarg_t imethod, sysarg_t arg1,1217 sysarg_t async_req_slow(int phoneid, sysarg_t method, sysarg_t arg1, 1390 1218 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5, sysarg_t *r1, 1391 1219 sysarg_t *r2, sysarg_t *r3, sysarg_t *r4, sysarg_t *r5) 1392 1220 { 1393 if (exch == NULL)1394 return ENOENT;1395 1396 1221 ipc_call_t result; 1397 aid_t aid = async_send_5(exch, imethod, arg1, arg2, arg3, arg4, arg5,1222 aid_t eid = async_send_5(phoneid, method, arg1, arg2, arg3, arg4, arg5, 1398 1223 &result); 1399 1224 1400 1225 sysarg_t rc; 1401 async_wait_for( aid, &rc);1226 async_wait_for(eid, &rc); 1402 1227 1403 1228 if (r1) … … 1419 1244 } 1420 1245 1421 void async_msg_0(async_exch_t *exch, sysarg_t imethod) 1422 { 1423 if (exch != NULL) 1424 ipc_call_async_0(exch->phone, imethod, NULL, NULL, true); 1425 } 1426 1427 void async_msg_1(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1) 1428 { 1429 if (exch != NULL) 1430 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL, true); 1431 } 1432 1433 void async_msg_2(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1, 1434 sysarg_t arg2) 1435 { 1436 if (exch != NULL) 1437 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL, 1438 true); 1439 } 1440 1441 void async_msg_3(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1, 1442 sysarg_t arg2, sysarg_t arg3) 1443 { 1444 if (exch != NULL) 1445 ipc_call_async_3(exch->phone, imethod, arg1, arg2, arg3, NULL, 1446 NULL, true); 1447 } 1448 1449 void async_msg_4(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1, 1450 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4) 1451 { 1452 if (exch != NULL) 1453 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, 1454 NULL, NULL, true); 1455 } 1456 1457 void async_msg_5(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1, 1458 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5) 1459 { 1460 if (exch != NULL) 1461 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, 1462 arg5, NULL, NULL, true); 1246 void async_msg_0(int phone, sysarg_t imethod) 1247 { 1248 ipc_call_async_0(phone, imethod, NULL, NULL, true); 1249 } 1250 1251 void async_msg_1(int phone, sysarg_t imethod, sysarg_t arg1) 1252 { 1253 ipc_call_async_1(phone, imethod, arg1, NULL, NULL, true); 1254 } 1255 1256 void async_msg_2(int phone, sysarg_t imethod, sysarg_t arg1, sysarg_t arg2) 1257 { 1258 ipc_call_async_2(phone, imethod, arg1, arg2, NULL, NULL, true); 1259 } 1260 1261 void async_msg_3(int phone, sysarg_t imethod, sysarg_t arg1, sysarg_t arg2, 1262 sysarg_t arg3) 1263 { 1264 ipc_call_async_3(phone, imethod, arg1, arg2, arg3, NULL, NULL, true); 1265 } 1266 1267 void async_msg_4(int phone, sysarg_t imethod, sysarg_t arg1, sysarg_t arg2, 1268 sysarg_t arg3, sysarg_t arg4) 1269 { 1270 ipc_call_async_4(phone, imethod, arg1, arg2, arg3, arg4, NULL, NULL, 1271 true); 1272 } 1273 1274 void async_msg_5(int phone, sysarg_t imethod, sysarg_t arg1, sysarg_t arg2, 1275 sysarg_t arg3, sysarg_t arg4, sysarg_t arg5) 1276 { 1277 ipc_call_async_5(phone, imethod, arg1, arg2, arg3, arg4, arg5, NULL, 1278 NULL, true); 1463 1279 } 1464 1280 … … 1497 1313 } 1498 1314 1499 int async_forward_fast(ipc_callid_t callid, async_exch_t *exch, 1500 sysarg_t imethod, sysarg_t arg1, sysarg_t arg2, unsigned int mode) 1501 { 1502 if (exch == NULL) 1503 return ENOENT; 1504 1505 return ipc_forward_fast(callid, exch->phone, imethod, arg1, arg2, mode); 1506 } 1507 1508 int async_forward_slow(ipc_callid_t callid, async_exch_t *exch, 1509 sysarg_t imethod, sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, 1510 sysarg_t arg4, sysarg_t arg5, unsigned int mode) 1511 { 1512 if (exch == NULL) 1513 return ENOENT; 1514 1515 return ipc_forward_slow(callid, exch->phone, imethod, arg1, arg2, arg3, 1516 arg4, arg5, mode); 1315 int async_forward_fast(ipc_callid_t callid, int phoneid, sysarg_t imethod, 1316 sysarg_t arg1, sysarg_t arg2, unsigned int mode) 1317 { 1318 return ipc_forward_fast(callid, phoneid, imethod, arg1, arg2, mode); 1319 } 1320 1321 int async_forward_slow(ipc_callid_t callid, int phoneid, sysarg_t imethod, 1322 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5, 1323 unsigned int mode) 1324 { 1325 return ipc_forward_slow(callid, phoneid, imethod, arg1, arg2, arg3, arg4, 1326 arg5, mode); 1517 1327 } 1518 1328 … … 1521 1331 * Ask through phone for a new connection to some service. 1522 1332 * 1523 * @param exch Exchange for sending the message.1333 * @param phone Phone handle used for contacting the other side. 1524 1334 * @param arg1 User defined argument. 1525 1335 * @param arg2 User defined argument. … … 1527 1337 * @param client_receiver Connection handing routine. 1528 1338 * 1529 * @return Zero on success or a negative error code. 1530 * 1531 */ 1532 int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2, 1533 sysarg_t arg3, async_client_conn_t client_receiver, void *carg) 1534 { 1535 if (exch == NULL) 1536 return ENOENT; 1537 1339 * @return New phone handle on success or a negative error code. 1340 * 1341 */ 1342 int async_connect_to_me(int phone, sysarg_t arg1, sysarg_t arg2, 1343 sysarg_t arg3, async_client_conn_t client_receiver) 1344 { 1345 sysarg_t task_hash; 1538 1346 sysarg_t phone_hash; 1539 sysarg_t rc; 1540 1541 aid_t req; 1542 ipc_call_t answer; 1543 req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3, 1544 &answer); 1545 async_wait_for(req, &rc); 1546 if (rc != EOK) 1547 return (int) rc; 1548 1549 phone_hash = IPC_GET_ARG5(answer); 1550 1551 if (client_receiver != NULL) 1552 async_new_connection(answer.in_task_id, phone_hash, 0, NULL, 1553 client_receiver, carg); 1554 1555 return EOK; 1556 } 1557 1558 /** Wrapper for making IPC_M_CONNECT_ME calls using the async framework. 1559 * 1560 * Ask through for a cloned connection to some service. 1561 * 1562 * @param mgmt Exchange management style. 1563 * @param exch Exchange for sending the message. 1564 * 1565 * @return New session on success or NULL on error. 1566 * 1567 */ 1568 async_sess_t *async_connect_me(exch_mgmt_t mgmt, async_exch_t *exch) 1569 { 1570 if (exch == NULL) { 1571 errno = ENOENT; 1572 return NULL; 1573 } 1574 1575 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t)); 1576 if (sess == NULL) { 1577 errno = ENOMEM; 1578 return NULL; 1579 } 1580 1581 ipc_call_t result; 1582 1583 amsg_t *msg = malloc(sizeof(amsg_t)); 1584 if (msg == NULL) { 1585 free(sess); 1586 errno = ENOMEM; 1587 return NULL; 1588 } 1589 1590 msg->done = false; 1591 msg->dataptr = &result; 1592 1593 msg->wdata.to_event.inlist = false; 1594 1595 /* 1596 * We may sleep in the next method, 1597 * but it will use its own means 1598 */ 1599 msg->wdata.active = true; 1600 1601 ipc_call_async_0(exch->phone, IPC_M_CONNECT_ME, msg, 1602 reply_received, true); 1603 1604 sysarg_t rc; 1605 async_wait_for((aid_t) msg, &rc); 1606 1607 if (rc != EOK) { 1608 errno = rc; 1609 free(sess); 1610 return NULL; 1611 } 1612 1613 int phone = (int) IPC_GET_ARG5(result); 1614 1615 if (phone < 0) { 1616 errno = phone; 1617 free(sess); 1618 return NULL; 1619 } 1620 1621 sess->mgmt = mgmt; 1622 sess->phone = phone; 1623 sess->arg1 = 0; 1624 sess->arg2 = 0; 1625 sess->arg3 = 0; 1626 1627 fibril_mutex_initialize(&sess->remote_state_mtx); 1628 sess->remote_state_data = NULL; 1629 1630 list_initialize(&sess->exch_list); 1631 fibril_mutex_initialize(&sess->mutex); 1632 atomic_set(&sess->refcnt, 0); 1633 1634 return sess; 1635 } 1636 1637 static int async_connect_me_to_internal(int phone, sysarg_t arg1, sysarg_t arg2, 1638 sysarg_t arg3, sysarg_t arg4) 1639 { 1640 ipc_call_t result; 1641 1642 amsg_t *msg = malloc(sizeof(amsg_t)); 1643 if (msg == NULL) 1644 return ENOENT; 1645 1646 msg->done = false; 1647 msg->dataptr = &result; 1648 1649 msg->wdata.to_event.inlist = false; 1650 1651 /* 1652 * We may sleep in the next method, 1653 * but it will use its own means 1654 */ 1655 msg->wdata.active = true; 1656 1657 ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, arg4, 1658 msg, reply_received, true); 1659 1660 sysarg_t rc; 1661 async_wait_for((aid_t) msg, &rc); 1662 1347 int rc = async_req_3_5(phone, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3, 1348 NULL, NULL, NULL, &task_hash, &phone_hash); 1663 1349 if (rc != EOK) 1664 1350 return rc; 1665 1351 1666 return (int) IPC_GET_ARG5(result); 1352 if (client_receiver != NULL) 1353 async_new_connection(task_hash, phone_hash, 0, NULL, 1354 client_receiver); 1355 1356 return EOK; 1667 1357 } 1668 1358 1669 1359 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework. 1670 1360 * 1671 * Ask through for a new connection to some service. 1672 * 1673 * @param mgmt Exchange management style. 1674 * @param exch Exchange for sending the message. 1675 * @param arg1 User defined argument. 1676 * @param arg2 User defined argument. 1677 * @param arg3 User defined argument. 1678 * 1679 * @return New session on success or NULL on error. 1680 * 1681 */ 1682 async_sess_t *async_connect_me_to(exch_mgmt_t mgmt, async_exch_t *exch, 1683 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3) 1684 { 1685 if (exch == NULL) { 1686 errno = ENOENT; 1687 return NULL; 1688 } 1689 1690 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t)); 1691 if (sess == NULL) { 1692 errno = ENOMEM; 1693 return NULL; 1694 } 1695 1696 int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3, 1697 0); 1698 1699 if (phone < 0) { 1700 errno = phone; 1701 free(sess); 1702 return NULL; 1703 } 1704 1705 sess->mgmt = mgmt; 1706 sess->phone = phone; 1707 sess->arg1 = arg1; 1708 sess->arg2 = arg2; 1709 sess->arg3 = arg3; 1710 1711 fibril_mutex_initialize(&sess->remote_state_mtx); 1712 sess->remote_state_data = NULL; 1713 1714 list_initialize(&sess->exch_list); 1715 fibril_mutex_initialize(&sess->mutex); 1716 atomic_set(&sess->refcnt, 0); 1717 1718 return sess; 1719 } 1720 1721 /** Set arguments for new connections. 1722 * 1723 * FIXME This is an ugly hack to work around the problem that parallel 1724 * exchanges are implemented using parallel connections. When we create 1725 * a callback session, the framework does not know arguments for the new 1726 * connections. 1727 * 1728 * The proper solution seems to be to implement parallel exchanges using 1729 * tagging. 1730 */ 1731 void async_sess_args_set(async_sess_t *sess, sysarg_t arg1, sysarg_t arg2, 1361 * Ask through phone for a new connection to some service. 1362 * 1363 * @param phone Phone handle used for contacting the other side. 1364 * @param arg1 User defined argument. 1365 * @param arg2 User defined argument. 1366 * @param arg3 User defined argument. 1367 * 1368 * @return New phone handle on success or a negative error code. 1369 * 1370 */ 1371 int async_connect_me_to(int phone, sysarg_t arg1, sysarg_t arg2, 1732 1372 sysarg_t arg3) 1733 1373 { 1734 sess->arg1 = arg1; 1735 sess->arg2 = arg2; 1736 sess->arg3 = arg3; 1374 sysarg_t newphid; 1375 int rc = async_req_3_5(phone, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, 1376 NULL, NULL, NULL, NULL, &newphid); 1377 1378 if (rc != EOK) 1379 return rc; 1380 1381 return newphid; 1737 1382 } 1738 1383 … … 1742 1387 * success. 1743 1388 * 1744 * @param mgmt Exchange management style. 1745 * @param exch Exchange for sending the message. 1746 * @param arg1 User defined argument. 1747 * @param arg2 User defined argument. 1748 * @param arg3 User defined argument. 1749 * 1750 * @return New session on success or NULL on error. 1751 * 1752 */ 1753 async_sess_t *async_connect_me_to_blocking(exch_mgmt_t mgmt, async_exch_t *exch, 1754 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3) 1755 { 1756 if (exch == NULL) { 1757 errno = ENOENT; 1758 return NULL; 1759 } 1760 1761 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t)); 1762 if (sess == NULL) { 1763 errno = ENOMEM; 1764 return NULL; 1765 } 1766 1767 int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3, 1768 IPC_FLAG_BLOCKING); 1769 1770 if (phone < 0) { 1771 errno = phone; 1772 free(sess); 1773 return NULL; 1774 } 1775 1776 sess->mgmt = mgmt; 1777 sess->phone = phone; 1778 sess->arg1 = arg1; 1779 sess->arg2 = arg2; 1780 sess->arg3 = arg3; 1781 1782 fibril_mutex_initialize(&sess->remote_state_mtx); 1783 sess->remote_state_data = NULL; 1784 1785 list_initialize(&sess->exch_list); 1786 fibril_mutex_initialize(&sess->mutex); 1787 atomic_set(&sess->refcnt, 0); 1788 1789 return sess; 1389 * @param phoneid Phone handle used for contacting the other side. 1390 * @param arg1 User defined argument. 1391 * @param arg2 User defined argument. 1392 * @param arg3 User defined argument. 1393 * 1394 * @return New phone handle on success or a negative error code. 1395 * 1396 */ 1397 int async_connect_me_to_blocking(int phoneid, sysarg_t arg1, sysarg_t arg2, 1398 sysarg_t arg3) 1399 { 1400 sysarg_t newphid; 1401 int rc = async_req_4_5(phoneid, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, 1402 IPC_FLAG_BLOCKING, NULL, NULL, NULL, NULL, &newphid); 1403 1404 if (rc != EOK) 1405 return rc; 1406 1407 return newphid; 1790 1408 } 1791 1409 … … 1793 1411 * 1794 1412 */ 1795 async_sess_t *async_connect_kbox(task_id_t id) 1796 { 1797 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t)); 1798 if (sess == NULL) { 1799 errno = ENOMEM; 1800 return NULL; 1801 } 1802 1803 int phone = ipc_connect_kbox(id); 1804 if (phone < 0) { 1805 errno = phone; 1806 free(sess); 1807 return NULL; 1808 } 1809 1810 sess->mgmt = EXCHANGE_ATOMIC; 1811 sess->phone = phone; 1812 sess->arg1 = 0; 1813 sess->arg2 = 0; 1814 sess->arg3 = 0; 1815 1816 fibril_mutex_initialize(&sess->remote_state_mtx); 1817 sess->remote_state_data = NULL; 1818 1819 list_initialize(&sess->exch_list); 1820 fibril_mutex_initialize(&sess->mutex); 1821 atomic_set(&sess->refcnt, 0); 1822 1823 return sess; 1824 } 1825 1826 static int async_hangup_internal(int phone) 1413 int async_connect_kbox(task_id_t id) 1414 { 1415 return ipc_connect_kbox(id); 1416 } 1417 1418 /** Wrapper for ipc_hangup. 1419 * 1420 * @param phone Phone handle to hung up. 1421 * 1422 * @return Zero on success or a negative error code. 1423 * 1424 */ 1425 int async_hangup(int phone) 1827 1426 { 1828 1427 return ipc_hangup(phone); 1829 }1830 1831 /** Wrapper for ipc_hangup.1832 *1833 * @param sess Session to hung up.1834 *1835 * @return Zero on success or a negative error code.1836 *1837 */1838 int async_hangup(async_sess_t *sess)1839 {1840 async_exch_t *exch;1841 1842 assert(sess);1843 1844 if (atomic_get(&sess->refcnt) > 0)1845 return EBUSY;1846 1847 fibril_mutex_lock(&async_sess_mutex);1848 1849 int rc = async_hangup_internal(sess->phone);1850 1851 while (!list_empty(&sess->exch_list)) {1852 exch = (async_exch_t *)1853 list_get_instance(list_first(&sess->exch_list),1854 async_exch_t, sess_link);1855 1856 list_remove(&exch->sess_link);1857 list_remove(&exch->global_link);1858 async_hangup_internal(exch->phone);1859 free(exch);1860 }1861 1862 free(sess);1863 1864 fibril_mutex_unlock(&async_sess_mutex);1865 1866 return rc;1867 1428 } 1868 1429 … … 1873 1434 } 1874 1435 1875 /** Start new exchange in a session.1876 *1877 * @param session Session.1878 *1879 * @return New exchange or NULL on error.1880 *1881 */1882 async_exch_t *async_exchange_begin(async_sess_t *sess)1883 {1884 if (sess == NULL)1885 return NULL;1886 1887 async_exch_t *exch;1888 1889 fibril_mutex_lock(&async_sess_mutex);1890 1891 if (!list_empty(&sess->exch_list)) {1892 /*1893 * There are inactive exchanges in the session.1894 */1895 exch = (async_exch_t *)1896 list_get_instance(list_first(&sess->exch_list),1897 async_exch_t, sess_link);1898 1899 list_remove(&exch->sess_link);1900 list_remove(&exch->global_link);1901 } else {1902 /*1903 * There are no available exchanges in the session.1904 */1905 1906 if ((sess->mgmt == EXCHANGE_ATOMIC) ||1907 (sess->mgmt == EXCHANGE_SERIALIZE)) {1908 exch = (async_exch_t *) malloc(sizeof(async_exch_t));1909 if (exch != NULL) {1910 link_initialize(&exch->sess_link);1911 link_initialize(&exch->global_link);1912 exch->sess = sess;1913 exch->phone = sess->phone;1914 }1915 } else { /* EXCHANGE_PARALLEL */1916 /*1917 * Make a one-time attempt to connect a new data phone.1918 */1919 1920 int phone;1921 1922 retry:1923 phone = async_connect_me_to_internal(sess->phone, sess->arg1,1924 sess->arg2, sess->arg3, 0);1925 if (phone >= 0) {1926 exch = (async_exch_t *) malloc(sizeof(async_exch_t));1927 if (exch != NULL) {1928 link_initialize(&exch->sess_link);1929 link_initialize(&exch->global_link);1930 exch->sess = sess;1931 exch->phone = phone;1932 } else1933 async_hangup_internal(phone);1934 } else if (!list_empty(&inactive_exch_list)) {1935 /*1936 * We did not manage to connect a new phone. But we1937 * can try to close some of the currently inactive1938 * connections in other sessions and try again.1939 */1940 exch = (async_exch_t *)1941 list_get_instance(list_first(&inactive_exch_list),1942 async_exch_t, global_link);1943 1944 list_remove(&exch->sess_link);1945 list_remove(&exch->global_link);1946 async_hangup_internal(exch->phone);1947 free(exch);1948 goto retry;1949 } else {1950 /*1951 * Wait for a phone to become available.1952 */1953 fibril_condvar_wait(&avail_phone_cv, &async_sess_mutex);1954 goto retry;1955 }1956 }1957 }1958 1959 fibril_mutex_unlock(&async_sess_mutex);1960 1961 if (exch != NULL) {1962 atomic_inc(&sess->refcnt);1963 1964 if (sess->mgmt == EXCHANGE_SERIALIZE)1965 fibril_mutex_lock(&sess->mutex);1966 }1967 1968 return exch;1969 }1970 1971 /** Finish an exchange.1972 *1973 * @param exch Exchange to finish.1974 *1975 */1976 void async_exchange_end(async_exch_t *exch)1977 {1978 if (exch == NULL)1979 return;1980 1981 async_sess_t *sess = exch->sess;1982 1983 atomic_dec(&sess->refcnt);1984 1985 if (sess->mgmt == EXCHANGE_SERIALIZE)1986 fibril_mutex_unlock(&sess->mutex);1987 1988 fibril_mutex_lock(&async_sess_mutex);1989 1990 list_append(&exch->sess_link, &sess->exch_list);1991 list_append(&exch->global_link, &inactive_exch_list);1992 fibril_condvar_signal(&avail_phone_cv);1993 1994 fibril_mutex_unlock(&async_sess_mutex);1995 }1996 1997 1436 /** Wrapper for IPC_M_SHARE_IN calls using the async framework. 1998 1437 * 1999 * @param exch Exchange for sending the message.2000 * @param dst Destination address space area base.2001 * @param size Size of the destination address space area.2002 * @param arg User defined argument.2003 * @param flags Storage for the received flags. Can be NULL.1438 * @param phoneid Phone that will be used to contact the receiving side. 1439 * @param dst Destination address space area base. 1440 * @param size Size of the destination address space area. 1441 * @param arg User defined argument. 1442 * @param flags Storage for the received flags. Can be NULL. 2004 1443 * 2005 1444 * @return Zero on success or a negative error code from errno.h. 2006 1445 * 2007 1446 */ 2008 int async_share_in_start(async_exch_t *exch, void *dst, size_t size, 2009 sysarg_t arg, unsigned int *flags) 2010 { 2011 if (exch == NULL) 2012 return ENOENT; 2013 1447 int async_share_in_start(int phoneid, void *dst, size_t size, sysarg_t arg, 1448 unsigned int *flags) 1449 { 2014 1450 sysarg_t tmp_flags; 2015 int res = async_req_3_2( exch, IPC_M_SHARE_IN, (sysarg_t) dst,1451 int res = async_req_3_2(phoneid, IPC_M_SHARE_IN, (sysarg_t) dst, 2016 1452 (sysarg_t) size, arg, NULL, &tmp_flags); 2017 1453 … … 2071 1507 /** Wrapper for IPC_M_SHARE_OUT calls using the async framework. 2072 1508 * 2073 * @param exch Exchange for sending the message.2074 * @param src Source address space area base address.2075 * @param flags Flags to be used for sharing. Bits can be only cleared.1509 * @param phoneid Phone that will be used to contact the receiving side. 1510 * @param src Source address space area base address. 1511 * @param flags Flags to be used for sharing. Bits can be only cleared. 2076 1512 * 2077 1513 * @return Zero on success or a negative error code from errno.h. 2078 1514 * 2079 1515 */ 2080 int async_share_out_start(async_exch_t *exch, void *src, unsigned int flags) 2081 { 2082 if (exch == NULL) 2083 return ENOENT; 2084 2085 return async_req_3_0(exch, IPC_M_SHARE_OUT, (sysarg_t) src, 0, 1516 int async_share_out_start(int phoneid, void *src, unsigned int flags) 1517 { 1518 return async_req_3_0(phoneid, IPC_M_SHARE_OUT, (sysarg_t) src, 0, 2086 1519 (sysarg_t) flags); 2087 1520 } … … 2136 1569 } 2137 1570 2138 /** Start IPC_M_DATA_READusing the async framework.2139 * 2140 * @param exch Exchange for sending the message.1571 /** Wrapper for IPC_M_DATA_READ calls using the async framework. 1572 * 1573 * @param phoneid Phone that will be used to contact the receiving side. 2141 1574 * @param dst Address of the beginning of the destination buffer. 2142 * @param size Size of the destination buffer (in bytes). 2143 * @param dataptr Storage of call data (arg 2 holds actual data size). 2144 * 2145 * @return Hash of the sent message or 0 on error. 2146 * 2147 */ 2148 aid_t async_data_read(async_exch_t *exch, void *dst, size_t size, 2149 ipc_call_t *dataptr) 2150 { 2151 return async_send_2(exch, IPC_M_DATA_READ, (sysarg_t) dst, 2152 (sysarg_t) size, dataptr); 2153 } 2154 2155 /** Wrapper for IPC_M_DATA_READ calls using the async framework. 2156 * 2157 * @param exch Exchange for sending the message. 2158 * @param dst Address of the beginning of the destination buffer. 2159 * @param size Size of the destination buffer. 1575 * @param size Size of the destination buffer. 1576 * @param flags Flags to control the data transfer. 2160 1577 * 2161 1578 * @return Zero on success or a negative error code from errno.h. 2162 1579 * 2163 1580 */ 2164 int async_data_read_start(async_exch_t *exch, void *dst, size_t size) 2165 { 2166 if (exch == NULL) 2167 return ENOENT; 2168 2169 return async_req_2_0(exch, IPC_M_DATA_READ, (sysarg_t) dst, 2170 (sysarg_t) size); 1581 int 1582 async_data_read_start_generic(int phoneid, void *dst, size_t size, int flags) 1583 { 1584 return async_req_3_0(phoneid, IPC_M_DATA_READ, (sysarg_t) dst, 1585 (sysarg_t) size, (sysarg_t) flags); 2171 1586 } 2172 1587 … … 2223 1638 * 2224 1639 */ 2225 int async_data_read_forward_fast(async_exch_t *exch, sysarg_t imethod, 2226 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, 2227 ipc_call_t *dataptr) 2228 { 2229 if (exch == NULL) 2230 return ENOENT; 2231 1640 int async_data_read_forward_fast(int phoneid, sysarg_t method, sysarg_t arg1, 1641 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, ipc_call_t *dataptr) 1642 { 2232 1643 ipc_callid_t callid; 2233 1644 if (!async_data_read_receive(&callid, NULL)) { … … 2236 1647 } 2237 1648 2238 aid_t msg = async_send_fast( exch, imethod, arg1, arg2, arg3, arg4,1649 aid_t msg = async_send_fast(phoneid, method, arg1, arg2, arg3, arg4, 2239 1650 dataptr); 2240 1651 if (msg == 0) { … … 2243 1654 } 2244 1655 2245 int retval = ipc_forward_fast(callid, exch->phone, 0, 0, 0,1656 int retval = ipc_forward_fast(callid, phoneid, 0, 0, 0, 2246 1657 IPC_FF_ROUTE_FROM_ME); 2247 1658 if (retval != EOK) { … … 2259 1670 /** Wrapper for IPC_M_DATA_WRITE calls using the async framework. 2260 1671 * 2261 * @param exch Exchange for sending the message. 2262 * @param src Address of the beginning of the source buffer. 2263 * @param size Size of the source buffer. 1672 * @param phoneid Phone that will be used to contact the receiving side. 1673 * @param src Address of the beginning of the source buffer. 1674 * @param size Size of the source buffer. 1675 * @param flags Flags to control the data transfer. 2264 1676 * 2265 1677 * @return Zero on success or a negative error code from errno.h. 2266 1678 * 2267 1679 */ 2268 int async_data_write_start(async_exch_t *exch, const void *src, size_t size) 2269 { 2270 if (exch == NULL) 2271 return ENOENT; 2272 2273 return async_req_2_0(exch, IPC_M_DATA_WRITE, (sysarg_t) src, 2274 (sysarg_t) size); 1680 int 1681 async_data_write_start_generic(int phoneid, const void *src, size_t size, 1682 int flags) 1683 { 1684 return async_req_3_0(phoneid, IPC_M_DATA_WRITE, (sysarg_t) src, 1685 (sysarg_t) size, (sysarg_t) flags); 2275 1686 } 2276 1687 … … 2348 1759 size_t *received) 2349 1760 { 2350 assert(data);2351 2352 1761 ipc_callid_t callid; 2353 1762 size_t size; … … 2417 1826 * 2418 1827 */ 2419 int async_data_write_forward_fast(async_exch_t *exch, sysarg_t imethod, 2420 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, 2421 ipc_call_t *dataptr) 2422 { 2423 if (exch == NULL) 2424 return ENOENT; 2425 1828 int async_data_write_forward_fast(int phoneid, sysarg_t method, sysarg_t arg1, 1829 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, ipc_call_t *dataptr) 1830 { 2426 1831 ipc_callid_t callid; 2427 1832 if (!async_data_write_receive(&callid, NULL)) { … … 2430 1835 } 2431 1836 2432 aid_t msg = async_send_fast( exch, imethod, arg1, arg2, arg3, arg4,1837 aid_t msg = async_send_fast(phoneid, method, arg1, arg2, arg3, arg4, 2433 1838 dataptr); 2434 1839 if (msg == 0) { … … 2437 1842 } 2438 1843 2439 int retval = ipc_forward_fast(callid, exch->phone, 0, 0, 0,1844 int retval = ipc_forward_fast(callid, phoneid, 0, 0, 0, 2440 1845 IPC_FF_ROUTE_FROM_ME); 2441 1846 if (retval != EOK) { … … 2451 1856 } 2452 1857 2453 /** Wrapper for sending an exchange over different exchange for cloning2454 *2455 * @param exch Exchange to be used for sending.2456 * @param clone_exch Exchange to be cloned.2457 *2458 */2459 int async_exchange_clone(async_exch_t *exch, async_exch_t *clone_exch)2460 {2461 return async_req_1_0(exch, IPC_M_CONNECTION_CLONE, clone_exch->phone);2462 }2463 2464 /** Wrapper for receiving the IPC_M_CONNECTION_CLONE calls.2465 *2466 * If the current call is IPC_M_CONNECTION_CLONE then a new2467 * async session is created for the accepted phone.2468 *2469 * @param mgmt Exchange management style.2470 *2471 * @return New async session or NULL on failure.2472 *2473 */2474 async_sess_t *async_clone_receive(exch_mgmt_t mgmt)2475 {2476 /* Accept the phone */2477 ipc_call_t call;2478 ipc_callid_t callid = async_get_call(&call);2479 int phone = (int) IPC_GET_ARG1(call);2480 2481 if ((IPC_GET_IMETHOD(call) != IPC_M_CONNECTION_CLONE) ||2482 (phone < 0)) {2483 async_answer_0(callid, EINVAL);2484 return NULL;2485 }2486 2487 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));2488 if (sess == NULL) {2489 async_answer_0(callid, ENOMEM);2490 return NULL;2491 }2492 2493 sess->mgmt = mgmt;2494 sess->phone = phone;2495 sess->arg1 = 0;2496 sess->arg2 = 0;2497 sess->arg3 = 0;2498 2499 fibril_mutex_initialize(&sess->remote_state_mtx);2500 sess->remote_state_data = NULL;2501 2502 list_initialize(&sess->exch_list);2503 fibril_mutex_initialize(&sess->mutex);2504 atomic_set(&sess->refcnt, 0);2505 2506 /* Acknowledge the cloned phone */2507 async_answer_0(callid, EOK);2508 2509 return sess;2510 }2511 2512 /** Wrapper for receiving the IPC_M_CONNECT_TO_ME calls.2513 *2514 * If the current call is IPC_M_CONNECT_TO_ME then a new2515 * async session is created for the accepted phone.2516 *2517 * @param mgmt Exchange management style.2518 *2519 * @return New async session.2520 * @return NULL on failure.2521 *2522 */2523 async_sess_t *async_callback_receive(exch_mgmt_t mgmt)2524 {2525 /* Accept the phone */2526 ipc_call_t call;2527 ipc_callid_t callid = async_get_call(&call);2528 int phone = (int) IPC_GET_ARG5(call);2529 2530 if ((IPC_GET_IMETHOD(call) != IPC_M_CONNECT_TO_ME) ||2531 (phone < 0)) {2532 async_answer_0(callid, EINVAL);2533 return NULL;2534 }2535 2536 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));2537 if (sess == NULL) {2538 async_answer_0(callid, ENOMEM);2539 return NULL;2540 }2541 2542 sess->mgmt = mgmt;2543 sess->phone = phone;2544 sess->arg1 = 0;2545 sess->arg2 = 0;2546 sess->arg3 = 0;2547 2548 fibril_mutex_initialize(&sess->remote_state_mtx);2549 sess->remote_state_data = NULL;2550 2551 list_initialize(&sess->exch_list);2552 fibril_mutex_initialize(&sess->mutex);2553 atomic_set(&sess->refcnt, 0);2554 2555 /* Acknowledge the connected phone */2556 async_answer_0(callid, EOK);2557 2558 return sess;2559 }2560 2561 /** Wrapper for receiving the IPC_M_CONNECT_TO_ME calls.2562 *2563 * If the call is IPC_M_CONNECT_TO_ME then a new2564 * async session is created. However, the phone is2565 * not accepted automatically.2566 *2567 * @param mgmt Exchange management style.2568 * @param call Call data.2569 *2570 * @return New async session.2571 * @return NULL on failure.2572 * @return NULL if the call is not IPC_M_CONNECT_TO_ME.2573 *2574 */2575 async_sess_t *async_callback_receive_start(exch_mgmt_t mgmt, ipc_call_t *call)2576 {2577 int phone = (int) IPC_GET_ARG5(*call);2578 2579 if ((IPC_GET_IMETHOD(*call) != IPC_M_CONNECT_TO_ME) ||2580 (phone < 0))2581 return NULL;2582 2583 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));2584 if (sess == NULL)2585 return NULL;2586 2587 sess->mgmt = mgmt;2588 sess->phone = phone;2589 sess->arg1 = 0;2590 sess->arg2 = 0;2591 sess->arg3 = 0;2592 2593 fibril_mutex_initialize(&sess->remote_state_mtx);2594 sess->remote_state_data = NULL;2595 2596 list_initialize(&sess->exch_list);2597 fibril_mutex_initialize(&sess->mutex);2598 atomic_set(&sess->refcnt, 0);2599 2600 return sess;2601 }2602 2603 int async_state_change_start(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2,2604 sysarg_t arg3, async_exch_t *other_exch)2605 {2606 return async_req_5_0(exch, IPC_M_STATE_CHANGE_AUTHORIZE,2607 arg1, arg2, arg3, 0, other_exch->phone);2608 }2609 2610 bool async_state_change_receive(ipc_callid_t *callid, sysarg_t *arg1,2611 sysarg_t *arg2, sysarg_t *arg3)2612 {2613 assert(callid);2614 2615 ipc_call_t call;2616 *callid = async_get_call(&call);2617 2618 if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE)2619 return false;2620 2621 if (arg1)2622 *arg1 = IPC_GET_ARG1(call);2623 if (arg2)2624 *arg2 = IPC_GET_ARG2(call);2625 if (arg3)2626 *arg3 = IPC_GET_ARG3(call);2627 2628 return true;2629 }2630 2631 int async_state_change_finalize(ipc_callid_t callid, async_exch_t *other_exch)2632 {2633 return ipc_answer_1(callid, EOK, other_exch->phone);2634 }2635 2636 /** Lock and get session remote state2637 *2638 * Lock and get the local replica of the remote state2639 * in stateful sessions. The call should be paired2640 * with async_remote_state_release*().2641 *2642 * @param[in] sess Stateful session.2643 *2644 * @return Local replica of the remote state.2645 *2646 */2647 void *async_remote_state_acquire(async_sess_t *sess)2648 {2649 fibril_mutex_lock(&sess->remote_state_mtx);2650 return sess->remote_state_data;2651 }2652 2653 /** Update the session remote state2654 *2655 * Update the local replica of the remote state2656 * in stateful sessions. The remote state must2657 * be already locked.2658 *2659 * @param[in] sess Stateful session.2660 * @param[in] state New local replica of the remote state.2661 *2662 */2663 void async_remote_state_update(async_sess_t *sess, void *state)2664 {2665 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));2666 sess->remote_state_data = state;2667 }2668 2669 /** Release the session remote state2670 *2671 * Unlock the local replica of the remote state2672 * in stateful sessions.2673 *2674 * @param[in] sess Stateful session.2675 *2676 */2677 void async_remote_state_release(async_sess_t *sess)2678 {2679 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));2680 2681 fibril_mutex_unlock(&sess->remote_state_mtx);2682 }2683 2684 /** Release the session remote state and end an exchange2685 *2686 * Unlock the local replica of the remote state2687 * in stateful sessions. This is convenience function2688 * which gets the session pointer from the exchange2689 * and also ends the exchange.2690 *2691 * @param[in] exch Stateful session's exchange.2692 *2693 */2694 void async_remote_state_release_exchange(async_exch_t *exch)2695 {2696 if (exch == NULL)2697 return;2698 2699 async_sess_t *sess = exch->sess;2700 assert(fibril_mutex_is_locked(&sess->remote_state_mtx));2701 2702 async_exchange_end(exch);2703 fibril_mutex_unlock(&sess->remote_state_mtx);2704 }2705 2706 1858 /** @} 2707 1859 */
Note:
See TracChangeset
for help on using the changeset viewer.