Changes in uspace/lib/c/generic/async.c [7f9d97f3:ae6021d] in mainline
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
uspace/lib/c/generic/async.c
r7f9d97f3 rae6021d 77 77 * } 78 78 * 79 * my_client_connection(icallid, *icall)79 * port_handler(icallid, *icall) 80 80 * { 81 81 * if (want_refuse) { … … 116 116 #include <stdlib.h> 117 117 #include <macros.h> 118 #include <as.h> 119 #include <abi/mm/as.h> 118 120 #include "private/libc.h" 119 121 … … 123 125 list_t exch_list; 124 126 127 /** Session interface */ 128 iface_t iface; 129 125 130 /** Exchange management style */ 126 131 exch_mgmt_t mgmt; … … 189 194 /** If reply was received. */ 190 195 bool done; 191 196 192 197 /** If the message / reply should be discarded on arrival. */ 193 198 bool forget; 194 199 195 200 /** If already destroyed. */ 196 201 bool destroyed; … … 232 237 /** Identification of the opening call. */ 233 238 ipc_callid_t callid; 239 234 240 /** Call data of the opening call. */ 235 241 ipc_call_t call; 236 /** Local argument or NULL if none. */237 void *carg;238 242 239 243 /** Identification of the closing call. */ … … 241 245 242 246 /** Fibril function that will be used to handle the connection. */ 243 async_client_conn_t cfibril; 247 async_port_handler_t handler; 248 249 /** Client data */ 250 void *data; 244 251 } connection_t; 252 253 /** Interface data */ 254 typedef struct { 255 ht_link_t link; 256 257 /** Interface ID */ 258 iface_t iface; 259 260 /** Futex protecting the hash table */ 261 futex_t futex; 262 263 /** Interface ports */ 264 hash_table_t port_hash_table; 265 266 /** Next available port ID */ 267 port_id_t port_id_avail; 268 } interface_t; 269 270 /* Port data */ 271 typedef struct { 272 ht_link_t link; 273 274 /** Port ID */ 275 port_id_t id; 276 277 /** Port connection handler */ 278 async_port_handler_t handler; 279 280 /** Client data */ 281 void *data; 282 } port_t; 245 283 246 284 /* Notification data */ … … 264 302 { 265 303 struct timeval tv = { 0, 0 }; 266 304 267 305 to->inlist = false; 268 306 to->occurred = false; … … 287 325 static amsg_t *amsg_create(void) 288 326 { 289 amsg_t *msg; 290 291 msg = malloc(sizeof(amsg_t)); 327 amsg_t *msg = malloc(sizeof(amsg_t)); 292 328 if (msg) { 293 329 msg->done = false; … … 298 334 awaiter_initialize(&msg->wdata); 299 335 } 300 336 301 337 return msg; 302 338 } … … 335 371 } 336 372 337 /** Default fibril function that gets called to handle new connection. 338 * 339 * This function is defined as a weak symbol - to be redefined in user code. 373 /** Default fallback fibril function. 374 * 375 * This fallback fibril function gets called on incomming 376 * connections that do not have a specific handler defined. 340 377 * 341 378 * @param callid Hash of the incoming call. … … 344 381 * 345 382 */ 346 static void default_ client_connection(ipc_callid_t callid, ipc_call_t *call,383 static void default_fallback_port_handler(ipc_callid_t callid, ipc_call_t *call, 347 384 void *arg) 348 385 { … … 350 387 } 351 388 352 static async_client_conn_t client_connection = default_client_connection; 353 static size_t notification_handler_stksz = FIBRIL_DFLT_STK_SIZE; 354 355 /** Setter for client_connection function pointer. 356 * 357 * @param conn Function that will implement a new connection fibril. 358 * 359 */ 360 void async_set_client_connection(async_client_conn_t conn) 361 { 362 assert(client_connection == default_client_connection); 363 client_connection = conn; 364 } 365 366 /** Set the stack size for the notification handler notification fibrils. 367 * 368 * @param size Stack size in bytes. 369 */ 370 void async_set_notification_handler_stack_size(size_t size) 371 { 372 notification_handler_stksz = size; 389 static async_port_handler_t fallback_port_handler = 390 default_fallback_port_handler; 391 static void *fallback_port_data = NULL; 392 393 static hash_table_t interface_hash_table; 394 395 static size_t interface_key_hash(void *key) 396 { 397 iface_t iface = *(iface_t *) key; 398 return iface; 399 } 400 401 static size_t interface_hash(const ht_link_t *item) 402 { 403 interface_t *interface = hash_table_get_inst(item, interface_t, link); 404 return interface_key_hash(&interface->iface); 405 } 406 407 static bool interface_key_equal(void *key, const ht_link_t *item) 408 { 409 iface_t iface = *(iface_t *) key; 410 interface_t *interface = hash_table_get_inst(item, interface_t, link); 411 return iface == interface->iface; 412 } 413 414 /** Operations for the port hash table. */ 415 static hash_table_ops_t interface_hash_table_ops = { 416 .hash = interface_hash, 417 .key_hash = interface_key_hash, 418 .key_equal = interface_key_equal, 419 .equal = NULL, 420 .remove_callback = NULL 421 }; 422 423 static size_t port_key_hash(void *key) 424 { 425 port_id_t port_id = *(port_id_t *) key; 426 return port_id; 427 } 428 429 static size_t port_hash(const ht_link_t *item) 430 { 431 port_t *port = hash_table_get_inst(item, port_t, link); 432 return port_key_hash(&port->id); 433 } 434 435 static bool port_key_equal(void *key, const ht_link_t *item) 436 { 437 port_id_t port_id = *(port_id_t *) key; 438 port_t *port = hash_table_get_inst(item, port_t, link); 439 return port_id == port->id; 440 } 441 442 /** Operations for the port hash table. */ 443 static hash_table_ops_t port_hash_table_ops = { 444 .hash = port_hash, 445 .key_hash = port_key_hash, 446 .key_equal = port_key_equal, 447 .equal = NULL, 448 .remove_callback = NULL 449 }; 450 451 static interface_t *async_new_interface(iface_t iface) 452 { 453 interface_t *interface = 454 (interface_t *) malloc(sizeof(interface_t)); 455 if (!interface) 456 return NULL; 457 458 bool ret = hash_table_create(&interface->port_hash_table, 0, 0, 459 &port_hash_table_ops); 460 if (!ret) { 461 free(interface); 462 return NULL; 463 } 464 465 interface->iface = iface; 466 futex_initialize(&interface->futex, 1); 467 interface->port_id_avail = 0; 468 469 hash_table_insert(&interface_hash_table, &interface->link); 470 471 return interface; 472 } 473 474 static port_t *async_new_port(interface_t *interface, 475 async_port_handler_t handler, void *data) 476 { 477 port_t *port = (port_t *) malloc(sizeof(port_t)); 478 if (!port) 479 return NULL; 480 481 futex_down(&interface->futex); 482 483 port_id_t id = interface->port_id_avail; 484 interface->port_id_avail++; 485 486 port->id = id; 487 port->handler = handler; 488 port->data = data; 489 490 hash_table_insert(&interface->port_hash_table, &port->link); 491 492 futex_up(&interface->futex); 493 494 return port; 373 495 } 374 496 … … 387 509 */ 388 510 static FIBRIL_CONDVAR_INITIALIZE(avail_phone_cv); 511 512 int async_create_port(iface_t iface, async_port_handler_t handler, 513 void *data, port_id_t *port_id) 514 { 515 if ((iface & IFACE_MOD_MASK) == IFACE_MOD_CALLBACK) 516 return EINVAL; 517 518 interface_t *interface; 519 520 futex_down(&async_futex); 521 522 ht_link_t *link = hash_table_find(&interface_hash_table, &iface); 523 if (link) 524 interface = hash_table_get_inst(link, interface_t, link); 525 else 526 interface = async_new_interface(iface); 527 528 if (!interface) { 529 futex_up(&async_futex); 530 return ENOMEM; 531 } 532 533 port_t *port = async_new_port(interface, handler, data); 534 if (!port) { 535 futex_up(&async_futex); 536 return ENOMEM; 537 } 538 539 *port_id = port->id; 540 541 futex_up(&async_futex); 542 543 return EOK; 544 } 545 546 void async_set_fallback_port_handler(async_port_handler_t handler, void *data) 547 { 548 assert(handler != NULL); 549 550 fallback_port_handler = handler; 551 fallback_port_data = data; 552 } 389 553 390 554 static hash_table_t client_hash_table; … … 458 622 }; 459 623 624 static client_t *async_client_get(task_id_t client_id, bool create) 625 { 626 client_t *client = NULL; 627 628 futex_down(&async_futex); 629 ht_link_t *link = hash_table_find(&client_hash_table, &client_id); 630 if (link) { 631 client = hash_table_get_inst(link, client_t, link); 632 atomic_inc(&client->refcnt); 633 } else if (create) { 634 client = malloc(sizeof(client_t)); 635 if (client) { 636 client->in_task_id = client_id; 637 client->data = async_client_data_create(); 638 639 atomic_set(&client->refcnt, 1); 640 hash_table_insert(&client_hash_table, &client->link); 641 } 642 } 643 644 futex_up(&async_futex); 645 return client; 646 } 647 648 static void async_client_put(client_t *client) 649 { 650 bool destroy; 651 652 futex_down(&async_futex); 653 654 if (atomic_predec(&client->refcnt) == 0) { 655 hash_table_remove(&client_hash_table, &client->in_task_id); 656 destroy = true; 657 } else 658 destroy = false; 659 660 futex_up(&async_futex); 661 662 if (destroy) { 663 if (client->data) 664 async_client_data_destroy(client->data); 665 666 free(client); 667 } 668 } 669 670 /** Wrapper for client connection fibril. 671 * 672 * When a new connection arrives, a fibril with this implementing 673 * function is created. 674 * 675 * @param arg Connection structure pointer. 676 * 677 * @return Always zero. 678 * 679 */ 680 static int connection_fibril(void *arg) 681 { 682 assert(arg); 683 684 /* 685 * Setup fibril-local connection pointer. 686 */ 687 fibril_connection = (connection_t *) arg; 688 689 /* 690 * Add our reference for the current connection in the client task 691 * tracking structure. If this is the first reference, create and 692 * hash in a new tracking structure. 693 */ 694 695 client_t *client = async_client_get(fibril_connection->in_task_id, true); 696 if (!client) { 697 ipc_answer_0(fibril_connection->callid, ENOMEM); 698 return 0; 699 } 700 701 fibril_connection->client = client; 702 703 /* 704 * Call the connection handler function. 705 */ 706 fibril_connection->handler(fibril_connection->callid, 707 &fibril_connection->call, fibril_connection->data); 708 709 /* 710 * Remove the reference for this client task connection. 711 */ 712 async_client_put(client); 713 714 /* 715 * Remove myself from the connection hash table. 716 */ 717 futex_down(&async_futex); 718 hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash); 719 futex_up(&async_futex); 720 721 /* 722 * Answer all remaining messages with EHANGUP. 723 */ 724 while (!list_empty(&fibril_connection->msg_queue)) { 725 msg_t *msg = 726 list_get_instance(list_first(&fibril_connection->msg_queue), 727 msg_t, link); 728 729 list_remove(&msg->link); 730 ipc_answer_0(msg->callid, EHANGUP); 731 free(msg); 732 } 733 734 /* 735 * If the connection was hung-up, answer the last call, 736 * i.e. IPC_M_PHONE_HUNGUP. 737 */ 738 if (fibril_connection->close_callid) 739 ipc_answer_0(fibril_connection->close_callid, EOK); 740 741 free(fibril_connection); 742 return 0; 743 } 744 745 /** Create a new fibril for a new connection. 746 * 747 * Create new fibril for connection, fill in connection structures 748 * and insert it into the hash table, so that later we can easily 749 * do routing of messages to particular fibrils. 750 * 751 * @param in_task_id Identification of the incoming connection. 752 * @param in_phone_hash Identification of the incoming connection. 753 * @param callid Hash of the opening IPC_M_CONNECT_ME_TO call. 754 * If callid is zero, the connection was opened by 755 * accepting the IPC_M_CONNECT_TO_ME call and this 756 * function is called directly by the server. 757 * @param call Call data of the opening call. 758 * @param handler Connection handler. 759 * @param data Client argument to pass to the connection handler. 760 * 761 * @return New fibril id or NULL on failure. 762 * 763 */ 764 static fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash, 765 ipc_callid_t callid, ipc_call_t *call, async_port_handler_t handler, 766 void *data) 767 { 768 connection_t *conn = malloc(sizeof(*conn)); 769 if (!conn) { 770 if (callid) 771 ipc_answer_0(callid, ENOMEM); 772 773 return (uintptr_t) NULL; 774 } 775 776 conn->in_task_id = in_task_id; 777 conn->in_phone_hash = in_phone_hash; 778 list_initialize(&conn->msg_queue); 779 conn->callid = callid; 780 conn->close_callid = 0; 781 conn->handler = handler; 782 conn->data = data; 783 784 if (call) 785 conn->call = *call; 786 787 /* We will activate the fibril ASAP */ 788 conn->wdata.active = true; 789 conn->wdata.fid = fibril_create(connection_fibril, conn); 790 791 if (conn->wdata.fid == 0) { 792 free(conn); 793 794 if (callid) 795 ipc_answer_0(callid, ENOMEM); 796 797 return (uintptr_t) NULL; 798 } 799 800 /* Add connection to the connection hash table */ 801 802 futex_down(&async_futex); 803 hash_table_insert(&conn_hash_table, &conn->link); 804 futex_up(&async_futex); 805 806 fibril_add_ready(conn->wdata.fid); 807 808 return conn->wdata.fid; 809 } 810 811 /** Wrapper for making IPC_M_CONNECT_TO_ME calls using the async framework. 812 * 813 * Ask through phone for a new connection to some service. 814 * 815 * @param exch Exchange for sending the message. 816 * @param iface Callback interface. 817 * @param arg1 User defined argument. 818 * @param arg2 User defined argument. 819 * @param handler Callback handler. 820 * @param data Handler data. 821 * @param port_id ID of the newly created port. 822 * 823 * @return Zero on success or a negative error code. 824 * 825 */ 826 int async_create_callback_port(async_exch_t *exch, iface_t iface, sysarg_t arg1, 827 sysarg_t arg2, async_port_handler_t handler, void *data, port_id_t *port_id) 828 { 829 if ((iface & IFACE_MOD_CALLBACK) != IFACE_MOD_CALLBACK) 830 return EINVAL; 831 832 if (exch == NULL) 833 return ENOENT; 834 835 ipc_call_t answer; 836 aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, iface, arg1, arg2, 837 &answer); 838 839 sysarg_t ret; 840 async_wait_for(req, &ret); 841 if (ret != EOK) 842 return (int) ret; 843 844 sysarg_t phone_hash = IPC_GET_ARG5(answer); 845 interface_t *interface; 846 847 futex_down(&async_futex); 848 849 ht_link_t *link = hash_table_find(&interface_hash_table, &iface); 850 if (link) 851 interface = hash_table_get_inst(link, interface_t, link); 852 else 853 interface = async_new_interface(iface); 854 855 if (!interface) { 856 futex_up(&async_futex); 857 return ENOMEM; 858 } 859 860 port_t *port = async_new_port(interface, handler, data); 861 if (!port) { 862 futex_up(&async_futex); 863 return ENOMEM; 864 } 865 866 *port_id = port->id; 867 868 futex_up(&async_futex); 869 870 fid_t fid = async_new_connection(answer.in_task_id, phone_hash, 871 0, NULL, handler, data); 872 if (fid == (uintptr_t) NULL) 873 return ENOMEM; 874 875 return EOK; 876 } 877 460 878 static size_t notification_key_hash(void *key) 461 879 { … … 571 989 } 572 990 573 /** Notification fibril. 574 * 575 * When a notification arrives, a fibril with this implementing function is 576 * created. It calls the corresponding notification handler and does the final 577 * cleanup. 578 * 579 * @param arg Message structure pointer. 580 * 581 * @return Always zero. 582 * 583 */ 584 static int notification_fibril(void *arg) 585 { 586 assert(arg); 587 588 msg_t *msg = (msg_t *) arg; 991 /** Process notification. 992 * 993 * @param callid Hash of the incoming call. 994 * @param call Data of the incoming call. 995 * 996 */ 997 static void process_notification(ipc_callid_t callid, ipc_call_t *call) 998 { 589 999 async_notification_handler_t handler = NULL; 590 1000 void *data = NULL; 1001 1002 assert(call); 591 1003 592 1004 futex_down(&async_futex); 593 1005 594 1006 ht_link_t *link = hash_table_find(¬ification_hash_table, 595 &IPC_GET_IMETHOD( msg->call));1007 &IPC_GET_IMETHOD(*call)); 596 1008 if (link) { 597 1009 notification_t *notification = … … 604 1016 605 1017 if (handler) 606 handler(msg->callid, &msg->call, data); 607 608 free(msg); 609 return 0; 610 } 611 612 /** Process notification. 613 * 614 * A new fibril is created which would process the notification. 615 * 616 * @param callid Hash of the incoming call. 617 * @param call Data of the incoming call. 618 * 619 * @return False if an error occured. 620 * True if the call was passed to the notification fibril. 621 * 622 */ 623 static bool process_notification(ipc_callid_t callid, ipc_call_t *call) 624 { 625 assert(call); 626 627 futex_down(&async_futex); 628 629 msg_t *msg = malloc(sizeof(*msg)); 630 if (!msg) { 631 futex_up(&async_futex); 632 return false; 633 } 634 635 msg->callid = callid; 636 msg->call = *call; 637 638 fid_t fid = fibril_create_generic(notification_fibril, msg, 639 notification_handler_stksz); 640 if (fid == 0) { 641 free(msg); 642 futex_up(&async_futex); 643 return false; 644 } 645 646 fibril_add_ready(fid); 647 648 futex_up(&async_futex); 649 return true; 1018 handler(callid, call, data); 650 1019 } 651 1020 … … 866 1235 } 867 1236 868 msg_t *msg = list_get_instance(list_first(&conn->msg_queue), msg_t, link); 1237 msg_t *msg = list_get_instance(list_first(&conn->msg_queue), 1238 msg_t, link); 869 1239 list_remove(&msg->link); 870 1240 … … 877 1247 } 878 1248 879 static client_t *async_client_get(task_id_t client_id, bool create)880 {881 client_t *client = NULL;882 883 futex_down(&async_futex);884 ht_link_t *link = hash_table_find(&client_hash_table, &client_id);885 if (link) {886 client = hash_table_get_inst(link, client_t, link);887 atomic_inc(&client->refcnt);888 } else if (create) {889 client = malloc(sizeof(client_t));890 if (client) {891 client->in_task_id = client_id;892 client->data = async_client_data_create();893 894 atomic_set(&client->refcnt, 1);895 hash_table_insert(&client_hash_table, &client->link);896 }897 }898 899 futex_up(&async_futex);900 return client;901 }902 903 static void async_client_put(client_t *client)904 {905 bool destroy;906 907 futex_down(&async_futex);908 909 if (atomic_predec(&client->refcnt) == 0) {910 hash_table_remove(&client_hash_table, &client->in_task_id);911 destroy = true;912 } else913 destroy = false;914 915 futex_up(&async_futex);916 917 if (destroy) {918 if (client->data)919 async_client_data_destroy(client->data);920 921 free(client);922 }923 }924 925 1249 void *async_get_client_data(void) 926 1250 { … … 934 1258 if (!client) 935 1259 return NULL; 1260 936 1261 if (!client->data) { 937 1262 async_client_put(client); 938 1263 return NULL; 939 1264 } 940 1265 941 1266 return client->data; 942 1267 } … … 945 1270 { 946 1271 client_t *client = async_client_get(client_id, false); 947 1272 948 1273 assert(client); 949 1274 assert(client->data); 950 1275 951 1276 /* Drop the reference we got in async_get_client_data_by_hash(). */ 952 1277 async_client_put(client); 953 1278 954 1279 /* Drop our own reference we got at the beginning of this function. */ 955 1280 async_client_put(client); 956 1281 } 957 1282 958 /** Wrapper for client connection fibril. 959 * 960 * When a new connection arrives, a fibril with this implementing function is 961 * created. It calls client_connection() and does the final cleanup. 962 * 963 * @param arg Connection structure pointer. 964 * 965 * @return Always zero. 966 * 967 */ 968 static int connection_fibril(void *arg) 969 { 970 assert(arg); 971 972 /* 973 * Setup fibril-local connection pointer. 974 */ 975 fibril_connection = (connection_t *) arg; 976 977 /* 978 * Add our reference for the current connection in the client task 979 * tracking structure. If this is the first reference, create and 980 * hash in a new tracking structure. 981 */ 982 983 client_t *client = async_client_get(fibril_connection->in_task_id, true); 984 if (!client) { 985 ipc_answer_0(fibril_connection->callid, ENOMEM); 986 return 0; 987 } 988 989 fibril_connection->client = client; 990 991 /* 992 * Call the connection handler function. 993 */ 994 fibril_connection->cfibril(fibril_connection->callid, 995 &fibril_connection->call, fibril_connection->carg); 996 997 /* 998 * Remove the reference for this client task connection. 999 */ 1000 async_client_put(client); 1001 1002 /* 1003 * Remove myself from the connection hash table. 1004 */ 1283 static port_t *async_find_port(iface_t iface, port_id_t port_id) 1284 { 1285 port_t *port = NULL; 1286 1005 1287 futex_down(&async_futex); 1006 hash_table_remove(&conn_hash_table, &fibril_connection->in_phone_hash); 1288 1289 ht_link_t *link = hash_table_find(&interface_hash_table, &iface); 1290 if (link) { 1291 interface_t *interface = 1292 hash_table_get_inst(link, interface_t, link); 1293 1294 link = hash_table_find(&interface->port_hash_table, &port_id); 1295 if (link) 1296 port = hash_table_get_inst(link, port_t, link); 1297 } 1298 1007 1299 futex_up(&async_futex); 1008 1300 1009 /* 1010 * Answer all remaining messages with EHANGUP. 1011 */ 1012 while (!list_empty(&fibril_connection->msg_queue)) { 1013 msg_t *msg = 1014 list_get_instance(list_first(&fibril_connection->msg_queue), 1015 msg_t, link); 1016 1017 list_remove(&msg->link); 1018 ipc_answer_0(msg->callid, EHANGUP); 1019 free(msg); 1020 } 1021 1022 /* 1023 * If the connection was hung-up, answer the last call, 1024 * i.e. IPC_M_PHONE_HUNGUP. 1025 */ 1026 if (fibril_connection->close_callid) 1027 ipc_answer_0(fibril_connection->close_callid, EOK); 1028 1029 free(fibril_connection); 1030 return 0; 1031 } 1032 1033 /** Create a new fibril for a new connection. 1034 * 1035 * Create new fibril for connection, fill in connection structures and insert 1036 * it into the hash table, so that later we can easily do routing of messages to 1037 * particular fibrils. 1038 * 1039 * @param in_task_id Identification of the incoming connection. 1040 * @param in_phone_hash Identification of the incoming connection. 1041 * @param callid Hash of the opening IPC_M_CONNECT_ME_TO call. 1042 * If callid is zero, the connection was opened by 1043 * accepting the IPC_M_CONNECT_TO_ME call and this function 1044 * is called directly by the server. 1045 * @param call Call data of the opening call. 1046 * @param cfibril Fibril function that should be called upon opening the 1047 * connection. 1048 * @param carg Extra argument to pass to the connection fibril 1049 * 1050 * @return New fibril id or NULL on failure. 1051 * 1052 */ 1053 fid_t async_new_connection(task_id_t in_task_id, sysarg_t in_phone_hash, 1054 ipc_callid_t callid, ipc_call_t *call, 1055 async_client_conn_t cfibril, void *carg) 1056 { 1057 connection_t *conn = malloc(sizeof(*conn)); 1058 if (!conn) { 1059 if (callid) 1060 ipc_answer_0(callid, ENOMEM); 1061 1062 return (uintptr_t) NULL; 1063 } 1064 1065 conn->in_task_id = in_task_id; 1066 conn->in_phone_hash = in_phone_hash; 1067 list_initialize(&conn->msg_queue); 1068 conn->callid = callid; 1069 conn->close_callid = 0; 1070 conn->carg = carg; 1071 1072 if (call) 1073 conn->call = *call; 1074 1075 /* We will activate the fibril ASAP */ 1076 conn->wdata.active = true; 1077 conn->cfibril = cfibril; 1078 conn->wdata.fid = fibril_create(connection_fibril, conn); 1079 1080 if (conn->wdata.fid == 0) { 1081 free(conn); 1082 1083 if (callid) 1084 ipc_answer_0(callid, ENOMEM); 1085 1086 return (uintptr_t) NULL; 1087 } 1088 1089 /* Add connection to the connection hash table */ 1090 1091 futex_down(&async_futex); 1092 hash_table_insert(&conn_hash_table, &conn->link); 1093 futex_up(&async_futex); 1094 1095 fibril_add_ready(conn->wdata.fid); 1096 1097 return conn->wdata.fid; 1301 return port; 1098 1302 } 1099 1303 … … 1111 1315 assert(call); 1112 1316 1113 /* Unrouted call - take some default action */1317 /* Kernel notification */ 1114 1318 if ((callid & IPC_CALLID_NOTIFICATION)) { 1319 fibril_t *fibril = (fibril_t *) __tcb_get()->fibril_data; 1320 unsigned oldsw = fibril->switches; 1321 1115 1322 process_notification(callid, call); 1323 1324 if (oldsw != fibril->switches) { 1325 /* 1326 * The notification handler did not execute atomically 1327 * and so the current manager fibril assumed the role of 1328 * a notification fibril. While waiting for its 1329 * resources, it switched to another manager fibril that 1330 * had already existed or it created a new one. We 1331 * therefore know there is at least yet another 1332 * manager fibril that can take over. We now kill the 1333 * current 'notification' fibril to prevent fibril 1334 * population explosion. 1335 */ 1336 futex_down(&async_futex); 1337 fibril_switch(FIBRIL_FROM_DEAD); 1338 } 1339 1116 1340 return; 1117 1341 } 1118 1342 1119 switch (IPC_GET_IMETHOD(*call)) { 1120 case IPC_M_CLONE_ESTABLISH: 1121 case IPC_M_CONNECT_ME_TO: 1343 /* New connection */ 1344 if (IPC_GET_IMETHOD(*call) == IPC_M_CONNECT_ME_TO) { 1345 iface_t iface = (iface_t) IPC_GET_ARG1(*call); 1346 sysarg_t in_phone_hash = IPC_GET_ARG5(*call); 1347 1348 async_notification_handler_t handler = fallback_port_handler; 1349 void *data = fallback_port_data; 1350 1351 // TODO: Currently ignores all ports but the first one 1352 port_t *port = async_find_port(iface, 0); 1353 if (port) { 1354 handler = port->handler; 1355 data = port->data; 1356 } 1357 1358 async_new_connection(call->in_task_id, in_phone_hash, callid, 1359 call, handler, data); 1360 return; 1361 } 1362 1363 /* Cloned connection */ 1364 if (IPC_GET_IMETHOD(*call) == IPC_M_CLONE_ESTABLISH) { 1365 // TODO: Currently ignores ports altogether 1366 1122 1367 /* Open new connection with fibril, etc. */ 1123 1368 async_new_connection(call->in_task_id, IPC_GET_ARG5(*call), 1124 callid, call, client_connection, NULL);1369 callid, call, fallback_port_handler, fallback_port_data); 1125 1370 return; 1126 1371 } … … 1267 1512 void async_create_manager(void) 1268 1513 { 1269 fid_t fid = fibril_create (async_manager_fibril, NULL);1514 fid_t fid = fibril_create_generic(async_manager_fibril, NULL, PAGE_SIZE); 1270 1515 if (fid != 0) 1271 1516 fibril_add_manager(fid); … … 1283 1528 void __async_init(void) 1284 1529 { 1530 if (!hash_table_create(&interface_hash_table, 0, 0, 1531 &interface_hash_table_ops)) 1532 abort(); 1533 1285 1534 if (!hash_table_create(&client_hash_table, 0, 0, &client_hash_table_ops)) 1286 1535 abort(); … … 1297 1546 abort(); 1298 1547 1548 session_ns->iface = 0; 1299 1549 session_ns->mgmt = EXCHANGE_ATOMIC; 1300 1550 session_ns->phone = PHONE_NS; … … 1343 1593 1344 1594 msg->done = true; 1345 1595 1346 1596 if (msg->forget) { 1347 1597 assert(msg->wdata.active); … … 1351 1601 fibril_add_ready(msg->wdata.fid); 1352 1602 } 1353 1603 1354 1604 futex_up(&async_futex); 1355 1605 } … … 1386 1636 1387 1637 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, msg, 1388 reply_received , true);1638 reply_received); 1389 1639 1390 1640 return (aid_t) msg; … … 1424 1674 1425 1675 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, arg5, 1426 msg, reply_received , true);1676 msg, reply_received); 1427 1677 1428 1678 return (aid_t) msg; … … 1443 1693 1444 1694 futex_down(&async_futex); 1445 1695 1446 1696 assert(!msg->forget); 1447 1697 assert(!msg->destroyed); 1448 1698 1449 1699 if (msg->done) { 1450 1700 futex_up(&async_futex); … … 1487 1737 1488 1738 amsg_t *msg = (amsg_t *) amsgid; 1489 1739 1490 1740 futex_down(&async_futex); 1491 1741 1492 1742 assert(!msg->forget); 1493 1743 assert(!msg->destroyed); 1494 1744 1495 1745 if (msg->done) { 1496 1746 futex_up(&async_futex); … … 1504 1754 if (timeout < 0) 1505 1755 timeout = 0; 1506 1756 1507 1757 getuptime(&msg->wdata.to_event.expires); 1508 1758 tv_add_diff(&msg->wdata.to_event.expires, timeout); … … 1557 1807 { 1558 1808 amsg_t *msg = (amsg_t *) amsgid; 1559 1809 1560 1810 assert(msg); 1561 1811 assert(!msg->forget); 1562 1812 assert(!msg->destroyed); 1563 1813 1564 1814 futex_down(&async_futex); 1815 1565 1816 if (msg->done) { 1566 1817 amsg_destroy(msg); … … 1569 1820 msg->forget = true; 1570 1821 } 1822 1571 1823 futex_up(&async_futex); 1572 1824 } … … 1711 1963 { 1712 1964 if (exch != NULL) 1713 ipc_call_async_0(exch->phone, imethod, NULL, NULL , true);1965 ipc_call_async_0(exch->phone, imethod, NULL, NULL); 1714 1966 } 1715 1967 … … 1717 1969 { 1718 1970 if (exch != NULL) 1719 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL , true);1971 ipc_call_async_1(exch->phone, imethod, arg1, NULL, NULL); 1720 1972 } 1721 1973 … … 1724 1976 { 1725 1977 if (exch != NULL) 1726 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL, 1727 true); 1978 ipc_call_async_2(exch->phone, imethod, arg1, arg2, NULL, NULL); 1728 1979 } 1729 1980 … … 1733 1984 if (exch != NULL) 1734 1985 ipc_call_async_3(exch->phone, imethod, arg1, arg2, arg3, NULL, 1735 NULL , true);1986 NULL); 1736 1987 } 1737 1988 … … 1741 1992 if (exch != NULL) 1742 1993 ipc_call_async_4(exch->phone, imethod, arg1, arg2, arg3, arg4, 1743 NULL, NULL , true);1994 NULL, NULL); 1744 1995 } 1745 1996 … … 1749 2000 if (exch != NULL) 1750 2001 ipc_call_async_5(exch->phone, imethod, arg1, arg2, arg3, arg4, 1751 arg5, NULL, NULL , true);2002 arg5, NULL, NULL); 1752 2003 } 1753 2004 … … 1814 2065 * @param arg2 User defined argument. 1815 2066 * @param arg3 User defined argument. 1816 * @param client_receiver Connection handing routine.1817 2067 * 1818 2068 * @return Zero on success or a negative error code. … … 1820 2070 */ 1821 2071 int async_connect_to_me(async_exch_t *exch, sysarg_t arg1, sysarg_t arg2, 1822 sysarg_t arg3 , async_client_conn_t client_receiver, void *carg)2072 sysarg_t arg3) 1823 2073 { 1824 2074 if (exch == NULL) 1825 2075 return ENOENT; 1826 2076 1827 sysarg_t phone_hash; 2077 ipc_call_t answer; 2078 aid_t req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3, 2079 &answer); 2080 1828 2081 sysarg_t rc; 1829 1830 aid_t req;1831 ipc_call_t answer;1832 req = async_send_3(exch, IPC_M_CONNECT_TO_ME, arg1, arg2, arg3,1833 &answer);1834 2082 async_wait_for(req, &rc); 1835 2083 if (rc != EOK) 1836 2084 return (int) rc; 1837 1838 phone_hash = IPC_GET_ARG5(answer);1839 1840 if (client_receiver != NULL)1841 async_new_connection(answer.in_task_id, phone_hash, 0, NULL,1842 client_receiver, carg);1843 2085 1844 2086 return EOK; … … 1881 2123 1882 2124 ipc_call_async_0(exch->phone, IPC_M_CLONE_ESTABLISH, msg, 1883 reply_received , true);2125 reply_received); 1884 2126 1885 2127 sysarg_t rc; … … 1900 2142 } 1901 2143 2144 sess->iface = 0; 1902 2145 sess->mgmt = mgmt; 1903 2146 sess->phone = phone; … … 1929 2172 1930 2173 ipc_call_async_4(phone, IPC_M_CONNECT_ME_TO, arg1, arg2, arg3, arg4, 1931 msg, reply_received , true);2174 msg, reply_received); 1932 2175 1933 2176 sysarg_t rc; … … 1969 2212 int phone = async_connect_me_to_internal(exch->phone, arg1, arg2, arg3, 1970 2213 0); 1971 1972 2214 if (phone < 0) { 1973 2215 errno = phone; … … 1976 2218 } 1977 2219 2220 sess->iface = 0; 1978 2221 sess->mgmt = mgmt; 1979 2222 sess->phone = phone; … … 1992 2235 } 1993 2236 2237 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework. 2238 * 2239 * Ask through phone for a new connection to some service and block until 2240 * success. 2241 * 2242 * @param exch Exchange for sending the message. 2243 * @param iface Connection interface. 2244 * @param arg2 User defined argument. 2245 * @param arg3 User defined argument. 2246 * 2247 * @return New session on success or NULL on error. 2248 * 2249 */ 2250 async_sess_t *async_connect_me_to_iface(async_exch_t *exch, iface_t iface, 2251 sysarg_t arg2, sysarg_t arg3) 2252 { 2253 if (exch == NULL) { 2254 errno = ENOENT; 2255 return NULL; 2256 } 2257 2258 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t)); 2259 if (sess == NULL) { 2260 errno = ENOMEM; 2261 return NULL; 2262 } 2263 2264 int phone = async_connect_me_to_internal(exch->phone, iface, arg2, 2265 arg3, 0); 2266 if (phone < 0) { 2267 errno = phone; 2268 free(sess); 2269 return NULL; 2270 } 2271 2272 sess->iface = iface; 2273 sess->phone = phone; 2274 sess->arg1 = iface; 2275 sess->arg2 = arg2; 2276 sess->arg3 = arg3; 2277 2278 fibril_mutex_initialize(&sess->remote_state_mtx); 2279 sess->remote_state_data = NULL; 2280 2281 list_initialize(&sess->exch_list); 2282 fibril_mutex_initialize(&sess->mutex); 2283 atomic_set(&sess->refcnt, 0); 2284 2285 return sess; 2286 } 2287 1994 2288 /** Set arguments for new connections. 1995 2289 * … … 2047 2341 } 2048 2342 2343 sess->iface = 0; 2049 2344 sess->mgmt = mgmt; 2050 2345 sess->phone = phone; … … 2063 2358 } 2064 2359 2360 /** Wrapper for making IPC_M_CONNECT_ME_TO calls using the async framework. 2361 * 2362 * Ask through phone for a new connection to some service and block until 2363 * success. 2364 * 2365 * @param exch Exchange for sending the message. 2366 * @param iface Connection interface. 2367 * @param arg2 User defined argument. 2368 * @param arg3 User defined argument. 2369 * 2370 * @return New session on success or NULL on error. 2371 * 2372 */ 2373 async_sess_t *async_connect_me_to_blocking_iface(async_exch_t *exch, iface_t iface, 2374 sysarg_t arg2, sysarg_t arg3) 2375 { 2376 if (exch == NULL) { 2377 errno = ENOENT; 2378 return NULL; 2379 } 2380 2381 async_sess_t *sess = (async_sess_t *) malloc(sizeof(async_sess_t)); 2382 if (sess == NULL) { 2383 errno = ENOMEM; 2384 return NULL; 2385 } 2386 2387 int phone = async_connect_me_to_internal(exch->phone, iface, arg2, 2388 arg3, IPC_FLAG_BLOCKING); 2389 if (phone < 0) { 2390 errno = phone; 2391 free(sess); 2392 return NULL; 2393 } 2394 2395 sess->iface = iface; 2396 sess->phone = phone; 2397 sess->arg1 = iface; 2398 sess->arg2 = arg2; 2399 sess->arg3 = arg3; 2400 2401 fibril_mutex_initialize(&sess->remote_state_mtx); 2402 sess->remote_state_data = NULL; 2403 2404 list_initialize(&sess->exch_list); 2405 fibril_mutex_initialize(&sess->mutex); 2406 atomic_set(&sess->refcnt, 0); 2407 2408 return sess; 2409 } 2410 2065 2411 /** Connect to a task specified by id. 2066 2412 * … … 2081 2427 } 2082 2428 2429 sess->iface = 0; 2083 2430 sess->mgmt = EXCHANGE_ATOMIC; 2084 2431 sess->phone = phone; … … 2158 2505 return NULL; 2159 2506 2160 async_exch_t *exch; 2507 exch_mgmt_t mgmt = sess->mgmt; 2508 if (sess->iface != 0) 2509 mgmt = sess->iface & IFACE_EXCHANGE_MASK; 2510 2511 async_exch_t *exch = NULL; 2161 2512 2162 2513 fibril_mutex_lock(&async_sess_mutex); … … 2177 2528 */ 2178 2529 2179 if (( sess->mgmt == EXCHANGE_ATOMIC) ||2180 ( sess->mgmt == EXCHANGE_SERIALIZE)) {2530 if ((mgmt == EXCHANGE_ATOMIC) || 2531 (mgmt == EXCHANGE_SERIALIZE)) { 2181 2532 exch = (async_exch_t *) malloc(sizeof(async_exch_t)); 2182 2533 if (exch != NULL) { … … 2186 2537 exch->phone = sess->phone; 2187 2538 } 2188 } else { /* EXCHANGE_PARALLEL */ 2539 } else if (mgmt == EXCHANGE_PARALLEL) { 2540 int phone; 2541 2542 retry: 2189 2543 /* 2190 2544 * Make a one-time attempt to connect a new data phone. 2191 2545 */ 2192 2193 int phone;2194 2195 retry:2196 2546 phone = async_connect_me_to_internal(sess->phone, sess->arg1, 2197 2547 sess->arg2, sess->arg3, 0); … … 2235 2585 atomic_inc(&sess->refcnt); 2236 2586 2237 if ( sess->mgmt == EXCHANGE_SERIALIZE)2587 if (mgmt == EXCHANGE_SERIALIZE) 2238 2588 fibril_mutex_lock(&sess->mutex); 2239 2589 } … … 2255 2605 assert(sess != NULL); 2256 2606 2607 exch_mgmt_t mgmt = sess->mgmt; 2608 if (sess->iface != 0) 2609 mgmt = sess->iface & IFACE_EXCHANGE_MASK; 2610 2257 2611 atomic_dec(&sess->refcnt); 2258 2612 2259 if ( sess->mgmt == EXCHANGE_SERIALIZE)2613 if (mgmt == EXCHANGE_SERIALIZE) 2260 2614 fibril_mutex_unlock(&sess->mutex); 2261 2615 … … 2694 3048 } 2695 3049 2696 void * _data;3050 void *arg_data; 2697 3051 2698 3052 if (nullterm) 2699 _data = malloc(size + 1);3053 arg_data = malloc(size + 1); 2700 3054 else 2701 _data = malloc(size);2702 2703 if ( _data == NULL) {3055 arg_data = malloc(size); 3056 3057 if (arg_data == NULL) { 2704 3058 ipc_answer_0(callid, ENOMEM); 2705 3059 return ENOMEM; 2706 3060 } 2707 3061 2708 int rc = async_data_write_finalize(callid, _data, size);3062 int rc = async_data_write_finalize(callid, arg_data, size); 2709 3063 if (rc != EOK) { 2710 free( _data);3064 free(arg_data); 2711 3065 return rc; 2712 3066 } 2713 3067 2714 3068 if (nullterm) 2715 ((char *) _data)[size] = 0;2716 2717 *data = _data;3069 ((char *) arg_data)[size] = 0; 3070 3071 *data = arg_data; 2718 3072 if (received != NULL) 2719 3073 *received = size; … … 2813 3167 } 2814 3168 3169 sess->iface = 0; 2815 3170 sess->mgmt = mgmt; 2816 3171 sess->phone = phone; … … 2862 3217 } 2863 3218 3219 sess->iface = 0; 2864 3220 sess->mgmt = mgmt; 2865 3221 sess->phone = phone; … … 2907 3263 return NULL; 2908 3264 3265 sess->iface = 0; 2909 3266 sess->mgmt = mgmt; 2910 3267 sess->phone = phone; … … 2934 3291 { 2935 3292 assert(callid); 2936 3293 2937 3294 ipc_call_t call; 2938 3295 *callid = async_get_call(&call); 2939 3296 2940 3297 if (IPC_GET_IMETHOD(call) != IPC_M_STATE_CHANGE_AUTHORIZE) 2941 3298 return false; … … 2947 3304 if (arg3) 2948 3305 *arg3 = IPC_GET_ARG3(call); 2949 3306 2950 3307 return true; 2951 3308 } … … 3026 3383 } 3027 3384 3385 void *async_as_area_create(void *base, size_t size, unsigned int flags, 3386 async_sess_t *pager, sysarg_t id1, sysarg_t id2, sysarg_t id3) 3387 { 3388 as_area_pager_info_t pager_info = { 3389 .pager = pager->phone, 3390 .id1 = id1, 3391 .id2 = id2, 3392 .id3 = id3 3393 }; 3394 return as_area_create(base, size, flags, &pager_info); 3395 } 3396 3028 3397 /** @} 3029 3398 */
Note:
See TracChangeset
for help on using the changeset viewer.