Changeset 514d561 in mainline for uspace/lib/c/generic/async/server.c


Ignore:
Timestamp:
2018-07-20T16:27:20Z (6 years ago)
Author:
Jiří Zárevúcky <jiri.zarevucky@…>
Branches:
lfn, master, serial, ticket/834-toolchain-update, topic/msim-upgrade, topic/simplify-dev-export
Children:
05208d9
Parents:
7137f74c
git-author:
Jiří Zárevúcky <jiri.zarevucky@…> (2018-07-19 21:52:47)
git-committer:
Jiří Zárevúcky <jiri.zarevucky@…> (2018-07-20 16:27:20)
Message:

Fibril/async implementation overhaul.

This commit marks the move towards treating the fibril library as a mere
implementation of a generic threading interface. Understood as a layer that
wraps the kernel threads, we not only have to wrap threading itself, but also
every syscall that blocks the kernel thread (by blocking, we mean thread not
doing useful work until an external event happens — e.g. locking a kernel
mutex or thread sleep is understood as blocking, but an as_area_create() is not,
despite potentially taking a long time to complete).

Consequently, we implement fibril_ipc_wait() as a fibril-native wrapper for
kernel's ipc_wait(), and also implement timer functionality like timeouts
as part of the fibril library. This removes the interdependency between fibril
implementation and the async framework — in theory, the fibril API could be
reimplemented as a simple 1:1 shim, and the async framework would continue
working normally (note that the current implementation of loader complicates
this).

