Changes in uspace/lib/c/generic/async.c [c7bbf029:cff3fb6] in mainline
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
uspace/lib/c/generic/async.c
rc7bbf029 rcff3fb6 40 40 * programming. 41 41 * 42 * You should be able to write very simple multithreaded programs, the async 43 * framework will automatically take care of most synchronization problems. 42 * You should be able to write very simple multithreaded programs. The async 43 * framework will automatically take care of most of the synchronization 44 * problems. 44 45 * 45 46 * Example of use (pseudo C): … … 53 54 * int fibril1(void *arg) 54 55 * { 55 * conn = async_connect_me_to(); 56 * c1 = async_send(conn); 57 * c2 = async_send(conn); 56 * conn = async_connect_me_to(...); 57 * 58 * exch = async_exchange_begin(conn); 59 * c1 = async_send(exch); 60 * async_exchange_end(exch); 61 * 62 * exch = async_exchange_begin(conn); 63 * c2 = async_send(exch); 64 * async_exchange_end(exch); 65 * 58 66 * async_wait_for(c1); 59 67 * async_wait_for(c2); … … 90 98 #include <ipc/ipc.h> 91 99 #include <async.h> 100 #include "private/async.h" 92 101 #undef LIBC_ASYNC_C_ 93 102 94 103 #include <futex.h> 95 104 #include <fibril.h> 96 #include <stdio.h>97 105 #include <adt/hash_table.h> 98 106 #include <adt/list.h> … … 100 108 #include <errno.h> 101 109 #include <sys/time.h> 102 #include < arch/barrier.h>110 #include <libarch/barrier.h> 103 111 #include <bool.h> 112 #include <malloc.h> 113 #include <mem.h> 104 114 #include <stdlib.h> 105 #include <malloc.h> 106 #include "private/async.h" 107 115 #include <macros.h> 116 117 #define CLIENT_HASH_TABLE_BUCKETS 32 118 #define CONN_HASH_TABLE_BUCKETS 32 119 120 /** Session data */ 121 struct async_sess { 122 /** List of inactive exchanges */ 123 list_t exch_list; 124 125 /** Exchange management style */ 126 exch_mgmt_t mgmt; 127 128 /** Session identification */ 129 int phone; 130 131 /** First clone connection argument */ 132 sysarg_t arg1; 133 134 /** Second clone connection argument */ 135 sysarg_t arg2; 136 137 /** Third clone connection argument */ 138 sysarg_t arg3; 139 140 /** Exchange mutex */ 141 fibril_mutex_t mutex; 142 143 /** Number of opened exchanges */ 144 atomic_t refcnt; 145 146 /** Mutex for stateful connections */ 147 fibril_mutex_t remote_state_mtx; 148 149 /** Data for stateful connections */ 150 void *remote_state_data; 151 }; 152 153 /** Exchange data */ 154 struct async_exch { 155 /** Link into list of inactive exchanges */ 156 link_t sess_link; 157 158 /** Link into global list of inactive exchanges */ 159 link_t global_link; 160 161 /** Session pointer */ 162 async_sess_t *sess; 163 164 /** Exchange identification */ 165 int phone; 166 }; 167 168 /** Async framework global futex */ 108 169 atomic_t async_futex = FUTEX_INITIALIZER; 109 170 … … 111 172 atomic_t threads_in_ipc_wait = { 0 }; 112 173 113 typedef struct { 114 awaiter_t wdata; 115 116 /** If reply was received. */ 117 bool done; 118 119 /** Pointer to where the answer data is stored. */ 120 ipc_call_t *dataptr; 121 122 sysarg_t retval; 123 } amsg_t; 124 125 /** 126 * Structures of this type are used to group information about 127 * a call and about a message queue link. 128 */ 174 /** Naming service session */ 175 async_sess_t *session_ns; 176 177 /** Call data */ 129 178 typedef struct { 130 179 link_t link; 180 131 181 ipc_callid_t callid; 132 182 ipc_call_t call; 133 183 } msg_t; 134 184 185 /** Message data */ 135 186 typedef struct { 136 sysarg_t in_task_hash; 187 awaiter_t wdata; 188 189 /** If reply was received. */ 190 bool done; 191 192 /** Pointer to where the answer data is stored. */ 193 ipc_call_t *dataptr; 194 195 sysarg_t retval; 196 } amsg_t; 197 198 /* Client connection data */ 199 typedef struct { 137 200 link_t link; 138 int refcnt; 201 202 task_id_t in_task_id; 203 atomic_t refcnt; 139 204 void *data; 140 205 } client_t; 141 206 207 /* Server connection data */ 142 208 typedef struct { 143 209 awaiter_t wdata; … … 146 212 link_t link; 147 213 148 /** Incoming client task hash. */ 149 sysarg_t in_task_hash; 214 /** Incoming client task ID. */ 215 task_id_t in_task_id; 216 150 217 /** Incoming phone hash. */ 151 218 sysarg_t in_phone_hash; … … 155 222 156 223 /** Messages that should be delivered to this fibril. */ 157 li nk_t msg_queue;224 list_t msg_queue; 158 225 159 226 /** Identification of the opening call. */ … … 161 228 /** Call data of the opening call. */ 162 229 ipc_call_t call; 230 /** Local argument or NULL if none. */ 231 void *carg; 163 232 164 233 /** Identification of the closing call. */ … … 166 235 167 236 /** Fibril function that will be used to handle the connection. */ 168 void (*cfibril)(ipc_callid_t, ipc_call_t *);237 async_client_conn_t cfibril; 169 238 } connection_t; 170 239 171 240 /** Identifier of the incoming connection handled by the current fibril. */ 172 static fibril_local connection_t * FIBRIL_connection;241 static fibril_local connection_t *fibril_connection; 173 242 174 243 static void *default_client_data_constructor(void) … … 196 265 } 197 266 198 void *async_client_data_get(void)199 {200 assert(FIBRIL_connection);201 return FIBRIL_connection->client->data;202 }203 204 267 /** Default fibril function that gets called to handle new connection. 205 268 * … … 208 271 * @param callid Hash of the incoming call. 209 272 * @param call Data of the incoming call. 210 * 211 */ 212 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call) 273 * @param arg Local argument 274 * 275 */ 276 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call, 277 void *arg) 213 278 { 214 279 ipc_answer_0(callid, ENOENT); 215 280 } 216 217 /**218 * Pointer to a fibril function that will be used to handle connections.219 */220 static async_client_conn_t client_connection = default_client_connection;221 281 222 282 /** Default fibril function that gets called to handle interrupt notifications. … … 226 286 * @param callid Hash of the incoming call. 227 287 * @param call Data of the incoming call. 288 * @param arg Local argument. 228 289 * 229 290 */ … … 232 293 } 233 294 234 /** 235 * Pointer to a fibril function that will be used to handle interrupt 236 * notifications. 237 */ 238 static async_client_conn_t interrupt_received = default_interrupt_received; 295 static async_client_conn_t client_connection = default_client_connection; 296 static async_interrupt_handler_t interrupt_received = default_interrupt_received; 297 298 /** Setter for client_connection function pointer. 299 * 300 * @param conn Function that will implement a new connection fibril. 301 * 302 */ 303 void async_set_client_connection(async_client_conn_t conn) 304 { 305 client_connection = conn; 306 } 307 308 /** Setter for interrupt_received function pointer. 309 * 310 * @param intr Function that will implement a new interrupt 311 * notification fibril. 312 */ 313 void async_set_interrupt_received(async_interrupt_handler_t intr) 314 { 315 interrupt_received = intr; 316 } 317 318 /** Mutex protecting inactive_exch_list and avail_phone_cv. 319 * 320 */ 321 static FIBRIL_MUTEX_INITIALIZE(async_sess_mutex); 322 323 /** List of all currently inactive exchanges. 324 * 325 */ 326 static LIST_INITIALIZE(inactive_exch_list); 327 328 /** Condition variable to wait for a phone to become available. 329 * 330 */ 331 static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv); 239 332 240 333 static hash_table_t client_hash_table; … … 242 335 static LIST_INITIALIZE(timeout_list); 243 336 244 #define CLIENT_HASH_TABLE_BUCKETS 32245 #define CONN_HASH_TABLE_BUCKETS 32246 247 337 static hash_index_t client_hash(unsigned long key[]) 248 338 { 249 339 assert(key); 340 250 341 return (((key[0]) >> 4) % CLIENT_HASH_TABLE_BUCKETS); 251 342 } … … 253 344 static int client_compare(unsigned long key[], hash_count_t keys, link_t *item) 254 345 { 346 assert(key); 347 assert(keys == 2); 348 assert(item); 349 255 350 client_t *client = hash_table_get_instance(item, client_t, link); 256 return (key[0] == client->in_task_hash); 351 return (key[0] == LOWER32(client->in_task_id) && 352 (key[1] == UPPER32(client->in_task_id))); 257 353 } 258 354 … … 278 374 { 279 375 assert(key); 376 280 377 return (((key[0]) >> 4) % CONN_HASH_TABLE_BUCKETS); 281 378 } … … 292 389 static int conn_compare(unsigned long key[], hash_count_t keys, link_t *item) 293 390 { 391 assert(key); 392 assert(item); 393 294 394 connection_t *conn = hash_table_get_instance(item, connection_t, link); 295 395 return (key[0] == conn->in_phone_hash); … … 314 414 void async_insert_timeout(awaiter_t *wd) 315 415 { 416 assert(wd); 417 316 418 wd->to_event.occurred = false; 317 419 wd->to_event.inlist = true; 318 420 319 link_t *tmp = timeout_list. next;320 while (tmp != &timeout_list ) {421 link_t *tmp = timeout_list.head.next; 422 while (tmp != &timeout_list.head) { 321 423 awaiter_t *cur 322 424 = list_get_instance(tmp, awaiter_t, to_event.link); … … 328 430 } 329 431 330 list_ append(&wd->to_event.link, tmp);432 list_insert_before(&wd->to_event.link, tmp); 331 433 } 332 434 … … 346 448 static bool route_call(ipc_callid_t callid, ipc_call_t *call) 347 449 { 450 assert(call); 451 348 452 futex_down(&async_futex); 349 453 … … 400 504 static int notification_fibril(void *arg) 401 505 { 506 assert(arg); 507 402 508 msg_t *msg = (msg_t *) arg; 403 509 interrupt_received(msg->callid, &msg->call); … … 420 526 static bool process_notification(ipc_callid_t callid, ipc_call_t *call) 421 527 { 528 assert(call); 529 422 530 futex_down(&async_futex); 423 531 … … 458 566 ipc_callid_t async_get_call_timeout(ipc_call_t *call, suseconds_t usecs) 459 567 { 460 assert(FIBRIL_connection); 568 assert(call); 569 assert(fibril_connection); 461 570 462 571 /* Why doing this? 463 * GCC 4.1.0 coughs on FIBRIL_connection-> dereference.572 * GCC 4.1.0 coughs on fibril_connection-> dereference. 464 573 * GCC 4.1.1 happilly puts the rdhwr instruction in delay slot. 465 574 * I would never expect to find so many errors in 466 575 * a compiler. 467 576 */ 468 connection_t *conn = FIBRIL_connection;577 connection_t *conn = fibril_connection; 469 578 470 579 futex_down(&async_futex); … … 518 627 } 519 628 520 msg_t *msg = list_get_instance( conn->msg_queue.next, msg_t, link);629 msg_t *msg = list_get_instance(list_first(&conn->msg_queue), msg_t, link); 521 630 list_remove(&msg->link); 522 631 … … 529 638 } 530 639 640 static client_t *async_client_get(task_id_t client_id, bool create) 641 { 642 unsigned long key[2] = { 643 LOWER32(client_id), 644 UPPER32(client_id), 645 }; 646 client_t *client = NULL; 647 648 futex_down(&async_futex); 649 link_t *lnk = hash_table_find(&client_hash_table, key); 650 if (lnk) { 651 client = hash_table_get_instance(lnk, client_t, link); 652 atomic_inc(&client->refcnt); 653 } else if (create) { 654 client = malloc(sizeof(client_t)); 655 if (client) { 656 client->in_task_id = client_id; 657 client->data = async_client_data_create(); 658 659 atomic_set(&client->refcnt, 1); 660 hash_table_insert(&client_hash_table, key, &client->link); 661 } 662 } 663 664 futex_up(&async_futex); 665 return client; 666 } 667 668 static void async_client_put(client_t *client) 669 { 670 bool destroy; 671 unsigned long key[2] = { 672 LOWER32(client->in_task_id), 673 UPPER32(client->in_task_id) 674 }; 675 676 futex_down(&async_futex); 677 678 if (atomic_predec(&client->refcnt) == 0) { 679 hash_table_remove(&client_hash_table, key, 2); 680 destroy = true; 681 } else 682 destroy = false; 683 684 futex_up(&async_futex); 685 686 if (destroy) { 687 if (client->data) 688 async_client_data_destroy(client->data); 689 690 free(client); 691 } 692 } 693 694 void *async_get_client_data(void) 695 { 696 assert(fibril_connection); 697 return fibril_connection->client->data; 698 } 699 700 void *async_get_client_data_by_id(task_id_t client_id) 701 { 702 client_t *client = async_client_get(client_id, false); 703 if (!client) 704 return NULL; 705 if (!client->data) { 706 async_client_put(client); 707 return NULL; 708 } 709 710 return client->data; 711 } 712 713 void async_put_client_data_by_id(task_id_t client_id) 714 { 715 client_t *client = async_client_get(client_id, false); 716 717 assert(client); 718 assert(client->data); 719 720 /* Drop the reference we got in async_get_client_data_by_hash(). */ 721 async_client_put(client); 722 723 /* Drop our own reference we got at the beginning of this function. */ 724 async_client_put(client); 725 } 726 531 727 /** Wrapper for client connection fibril. 532 728 * … … 541 737 static int connection_fibril(void *arg) 542 738 { 739 assert(arg); 740 543 741 /* 544 742 * Setup fibril-local connection pointer. 545 743 */ 546 FIBRIL_connection = (connection_t *) arg; 547 548 futex_down(&async_futex); 744 fibril_connection = (connection_t *) arg; 549 745 550 746 /* … … 553 749 * hash in a new tracking structure. 554 750 */ 555 556 unsigned long key = FIBRIL_connection->in_task_hash; 557 link_t *lnk = hash_table_find(&client_hash_table, &key); 558 559 client_t *client; 560 561 if (lnk) { 562 client = hash_table_get_instance(lnk, client_t, link); 563 client->refcnt++; 564 } else { 565 client = malloc(sizeof(client_t)); 566 if (!client) { 567 ipc_answer_0(FIBRIL_connection->callid, ENOMEM); 568 futex_up(&async_futex); 569 return 0; 570 } 571 572 client->in_task_hash = FIBRIL_connection->in_task_hash; 573 574 async_serialize_start(); 575 client->data = async_client_data_create(); 576 async_serialize_end(); 577 578 client->refcnt = 1; 579 hash_table_insert(&client_hash_table, &key, &client->link); 580 } 581 582 futex_up(&async_futex); 583 584 FIBRIL_connection->client = client; 751 752 client_t *client = async_client_get(fibril_connection->in_task_id, true); 753 if (!client) { 754 ipc_answer_0(fibril_connection->callid, ENOMEM); 755 return 0; 756 } 757 758 fibril_connection->client = client; 585 759 586 760 /* 587 761 * Call the connection handler function. 588 762 */ 589 FIBRIL_connection->cfibril(FIBRIL_connection->callid,590 & FIBRIL_connection->call);763 fibril_connection->cfibril(fibril_connection->callid, 764 &fibril_connection->call, fibril_connection->carg); 591 765 592 766 /* 593 767 * Remove the reference for this client task connection. 594 768 */ 595 bool destroy; 596 597 futex_down(&async_futex); 598 599 if (--client->refcnt == 0) { 600 hash_table_remove(&client_hash_table, &key, 1); 601 destroy = true; 602 } else 603 destroy = false; 604 605 futex_up(&async_futex); 606 607 if (destroy) { 608 if (client->data) 609 async_client_data_destroy(client->data); 610 611 free(client); 612 } 769 async_client_put(client); 613 770 614 771 /* … … 616 773 */ 617 774 futex_down(&async_futex); 618 key = FIBRIL_connection->in_phone_hash;775 unsigned long key = fibril_connection->in_phone_hash; 619 776 hash_table_remove(&conn_hash_table, &key, 1); 620 777 futex_up(&async_futex); … … 623 780 * Answer all remaining messages with EHANGUP. 624 781 */ 625 while (!list_empty(& FIBRIL_connection->msg_queue)) {782 while (!list_empty(&fibril_connection->msg_queue)) { 626 783 msg_t *msg = 627 list_get_instance( FIBRIL_connection->msg_queue.next, msg_t,628 link);784 list_get_instance(list_first(&fibril_connection->msg_queue), 785 msg_t, link); 629 786 630 787 list_remove(&msg->link); … … 637 794 * i.e. IPC_M_PHONE_HUNGUP. 638 795 */ 639 if ( FIBRIL_connection->close_callid)640 ipc_answer_0( FIBRIL_connection->close_callid, EOK);641 642 free( FIBRIL_connection);796 if (fibril_connection->close_callid) 797 ipc_answer_0(fibril_connection->close_callid, EOK); 798 799 free(fibril_connection); 643 800 return 0; 644 801 } … … 646 803 /** Create a new fibril for a new connection. 647 804 * 648 * Create new fibril for connection, fill in connection structures and insert s805 * Create new fibril for connection, fill in connection structures and insert 649 806 * it into the hash table, so that later we can easily do routing of messages to 650 807 * particular fibrils. 651 808 * 652 * @param in_task_ hashIdentification of the incoming connection.809 * @param in_task_id Identification of the incoming connection. 653 810 * @param in_phone_hash Identification of the incoming connection. 654 811 * @param callid Hash of the opening IPC_M_CONNECT_ME_TO call. … … 659 816 * @param cfibril Fibril function that should be called upon opening the 660 817 * connection. 818 * @param carg Extra argument to pass to the connection fibril 661 819 * 662 820 * @return New fibril id or NULL on failure. 663 821 * 664 822 */ 665 fid_t async_new_connection( sysarg_t in_task_hash, sysarg_t in_phone_hash,823 fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash, 666 824 ipc_callid_t callid, ipc_call_t *call, 667 void (*cfibril)(ipc_callid_t, ipc_call_t *))825 async_client_conn_t cfibril, void *carg) 668 826 { 669 827 connection_t *conn = malloc(sizeof(*conn)); … … 675 833 } 676 834 677 conn->in_task_ hash = in_task_hash;835 conn->in_task_id = in_task_id; 678 836 conn->in_phone_hash = in_phone_hash; 679 837 list_initialize(&conn->msg_queue); 680 838 conn->callid = callid; 681 839 conn->close_callid = 0; 840 conn->carg = carg; 682 841 683 842 if (call) … … 721 880 static void handle_call(ipc_callid_t callid, ipc_call_t *call) 722 881 { 882 assert(call); 883 723 884 /* Unrouted call - take some default action */ 724 885 if ((callid & IPC_CALLID_NOTIFICATION)) { … … 731 892 case IPC_M_CONNECT_ME_TO: 732 893 /* Open new connection with fibril, etc. */ 733 async_new_connection(call->in_task_ hash, IPC_GET_ARG5(*call),734 callid, call, client_connection );894 async_new_connection(call->in_task_id, IPC_GET_ARG5(*call), 895 callid, call, client_connection, NULL); 735 896 return; 736 897 } … … 752 913 futex_down(&async_futex); 753 914 754 link_t *cur = timeout_list.next;755 while (cur != &timeout_list) {915 link_t *cur = list_first(&timeout_list); 916 while (cur != NULL) { 756 917 awaiter_t *waiter = 757 918 list_get_instance(cur, awaiter_t, to_event.link); … … 759 920 if (tv_gt(&waiter->to_event.expires, &tv)) 760 921 break; 761 762 cur = cur->next;763 922 764 923 list_remove(&waiter->to_event.link); … … 774 933 fibril_add_ready(waiter->fid); 775 934 } 935 936 cur = list_first(&timeout_list); 776 937 } 777 938 … … 800 961 suseconds_t timeout; 801 962 if (!list_empty(&timeout_list)) { 802 awaiter_t *waiter = list_get_instance( timeout_list.next,803 awaiter_t, to_event.link);963 awaiter_t *waiter = list_get_instance( 964 list_first(&timeout_list), awaiter_t, to_event.link); 804 965 805 966 struct timeval tv; … … 878 1039 void __async_init(void) 879 1040 { 880 if (!hash_table_create(&client_hash_table, CLIENT_HASH_TABLE_BUCKETS, 1,881 &client_hash_table_ops))1041 if (!hash_table_create(&client_hash_table, CLIENT_HASH_TABLE_BUCKETS, 1042 2, &client_hash_table_ops)) 882 1043 abort(); 883 1044 884 if (!hash_table_create(&conn_hash_table, CONN_HASH_TABLE_BUCKETS, 1,885 &conn_hash_table_ops))1045 if (!hash_table_create(&conn_hash_table, CONN_HASH_TABLE_BUCKETS, 1046 1, &conn_hash_table_ops)) 886 1047 abort(); 1048 1049 session_ns = (async_sess_t *) malloc(sizeof(async_sess_t)); 1050 if (session_ns == NULL) 1051 abort(); 1052 1053 session_ns->mgmt = EXCHANGE_ATOMIC; 1054 session_ns->phone = PHONE_NS; 1055 session_ns->arg1 = 0; 1056 session_ns->arg2 = 0; 1057 session_ns->arg3 = 0; 1058 1059 fibril_mutex_initialize(&session_ns->remote_state_mtx); 1060 session_ns->remote_state_data = NULL; 1061 1062 list_initialize(&session_ns->exch_list); 1063 fibril_mutex_initialize(&session_ns->mutex); 1064 atomic_set(&session_ns->refcnt, 0); 887 1065 } 888 1066 … … 899 1077 * 900 1078 */ 901 static void reply_received(void *arg, int retval, ipc_call_t *data) 902 { 1079 void reply_received(void *arg, int retval, ipc_call_t *data) 1080 { 1081 assert(arg); 1082 903 1083 futex_down(&async_futex); 904 1084 … … 930 1110 * completion. 931 1111 * 932 * @param phoneid Handle of the phone that will be used for the send.933 * @param method Service-defined method.1112 * @param exch Exchange for sending the message. 1113 * @param imethod Service-defined interface and method. 934 1114 * @param arg1 Service-defined payload argument. 935 1115 * @param arg2 Service-defined payload argument. … … 942 1122 * 943 1123 */ 944 aid_t async_send_fast( int phoneid, sysarg_tmethod, sysarg_t arg1,1124 aid_t async_send_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1, 945 1125 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, ipc_call_t *dataptr) 946 1126 { 1127 if (exch == NULL) 1128 return 0; 1129 947 1130 amsg_t *msg = malloc(sizeof(amsg_t)); 948 949 if (!msg) 1131 if (msg == NULL) 950 1132 return 0; 951 1133 … … 961 1143 msg->wdata.active = true; 962 1144 963 ipc_call_async_4( phoneid,method, arg1, arg2, arg3, arg4, msg,1145 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, msg, 964 1146 reply_received, true); 965 1147 … … 972 1154 * completion. 973 1155 * 974 * @param phoneid Handle of the phone that will be used for the send.975 * @param method Service-defined method.1156 * @param exch Exchange for sending the message. 1157 * @param imethod Service-defined interface and method. 976 1158 * @param arg1 Service-defined payload argument. 977 1159 * @param arg2 Service-defined payload argument. … … 985 1167 * 986 1168 */ 987 aid_t async_send_slow( int phoneid, sysarg_tmethod, sysarg_t arg1,1169 aid_t async_send_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1, 988 1170 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5, 989 1171 ipc_call_t *dataptr) 990 1172 { 1173 if (exch == NULL) 1174 return 0; 1175 991 1176 amsg_t *msg = malloc(sizeof(amsg_t)); 992 1177 993 if ( !msg)1178 if (msg == NULL) 994 1179 return 0; 995 1180 … … 1005 1190 msg->wdata.active = true; 1006 1191 1007 ipc_call_async_5( phoneid, method, arg1, arg2, arg3, arg4, arg5, msg,1008 reply_received, true);1192 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, arg5, 1193 msg, reply_received, true); 1009 1194 1010 1195 return (aid_t) msg; … … 1020 1205 void async_wait_for(aid_t amsgid, sysarg_t *retval) 1021 1206 { 1207 assert(amsgid); 1208 1022 1209 amsg_t *msg = (amsg_t *) amsgid; 1023 1210 … … 1056 1243 int async_wait_timeout(aid_t amsgid, sysarg_t *retval, suseconds_t timeout) 1057 1244 { 1245 assert(amsgid); 1246 1058 1247 amsg_t *msg = (amsg_t *) amsgid; 1059 1248 … … 1124 1313 } 1125 1314 1126 /** Setter for client_connection function pointer.1127 *1128 * @param conn Function that will implement a new connection fibril.1129 *1130 */1131 void async_set_client_connection(async_client_conn_t conn)1132 {1133 client_connection = conn;1134 }1135 1136 /** Setter for interrupt_received function pointer.1137 *1138 * @param intr Function that will implement a new interrupt1139 * notification fibril.1140 */1141 void async_set_interrupt_received(async_client_conn_t intr)1142 {1143 interrupt_received = intr;1144 }1145 1146 1315 /** Pseudo-synchronous message sending - fast version. 1147 1316 * … … 1151 1320 * transferring more arguments, see the slower async_req_slow(). 1152 1321 * 1153 * @param phoneid Hash of the phone through which to make the call.1154 * @param method Method of the call.1322 * @param exch Exchange for sending the message. 1323 * @param imethod Interface and method of the call. 1155 1324 * @param arg1 Service-defined payload argument. 1156 1325 * @param arg2 Service-defined payload argument. … … 1166 1335 * 1167 1336 */ 1168 sysarg_t async_req_fast( int phoneid, sysarg_tmethod, sysarg_t arg1,1337 sysarg_t async_req_fast(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1, 1169 1338 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t *r1, sysarg_t *r2, 1170 1339 sysarg_t *r3, sysarg_t *r4, sysarg_t *r5) 1171 1340 { 1341 if (exch == NULL) 1342 return ENOENT; 1343 1172 1344 ipc_call_t result; 1173 aid_t eid = async_send_4(phoneid,method, arg1, arg2, arg3, arg4,1345 aid_t aid = async_send_4(exch, imethod, arg1, arg2, arg3, arg4, 1174 1346 &result); 1175 1347 1176 1348 sysarg_t rc; 1177 async_wait_for( eid, &rc);1349 async_wait_for(aid, &rc); 1178 1350 1179 1351 if (r1) … … 1199 1371 * Send message asynchronously and return only after the reply arrives. 1200 1372 * 1201 * @param phoneid Hash of the phone through which to make the call.1202 * @param method Method of the call.1373 * @param exch Exchange for sending the message. 1374 * @param imethod Interface and method of the call. 1203 1375 * @param arg1 Service-defined payload argument. 1204 1376 * @param arg2 Service-defined payload argument. … … 1215 1387 * 1216 1388 */ 1217 sysarg_t async_req_slow( int phoneid, sysarg_tmethod, sysarg_t arg1,1389 sysarg_t async_req_slow(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1, 1218 1390 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5, sysarg_t *r1, 1219 1391 sysarg_t *r2, sysarg_t *r3, sysarg_t *r4, sysarg_t *r5) 1220 1392 { 1393 if (exch == NULL) 1394 return ENOENT; 1395 1221 1396 ipc_call_t result; 1222 aid_t eid = async_send_5(phoneid,method, arg1, arg2, arg3, arg4, arg5,1397 aid_t aid = async_send_5(exch, imethod, arg1, arg2, arg3, arg4, arg5, 1223 1398 &result); 1224 1399 1225 1400 sysarg_t rc; 1226 async_wait_for( eid, &rc);1401 async_wait_for(aid, &rc); 1227 1402 1228 1403 if (r1) … … 1244 1419 } 1245 1420 1246 void async_msg_0(int phone, sysarg_t imethod) 1247 { 1248 ipc_call_async_0(phone, imethod, NULL, NULL, true); 1249 } 1250 1251 void async_msg_1(int phone, sysarg_t imethod, sysarg_t arg1) 1252 { 1253 ipc_call_async_1(phone, imethod, arg1, NULL, NULL, true); 1254 } 1255 1256 void async_msg_2(int phone, sysarg_t imethod, sysarg_t arg1, sysarg_t arg2) 1257 { 1258 ipc_call_async_2(phone, imethod, arg1, arg2, NULL, NULL, true); 1259 } 1260 1261 void async_msg_3(int phone, sysarg_t imethod, sysarg_t arg1, sysarg_t arg2, 1262 sysarg_t arg3) 1263 { 1264 ipc_call_async_3(phone, imethod, arg1, arg2, arg3, NULL, NULL, true); 1265 } 1266 1267 void async_msg_4(int phone, sysarg_t imethod, sysarg_t arg1, sysarg_t arg2, 1268 sysarg_t arg3, sysarg_t arg4) 1269 { 1270 ipc_call_async_4(phone, imethod, arg1, arg2, arg3, arg4, NULL, NULL, 1271 true); 1272 } 1273 1274 void async_msg_5(int phone, sysarg_t imethod, sysarg_t arg1, sysarg_t arg2, 1275 sysarg_t arg3, sysarg_t arg4, sysarg_t arg5) 1276 { 1277 ipc_call_async_5(phone, imethod, arg1, arg2, arg3, arg4, arg5, NULL, 1278 NULL, true); 1421 void async_msg_0(async_exch_t *exch, sysarg_t imethod) 1422 { 1423 if (exch != NULL) 1424 ipc_call_async_0(exch->phone, imethod, NULL, NULL, true); 1425 } 1426 1427 void async_msg_1(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1) 1428 { 1429 if (exch != NULL) 1430 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL, true); 1431 } 1432 1433 void async_msg_2(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1, 1434 sysarg_t arg2) 1435 { 1436 if (exch != NULL) 1437 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL, 1438 true); 1439 } 1440 1441 void async_msg_3(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1, 1442 sysarg_t arg2, sysarg_t arg3) 1443 { 1444 if (exch != NULL) 1445 ipc_call_async_3(exch->phone, imethod, arg1, arg2, arg3, NULL, 1446 NULL, true); 1447 } 1448 1449 void async_msg_4(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1, 1450 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4) 1451 { 1452 if (exch != NULL) 1453 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, 1454 NULL, NULL, true); 1455 } 1456 1457 void async_msg_5(async_exch_t *exch, sysarg_t imethod, sysarg_t arg1, 1458 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5) 1459 { 1460 if (exch != NULL) 1461 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, 1462 arg5, NULL, NULL, true); 1279 1463 } 1280 1464 … … 1313 1497 } 1314 1498 1315 int async_forward_fast(ipc_callid_t callid, int phoneid, sysarg_t imethod, 1316 sysarg_t arg1, sysarg_t arg2, unsigned int mode) 1317 { 1318 return ipc_forward_fast(callid, phoneid, imethod, arg1, arg2, mode); 1319 } 1320 1321 int async_forward_slow(ipc_callid_t callid, int phoneid, sysarg_t imethod, 1322 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, sysarg_t arg5, 1323 unsigned int mode) 1324 { 1325 return ipc_forward_slow(callid, phoneid, imethod, arg1, arg2, arg3, arg4, 1326 arg5, mode); 1499 int async_forward_fast(ipc_callid_t callid, async_exch_t *exch, 1500 sysarg_t imethod, sysarg_t arg1, sysarg_t arg2, unsigned int mode) 1501 { 1502 if (exch == NULL) 1503 return ENOENT; 1504 1505 return ipc_forward_fast(callid, exch->phone, imethod, arg1, arg2, mode); 1506 } 1507 1508 int async_forward_slow(ipc_callid_t callid, async_exch_t *exch, 1509 sysarg_t imethod, sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, 1510 sysarg_t arg4, sysarg_t arg5, unsigned int mode) 1511 { 1512 if (exch == NULL) 1513 return ENOENT; 1514 1515 return ipc_forward_slow(callid, exch->phone, imethod, arg1, arg2, arg3, 1516 arg4, arg5, mode); 1327 1517 } 1328 1518 … … 1331 1521 * Ask through phone for a new connection to some service. 1332 1522 * 1333 * @param phone Phone handle used for contacting the other side.1523 * @param exch Exchange for sending the message. 1334 1524 * @param arg1 User defined argument. 1335 1525 * @param arg2 User defined argument. … … 1337 1527 * @param client_receiver Connection handing routine. 1338 1528 * 1339 * @return New phone handle on success or a negative error code. 1340 * 1341 */ 1342 int async_connect_to_me(int phone, sysarg_t arg1, sysarg_t arg2, 1343 sysarg_t arg3, async_client_conn_t client_receiver) 1344 { 1345 sysarg_t task_hash; 1529 * @return Zero on success or a negative error code. 1530 * 1531 */ 1532 int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2, 1533 sysarg_t arg3, async_client_conn_t client_receiver, void *carg) 1534 { 1535 if (exch == NULL) 1536 return ENOENT; 1537 1346 1538 sysarg_t phone_hash; 1347 int rc = async_req_3_5(phone, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3, 1348 NULL, NULL, NULL, &task_hash, &phone_hash); 1539 sysarg_t rc; 1540 1541 aid_t req; 1542 ipc_call_t answer; 1543 req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3, 1544 &answer); 1545 async_wait_for(req, &rc); 1546 if (rc != EOK) 1547 return (int) rc; 1548 1549 phone_hash = IPC_GET_ARG5(answer); 1550 1551 if (client_receiver != NULL) 1552 async_new_connection(answer.in_task_id, phone_hash, 0, NULL, 1553 client_receiver, carg); 1554 1555 return EOK; 1556 } 1557 1558 /** Wrapper for making IPC_M_CONNECT_ME calls using the async framework. 1559 * 1560 * Ask through for a cloned connection to some service. 1561 * 1562 * @param mgmt Exchange management style. 1563 * @param exch Exchange for sending the message. 1564 * 1565 * @return New session on success or NULL on error. 1566 * 1567 */ 1568 async_sess_t *async_connect_me(exch_mgmt_t mgmt, async_exch_t *exch) 1569 { 1570 if (exch == NULL) { 1571 errno = ENOENT; 1572 return NULL; 1573 } 1574 1575 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t)); 1576 if (sess == NULL) { 1577 errno = ENOMEM; 1578 return NULL; 1579 } 1580 1581 ipc_call_t result; 1582 1583 amsg_t *msg = malloc(sizeof(amsg_t)); 1584 if (msg == NULL) { 1585 free(sess); 1586 errno = ENOMEM; 1587 return NULL; 1588 } 1589 1590 msg->done = false; 1591 msg->dataptr = &result; 1592 1593 msg->wdata.to_event.inlist = false; 1594 1595 /* 1596 * We may sleep in the next method, 1597 * but it will use its own means 1598 */ 1599 msg->wdata.active = true; 1600 1601 ipc_call_async_0(exch->phone, IPC_M_CONNECT_ME, msg, 1602 reply_received, true); 1603 1604 sysarg_t rc; 1605 async_wait_for((aid_t) msg, &rc); 1606 1607 if (rc != EOK) { 1608 errno = rc; 1609 free(sess); 1610 return NULL; 1611 } 1612 1613 int phone = (int) IPC_GET_ARG5(result); 1614 1615 if (phone < 0) { 1616 errno = phone; 1617 free(sess); 1618 return NULL; 1619 } 1620 1621 sess->mgmt = mgmt; 1622 sess->phone = phone; 1623 sess->arg1 = 0; 1624 sess->arg2 = 0; 1625 sess->arg3 = 0; 1626 1627 fibril_mutex_initialize(&sess->remote_state_mtx); 1628 sess->remote_state_data = NULL; 1629 1630 list_initialize(&sess->exch_list); 1631 fibril_mutex_initialize(&sess->mutex); 1632 atomic_set(&sess->refcnt, 0); 1633 1634 return sess; 1635 } 1636 1637 static int async_connect_me_to_internal(int phone, sysarg_t arg1, sysarg_t arg2, 1638 sysarg_t arg3, sysarg_t arg4) 1639 { 1640 ipc_call_t result; 1641 1642 amsg_t *msg = malloc(sizeof(amsg_t)); 1643 if (msg == NULL) 1644 return ENOENT; 1645 1646 msg->done = false; 1647 msg->dataptr = &result; 1648 1649 msg->wdata.to_event.inlist = false; 1650 1651 /* 1652 * We may sleep in the next method, 1653 * but it will use its own means 1654 */ 1655 msg->wdata.active = true; 1656 1657 ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, arg4, 1658 msg, reply_received, true); 1659 1660 sysarg_t rc; 1661 async_wait_for((aid_t) msg, &rc); 1662 1349 1663 if (rc != EOK) 1350 1664 return rc; 1351 1665 1352 if (client_receiver != NULL) 1353 async_new_connection(task_hash, phone_hash, 0, NULL, 1354 client_receiver); 1355 1356 return EOK; 1666 return (int) IPC_GET_ARG5(result); 1357 1667 } 1358 1668 1359 1669 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework. 1360 1670 * 1361 * Ask through phone for a new connection to some service. 1362 * 1363 * @param phone Phone handle used for contacting the other side. 1364 * @param arg1 User defined argument. 1365 * @param arg2 User defined argument. 1366 * @param arg3 User defined argument. 1367 * 1368 * @return New phone handle on success or a negative error code. 1369 * 1370 */ 1371 int async_connect_me_to(int phone, sysarg_t arg1, sysarg_t arg2, 1671 * Ask through for a new connection to some service. 1672 * 1673 * @param mgmt Exchange management style. 1674 * @param exch Exchange for sending the message. 1675 * @param arg1 User defined argument. 1676 * @param arg2 User defined argument. 1677 * @param arg3 User defined argument. 1678 * 1679 * @return New session on success or NULL on error. 1680 * 1681 */ 1682 async_sess_t *async_connect_me_to(exch_mgmt_t mgmt, async_exch_t *exch, 1683 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3) 1684 { 1685 if (exch == NULL) { 1686 errno = ENOENT; 1687 return NULL; 1688 } 1689 1690 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t)); 1691 if (sess == NULL) { 1692 errno = ENOMEM; 1693 return NULL; 1694 } 1695 1696 int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3, 1697 0); 1698 1699 if (phone < 0) { 1700 errno = phone; 1701 free(sess); 1702 return NULL; 1703 } 1704 1705 sess->mgmt = mgmt; 1706 sess->phone = phone; 1707 sess->arg1 = arg1; 1708 sess->arg2 = arg2; 1709 sess->arg3 = arg3; 1710 1711 fibril_mutex_initialize(&sess->remote_state_mtx); 1712 sess->remote_state_data = NULL; 1713 1714 list_initialize(&sess->exch_list); 1715 fibril_mutex_initialize(&sess->mutex); 1716 atomic_set(&sess->refcnt, 0); 1717 1718 return sess; 1719 } 1720 1721 /** Set arguments for new connections. 1722 * 1723 * FIXME This is an ugly hack to work around the problem that parallel 1724 * exchanges are implemented using parallel connections. When we create 1725 * a callback session, the framework does not know arguments for the new 1726 * connections. 1727 * 1728 * The proper solution seems to be to implement parallel exchanges using 1729 * tagging. 1730 */ 1731 void async_sess_args_set(async_sess_t *sess, sysarg_t arg1, sysarg_t arg2, 1372 1732 sysarg_t arg3) 1373 1733 { 1374 sysarg_t newphid; 1375 int rc = async_req_3_5(phone, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, 1376 NULL, NULL, NULL, NULL, &newphid); 1377 1378 if (rc != EOK) 1379 return rc; 1380 1381 return newphid; 1734 sess->arg1 = arg1; 1735 sess->arg2 = arg2; 1736 sess->arg3 = arg3; 1382 1737 } 1383 1738 … … 1387 1742 * success. 1388 1743 * 1389 * @param phoneid Phone handle used for contacting the other side. 1390 * @param arg1 User defined argument. 1391 * @param arg2 User defined argument. 1392 * @param arg3 User defined argument. 1393 * 1394 * @return New phone handle on success or a negative error code. 1395 * 1396 */ 1397 int async_connect_me_to_blocking(int phoneid, sysarg_t arg1, sysarg_t arg2, 1398 sysarg_t arg3) 1399 { 1400 sysarg_t newphid; 1401 int rc = async_req_4_5(phoneid, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, 1402 IPC_FLAG_BLOCKING, NULL, NULL, NULL, NULL, &newphid); 1403 1404 if (rc != EOK) 1405 return rc; 1406 1407 return newphid; 1744 * @param mgmt Exchange management style. 1745 * @param exch Exchange for sending the message. 1746 * @param arg1 User defined argument. 1747 * @param arg2 User defined argument. 1748 * @param arg3 User defined argument. 1749 * 1750 * @return New session on success or NULL on error. 1751 * 1752 */ 1753 async_sess_t *async_connect_me_to_blocking(exch_mgmt_t mgmt, async_exch_t *exch, 1754 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3) 1755 { 1756 if (exch == NULL) { 1757 errno = ENOENT; 1758 return NULL; 1759 } 1760 1761 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t)); 1762 if (sess == NULL) { 1763 errno = ENOMEM; 1764 return NULL; 1765 } 1766 1767 int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3, 1768 IPC_FLAG_BLOCKING); 1769 1770 if (phone < 0) { 1771 errno = phone; 1772 free(sess); 1773 return NULL; 1774 } 1775 1776 sess->mgmt = mgmt; 1777 sess->phone = phone; 1778 sess->arg1 = arg1; 1779 sess->arg2 = arg2; 1780 sess->arg3 = arg3; 1781 1782 fibril_mutex_initialize(&sess->remote_state_mtx); 1783 sess->remote_state_data = NULL; 1784 1785 list_initialize(&sess->exch_list); 1786 fibril_mutex_initialize(&sess->mutex); 1787 atomic_set(&sess->refcnt, 0); 1788 1789 return sess; 1408 1790 } 1409 1791 … … 1411 1793 * 1412 1794 */ 1413 int async_connect_kbox(task_id_t id) 1414 { 1415 return ipc_connect_kbox(id); 1795 async_sess_t *async_connect_kbox(task_id_t id) 1796 { 1797 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t)); 1798 if (sess == NULL) { 1799 errno = ENOMEM; 1800 return NULL; 1801 } 1802 1803 int phone = ipc_connect_kbox(id); 1804 if (phone < 0) { 1805 errno = phone; 1806 free(sess); 1807 return NULL; 1808 } 1809 1810 sess->mgmt = EXCHANGE_ATOMIC; 1811 sess->phone = phone; 1812 sess->arg1 = 0; 1813 sess->arg2 = 0; 1814 sess->arg3 = 0; 1815 1816 fibril_mutex_initialize(&sess->remote_state_mtx); 1817 sess->remote_state_data = NULL; 1818 1819 list_initialize(&sess->exch_list); 1820 fibril_mutex_initialize(&sess->mutex); 1821 atomic_set(&sess->refcnt, 0); 1822 1823 return sess; 1824 } 1825 1826 static int async_hangup_internal(int phone) 1827 { 1828 return ipc_hangup(phone); 1416 1829 } 1417 1830 1418 1831 /** Wrapper for ipc_hangup. 1419 1832 * 1420 * @param phone Phone handleto hung up.1833 * @param sess Session to hung up. 1421 1834 * 1422 1835 * @return Zero on success or a negative error code. 1423 1836 * 1424 1837 */ 1425 int async_hangup(int phone) 1426 { 1427 return ipc_hangup(phone); 1838 int async_hangup(async_sess_t *sess) 1839 { 1840 async_exch_t *exch; 1841 1842 assert(sess); 1843 1844 if (atomic_get(&sess->refcnt) > 0) 1845 return EBUSY; 1846 1847 fibril_mutex_lock(&async_sess_mutex); 1848 1849 int rc = async_hangup_internal(sess->phone); 1850 1851 while (!list_empty(&sess->exch_list)) { 1852 exch = (async_exch_t *) 1853 list_get_instance(list_first(&sess->exch_list), 1854 async_exch_t, sess_link); 1855 1856 list_remove(&exch->sess_link); 1857 list_remove(&exch->global_link); 1858 async_hangup_internal(exch->phone); 1859 free(exch); 1860 } 1861 1862 free(sess); 1863 1864 fibril_mutex_unlock(&async_sess_mutex); 1865 1866 return rc; 1428 1867 } 1429 1868 … … 1434 1873 } 1435 1874 1875 /** Start new exchange in a session. 1876 * 1877 * @param session Session. 1878 * 1879 * @return New exchange or NULL on error. 1880 * 1881 */ 1882 async_exch_t *async_exchange_begin(async_sess_t *sess) 1883 { 1884 if (sess == NULL) 1885 return NULL; 1886 1887 async_exch_t *exch; 1888 1889 fibril_mutex_lock(&async_sess_mutex); 1890 1891 if (!list_empty(&sess->exch_list)) { 1892 /* 1893 * There are inactive exchanges in the session. 1894 */ 1895 exch = (async_exch_t *) 1896 list_get_instance(list_first(&sess->exch_list), 1897 async_exch_t, sess_link); 1898 1899 list_remove(&exch->sess_link); 1900 list_remove(&exch->global_link); 1901 } else { 1902 /* 1903 * There are no available exchanges in the session. 1904 */ 1905 1906 if ((sess->mgmt == EXCHANGE_ATOMIC) || 1907 (sess->mgmt == EXCHANGE_SERIALIZE)) { 1908 exch = (async_exch_t *) malloc(sizeof(async_exch_t)); 1909 if (exch != NULL) { 1910 link_initialize(&exch->sess_link); 1911 link_initialize(&exch->global_link); 1912 exch->sess = sess; 1913 exch->phone = sess->phone; 1914 } 1915 } else { /* EXCHANGE_PARALLEL */ 1916 /* 1917 * Make a one-time attempt to connect a new data phone. 1918 */ 1919 1920 int phone; 1921 1922 retry: 1923 phone = async_connect_me_to_internal(sess->phone, sess->arg1, 1924 sess->arg2, sess->arg3, 0); 1925 if (phone >= 0) { 1926 exch = (async_exch_t *) malloc(sizeof(async_exch_t)); 1927 if (exch != NULL) { 1928 link_initialize(&exch->sess_link); 1929 link_initialize(&exch->global_link); 1930 exch->sess = sess; 1931 exch->phone = phone; 1932 } else 1933 async_hangup_internal(phone); 1934 } else if (!list_empty(&inactive_exch_list)) { 1935 /* 1936 * We did not manage to connect a new phone. But we 1937 * can try to close some of the currently inactive 1938 * connections in other sessions and try again. 1939 */ 1940 exch = (async_exch_t *) 1941 list_get_instance(list_first(&inactive_exch_list), 1942 async_exch_t, global_link); 1943 1944 list_remove(&exch->sess_link); 1945 list_remove(&exch->global_link); 1946 async_hangup_internal(exch->phone); 1947 free(exch); 1948 goto retry; 1949 } else { 1950 /* 1951 * Wait for a phone to become available. 1952 */ 1953 fibril_condvar_wait(&avail_phone_cv, &async_sess_mutex); 1954 goto retry; 1955 } 1956 } 1957 } 1958 1959 fibril_mutex_unlock(&async_sess_mutex); 1960 1961 if (exch != NULL) { 1962 atomic_inc(&sess->refcnt); 1963 1964 if (sess->mgmt == EXCHANGE_SERIALIZE) 1965 fibril_mutex_lock(&sess->mutex); 1966 } 1967 1968 return exch; 1969 } 1970 1971 /** Finish an exchange. 1972 * 1973 * @param exch Exchange to finish. 1974 * 1975 */ 1976 void async_exchange_end(async_exch_t *exch) 1977 { 1978 if (exch == NULL) 1979 return; 1980 1981 async_sess_t *sess = exch->sess; 1982 1983 atomic_dec(&sess->refcnt); 1984 1985 if (sess->mgmt == EXCHANGE_SERIALIZE) 1986 fibril_mutex_unlock(&sess->mutex); 1987 1988 fibril_mutex_lock(&async_sess_mutex); 1989 1990 list_append(&exch->sess_link, &sess->exch_list); 1991 list_append(&exch->global_link, &inactive_exch_list); 1992 fibril_condvar_signal(&avail_phone_cv); 1993 1994 fibril_mutex_unlock(&async_sess_mutex); 1995 } 1996 1436 1997 /** Wrapper for IPC_M_SHARE_IN calls using the async framework. 1437 1998 * 1438 * @param phoneid Phone that will be used to contact the receiving side.1439 * @param dst 1440 * @param size 1441 * @param arg 1442 * @param flags 1999 * @param exch Exchange for sending the message. 2000 * @param dst Destination address space area base. 2001 * @param size Size of the destination address space area. 2002 * @param arg User defined argument. 2003 * @param flags Storage for the received flags. Can be NULL. 1443 2004 * 1444 2005 * @return Zero on success or a negative error code from errno.h. 1445 2006 * 1446 2007 */ 1447 int async_share_in_start(int phoneid, void *dst, size_t size, sysarg_t arg, 1448 unsigned int *flags) 1449 { 2008 int async_share_in_start(async_exch_t *exch, void *dst, size_t size, 2009 sysarg_t arg, unsigned int *flags) 2010 { 2011 if (exch == NULL) 2012 return ENOENT; 2013 1450 2014 sysarg_t tmp_flags; 1451 int res = async_req_3_2( phoneid, IPC_M_SHARE_IN, (sysarg_t) dst,2015 int res = async_req_3_2(exch, IPC_M_SHARE_IN, (sysarg_t) dst, 1452 2016 (sysarg_t) size, arg, NULL, &tmp_flags); 1453 2017 … … 1507 2071 /** Wrapper for IPC_M_SHARE_OUT calls using the async framework. 1508 2072 * 1509 * @param phoneid Phone that will be used to contact the receiving side.1510 * @param src 1511 * @param flags 2073 * @param exch Exchange for sending the message. 2074 * @param src Source address space area base address. 2075 * @param flags Flags to be used for sharing. Bits can be only cleared. 1512 2076 * 1513 2077 * @return Zero on success or a negative error code from errno.h. 1514 2078 * 1515 2079 */ 1516 int async_share_out_start(int phoneid, void *src, unsigned int flags) 1517 { 1518 return async_req_3_0(phoneid, IPC_M_SHARE_OUT, (sysarg_t) src, 0, 2080 int async_share_out_start(async_exch_t *exch, void *src, unsigned int flags) 2081 { 2082 if (exch == NULL) 2083 return ENOENT; 2084 2085 return async_req_3_0(exch, IPC_M_SHARE_OUT, (sysarg_t) src, 0, 1519 2086 (sysarg_t) flags); 1520 2087 } … … 1569 2136 } 1570 2137 2138 /** Start IPC_M_DATA_READ using the async framework. 2139 * 2140 * @param exch Exchange for sending the message. 2141 * @param dst Address of the beginning of the destination buffer. 2142 * @param size Size of the destination buffer (in bytes). 2143 * @param dataptr Storage of call data (arg 2 holds actual data size). 2144 * 2145 * @return Hash of the sent message or 0 on error. 2146 * 2147 */ 2148 aid_t async_data_read(async_exch_t *exch, void *dst, size_t size, 2149 ipc_call_t *dataptr) 2150 { 2151 return async_send_2(exch, IPC_M_DATA_READ, (sysarg_t) dst, 2152 (sysarg_t) size, dataptr); 2153 } 2154 1571 2155 /** Wrapper for IPC_M_DATA_READ calls using the async framework. 1572 2156 * 1573 * @param phoneid Phone that will be used to contact the receiving side. 1574 * @param dst Address of the beginning of the destination buffer. 1575 * @param size Size of the destination buffer. 1576 * @param flags Flags to control the data transfer. 2157 * @param exch Exchange for sending the message. 2158 * @param dst Address of the beginning of the destination buffer. 2159 * @param size Size of the destination buffer. 1577 2160 * 1578 2161 * @return Zero on success or a negative error code from errno.h. 1579 2162 * 1580 2163 */ 1581 int 1582 async_data_read_start_generic(int phoneid, void *dst, size_t size, int flags) 1583 { 1584 return async_req_3_0(phoneid, IPC_M_DATA_READ, (sysarg_t) dst, 1585 (sysarg_t) size, (sysarg_t) flags); 2164 int async_data_read_start(async_exch_t *exch, void *dst, size_t size) 2165 { 2166 if (exch == NULL) 2167 return ENOENT; 2168 2169 return async_req_2_0(exch, IPC_M_DATA_READ, (sysarg_t) dst, 2170 (sysarg_t) size); 1586 2171 } 1587 2172 … … 1638 2223 * 1639 2224 */ 1640 int async_data_read_forward_fast(int phoneid, sysarg_t method, sysarg_t arg1, 1641 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, ipc_call_t *dataptr) 1642 { 2225 int async_data_read_forward_fast(async_exch_t *exch, sysarg_t imethod, 2226 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, 2227 ipc_call_t *dataptr) 2228 { 2229 if (exch == NULL) 2230 return ENOENT; 2231 1643 2232 ipc_callid_t callid; 1644 2233 if (!async_data_read_receive(&callid, NULL)) { … … 1647 2236 } 1648 2237 1649 aid_t msg = async_send_fast( phoneid,method, arg1, arg2, arg3, arg4,2238 aid_t msg = async_send_fast(exch, imethod, arg1, arg2, arg3, arg4, 1650 2239 dataptr); 1651 2240 if (msg == 0) { … … 1654 2243 } 1655 2244 1656 int retval = ipc_forward_fast(callid, phoneid, 0, 0, 0,2245 int retval = ipc_forward_fast(callid, exch->phone, 0, 0, 0, 1657 2246 IPC_FF_ROUTE_FROM_ME); 1658 2247 if (retval != EOK) { … … 1670 2259 /** Wrapper for IPC_M_DATA_WRITE calls using the async framework. 1671 2260 * 1672 * @param phoneid Phone that will be used to contact the receiving side. 1673 * @param src Address of the beginning of the source buffer. 1674 * @param size Size of the source buffer. 1675 * @param flags Flags to control the data transfer. 2261 * @param exch Exchange for sending the message. 2262 * @param src Address of the beginning of the source buffer. 2263 * @param size Size of the source buffer. 1676 2264 * 1677 2265 * @return Zero on success or a negative error code from errno.h. 1678 2266 * 1679 2267 */ 1680 int 1681 async_data_write_start_generic(int phoneid, const void *src, size_t size, 1682 int flags) 1683 { 1684 return async_req_3_0(phoneid, IPC_M_DATA_WRITE, (sysarg_t) src, 1685 (sysarg_t) size, (sysarg_t) flags); 2268 int async_data_write_start(async_exch_t *exch, const void *src, size_t size) 2269 { 2270 if (exch == NULL) 2271 return ENOENT; 2272 2273 return async_req_2_0(exch, IPC_M_DATA_WRITE, (sysarg_t) src, 2274 (sysarg_t) size); 1686 2275 } 1687 2276 … … 1759 2348 size_t *received) 1760 2349 { 2350 assert(data); 2351 1761 2352 ipc_callid_t callid; 1762 2353 size_t size; … … 1826 2417 * 1827 2418 */ 1828 int async_data_write_forward_fast(int phoneid, sysarg_t method, sysarg_t arg1, 1829 sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, ipc_call_t *dataptr) 1830 { 2419 int async_data_write_forward_fast(async_exch_t *exch, sysarg_t imethod, 2420 sysarg_t arg1, sysarg_t arg2, sysarg_t arg3, sysarg_t arg4, 2421 ipc_call_t *dataptr) 2422 { 2423 if (exch == NULL) 2424 return ENOENT; 2425 1831 2426 ipc_callid_t callid; 1832 2427 if (!async_data_write_receive(&callid, NULL)) { … … 1835 2430 } 1836 2431 1837 aid_t msg = async_send_fast( phoneid,method, arg1, arg2, arg3, arg4,2432 aid_t msg = async_send_fast(exch, imethod, arg1, arg2, arg3, arg4, 1838 2433 dataptr); 1839 2434 if (msg == 0) { … … 1842 2437 } 1843 2438 1844 int retval = ipc_forward_fast(callid, phoneid, 0, 0, 0,2439 int retval = ipc_forward_fast(callid, exch->phone, 0, 0, 0, 1845 2440 IPC_FF_ROUTE_FROM_ME); 1846 2441 if (retval != EOK) { … … 1856 2451 } 1857 2452 2453 /** Wrapper for sending an exchange over different exchange for cloning 2454 * 2455 * @param exch Exchange to be used for sending. 2456 * @param clone_exch Exchange to be cloned. 2457 * 2458 */ 2459 int async_exchange_clone(async_exch_t *exch, async_exch_t *clone_exch) 2460 { 2461 return async_req_1_0(exch, IPC_M_CONNECTION_CLONE, clone_exch->phone); 2462 } 2463 2464 /** Wrapper for receiving the IPC_M_CONNECTION_CLONE calls. 2465 * 2466 * If the current call is IPC_M_CONNECTION_CLONE then a new 2467 * async session is created for the accepted phone. 2468 * 2469 * @param mgmt Exchange management style. 2470 * 2471 * @return New async session or NULL on failure. 2472 * 2473 */ 2474 async_sess_t *async_clone_receive(exch_mgmt_t mgmt) 2475 { 2476 /* Accept the phone */ 2477 ipc_call_t call; 2478 ipc_callid_t callid = async_get_call(&call); 2479 int phone = (int) IPC_GET_ARG1(call); 2480 2481 if ((IPC_GET_IMETHOD(call) != IPC_M_CONNECTION_CLONE) || 2482 (phone < 0)) { 2483 async_answer_0(callid, EINVAL); 2484 return NULL; 2485 } 2486 2487 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t)); 2488 if (sess == NULL) { 2489 async_answer_0(callid, ENOMEM); 2490 return NULL; 2491 } 2492 2493 sess->mgmt = mgmt; 2494 sess->phone = phone; 2495 sess->arg1 = 0; 2496 sess->arg2 = 0; 2497 sess->arg3 = 0; 2498 2499 fibril_mutex_initialize(&sess->remote_state_mtx); 2500 sess->remote_state_data = NULL; 2501 2502 list_initialize(&sess->exch_list); 2503 fibril_mutex_initialize(&sess->mutex); 2504 atomic_set(&sess->refcnt, 0); 2505 2506 /* Acknowledge the cloned phone */ 2507 async_answer_0(callid, EOK); 2508 2509 return sess; 2510 } 2511 2512 /** Wrapper for receiving the IPC_M_CONNECT_TO_ME calls. 2513 * 2514 * If the current call is IPC_M_CONNECT_TO_ME then a new 2515 * async session is created for the accepted phone. 2516 * 2517 * @param mgmt Exchange management style. 2518 * 2519 * @return New async session. 2520 * @return NULL on failure. 2521 * 2522 */ 2523 async_sess_t *async_callback_receive(exch_mgmt_t mgmt) 2524 { 2525 /* Accept the phone */ 2526 ipc_call_t call; 2527 ipc_callid_t callid = async_get_call(&call); 2528 int phone = (int) IPC_GET_ARG5(call); 2529 2530 if ((IPC_GET_IMETHOD(call) != IPC_M_CONNECT_TO_ME) || 2531 (phone < 0)) { 2532 async_answer_0(callid, EINVAL); 2533 return NULL; 2534 } 2535 2536 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t)); 2537 if (sess == NULL) { 2538 async_answer_0(callid, ENOMEM); 2539 return NULL; 2540 } 2541 2542 sess->mgmt = mgmt; 2543 sess->phone = phone; 2544 sess->arg1 = 0; 2545 sess->arg2 = 0; 2546 sess->arg3 = 0; 2547 2548 fibril_mutex_initialize(&sess->remote_state_mtx); 2549 sess->remote_state_data = NULL; 2550 2551 list_initialize(&sess->exch_list); 2552 fibril_mutex_initialize(&sess->mutex); 2553 atomic_set(&sess->refcnt, 0); 2554 2555 /* Acknowledge the connected phone */ 2556 async_answer_0(callid, EOK); 2557 2558 return sess; 2559 } 2560 2561 /** Wrapper for receiving the IPC_M_CONNECT_TO_ME calls. 2562 * 2563 * If the call is IPC_M_CONNECT_TO_ME then a new 2564 * async session is created. However, the phone is 2565 * not accepted automatically. 2566 * 2567 * @param mgmt Exchange management style. 2568 * @param call Call data. 2569 * 2570 * @return New async session. 2571 * @return NULL on failure. 2572 * @return NULL if the call is not IPC_M_CONNECT_TO_ME. 2573 * 2574 */ 2575 async_sess_t *async_callback_receive_start(exch_mgmt_t mgmt, ipc_call_t *call) 2576 { 2577 int phone = (int) IPC_GET_ARG5(*call); 2578 2579 if ((IPC_GET_IMETHOD(*call) != IPC_M_CONNECT_TO_ME) || 2580 (phone < 0)) 2581 return NULL; 2582 2583 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t)); 2584 if (sess == NULL) 2585 return NULL; 2586 2587 sess->mgmt = mgmt; 2588 sess->phone = phone; 2589 sess->arg1 = 0; 2590 sess->arg2 = 0; 2591 sess->arg3 = 0; 2592 2593 fibril_mutex_initialize(&sess->remote_state_mtx); 2594 sess->remote_state_data = NULL; 2595 2596 list_initialize(&sess->exch_list); 2597 fibril_mutex_initialize(&sess->mutex); 2598 atomic_set(&sess->refcnt, 0); 2599 2600 return sess; 2601 } 2602 2603 int async_state_change_start(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2, 2604 sysarg_t arg3, async_exch_t *other_exch) 2605 { 2606 return async_req_5_0(exch, IPC_M_STATE_CHANGE_AUTHORIZE, 2607 arg1, arg2, arg3, 0, other_exch->phone); 2608 } 2609 2610 bool async_state_change_receive(ipc_callid_t *callid, sysarg_t *arg1, 2611 sysarg_t *arg2, sysarg_t *arg3) 2612 { 2613 assert(callid); 2614 2615 ipc_call_t call; 2616 *callid = async_get_call(&call); 2617 2618 if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE) 2619 return false; 2620 2621 if (arg1) 2622 *arg1 = IPC_GET_ARG1(call); 2623 if (arg2) 2624 *arg2 = IPC_GET_ARG2(call); 2625 if (arg3) 2626 *arg3 = IPC_GET_ARG3(call); 2627 2628 return true; 2629 } 2630 2631 int async_state_change_finalize(ipc_callid_t callid, async_exch_t *other_exch) 2632 { 2633 return ipc_answer_1(callid, EOK, other_exch->phone); 2634 } 2635 2636 /** Lock and get session remote state 2637 * 2638 * Lock and get the local replica of the remote state 2639 * in stateful sessions. The call should be paired 2640 * with async_remote_state_release*(). 2641 * 2642 * @param[in] sess Stateful session. 2643 * 2644 * @return Local replica of the remote state. 2645 * 2646 */ 2647 void *async_remote_state_acquire(async_sess_t *sess) 2648 { 2649 fibril_mutex_lock(&sess->remote_state_mtx); 2650 return sess->remote_state_data; 2651 } 2652 2653 /** Update the session remote state 2654 * 2655 * Update the local replica of the remote state 2656 * in stateful sessions. The remote state must 2657 * be already locked. 2658 * 2659 * @param[in] sess Stateful session. 2660 * @param[in] state New local replica of the remote state. 2661 * 2662 */ 2663 void async_remote_state_update(async_sess_t *sess, void *state) 2664 { 2665 assert(fibril_mutex_is_locked(&sess->remote_state_mtx)); 2666 sess->remote_state_data = state; 2667 } 2668 2669 /** Release the session remote state 2670 * 2671 * Unlock the local replica of the remote state 2672 * in stateful sessions. 2673 * 2674 * @param[in] sess Stateful session. 2675 * 2676 */ 2677 void async_remote_state_release(async_sess_t *sess) 2678 { 2679 assert(fibril_mutex_is_locked(&sess->remote_state_mtx)); 2680 2681 fibril_mutex_unlock(&sess->remote_state_mtx); 2682 } 2683 2684 /** Release the session remote state and end an exchange 2685 * 2686 * Unlock the local replica of the remote state 2687 * in stateful sessions. This is convenience function 2688 * which gets the session pointer from the exchange 2689 * and also ends the exchange. 2690 * 2691 * @param[in] exch Stateful session's exchange. 2692 * 2693 */ 2694 void async_remote_state_release_exchange(async_exch_t *exch) 2695 { 2696 if (exch == NULL) 2697 return; 2698 2699 async_sess_t *sess = exch->sess; 2700 assert(fibril_mutex_is_locked(&sess->remote_state_mtx)); 2701 2702 async_exchange_end(exch); 2703 fibril_mutex_unlock(&sess->remote_state_mtx); 2704 } 2705 1858 2706 /** @} 1859 2707 */
Note:
See TracChangeset
for help on using the changeset viewer.