Changes in uspace/lib/c/generic/async.c [ae6021d:7f9d97f3] in mainline
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified uspace/lib/c/generic/async.c ¶
rae6021d r7f9d97f3 77 77 * } 78 78 * 79 * port_handler(icallid, *icall)79 * my_client_connection(icallid, *icall) 80 80 * { 81 81 * if (want_refuse) { … … 116 116 #include <stdlib.h> 117 117 #include <macros.h> 118 #include <as.h>119 #include <abi/mm/as.h>120 118 #include "private/libc.h" 121 119 … … 125 123 list_t exch_list; 126 124 127 /** Session interface */128 iface_t iface;129 130 125 /** Exchange management style */ 131 126 exch_mgmt_t mgmt; … … 194 189 /** If reply was received. */ 195 190 bool done; 196 191 197 192 /** If the message / reply should be discarded on arrival. */ 198 193 bool forget; 199 194 200 195 /** If already destroyed. */ 201 196 bool destroyed; … … 237 232 /** Identification of the opening call. */ 238 233 ipc_callid_t callid; 239 240 234 /** Call data of the opening call. */ 241 235 ipc_call_t call; 236 /** Local argument or NULL if none. */ 237 void *carg; 242 238 243 239 /** Identification of the closing call. */ … … 245 241 246 242 /** Fibril function that will be used to handle the connection. */ 247 async_port_handler_t handler; 248 249 /** Client data */ 250 void *data; 243 async_client_conn_t cfibril; 251 244 } connection_t; 252 253 /** Interface data */254 typedef struct {255 ht_link_t link;256 257 /** Interface ID */258 iface_t iface;259 260 /** Futex protecting the hash table */261 futex_t futex;262 263 /** Interface ports */264 hash_table_t port_hash_table;265 266 /** Next available port ID */267 port_id_t port_id_avail;268 } interface_t;269 270 /* Port data */271 typedef struct {272 ht_link_t link;273 274 /** Port ID */275 port_id_t id;276 277 /** Port connection handler */278 async_port_handler_t handler;279 280 /** Client data */281 void *data;282 } port_t;283 245 284 246 /* Notification data */ … … 302 264 { 303 265 struct timeval tv = { 0, 0 }; 304 266 305 267 to->inlist = false; 306 268 to->occurred = false; … … 325 287 static amsg_t *amsg_create(void) 326 288 { 327 amsg_t *msg = malloc(sizeof(amsg_t)); 289 amsg_t *msg; 290 291 msg = malloc(sizeof(amsg_t)); 328 292 if (msg) { 329 293 msg->done = false; … … 334 298 awaiter_initialize(&msg->wdata); 335 299 } 336 300 337 301 return msg; 338 302 } … … 371 335 } 372 336 373 /** Default fallback fibril function. 374 * 375 * This fallback fibril function gets called on incomming 376 * connections that do not have a specific handler defined. 337 /** Default fibril function that gets called to handle new connection. 338 * 339 * This function is defined as a weak symbol - to be redefined in user code. 377 340 * 378 341 * @param callid Hash of the incoming call. … … 381 344 * 382 345 */ 383 static void default_ fallback_port_handler(ipc_callid_t callid, ipc_call_t *call,346 static void default_client_connection(ipc_callid_t callid, ipc_call_t *call, 384 347 void *arg) 385 348 { … … 387 350 } 388 351 389 static async_port_handler_t fallback_port_handler = 390 default_fallback_port_handler; 391 static void *fallback_port_data = NULL; 392 393 static hash_table_t interface_hash_table; 394 395 static size_t interface_key_hash(void *key) 396 { 397 iface_t iface = *(iface_t *) key; 398 return iface; 399 } 400 401 static size_t interface_hash(const ht_link_t *item) 402 { 403 interface_t *interface = hash_table_get_inst(item, interface_t, link); 404 return interface_key_hash(&interface->iface); 405 } 406 407 static bool interface_key_equal(void *key, const ht_link_t *item) 408 { 409 iface_t iface = *(iface_t *) key; 410 interface_t *interface = hash_table_get_inst(item, interface_t, link); 411 return iface == interface->iface; 412 } 413 414 /** Operations for the port hash table. */ 415 static hash_table_ops_t interface_hash_table_ops = { 416 .hash = interface_hash, 417 .key_hash = interface_key_hash, 418 .key_equal = interface_key_equal, 419 .equal = NULL, 420 .remove_callback = NULL 421 }; 422 423 static size_t port_key_hash(void *key) 424 { 425 port_id_t port_id = *(port_id_t *) key; 426 return port_id; 427 } 428 429 static size_t port_hash(const ht_link_t *item) 430 { 431 port_t *port = hash_table_get_inst(item, port_t, link); 432 return port_key_hash(&port->id); 433 } 434 435 static bool port_key_equal(void *key, const ht_link_t *item) 436 { 437 port_id_t port_id = *(port_id_t *) key; 438 port_t *port = hash_table_get_inst(item, port_t, link); 439 return port_id == port->id; 440 } 441 442 /** Operations for the port hash table. */ 443 static hash_table_ops_t port_hash_table_ops = { 444 .hash = port_hash, 445 .key_hash = port_key_hash, 446 .key_equal = port_key_equal, 447 .equal = NULL, 448 .remove_callback = NULL 449 }; 450 451 static interface_t *async_new_interface(iface_t iface) 452 { 453 interface_t *interface = 454 (interface_t *) malloc(sizeof(interface_t)); 455 if (!interface) 456 return NULL; 457 458 bool ret = hash_table_create(&interface->port_hash_table, 0, 0, 459 &port_hash_table_ops); 460 if (!ret) { 461 free(interface); 462 return NULL; 463 } 464 465 interface->iface = iface; 466 futex_initialize(&interface->futex, 1); 467 interface->port_id_avail = 0; 468 469 hash_table_insert(&interface_hash_table, &interface->link); 470 471 return interface; 472 } 473 474 static port_t *async_new_port(interface_t *interface, 475 async_port_handler_t handler, void *data) 476 { 477 port_t *port = (port_t *) malloc(sizeof(port_t)); 478 if (!port) 479 return NULL; 480 481 futex_down(&interface->futex); 482 483 port_id_t id = interface->port_id_avail; 484 interface->port_id_avail++; 485 486 port->id = id; 487 port->handler = handler; 488 port->data = data; 489 490 hash_table_insert(&interface->port_hash_table, &port->link); 491 492 futex_up(&interface->futex); 493 494 return port; 352 static async_client_conn_t client_connection = default_client_connection; 353 static size_t notification_handler_stksz = FIBRIL_DFLT_STK_SIZE; 354 355 /** Setter for client_connection function pointer. 356 * 357 * @param conn Function that will implement a new connection fibril. 358 * 359 */ 360 void async_set_client_connection(async_client_conn_t conn) 361 { 362 assert(client_connection == default_client_connection); 363 client_connection = conn; 364 } 365 366 /** Set the stack size for the notification handler notification fibrils. 367 * 368 * @param size Stack size in bytes. 369 */ 370 void async_set_notification_handler_stack_size(size_t size) 371 { 372 notification_handler_stksz = size; 495 373 } 496 374 … … 509 387 */ 510 388 static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv); 511 512 int async_create_port(iface_t iface, async_port_handler_t handler,513 void *data, port_id_t *port_id)514 {515 if ((iface & IFACE_MOD_MASK) == IFACE_MOD_CALLBACK)516 return EINVAL;517 518 interface_t *interface;519 520 futex_down(&async_futex);521 522 ht_link_t *link = hash_table_find(&interface_hash_table, &iface);523 if (link)524 interface = hash_table_get_inst(link, interface_t, link);525 else526 interface = async_new_interface(iface);527 528 if (!interface) {529 futex_up(&async_futex);530 return ENOMEM;531 }532 533 port_t *port = async_new_port(interface, handler, data);534 if (!port) {535 futex_up(&async_futex);536 return ENOMEM;537 }538 539 *port_id = port->id;540 541 futex_up(&async_futex);542 543 return EOK;544 }545 546 void async_set_fallback_port_handler(async_port_handler_t handler, void *data)547 {548 assert(handler != NULL);549 550 fallback_port_handler = handler;551 fallback_port_data = data;552 }553 389 554 390 static hash_table_t client_hash_table; … … 621 457 .remove_callback = NULL 622 458 }; 623 624 static client_t *async_client_get(task_id_t client_id, bool create)625 {626 client_t *client = NULL;627 628 futex_down(&async_futex);629 ht_link_t *link = hash_table_find(&client_hash_table, &client_id);630 if (link) {631 client = hash_table_get_inst(link, client_t, link);632 atomic_inc(&client->refcnt);633 } else if (create) {634 client = malloc(sizeof(client_t));635 if (client) {636 client->in_task_id = client_id;637 client->data = async_client_data_create();638 639 atomic_set(&client->refcnt, 1);640 hash_table_insert(&client_hash_table, &client->link);641 }642 }643 644 futex_up(&async_futex);645 return client;646 }647 648 static void async_client_put(client_t *client)649 {650 bool destroy;651 652 futex_down(&async_futex);653 654 if (atomic_predec(&client->refcnt) == 0) {655 hash_table_remove(&client_hash_table, &client->in_task_id);656 destroy = true;657 } else658 destroy = false;659 660 futex_up(&async_futex);661 662 if (destroy) {663 if (client->data)664 async_client_data_destroy(client->data);665 666 free(client);667 }668 }669 670 /** Wrapper for client connection fibril.671 *672 * When a new connection arrives, a fibril with this implementing673 * function is created.674 *675 * @param arg Connection structure pointer.676 *677 * @return Always zero.678 *679 */680 static int connection_fibril(void *arg)681 {682 assert(arg);683 684 /*685 * Setup fibril-local connection pointer.686 */687 fibril_connection = (connection_t *) arg;688 689 /*690 * Add our reference for the current connection in the client task691 * tracking structure. If this is the first reference, create and692 * hash in a new tracking structure.693 */694 695 client_t *client = async_client_get(fibril_connection->in_task_id, true);696 if (!client) {697 ipc_answer_0(fibril_connection->callid, ENOMEM);698 return 0;699 }700 701 fibril_connection->client = client;702 703 /*704 * Call the connection handler function.705 */706 fibril_connection->handler(fibril_connection->callid,707 &fibril_connection->call, fibril_connection->data);708 709 /*710 * Remove the reference for this client task connection.711 */712 async_client_put(client);713 714 /*715 * Remove myself from the connection hash table.716 */717 futex_down(&async_futex);718 hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash);719 futex_up(&async_futex);720 721 /*722 * Answer all remaining messages with EHANGUP.723 */724 while (!list_empty(&fibril_connection->msg_queue)) {725 msg_t *msg =726 list_get_instance(list_first(&fibril_connection->msg_queue),727 msg_t, link);728 729 list_remove(&msg->link);730 ipc_answer_0(msg->callid, EHANGUP);731 free(msg);732 }733 734 /*735 * If the connection was hung-up, answer the last call,736 * i.e. IPC_M_PHONE_HUNGUP.737 */738 if (fibril_connection->close_callid)739 ipc_answer_0(fibril_connection->close_callid, EOK);740 741 free(fibril_connection);742 return 0;743 }744 745 /** Create a new fibril for a new connection.746 *747 * Create new fibril for connection, fill in connection structures748 * and insert it into the hash table, so that later we can easily749 * do routing of messages to particular fibrils.750 *751 * @param in_task_id Identification of the incoming connection.752 * @param in_phone_hash Identification of the incoming connection.753 * @param callid Hash of the opening IPC_M_CONNECT_ME_TO call.754 * If callid is zero, the connection was opened by755 * accepting the IPC_M_CONNECT_TO_ME call and this756 * function is called directly by the server.757 * @param call Call data of the opening call.758 * @param handler Connection handler.759 * @param data Client argument to pass to the connection handler.760 *761 * @return New fibril id or NULL on failure.762 *763 */764 static fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash,765 ipc_callid_t callid, ipc_call_t *call, async_port_handler_t handler,766 void *data)767 {768 connection_t *conn = malloc(sizeof(*conn));769 if (!conn) {770 if (callid)771 ipc_answer_0(callid, ENOMEM);772 773 return (uintptr_t) NULL;774 }775 776 conn->in_task_id = in_task_id;777 conn->in_phone_hash = in_phone_hash;778 list_initialize(&conn->msg_queue);779 conn->callid = callid;780 conn->close_callid = 0;781 conn->handler = handler;782 conn->data = data;783 784 if (call)785 conn->call = *call;786 787 /* We will activate the fibril ASAP */788 conn->wdata.active = true;789 conn->wdata.fid = fibril_create(connection_fibril, conn);790 791 if (conn->wdata.fid == 0) {792 free(conn);793 794 if (callid)795 ipc_answer_0(callid, ENOMEM);796 797 return (uintptr_t) NULL;798 }799 800 /* Add connection to the connection hash table */801 802 futex_down(&async_futex);803 hash_table_insert(&conn_hash_table, &conn->link);804 futex_up(&async_futex);805 806 fibril_add_ready(conn->wdata.fid);807 808 return conn->wdata.fid;809 }810 811 /** Wrapper for making IPC_M_CONNECT_TO_ME calls using the async framework.812 *813 * Ask through phone for a new connection to some service.814 *815 * @param exch Exchange for sending the message.816 * @param iface Callback interface.817 * @param arg1 User defined argument.818 * @param arg2 User defined argument.819 * @param handler Callback handler.820 * @param data Handler data.821 * @param port_id ID of the newly created port.822 *823 * @return Zero on success or a negative error code.824 *825 */826 int async_create_callback_port(async_exch_t *exch, iface_t iface, sysarg_t arg1,827 sysarg_t arg2, async_port_handler_t handler, void *data, port_id_t *port_id)828 {829 if ((iface & IFACE_MOD_CALLBACK) != IFACE_MOD_CALLBACK)830 return EINVAL;831 832 if (exch == NULL)833 return ENOENT;834 835 ipc_call_t answer;836 aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, iface, arg1, arg2,837 &answer);838 839 sysarg_t ret;840 async_wait_for(req, &ret);841 if (ret != EOK)842 return (int) ret;843 844 sysarg_t phone_hash = IPC_GET_ARG5(answer);845 interface_t *interface;846 847 futex_down(&async_futex);848 849 ht_link_t *link = hash_table_find(&interface_hash_table, &iface);850 if (link)851 interface = hash_table_get_inst(link, interface_t, link);852 else853 interface = async_new_interface(iface);854 855 if (!interface) {856 futex_up(&async_futex);857 return ENOMEM;858 }859 860 port_t *port = async_new_port(interface, handler, data);861 if (!port) {862 futex_up(&async_futex);863 return ENOMEM;864 }865 866 *port_id = port->id;867 868 futex_up(&async_futex);869 870 fid_t fid = async_new_connection(answer.in_task_id, phone_hash,871 0, NULL, handler, data);872 if (fid == (uintptr_t) NULL)873 return ENOMEM;874 875 return EOK;876 }877 459 878 460 static size_t notification_key_hash(void *key) … … 989 571 } 990 572 991 /** Process notification. 992 * 993 * @param callid Hash of the incoming call. 994 * @param call Data of the incoming call. 995 * 996 */ 997 static void process_notification(ipc_callid_t callid, ipc_call_t *call) 998 { 573 /** Notification fibril. 574 * 575 * When a notification arrives, a fibril with this implementing function is 576 * created. It calls the corresponding notification handler and does the final 577 * cleanup. 578 * 579 * @param arg Message structure pointer. 580 * 581 * @return Always zero. 582 * 583 */ 584 static int notification_fibril(void *arg) 585 { 586 assert(arg); 587 588 msg_t *msg = (msg_t *) arg; 999 589 async_notification_handler_t handler = NULL; 1000 590 void *data = NULL; 1001 1002 assert(call);1003 591 1004 592 futex_down(&async_futex); 1005 593 1006 594 ht_link_t *link = hash_table_find(¬ification_hash_table, 1007 &IPC_GET_IMETHOD( *call));595 &IPC_GET_IMETHOD(msg->call)); 1008 596 if (link) { 1009 597 notification_t *notification = … … 1016 604 1017 605 if (handler) 1018 handler(callid, call, data); 606 handler(msg->callid, &msg->call, data); 607 608 free(msg); 609 return 0; 610 } 611 612 /** Process notification. 613 * 614 * A new fibril is created which would process the notification. 615 * 616 * @param callid Hash of the incoming call. 617 * @param call Data of the incoming call. 618 * 619 * @return False if an error occured. 620 * True if the call was passed to the notification fibril. 621 * 622 */ 623 static bool process_notification(ipc_callid_t callid, ipc_call_t *call) 624 { 625 assert(call); 626 627 futex_down(&async_futex); 628 629 msg_t *msg = malloc(sizeof(*msg)); 630 if (!msg) { 631 futex_up(&async_futex); 632 return false; 633 } 634 635 msg->callid = callid; 636 msg->call = *call; 637 638 fid_t fid = fibril_create_generic(notification_fibril, msg, 639 notification_handler_stksz); 640 if (fid == 0) { 641 free(msg); 642 futex_up(&async_futex); 643 return false; 644 } 645 646 fibril_add_ready(fid); 647 648 futex_up(&async_futex); 649 return true; 1019 650 } 1020 651 … … 1235 866 } 1236 867 1237 msg_t *msg = list_get_instance(list_first(&conn->msg_queue), 1238 msg_t, link); 868 msg_t *msg = list_get_instance(list_first(&conn->msg_queue), msg_t, link); 1239 869 list_remove(&msg->link); 1240 870 … … 1247 877 } 1248 878 879 static client_t *async_client_get(task_id_t client_id, bool create) 880 { 881 client_t *client = NULL; 882 883 futex_down(&async_futex); 884 ht_link_t *link = hash_table_find(&client_hash_table, &client_id); 885 if (link) { 886 client = hash_table_get_inst(link, client_t, link); 887 atomic_inc(&client->refcnt); 888 } else if (create) { 889 client = malloc(sizeof(client_t)); 890 if (client) { 891 client->in_task_id = client_id; 892 client->data = async_client_data_create(); 893 894 atomic_set(&client->refcnt, 1); 895 hash_table_insert(&client_hash_table, &client->link); 896 } 897 } 898 899 futex_up(&async_futex); 900 return client; 901 } 902 903 static void async_client_put(client_t *client) 904 { 905 bool destroy; 906 907 futex_down(&async_futex); 908 909 if (atomic_predec(&client->refcnt) == 0) { 910 hash_table_remove(&client_hash_table, &client->in_task_id); 911 destroy = true; 912 } else 913 destroy = false; 914 915 futex_up(&async_futex); 916 917 if (destroy) { 918 if (client->data) 919 async_client_data_destroy(client->data); 920 921 free(client); 922 } 923 } 924 1249 925 void *async_get_client_data(void) 1250 926 { … … 1258 934 if (!client) 1259 935 return NULL; 1260 1261 936 if (!client->data) { 1262 937 async_client_put(client); 1263 938 return NULL; 1264 939 } 1265 940 1266 941 return client->data; 1267 942 } … … 1270 945 { 1271 946 client_t *client = async_client_get(client_id, false); 1272 947 1273 948 assert(client); 1274 949 assert(client->data); 1275 950 1276 951 /* Drop the reference we got in async_get_client_data_by_hash(). */ 1277 952 async_client_put(client); 1278 953 1279 954 /* Drop our own reference we got at the beginning of this function. */ 1280 955 async_client_put(client); 1281 956 } 1282 957 1283 static port_t *async_find_port(iface_t iface, port_id_t port_id) 1284 { 1285 port_t *port = NULL; 1286 958 /** Wrapper for client connection fibril. 959 * 960 * When a new connection arrives, a fibril with this implementing function is 961 * created. It calls client_connection() and does the final cleanup. 962 * 963 * @param arg Connection structure pointer. 964 * 965 * @return Always zero. 966 * 967 */ 968 static int connection_fibril(void *arg) 969 { 970 assert(arg); 971 972 /* 973 * Setup fibril-local connection pointer. 974 */ 975 fibril_connection = (connection_t *) arg; 976 977 /* 978 * Add our reference for the current connection in the client task 979 * tracking structure. If this is the first reference, create and 980 * hash in a new tracking structure. 981 */ 982 983 client_t *client = async_client_get(fibril_connection->in_task_id, true); 984 if (!client) { 985 ipc_answer_0(fibril_connection->callid, ENOMEM); 986 return 0; 987 } 988 989 fibril_connection->client = client; 990 991 /* 992 * Call the connection handler function. 993 */ 994 fibril_connection->cfibril(fibril_connection->callid, 995 &fibril_connection->call, fibril_connection->carg); 996 997 /* 998 * Remove the reference for this client task connection. 999 */ 1000 async_client_put(client); 1001 1002 /* 1003 * Remove myself from the connection hash table. 1004 */ 1287 1005 futex_down(&async_futex); 1288 1289 ht_link_t *link = hash_table_find(&interface_hash_table, &iface); 1290 if (link) { 1291 interface_t *interface = 1292 hash_table_get_inst(link, interface_t, link); 1006 hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash); 1007 futex_up(&async_futex); 1008 1009 /* 1010 * Answer all remaining messages with EHANGUP. 1011 */ 1012 while (!list_empty(&fibril_connection->msg_queue)) { 1013 msg_t *msg = 1014 list_get_instance(list_first(&fibril_connection->msg_queue), 1015 msg_t, link); 1293 1016 1294 link = hash_table_find(&interface->port_hash_table, &port_id); 1295 if (link) 1296 port = hash_table_get_inst(link, port_t, link); 1297 } 1298 1017 list_remove(&msg->link); 1018 ipc_answer_0(msg->callid, EHANGUP); 1019 free(msg); 1020 } 1021 1022 /* 1023 * If the connection was hung-up, answer the last call, 1024 * i.e. IPC_M_PHONE_HUNGUP. 1025 */ 1026 if (fibril_connection->close_callid) 1027 ipc_answer_0(fibril_connection->close_callid, EOK); 1028 1029 free(fibril_connection); 1030 return 0; 1031 } 1032 1033 /** Create a new fibril for a new connection. 1034 * 1035 * Create new fibril for connection, fill in connection structures and insert 1036 * it into the hash table, so that later we can easily do routing of messages to 1037 * particular fibrils. 1038 * 1039 * @param in_task_id Identification of the incoming connection. 1040 * @param in_phone_hash Identification of the incoming connection. 1041 * @param callid Hash of the opening IPC_M_CONNECT_ME_TO call. 1042 * If callid is zero, the connection was opened by 1043 * accepting the IPC_M_CONNECT_TO_ME call and this function 1044 * is called directly by the server. 1045 * @param call Call data of the opening call. 1046 * @param cfibril Fibril function that should be called upon opening the 1047 * connection. 1048 * @param carg Extra argument to pass to the connection fibril 1049 * 1050 * @return New fibril id or NULL on failure. 1051 * 1052 */ 1053 fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash, 1054 ipc_callid_t callid, ipc_call_t *call, 1055 async_client_conn_t cfibril, void *carg) 1056 { 1057 connection_t *conn = malloc(sizeof(*conn)); 1058 if (!conn) { 1059 if (callid) 1060 ipc_answer_0(callid, ENOMEM); 1061 1062 return (uintptr_t) NULL; 1063 } 1064 1065 conn->in_task_id = in_task_id; 1066 conn->in_phone_hash = in_phone_hash; 1067 list_initialize(&conn->msg_queue); 1068 conn->callid = callid; 1069 conn->close_callid = 0; 1070 conn->carg = carg; 1071 1072 if (call) 1073 conn->call = *call; 1074 1075 /* We will activate the fibril ASAP */ 1076 conn->wdata.active = true; 1077 conn->cfibril = cfibril; 1078 conn->wdata.fid = fibril_create(connection_fibril, conn); 1079 1080 if (conn->wdata.fid == 0) { 1081 free(conn); 1082 1083 if (callid) 1084 ipc_answer_0(callid, ENOMEM); 1085 1086 return (uintptr_t) NULL; 1087 } 1088 1089 /* Add connection to the connection hash table */ 1090 1091 futex_down(&async_futex); 1092 hash_table_insert(&conn_hash_table, &conn->link); 1299 1093 futex_up(&async_futex); 1300 1094 1301 return port; 1095 fibril_add_ready(conn->wdata.fid); 1096 1097 return conn->wdata.fid; 1302 1098 } 1303 1099 … … 1315 1111 assert(call); 1316 1112 1317 /* Kernel notification */1113 /* Unrouted call - take some default action */ 1318 1114 if ((callid & IPC_CALLID_NOTIFICATION)) { 1319 fibril_t *fibril = (fibril_t *) __tcb_get()->fibril_data;1320 unsigned oldsw = fibril->switches;1321 1322 1115 process_notification(callid, call); 1323 1324 if (oldsw != fibril->switches) {1325 /*1326 * The notification handler did not execute atomically1327 * and so the current manager fibril assumed the role of1328 * a notification fibril. While waiting for its1329 * resources, it switched to another manager fibril that1330 * had already existed or it created a new one. We1331 * therefore know there is at least yet another1332 * manager fibril that can take over. We now kill the1333 * current 'notification' fibril to prevent fibril1334 * population explosion.1335 */1336 futex_down(&async_futex);1337 fibril_switch(FIBRIL_FROM_DEAD);1338 }1339 1340 1116 return; 1341 1117 } 1342 1118 1343 /* New connection */ 1344 if (IPC_GET_IMETHOD(*call) == IPC_M_CONNECT_ME_TO) { 1345 iface_t iface = (iface_t) IPC_GET_ARG1(*call); 1346 sysarg_t in_phone_hash = IPC_GET_ARG5(*call); 1347 1348 async_notification_handler_t handler = fallback_port_handler; 1349 void *data = fallback_port_data; 1350 1351 // TODO: Currently ignores all ports but the first one 1352 port_t *port = async_find_port(iface, 0); 1353 if (port) { 1354 handler = port->handler; 1355 data = port->data; 1356 } 1357 1358 async_new_connection(call->in_task_id, in_phone_hash, callid, 1359 call, handler, data); 1360 return; 1361 } 1362 1363 /* Cloned connection */ 1364 if (IPC_GET_IMETHOD(*call) == IPC_M_CLONE_ESTABLISH) { 1365 // TODO: Currently ignores ports altogether 1366 1119 switch (IPC_GET_IMETHOD(*call)) { 1120 case IPC_M_CLONE_ESTABLISH: 1121 case IPC_M_CONNECT_ME_TO: 1367 1122 /* Open new connection with fibril, etc. */ 1368 1123 async_new_connection(call->in_task_id, IPC_GET_ARG5(*call), 1369 callid, call, fallback_port_handler, fallback_port_data);1124 callid, call, client_connection, NULL); 1370 1125 return; 1371 1126 } … … 1512 1267 void async_create_manager(void) 1513 1268 { 1514 fid_t fid = fibril_create _generic(async_manager_fibril, NULL, PAGE_SIZE);1269 fid_t fid = fibril_create(async_manager_fibril, NULL); 1515 1270 if (fid != 0) 1516 1271 fibril_add_manager(fid); … … 1528 1283 void __async_init(void) 1529 1284 { 1530 if (!hash_table_create(&interface_hash_table, 0, 0,1531 &interface_hash_table_ops))1532 abort();1533 1534 1285 if (!hash_table_create(&client_hash_table, 0, 0, &client_hash_table_ops)) 1535 1286 abort(); … … 1546 1297 abort(); 1547 1298 1548 session_ns->iface = 0;1549 1299 session_ns->mgmt = EXCHANGE_ATOMIC; 1550 1300 session_ns->phone = PHONE_NS; … … 1593 1343 1594 1344 msg->done = true; 1595 1345 1596 1346 if (msg->forget) { 1597 1347 assert(msg->wdata.active); … … 1601 1351 fibril_add_ready(msg->wdata.fid); 1602 1352 } 1603 1353 1604 1354 futex_up(&async_futex); 1605 1355 } … … 1636 1386 1637 1387 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, msg, 1638 reply_received );1388 reply_received, true); 1639 1389 1640 1390 return (aid_t) msg; … … 1674 1424 1675 1425 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, arg5, 1676 msg, reply_received );1426 msg, reply_received, true); 1677 1427 1678 1428 return (aid_t) msg; … … 1693 1443 1694 1444 futex_down(&async_futex); 1695 1445 1696 1446 assert(!msg->forget); 1697 1447 assert(!msg->destroyed); 1698 1448 1699 1449 if (msg->done) { 1700 1450 futex_up(&async_futex); … … 1737 1487 1738 1488 amsg_t *msg = (amsg_t *) amsgid; 1739 1489 1740 1490 futex_down(&async_futex); 1741 1491 1742 1492 assert(!msg->forget); 1743 1493 assert(!msg->destroyed); 1744 1494 1745 1495 if (msg->done) { 1746 1496 futex_up(&async_futex); … … 1754 1504 if (timeout < 0) 1755 1505 timeout = 0; 1756 1506 1757 1507 getuptime(&msg->wdata.to_event.expires); 1758 1508 tv_add_diff(&msg->wdata.to_event.expires, timeout); … … 1807 1557 { 1808 1558 amsg_t *msg = (amsg_t *) amsgid; 1809 1559 1810 1560 assert(msg); 1811 1561 assert(!msg->forget); 1812 1562 assert(!msg->destroyed); 1813 1563 1814 1564 futex_down(&async_futex); 1815 1816 1565 if (msg->done) { 1817 1566 amsg_destroy(msg); … … 1820 1569 msg->forget = true; 1821 1570 } 1822 1823 1571 futex_up(&async_futex); 1824 1572 } … … 1963 1711 { 1964 1712 if (exch != NULL) 1965 ipc_call_async_0(exch->phone, imethod, NULL, NULL );1713 ipc_call_async_0(exch->phone, imethod, NULL, NULL, true); 1966 1714 } 1967 1715 … … 1969 1717 { 1970 1718 if (exch != NULL) 1971 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL );1719 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL, true); 1972 1720 } 1973 1721 … … 1976 1724 { 1977 1725 if (exch != NULL) 1978 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL); 1726 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL, 1727 true); 1979 1728 } 1980 1729 … … 1984 1733 if (exch != NULL) 1985 1734 ipc_call_async_3(exch->phone, imethod, arg1, arg2, arg3, NULL, 1986 NULL );1735 NULL, true); 1987 1736 } 1988 1737 … … 1992 1741 if (exch != NULL) 1993 1742 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, 1994 NULL, NULL );1743 NULL, NULL, true); 1995 1744 } 1996 1745 … … 2000 1749 if (exch != NULL) 2001 1750 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, 2002 arg5, NULL, NULL );1751 arg5, NULL, NULL, true); 2003 1752 } 2004 1753 … … 2065 1814 * @param arg2 User defined argument. 2066 1815 * @param arg3 User defined argument. 1816 * @param client_receiver Connection handing routine. 2067 1817 * 2068 1818 * @return Zero on success or a negative error code. … … 2070 1820 */ 2071 1821 int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2, 2072 sysarg_t arg3 )1822 sysarg_t arg3, async_client_conn_t client_receiver, void *carg) 2073 1823 { 2074 1824 if (exch == NULL) 2075 1825 return ENOENT; 2076 1826 1827 sysarg_t phone_hash; 1828 sysarg_t rc; 1829 1830 aid_t req; 2077 1831 ipc_call_t answer; 2078 aid_treq = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,1832 req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3, 2079 1833 &answer); 2080 2081 sysarg_t rc;2082 1834 async_wait_for(req, &rc); 2083 1835 if (rc != EOK) 2084 1836 return (int) rc; 1837 1838 phone_hash = IPC_GET_ARG5(answer); 1839 1840 if (client_receiver != NULL) 1841 async_new_connection(answer.in_task_id, phone_hash, 0, NULL, 1842 client_receiver, carg); 2085 1843 2086 1844 return EOK; … … 2123 1881 2124 1882 ipc_call_async_0(exch->phone, IPC_M_CLONE_ESTABLISH, msg, 2125 reply_received );1883 reply_received, true); 2126 1884 2127 1885 sysarg_t rc; … … 2142 1900 } 2143 1901 2144 sess->iface = 0;2145 1902 sess->mgmt = mgmt; 2146 1903 sess->phone = phone; … … 2172 1929 2173 1930 ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, arg4, 2174 msg, reply_received );1931 msg, reply_received, true); 2175 1932 2176 1933 sysarg_t rc; … … 2212 1969 int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3, 2213 1970 0); 1971 2214 1972 if (phone < 0) { 2215 1973 errno = phone; … … 2218 1976 } 2219 1977 2220 sess->iface = 0;2221 1978 sess->mgmt = mgmt; 2222 1979 sess->phone = phone; … … 2235 1992 } 2236 1993 2237 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.2238 *2239 * Ask through phone for a new connection to some service and block until2240 * success.2241 *2242 * @param exch Exchange for sending the message.2243 * @param iface Connection interface.2244 * @param arg2 User defined argument.2245 * @param arg3 User defined argument.2246 *2247 * @return New session on success or NULL on error.2248 *2249 */2250 async_sess_t *async_connect_me_to_iface(async_exch_t *exch, iface_t iface,2251 sysarg_t arg2, sysarg_t arg3)2252 {2253 if (exch == NULL) {2254 errno = ENOENT;2255 return NULL;2256 }2257 2258 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));2259 if (sess == NULL) {2260 errno = ENOMEM;2261 return NULL;2262 }2263 2264 int phone = async_connect_me_to_internal(exch->phone, iface, arg2,2265 arg3, 0);2266 if (phone < 0) {2267 errno = phone;2268 free(sess);2269 return NULL;2270 }2271 2272 sess->iface = iface;2273 sess->phone = phone;2274 sess->arg1 = iface;2275 sess->arg2 = arg2;2276 sess->arg3 = arg3;2277 2278 fibril_mutex_initialize(&sess->remote_state_mtx);2279 sess->remote_state_data = NULL;2280 2281 list_initialize(&sess->exch_list);2282 fibril_mutex_initialize(&sess->mutex);2283 atomic_set(&sess->refcnt, 0);2284 2285 return sess;2286 }2287 2288 1994 /** Set arguments for new connections. 2289 1995 * … … 2341 2047 } 2342 2048 2343 sess->iface = 0;2344 2049 sess->mgmt = mgmt; 2345 2050 sess->phone = phone; … … 2358 2063 } 2359 2064 2360 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework.2361 *2362 * Ask through phone for a new connection to some service and block until2363 * success.2364 *2365 * @param exch Exchange for sending the message.2366 * @param iface Connection interface.2367 * @param arg2 User defined argument.2368 * @param arg3 User defined argument.2369 *2370 * @return New session on success or NULL on error.2371 *2372 */2373 async_sess_t *async_connect_me_to_blocking_iface(async_exch_t *exch, iface_t iface,2374 sysarg_t arg2, sysarg_t arg3)2375 {2376 if (exch == NULL) {2377 errno = ENOENT;2378 return NULL;2379 }2380 2381 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t));2382 if (sess == NULL) {2383 errno = ENOMEM;2384 return NULL;2385 }2386 2387 int phone = async_connect_me_to_internal(exch->phone, iface, arg2,2388 arg3, IPC_FLAG_BLOCKING);2389 if (phone < 0) {2390 errno = phone;2391 free(sess);2392 return NULL;2393 }2394 2395 sess->iface = iface;2396 sess->phone = phone;2397 sess->arg1 = iface;2398 sess->arg2 = arg2;2399 sess->arg3 = arg3;2400 2401 fibril_mutex_initialize(&sess->remote_state_mtx);2402 sess->remote_state_data = NULL;2403 2404 list_initialize(&sess->exch_list);2405 fibril_mutex_initialize(&sess->mutex);2406 atomic_set(&sess->refcnt, 0);2407 2408 return sess;2409 }2410 2411 2065 /** Connect to a task specified by id. 2412 2066 * … … 2427 2081 } 2428 2082 2429 sess->iface = 0;2430 2083 sess->mgmt = EXCHANGE_ATOMIC; 2431 2084 sess->phone = phone; … … 2505 2158 return NULL; 2506 2159 2507 exch_mgmt_t mgmt = sess->mgmt; 2508 if (sess->iface != 0) 2509 mgmt = sess->iface & IFACE_EXCHANGE_MASK; 2510 2511 async_exch_t *exch = NULL; 2160 async_exch_t *exch; 2512 2161 2513 2162 fibril_mutex_lock(&async_sess_mutex); … … 2528 2177 */ 2529 2178 2530 if (( mgmt == EXCHANGE_ATOMIC) ||2531 ( mgmt == EXCHANGE_SERIALIZE)) {2179 if ((sess->mgmt == EXCHANGE_ATOMIC) || 2180 (sess->mgmt == EXCHANGE_SERIALIZE)) { 2532 2181 exch = (async_exch_t *) malloc(sizeof(async_exch_t)); 2533 2182 if (exch != NULL) { … … 2537 2186 exch->phone = sess->phone; 2538 2187 } 2539 } else if (mgmt == EXCHANGE_PARALLEL) { 2540 int phone; 2541 2542 retry: 2188 } else { /* EXCHANGE_PARALLEL */ 2543 2189 /* 2544 2190 * Make a one-time attempt to connect a new data phone. 2545 2191 */ 2192 2193 int phone; 2194 2195 retry: 2546 2196 phone = async_connect_me_to_internal(sess->phone, sess->arg1, 2547 2197 sess->arg2, sess->arg3, 0); … … 2585 2235 atomic_inc(&sess->refcnt); 2586 2236 2587 if ( mgmt == EXCHANGE_SERIALIZE)2237 if (sess->mgmt == EXCHANGE_SERIALIZE) 2588 2238 fibril_mutex_lock(&sess->mutex); 2589 2239 } … … 2605 2255 assert(sess != NULL); 2606 2256 2607 exch_mgmt_t mgmt = sess->mgmt;2608 if (sess->iface != 0)2609 mgmt = sess->iface & IFACE_EXCHANGE_MASK;2610 2611 2257 atomic_dec(&sess->refcnt); 2612 2258 2613 if ( mgmt == EXCHANGE_SERIALIZE)2259 if (sess->mgmt == EXCHANGE_SERIALIZE) 2614 2260 fibril_mutex_unlock(&sess->mutex); 2615 2261 … … 3048 2694 } 3049 2695 3050 void * arg_data;2696 void *_data; 3051 2697 3052 2698 if (nullterm) 3053 arg_data = malloc(size + 1);2699 _data = malloc(size + 1); 3054 2700 else 3055 arg_data = malloc(size);3056 3057 if ( arg_data == NULL) {2701 _data = malloc(size); 2702 2703 if (_data == NULL) { 3058 2704 ipc_answer_0(callid, ENOMEM); 3059 2705 return ENOMEM; 3060 2706 } 3061 2707 3062 int rc = async_data_write_finalize(callid, arg_data, size);2708 int rc = async_data_write_finalize(callid, _data, size); 3063 2709 if (rc != EOK) { 3064 free( arg_data);2710 free(_data); 3065 2711 return rc; 3066 2712 } 3067 2713 3068 2714 if (nullterm) 3069 ((char *) arg_data)[size] = 0;3070 3071 *data = arg_data;2715 ((char *) _data)[size] = 0; 2716 2717 *data = _data; 3072 2718 if (received != NULL) 3073 2719 *received = size; … … 3167 2813 } 3168 2814 3169 sess->iface = 0;3170 2815 sess->mgmt = mgmt; 3171 2816 sess->phone = phone; … … 3217 2862 } 3218 2863 3219 sess->iface = 0;3220 2864 sess->mgmt = mgmt; 3221 2865 sess->phone = phone; … … 3263 2907 return NULL; 3264 2908 3265 sess->iface = 0;3266 2909 sess->mgmt = mgmt; 3267 2910 sess->phone = phone; … … 3291 2934 { 3292 2935 assert(callid); 3293 2936 3294 2937 ipc_call_t call; 3295 2938 *callid = async_get_call(&call); 3296 2939 3297 2940 if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE) 3298 2941 return false; … … 3304 2947 if (arg3) 3305 2948 *arg3 = IPC_GET_ARG3(call); 3306 2949 3307 2950 return true; 3308 2951 } … … 3383 3026 } 3384 3027 3385 void *async_as_area_create(void *base, size_t size, unsigned int flags,3386 async_sess_t *pager, sysarg_t id1, sysarg_t id2, sysarg_t id3)3387 {3388 as_area_pager_info_t pager_info = {3389 .pager = pager->phone,3390 .id1 = id1,3391 .id2 = id2,3392 .id3 = id33393 };3394 return as_area_create(base, size, flags, &pager_info);3395 }3396 3397 3028 /** @} 3398 3029 */
Note:
See TracChangeset
for help on using the changeset viewer.