To better isolate the fibril internals from the implementation of high-level
synchronization, a fibril_event_t is added. This object conceptually acts
like a single slot wait queue. All other synchronization is implemented in
terms of this primitive.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • uspace/lib/c/generic/async/server.c

    r7137f74c r514d561  
    104104#include <ipc/irq.h>
    105105#include <ipc/event.h>
    106 #include <futex.h>
    107106#include <fibril.h>
    108107#include <adt/hash_table.h>
     
    118117#include <stdlib.h>
    119118#include <macros.h>
     119#include <str_error.h>
    120120#include <as.h>
    121121#include <abi/mm/as.h>
     
    127127/** Async framework global futex */
    128128futex_t async_futex = FUTEX_INITIALIZER;
    129 
    130 /** Number of threads waiting for IPC in the kernel. */
    131 static atomic_t threads_in_ipc_wait = { 0 };
    132129
    133130/** Call data */
     
    148145/* Server connection data */
    149146typedef struct {
    150         awaiter_t wdata;
     147        /** Fibril handling the connection. */
     148        fid_t fid;
    151149
    152150        /** Hash table link. */
     
    161159        /** Link to the client tracking structure. */
    162160        client_t *client;
     161
     162        /** Message event. */
     163        fibril_event_t msg_arrived;
    163164
    164165        /** Messages that should be delivered to this fibril. */
     
    251252/* The remaining structures are guarded by async_futex. */
    252253static hash_table_t conn_hash_table;
    253 static LIST_INITIALIZE(timeout_list);
    254254
    255255static size_t client_key_hash(void *key)
     
    487487                        ipc_answer_0(call->cap_handle, ENOMEM);
    488488
    489                 return (uintptr_t) NULL;
     489                return (fid_t) NULL;
    490490        }
    491491
    492492        conn->in_task_id = in_task_id;
    493493        conn->in_phone_hash = in_phone_hash;
     494        conn->msg_arrived = FIBRIL_EVENT_INIT;
    494495        list_initialize(&conn->msg_queue);
    495496        conn->close_chandle = CAP_NIL;
     
    503504
    504505        /* We will activate the fibril ASAP */
    505         conn->wdata.active = true;
    506         conn->wdata.fid = fibril_create(connection_fibril, conn);
    507 
    508         if (conn->wdata.fid == 0) {
     506        conn->fid = fibril_create(connection_fibril, conn);
     507
     508        if (conn->fid == 0) {
    509509                free(conn);
    510510
     
    512512                        ipc_answer_0(call->cap_handle, ENOMEM);
    513513
    514                 return (uintptr_t) NULL;
     514                return (fid_t) NULL;
    515515        }
    516516
     
    521521        futex_unlock(&async_futex);
    522522
    523         fibril_add_ready(conn->wdata.fid);
    524 
    525         return conn->wdata.fid;
     523        fibril_add_ready(conn->fid);
     524
     525        return conn->fid;
    526526}
    527527
     
    566566        fid_t fid = async_new_connection(answer.in_task_id, phone_hash,
    567567            NULL, handler, data);
    568         if (fid == (uintptr_t) NULL)
     568        if (fid == (fid_t) NULL)
    569569                return ENOMEM;
    570570
     
    602602};
    603603
    604 /** Sort in current fibril's timeout request.
    605  *
    606  * @param wd Wait data of the current fibril.
    607  *
    608  */
    609 void async_insert_timeout(awaiter_t *wd)
    610 {
    611         assert(wd);
    612 
    613         wd->to_event.occurred = false;
    614         wd->to_event.inlist = true;
    615 
    616         link_t *tmp = timeout_list.head.next;
    617         while (tmp != &timeout_list.head) {
    618                 awaiter_t *cur =
    619                     list_get_instance(tmp, awaiter_t, to_event.link);
    620 
    621                 if (tv_gteq(&cur->to_event.expires, &wd->to_event.expires))
    622                         break;
    623 
    624                 tmp = tmp->next;
    625         }
    626 
    627         list_insert_before(&wd->to_event.link, tmp);
    628 }
    629 
    630604/** Try to route a call to an appropriate connection fibril.
    631605 *
     
    657631        connection_t *conn = hash_table_get_inst(link, connection_t, link);
    658632
     633        // FIXME: malloc in critical section
    659634        msg_t *msg = malloc(sizeof(*msg));
    660635        if (!msg) {
     
    670645
    671646        /* If the connection fibril is waiting for an event, activate it */
    672         if (!conn->wdata.active) {
    673 
    674                 /* If in timeout list, remove it */
    675                 if (conn->wdata.to_event.inlist) {
    676                         conn->wdata.to_event.inlist = false;
    677                         list_remove(&conn->wdata.to_event.link);
    678                 }
    679 
    680                 conn->wdata.active = true;
    681                 fibril_add_ready(conn->wdata.fid);
    682         }
     647        fibril_notify(&conn->msg_arrived);
    683648
    684649        futex_unlock(&async_futex);
     
    987952        connection_t *conn = fibril_connection;
    988953
     954        struct timeval tv;
     955        struct timeval *expires = NULL;
     956        if (usecs) {
     957                getuptime(&tv);
     958                tv_add_diff(&tv, usecs);
     959                expires = &tv;
     960        }
     961
    989962        futex_lock(&async_futex);
    990 
    991         if (usecs) {
    992                 getuptime(&conn->wdata.to_event.expires);
    993                 tv_add_diff(&conn->wdata.to_event.expires, usecs);
    994         } else
    995                 conn->wdata.to_event.inlist = false;
    996963
    997964        /* If nothing in queue, wait until something arrives */
     
    1011978                }
    1012979
    1013                 if (usecs)
    1014                         async_insert_timeout(&conn->wdata);
    1015 
    1016                 conn->wdata.active = false;
    1017 
    1018                 /*
    1019                  * Note: the current fibril will be rescheduled either due to a
    1020                  * timeout or due to an arriving message destined to it. In the
    1021                  * former case, handle_expired_timeouts() and, in the latter
    1022                  * case, route_call() will perform the wakeup.
    1023                  */
    1024                 fibril_switch(FIBRIL_FROM_BLOCKED);
    1025 
    1026                 if ((usecs) && (conn->wdata.to_event.occurred) &&
    1027                     (list_empty(&conn->msg_queue))) {
    1028                         /* If we timed out -> exit */
    1029                         futex_unlock(&async_futex);
     980                // TODO: replace with cvar
     981                futex_unlock(&async_futex);
     982
     983                errno_t rc = fibril_wait_timeout(&conn->msg_arrived, expires);
     984                if (rc == ETIMEOUT)
    1030985                        return false;
    1031                 }
     986
     987                futex_lock(&async_futex);
    1032988        }
    1033989
     
    11261082}
    11271083
    1128 /** Fire all timeouts that expired. */
    1129 static suseconds_t handle_expired_timeouts(unsigned int *flags)
    1130 {
    1131         /* Make sure the async_futex is held. */
    1132         futex_assert_is_locked(&async_futex);
    1133 
    1134         struct timeval tv;
    1135         getuptime(&tv);
    1136 
    1137         bool fired = false;
    1138 
    1139         link_t *cur = list_first(&timeout_list);
    1140         while (cur != NULL) {
    1141                 awaiter_t *waiter =
    1142                     list_get_instance(cur, awaiter_t, to_event.link);
    1143 
    1144                 if (tv_gt(&waiter->to_event.expires, &tv)) {
    1145                         if (fired) {
    1146                                 *flags = SYNCH_FLAGS_NON_BLOCKING;
    1147                                 return 0;
    1148                         }
    1149                         *flags = 0;
    1150                         return tv_sub_diff(&waiter->to_event.expires, &tv);
    1151                 }
    1152 
    1153                 list_remove(&waiter->to_event.link);
    1154                 waiter->to_event.inlist = false;
    1155                 waiter->to_event.occurred = true;
    1156 
    1157                 /*
    1158                  * Redundant condition?
    1159                  * The fibril should not be active when it gets here.
    1160                  */
    1161                 if (!waiter->active) {
    1162                         waiter->active = true;
    1163                         fibril_add_ready(waiter->fid);
    1164                         fired = true;
    1165                 }
    1166 
    1167                 cur = list_first(&timeout_list);
    1168         }
    1169 
    1170         if (fired) {
    1171                 *flags = SYNCH_FLAGS_NON_BLOCKING;
    1172                 return 0;
    1173         }
    1174 
    1175         return SYNCH_NO_TIMEOUT;
    1176 }
    1177 
    11781084/** Endless loop dispatching incoming calls and answers.
    11791085 *
     
    11831089static errno_t async_manager_worker(void)
    11841090{
     1091        ipc_call_t call;
     1092        errno_t rc;
     1093
    11851094        while (true) {
    1186                 futex_lock(&async_futex);
    1187                 fibril_switch(FIBRIL_FROM_MANAGER);
    1188 
    1189                 /*
    1190                  * The switch only returns when there is no non-manager fibril
    1191                  * it can run.
    1192                  */
    1193 
    1194                 unsigned int flags = SYNCH_FLAGS_NONE;
    1195                 suseconds_t next_timeout = handle_expired_timeouts(&flags);
    1196                 futex_unlock(&async_futex);
    1197 
    1198                 atomic_inc(&threads_in_ipc_wait);
    1199 
    1200                 ipc_call_t call;
    1201                 errno_t rc = ipc_wait(&call, next_timeout, flags);
    1202 
    1203                 atomic_dec(&threads_in_ipc_wait);
    1204 
     1095                rc = fibril_ipc_wait(&call, NULL);
    12051096                if (rc == EOK)
    12061097                        handle_call(&call);
     
    12251116
    12261117/** Add one manager to manager list. */
    1227 void async_create_manager(void)
     1118fid_t async_create_manager(void)
    12281119{
    12291120        fid_t fid = fibril_create_generic(async_manager_fibril, NULL, PAGE_SIZE);
    1230         if (fid != 0)
    1231                 fibril_add_manager(fid);
    1232 }
    1233 
    1234 /** Remove one manager from manager list */
    1235 void async_destroy_manager(void)
    1236 {
    1237         fibril_remove_manager();
     1121        fibril_start(fid);
     1122        return fid;
    12381123}
    12391124
     
    12521137            &notification_hash_table_ops))
    12531138                abort();
     1139
     1140        async_create_manager();
    12541141}
    12551142
     
    13421229
    13431230        return EOK;
    1344 }
    1345 
    1346 /** Interrupt one thread of this task from waiting for IPC. */
    1347 void async_poke(void)
    1348 {
    1349         if (atomic_get(&threads_in_ipc_wait) > 0)
    1350                 ipc_poke();
    13511231}
    13521232
     
    18341714__noreturn void async_manager(void)
    18351715{
    1836         futex_lock(&async_futex);
    1837         fibril_switch(FIBRIL_FROM_DEAD);
     1716        fibril_event_t ever = FIBRIL_EVENT_INIT;
     1717        fibril_wait_for(&ever);
    18381718        __builtin_unreachable();
    18391719}
Note: See TracChangeset for help on using the changeset viewer.