Changeset 3f7e1f24 in mainline
- Timestamp:
- 2019-08-03T08:28:26Z (5 years ago)
- Children:
- 095d03c
- Parents:
- d7c5fc0
- git-author:
- Michal Koutný <xm.koutny+hos@…> (2015-04-22 17:54:08)
- git-committer:
- Matthieu Riolo <matthieu.riolo@…> (2019-08-03 08:28:26)
- Location:
- uspace
- Files:
-
- 12 edited
Legend:
- Unmodified
- Added
- Removed
-
uspace/Makefile
rd7c5fc0 r3f7e1f24 118 118 srv/net/udp \ 119 119 srv/ns \ 120 srv/taskmon \ 120 121 srv/sysman \ 121 srv/taskmon \122 122 srv/vfs \ 123 123 srv/bd/sata_bd \ -
uspace/srv/sysman/configuration.c
rd7c5fc0 r3f7e1f24 39 39 40 40 static hash_table_t units; 41 static fibril_rwlock_t units_rwl;42 41 43 42 /* Hash table functions */ … … 81 80 { 82 81 hash_table_create(&units, 0, 0, &units_ht_ops); 83 fibril_rwlock_initialize(&units_rwl);84 82 } 85 83 … … 89 87 assert(unit->state == STATE_EMBRYO); 90 88 assert(unit->name != NULL); 91 assert(fibril_rwlock_is_write_locked(&units_rwl));92 89 sysman_log(LVL_DEBUG2, "%s('%s')", __func__, unit_name(unit)); 93 90 … … 100 97 101 98 void configuration_start_update(void) { 102 assert(!fibril_rwlock_is_write_locked(&units_rwl)); 103 sysman_log(LVL_DEBUG2, "%s", __func__); 104 fibril_rwlock_write_lock(&units_rwl); 99 sysman_log(LVL_DEBUG2, "%s", __func__); 105 100 } 106 101 … … 124 119 void configuration_commit(void) 125 120 { 126 assert(fibril_rwlock_is_write_locked(&units_rwl));127 121 sysman_log(LVL_DEBUG2, "%s", __func__); 128 122 … … 132 126 */ 133 127 hash_table_apply(&units, &configuration_commit_unit, NULL); 134 fibril_rwlock_write_unlock(&units_rwl);135 128 } 136 129 … … 157 150 void configuration_rollback(void) 158 151 { 159 assert(fibril_rwlock_is_write_locked(&units_rwl));160 152 sysman_log(LVL_DEBUG2, "%s", __func__); 161 153 162 154 hash_table_apply(&units, &configuration_rollback_unit, NULL); 163 fibril_rwlock_write_unlock(&units_rwl);164 155 } 165 156 … … 199 190 int configuration_resolve_dependecies(void) 200 191 { 201 assert(fibril_rwlock_is_write_locked(&units_rwl));202 192 sysman_log(LVL_DEBUG2, "%s", __func__); 203 193 -
uspace/srv/sysman/job.c
rd7c5fc0 r3f7e1f24 27 27 */ 28 28 29 #include <adt/ list.h>29 #include <adt/fifo.h> 30 30 #include <assert.h> 31 31 #include <errno.h> 32 #include <fibril.h> 33 #include <fibril_synch.h> 34 #include <stdio.h> 35 #include <stdlib.h> 36 32 33 #include "dep.h" 37 34 #include "job.h" 38 35 #include "log.h" 39 #include " unit.h"36 #include "sysman.h" 40 37 41 38 static list_t job_queue; 42 static fibril_mutex_t job_queue_mtx; 43 static fibril_condvar_t job_queue_cv; 44 45 static void job_destroy(job_t **); 46 47 48 static int job_run_start(job_t *job) 49 { 50 sysman_log(LVL_DEBUG, "%s(%p)", __func__, job); 51 unit_t *unit = job->unit; 52 53 int rc = unit_start(unit); 39 40 /* 41 * Static functions 42 */ 43 44 static int job_add_blocked_job(job_t *job, job_t *blocked_job) 45 { 46 int rc = dyn_array_append(&job->blocked_jobs, job_ptr_t, blocked_job); 54 47 if (rc != EOK) { 55 return rc; 56 } 57 58 fibril_mutex_lock(&unit->state_mtx); 59 while (unit->state != STATE_STARTED) { 60 fibril_condvar_wait(&unit->state_cv, &unit->state_mtx); 61 } 62 fibril_mutex_unlock(&unit->state_mtx); 63 64 // TODO react to failed state 48 return ENOMEM; 49 } 50 51 job_add_ref(blocked_job); 52 blocked_job->blocking_jobs += 1; 53 65 54 return EOK; 66 55 } 67 56 68 static int job_runner(void *arg) 69 { 70 job_t *job = (job_t *)arg; 71 72 int retval = EOK; 73 74 /* Wait for previous jobs */ 75 list_foreach(job->blocking_jobs, link, job_link_t, jl) { 76 retval = job_wait(jl->job); 77 if (retval != EOK) { 78 break; 79 } 80 } 81 82 if (retval != EOK) { 83 goto finish; 84 } 85 86 /* Run the job itself */ 87 fibril_mutex_lock(&job->state_mtx); 88 job->state = JOB_RUNNING; 89 fibril_condvar_broadcast(&job->state_cv); 90 fibril_mutex_unlock(&job->state_mtx); 91 92 switch (job->type) { 93 case JOB_START: 94 retval = job_run_start(job); 95 break; 96 default: 97 assert(false); 98 } 99 100 101 finish: 102 fibril_mutex_lock(&job->state_mtx); 103 job->state = JOB_FINISHED; 104 job->retval = retval; 105 fibril_condvar_broadcast(&job->state_cv); 106 fibril_mutex_unlock(&job->state_mtx); 107 108 job_del_ref(&job); 109 110 return EOK; 111 } 112 113 static int job_dispatcher(void *arg) 114 { 115 fibril_mutex_lock(&job_queue_mtx); 116 while (1) { 117 while (list_empty(&job_queue)) { 118 fibril_condvar_wait(&job_queue_cv, &job_queue_mtx); 119 } 120 121 link_t *link = list_first(&job_queue); 122 assert(link); 123 list_remove(link); 124 125 /* 126 * Note that possible use of fibril pool must hold invariant 127 * that job is started asynchronously. In the case there exists 128 * circular dependency between jobs, it may result in a deadlock. 129 */ 130 job_t *job = list_get_instance(link, job_t, link); 131 fid_t runner_fibril = fibril_create(job_runner, job); 132 fibril_add_ready(runner_fibril); 133 } 134 135 fibril_mutex_unlock(&job_queue_mtx); 136 return EOK; 137 } 57 static void job_init(job_t *job, unit_t *u, unit_state_t target_state) 58 { 59 assert(job); 60 assert(u); 61 62 link_initialize(&job->job_queue); 63 64 /* Start with one reference for the creator */ 65 atomic_set(&job->refcnt, 1); 66 67 job->target_state = target_state; 68 job->unit = u; 69 70 dyn_array_initialize(&job->blocked_jobs, job_ptr_t, 0); 71 job->blocking_jobs = 0; 72 job->blocking_job_failed = false; 73 74 job->state = JOB_UNQUEUED; 75 job->retval = JOB_UNDEFINED_; 76 } 77 78 static bool job_eval_retval(job_t *job) 79 { 80 unit_t *u = job->unit; 81 if (u->state == job->target_state) { 82 job->retval = JOB_OK; 83 return true; 84 } else if (u->state == STATE_FAILED) { 85 job->retval = JOB_FAILED; 86 return true; 87 } else { 88 return false; 89 } 90 } 91 92 static bool job_is_runnable(job_t *job) 93 { 94 return job->state == JOB_QUEUED && job->blocking_jobs == 0; 95 } 96 97 static void job_check(void *object, void *data) 98 { 99 unit_t *u = object; 100 job_t *job = data; 101 102 if (job_eval_retval(job)) { 103 job_finish(job); 104 } else { 105 // TODO place for timeout 106 // TODO add reference to job? 107 sysman_object_observer(u, &job_check, job); 108 } 109 } 110 111 112 static void job_unblock(job_t *blocked_job, job_t *blocking_job) 113 { 114 if (blocking_job->retval == JOB_FAILED) { 115 blocked_job->blocking_job_failed = true; 116 } 117 blocked_job->blocking_jobs -= 1; 118 } 119 120 static void job_destroy(job_t **job_ptr) 121 { 122 job_t *job = *job_ptr; 123 if (job == NULL) { 124 return; 125 } 126 127 assert(!link_used(&job->job_queue)); 128 dyn_array_destroy(&job->blocked_jobs); 129 // TODO I should decrease referece of blocked jobs 130 131 free(job); 132 *job_ptr = NULL; 133 } 134 135 /* 136 * Non-static functions 137 */ 138 138 139 139 void job_queue_init() 140 140 { 141 141 list_initialize(&job_queue); 142 fibril_mutex_initialize(&job_queue_mtx); 143 fibril_condvar_initialize(&job_queue_cv); 144 145 fid_t dispatcher_fibril = fibril_create(job_dispatcher, NULL); 146 fibril_add_ready(dispatcher_fibril); 147 } 148 149 int job_queue_jobs(list_t *jobs) 150 { 151 fibril_mutex_lock(&job_queue_mtx); 152 142 } 143 144 int job_queue_add_jobs(dyn_array_t *jobs) 145 { 153 146 /* Check consistency with queue. */ 154 list_foreach(*jobs, link, job_t, new_job) {155 list_foreach(job_queue, link, job_t, queued_job) {147 dyn_array_foreach(*jobs, job_ptr_t, new_job_it) { 148 list_foreach(job_queue, job_queue, job_t, queued_job) { 156 149 /* 157 150 * Currently we have strict strategy not permitting 158 151 * multiple jobs for one unit in the queue. 159 152 */ 160 if (new_job->unit == queued_job->unit) { 153 if ((*new_job_it)->unit == queued_job->unit) { 154 sysman_log(LVL_ERROR, 155 "Cannot queue multiple jobs foor unit '%s'", 156 unit_name((*new_job_it)->unit)); 161 157 return EEXIST; 162 158 } … … 165 161 166 162 /* Enqueue jobs */ 167 list_foreach_safe(*jobs, cur_link, next_link) { 168 list_remove(cur_link); 169 list_append(cur_link, &job_queue); 170 } 171 172 /* Only job dispatcher waits, it's correct to notify one only. */ 173 fibril_condvar_signal(&job_queue_cv); 174 fibril_mutex_unlock(&job_queue_mtx); 163 dyn_array_foreach(*jobs, job_ptr_t, job_it) { 164 (*job_it)->state = JOB_QUEUED; 165 list_append(&(*job_it)->job_queue, &job_queue); 166 // TODO explain this reference 167 job_add_ref(*job_it); 168 } 175 169 176 170 return EOK; 177 171 } 178 172 179 /** Blocking wait for job finishing. 180 * 181 * Multiple fibrils may wait for the same job. 182 * 183 * @return Return code of the job 184 */ 185 int job_wait(job_t *job) 186 { 187 fibril_mutex_lock(&job->state_mtx); 188 while (job->state != JOB_FINISHED) { 189 fibril_condvar_wait(&job->state_cv, &job->state_mtx); 190 } 191 192 int rc = job->retval; 193 fibril_mutex_unlock(&job->state_mtx); 194 173 /** Pop next runnable job 174 * 175 * @return runnable job or NULL when there's none 176 */ 177 job_t *job_queue_pop_runnable(void) 178 { 179 job_t *result = NULL; 180 link_t *first_link = list_first(&job_queue); 181 bool first_iteration = true; 182 183 list_foreach_safe(job_queue, cur_link, next_link) { 184 result = list_get_instance(cur_link, job_t, job_queue); 185 if (job_is_runnable(result)) { 186 break; 187 } else if (!first_iteration && cur_link == first_link) { 188 result = NULL; 189 break; 190 } else { 191 /* 192 * We make no assuptions about ordering of jobs in the 193 * queue, so just move the job to the end of the queue. 194 * If there are exist topologic ordering, eventually 195 * jobs will be reordered. Furthermore when if there 196 * exists any runnable job, it's always found. 197 */ 198 list_remove(cur_link); 199 list_append(cur_link, &job_queue); 200 } 201 first_iteration = false; 202 } 203 204 if (result) { 205 // TODO update refcount 206 list_remove(&result->job_queue); 207 result->state = JOB_DEQUEUED; 208 } 209 210 return result; 211 } 212 213 int job_create_closure(job_t *main_job, dyn_array_t *job_closure) 214 { 215 // TODO replace hard-coded FIFO size with resizable FIFO 216 FIFO_INITIALIZE_DYNAMIC(jobs_fifo, job_ptr_t, 10); 217 void *fifo_data = fifo_create(jobs_fifo); 218 int rc; 219 if (fifo_data == NULL) { 220 rc = ENOMEM; 221 goto finish; 222 } 223 224 /* 225 * Traverse dependency graph in BFS fashion and create jobs for every 226 * necessary unit. 227 */ 228 fifo_push(jobs_fifo, main_job); 229 job_t *job; 230 while ((job = fifo_pop(jobs_fifo)) != NULL) { 231 /* 232 * Do not increase reference count of job, as we're passing it 233 * to the closure. 234 */ 235 dyn_array_append(job_closure, job_ptr_t, job); 236 237 /* Traverse dependencies edges */ 238 unit_t *u = job->unit; 239 list_foreach(u->dependencies, dependencies, unit_dependency_t, dep) { 240 // TODO prepare for reverse edge direction and 241 // non-identity state mapping 242 job_t *new_job = 243 job_create(dep->dependency, job->target_state); 244 if (new_job == NULL) { 245 rc = ENOMEM; 246 goto finish; 247 } 248 job_add_blocked_job(new_job, job); 249 fifo_push(jobs_fifo, new_job); 250 } 251 } 252 rc = EOK; 253 254 finish: 255 free(fifo_data); 256 /* 257 * Newly created jobs are already passed to the closure, thus not 258 * deleting them here. 259 */ 195 260 return rc; 261 } 262 263 job_t *job_create(unit_t *u, unit_state_t target_state) 264 { 265 job_t *job = malloc(sizeof(job_t)); 266 if (job != NULL) { 267 job_init(job, u, target_state); 268 } 269 270 return job; 196 271 } 197 272 … … 214 289 } 215 290 216 static void job_init(job_t *job, job_type_t type) 217 { 218 assert(job); 219 220 link_initialize(&job->link); 221 list_initialize(&job->blocking_jobs); 222 223 /* Start with one reference for the creator */ 224 atomic_set(&job->refcnt, 1); 225 226 job->type = type; 227 job->unit = NULL; 228 229 job->state = JOB_WAITING; 230 fibril_mutex_initialize(&job->state_mtx); 231 fibril_condvar_initialize(&job->state_cv); 232 } 233 234 job_t *job_create(job_type_t type) 235 { 236 job_t *job = malloc(sizeof(job_t)); 237 if (job != NULL) { 238 job_init(job, type); 239 } 240 241 return job; 242 } 243 244 int job_add_blocking_job(job_t *job, job_t *blocking_job) 245 { 246 job_link_t *job_link = malloc(sizeof(job_link_t)); 247 if (job_link == NULL) { 248 return ENOMEM; 249 } 250 251 link_initialize(&job_link->link); 252 list_append(&job_link->link, &job->blocking_jobs); 253 254 job_link->job = blocking_job; 255 job_add_ref(blocking_job); 256 257 return EOK; 258 } 259 260 static void job_destroy(job_t **job_ptr) 261 { 262 job_t *job = *job_ptr; 263 if (job == NULL) { 264 return; 265 } 266 267 list_foreach_safe(job->blocking_jobs, cur_link, next_link) { 268 job_link_t *jl = list_get_instance(cur_link, job_link_t, link); 269 list_remove(cur_link); 270 job_del_ref(&jl->job); 271 free(jl); 272 } 273 free(job); 274 275 *job_ptr = NULL; 276 } 291 void job_run(job_t *job) 292 { 293 assert(job->state != JOB_RUNNING); 294 assert(job->state != JOB_FINISHED); 295 296 unit_t *u = job->unit; 297 sysman_log(LVL_DEBUG, "%s, %s -> %i", 298 __func__, unit_name(u), job->target_state); 299 300 /* Propagate failure */ 301 if (job->blocking_job_failed) { 302 goto fail; 303 } 304 305 int rc; 306 switch (job->target_state) { 307 case STATE_STARTED: 308 rc = unit_start(u); 309 break; 310 default: 311 // TODO implement other states 312 assert(false); 313 } 314 if (rc != EOK) { 315 goto fail; 316 } 317 318 job_check(job->unit, job); 319 return; 320 321 fail: 322 job->retval = JOB_FAILED; 323 job_finish(job); 324 } 325 326 /** Unblocks blocked jobs and notify observers 327 * 328 * @param[in] job job with defined return value 329 */ 330 void job_finish(job_t *job) 331 { 332 assert(job->state != JOB_FINISHED); 333 assert(job->retval != JOB_UNDEFINED_); 334 335 sysman_log(LVL_DEBUG2, "%s(%s) -> %i", 336 __func__, unit_name(job->unit), job->retval); 337 338 job->state = JOB_FINISHED; 339 340 /* Job finished */ 341 dyn_array_foreach(job->blocked_jobs, job_ptr_t, job_it) { 342 job_unblock(*job_it, job); 343 } 344 // TODO remove reference from blocked jobs 345 346 // TODO should reference be added (for the created event) 347 sysman_raise_event(&sysman_event_job_changed, job); 348 } 349 -
uspace/srv/sysman/job.h
rd7c5fc0 r3f7e1f24 30 30 #define SYSMAN_JOB_H 31 31 32 #include <adt/dyn_array.h> 32 33 #include <adt/list.h> 33 34 #include <atomic.h> 35 #include <stdbool.h> 34 36 35 37 #include "unit.h" 36 38 37 struct job; 38 typedef struct job job_t; 39 39 /** Run state of job */ 40 40 typedef enum { 41 JOB_START 42 } job_type_t; 43 44 typedef enum { 45 JOB_WAITING, 41 JOB_UNQUEUED, /**< Job not in queue yet */ 42 JOB_QUEUED, 43 JOB_DEQUEUED, /**< Job not in queue already */ 46 44 JOB_RUNNING, 47 45 JOB_FINISHED 48 46 } job_state_t; 49 47 48 /** Return value of job */ 49 typedef enum { 50 JOB_OK, 51 JOB_FAILED, 52 JOB_UNDEFINED_ = -1 53 } job_retval_t; 54 50 55 typedef struct { 51 link_t link; 52 job_t *job; 53 } job_link_t; 54 55 /** Job represents pending or running operation on unit */ 56 struct job { 57 /** Link to queue job is in */ 58 link_t link; 59 60 /** List of jobs (job_link_t ) that are blocking the job. */ 61 list_t blocking_jobs; 62 63 /** Reference counter for the job structure. */ 56 link_t job_queue; 64 57 atomic_t refcnt; 65 58 66 job_type_t type;59 unit_state_t target_state; 67 60 unit_t *unit; 68 61 62 /** Jobs that this job is preventing from running */ 63 dyn_array_t blocked_jobs; 64 /** No. of jobs that must finish before this job */ 65 size_t blocking_jobs; 66 /** Any of blocking jobs failed */ 67 bool blocking_job_failed; 68 69 /** See job_state_t */ 69 70 job_state_t state; 70 fibril_mutex_t state_mtx; 71 fibril_condvar_t state_cv; 71 /** See job_retval_t */ 72 job_retval_t retval; 73 } job_t; 72 74 73 /** Return value of the job, defined only when state == JOB_FINISHED */ 74 int retval; 75 }; 75 typedef job_t *job_ptr_t; 76 76 77 77 extern void job_queue_init(void); 78 extern int job_queue_jobs(list_t *); 78 extern int job_queue_add_jobs(dyn_array_t *); 79 extern job_t *job_queue_pop_runnable(void); 79 80 80 extern int job_wait(job_t *); 81 extern int job_create_closure(job_t *, dyn_array_t *); 82 extern job_t *job_create(unit_t *, unit_state_t); 81 83 82 84 extern void job_add_ref(job_t *); 83 85 extern void job_del_ref(job_t **); 84 86 85 extern job_t *job_create(job_type_t type);86 extern int job_add_blocking_job(job_t *, job_t *);87 87 88 extern void job_run(job_t *); 89 extern void job_finish(job_t *); 88 90 #endif -
uspace/srv/sysman/main.c
rd7c5fc0 r3f7e1f24 37 37 #include "dep.h" 38 38 #include "job.h" 39 #include "log.h" 39 40 #include "sysman.h" 40 41 #include "unit.h" … … 47 48 } 48 49 49 static int sysman_entry_point(void *arg) { 50 /* 51 * Build hard coded configuration. 52 * 53 * Strings are allocated on heap, so that they can be free'd by an 54 * owning unit. 55 */ 50 /** Build hard coded configuration */ 51 static job_t *create_entry_configuration(void) { 56 52 int result = EOK; 57 53 unit_t *mnt_initrd = NULL; … … 107 103 configuration_commit(); 108 104 109 result = sysman_unit_start(tgt_default); 110 111 return result; 105 job_t *first_job = job_create(tgt_default, STATE_STARTED); 106 if (first_job == NULL) { 107 goto fail; 108 } 109 return first_job; 112 110 113 111 fail: 112 // TODO cannot destroy units after they're added to configuration 114 113 unit_destroy(&tgt_default); 115 114 unit_destroy(&cfg_init); 116 115 unit_destroy(&mnt_initrd); 117 return result; 116 return NULL; 117 } 118 119 static void first_job_handler(void *object, void *unused) 120 { 121 job_t *job = object; 122 sysman_log(LVL_DEBUG, "First job retval: %i.", job->retval); 123 job_del_ref(&job); 118 124 } 119 125 … … 122 128 printf(NAME ": HelenOS system daemon\n"); 123 129 130 /* 131 * Initialize global structures 132 */ 124 133 configuration_init(); 134 sysman_events_init(); 125 135 job_queue_init(); 126 136 127 137 /* 128 * Create and start initial configuration asynchronously 129 * so that we can start server's fibril that may be used 130 * when executing the start. 138 * Create initial configuration while we are in a single fibril, keep 139 * the job and run it when event loop is running. 131 140 */ 132 fid_t entry_fibril = fibril_create(sysman_entry_point, NULL); 133 fibril_add_ready(entry_fibril); 141 job_t *first_job = create_entry_configuration(); 134 142 135 /* Prepare and start sysman server */ 143 /* 144 * Event loop runs in separate fibril, all consequent access to global 145 * structure is made from this fibril only. 146 */ 147 fid_t event_loop_fibril = fibril_create(sysman_events_loop, NULL); 148 fibril_add_ready(event_loop_fibril); 149 150 /* Queue first job for processing */ 151 sysman_object_observer(first_job, &first_job_handler, NULL); 152 sysman_raise_event(&sysman_event_job_process, first_job); 153 154 /* Start sysman server */ 136 155 async_set_client_connection(sysman_connection); 137 156 … … 139 158 async_manager(); 140 159 160 /* not reached */ 141 161 return 0; 142 162 } -
uspace/srv/sysman/sysman.c
rd7c5fc0 r3f7e1f24 27 27 */ 28 28 29 #include <adt/hash_table.h> 29 30 #include <adt/list.h> 30 31 #include <errno.h> 31 32 #include "dep.h" 33 #include "job.h" 32 #include <fibril_synch.h> 33 #include <stdlib.h> 34 35 #include "log.h" 34 36 #include "sysman.h" 35 37 36 /** Create jobs for cluser of given unit. 37 * 38 * @note Using recursion, limits "depth" of dependency graph. 39 */ 40 static int sysman_create_closure_jobs(unit_t *unit, job_t **entry_job_ptr, 41 list_t *accumulator, job_type_t type) 42 { 43 int rc = EOK; 44 job_t *job = job_create(type); 45 if (job == NULL) { 38 39 /* Do not expose this generally named type */ 40 typedef struct { 41 link_t event_queue; 42 43 event_handler_t handler; 44 void *data; 45 } event_t; 46 47 typedef struct { 48 link_t callbacks; 49 50 callback_handler_t handler; 51 void *data; 52 } obj_callback_t; 53 54 typedef struct { 55 ht_link_t ht_link; 56 57 void *object; 58 list_t callbacks; 59 } observed_object_t; 60 61 static LIST_INITIALIZE(event_queue); 62 static fibril_mutex_t event_queue_mtx; 63 static fibril_condvar_t event_queue_cv; 64 65 static hash_table_t observed_objects; 66 static fibril_mutex_t observed_objects_mtx; 67 static fibril_condvar_t observed_objects_cv; 68 69 /* Hash table functions */ 70 static size_t observed_objects_ht_hash(const ht_link_t *item) 71 { 72 observed_object_t *callbacks = 73 hash_table_get_inst(item, observed_object_t, ht_link); 74 75 return (size_t) callbacks->object; 76 } 77 78 static size_t observed_objects_ht_key_hash(void *key) 79 { 80 void *object = *(void **) key; 81 return (size_t) object; 82 } 83 84 static bool observed_objects_ht_key_equal(void *key, const ht_link_t *item) 85 { 86 void *object = *(void **)key; 87 return ( 88 hash_table_get_inst(item, observed_object_t, ht_link)->object == 89 object); 90 } 91 92 static hash_table_ops_t observed_objects_ht_ops = { 93 .hash = &observed_objects_ht_hash, 94 .key_hash = &observed_objects_ht_key_hash, 95 .equal = NULL, 96 .key_equal = &observed_objects_ht_key_equal, 97 .remove_callback = NULL 98 }; 99 100 static void notify_observers(void *object) 101 { 102 ht_link_t *item = hash_table_find(&observed_objects, &object); 103 if (item == NULL) { 104 return; 105 } 106 observed_object_t *observed_object = 107 hash_table_get_inst(item, observed_object_t, ht_link); 108 109 list_foreach_safe(observed_object->callbacks, cur_link, next_link) { 110 obj_callback_t *callback = 111 list_get_instance(cur_link, obj_callback_t, callbacks); 112 callback->handler(object, callback->data); 113 list_remove(cur_link); 114 free(callback); 115 } 116 } 117 118 /* 119 * Non-static functions 120 */ 121 void sysman_events_init(void) 122 { 123 fibril_mutex_initialize(&event_queue_mtx); 124 fibril_condvar_initialize(&event_queue_cv); 125 126 bool table = 127 hash_table_create(&observed_objects, 0, 0, &observed_objects_ht_ops); 128 if (!table) { 129 sysman_log(LVL_FATAL, "%s: Failed initialization", __func__); 130 abort(); 131 } 132 fibril_mutex_initialize(&observed_objects_mtx); 133 fibril_condvar_initialize(&observed_objects_cv); 134 } 135 136 int sysman_events_loop(void *unused) 137 { 138 while (1) { 139 /* Pop event */ 140 fibril_mutex_lock(&event_queue_mtx); 141 while (list_empty(&event_queue)) { 142 fibril_condvar_wait(&event_queue_cv, &event_queue_mtx); 143 } 144 145 link_t *li_event = list_first(&event_queue); 146 list_remove(li_event); 147 event_t *event = 148 list_get_instance(li_event, event_t, event_queue); 149 fibril_mutex_unlock(&event_queue_mtx); 150 151 /* Process event */ 152 event->handler(event->data); 153 free(event); 154 } 155 } 156 157 void sysman_raise_event(event_handler_t handler, void *data) 158 { 159 event_t *event = malloc(sizeof(event_t)); 160 if (event == NULL) { 161 sysman_log(LVL_FATAL, "%s: cannot allocate event", __func__); 162 // TODO think about aborting system critical task 163 abort(); 164 } 165 link_initialize(&event->event_queue); 166 event->handler = handler; 167 event->data = data; 168 169 fibril_mutex_lock(&event_queue_mtx); 170 list_append(&event->event_queue, &event_queue); 171 /* There's only single event loop, broadcast is unnecessary */ 172 fibril_condvar_signal(&event_queue_cv); 173 fibril_mutex_unlock(&event_queue_mtx); 174 } 175 176 /** Register single-use object observer callback 177 * 178 * TODO no one handles return value, it's quite fatal to lack memory for 179 * callbacks... @return EOK on success 180 * @return ENOMEM 181 */ 182 int sysman_object_observer(void *object, callback_handler_t handler, void *data) 183 { 184 int rc; 185 observed_object_t *observed_object = NULL; 186 observed_object_t *new_observed_object = NULL; 187 ht_link_t *ht_link = hash_table_find(&observed_objects, &object); 188 189 if (ht_link == NULL) { 190 observed_object = malloc(sizeof(observed_object_t)); 191 if (observed_object == NULL) { 192 rc = ENOMEM; 193 goto fail; 194 } 195 new_observed_object = observed_object; 196 197 observed_object->object = object; 198 list_initialize(&observed_object->callbacks); 199 hash_table_insert(&observed_objects, &observed_object->ht_link); 200 } else { 201 observed_object = 202 hash_table_get_inst(ht_link, observed_object_t, ht_link); 203 } 204 205 obj_callback_t *obj_callback = malloc(sizeof(obj_callback_t)); 206 if (obj_callback == NULL) { 46 207 rc = ENOMEM; 47 208 goto fail; 48 209 } 49 210 50 job->unit = unit; 51 52 list_foreach(unit->dependencies, dependencies, unit_dependency_t, dep) { 53 job_t *blocking_job = NULL; 54 rc = sysman_create_closure_jobs(dep->dependency, &blocking_job, 55 accumulator, type); 56 if (rc != EOK) { 57 goto fail; 58 } 59 60 rc = job_add_blocking_job(job, blocking_job); 61 if (rc != EOK) { 62 goto fail; 63 } 64 } 65 66 /* Job is passed to the accumulator, i.e. no add_ref. */ 67 list_append(&job->link, accumulator); 68 69 if (entry_job_ptr != NULL) { 70 *entry_job_ptr = job; 71 } 211 obj_callback->handler = handler; 212 obj_callback->data = data; 213 list_append(&obj_callback->callbacks, &observed_object->callbacks); 72 214 return EOK; 73 215 74 216 fail: 75 job_del_ref(&job);217 free(new_observed_object); 76 218 return rc; 77 219 } 78 220 79 int sysman_unit_start(unit_t *unit) 80 { 81 list_t new_jobs; 82 list_initialize(&new_jobs); 83 84 job_t *job = NULL; 85 // TODO shouldn't be here read-lock on configuration? 86 int rc = sysman_create_closure_jobs(unit, &job, &new_jobs, JOB_START); 221 /* 222 * Event handlers 223 */ 224 225 // NOTE must run in main event loop fibril 226 void sysman_event_job_process(void *arg) 227 { 228 job_t *job = arg; 229 dyn_array_t job_closure; 230 dyn_array_initialize(&job_closure, job_ptr_t, 0); 231 232 int rc = job_create_closure(job, &job_closure); 87 233 if (rc != EOK) { 88 return rc; 89 } 90 91 // TODO handle errors when adding job accumulator 92 job_queue_jobs(&new_jobs); 93 94 return job_wait(job); 95 } 234 sysman_log(LVL_ERROR, "Cannot create closure for job %p (%i)", 235 job, rc); 236 goto fail; 237 } 238 239 rc = job_queue_add_jobs(&job_closure); 240 if (rc != EOK) { 241 // TODO job_queue_add_jobs should log message 242 goto fail; 243 } 244 245 // TODO explain why calling asynchronously 246 sysman_raise_event(&sysman_event_job_queue_run, NULL); 247 return; 248 249 fail: 250 job->retval = JOB_FAILED; 251 job_finish(job); 252 // TODO clarify refcount to the main job 253 dyn_array_foreach(job_closure, job_ptr_t, closure_job) { 254 job_del_ref(&(*closure_job)); 255 } 256 dyn_array_destroy(&job_closure); 257 } 258 259 260 void sysman_event_job_queue_run(void *unused) 261 { 262 job_t *job; 263 while ((job = job_queue_pop_runnable())) { 264 job_run(job); 265 } 266 } 267 268 void sysman_event_job_changed(void *object) 269 { 270 notify_observers(object); 271 } -
uspace/srv/sysman/sysman.h
rd7c5fc0 r3f7e1f24 30 30 #define SYSMAN_SYSMAN_H 31 31 32 #include "job.h" 32 33 #include "unit.h" 33 34 34 extern int sysman_unit_start(unit_t *); 35 typedef void (*event_handler_t)(void *); 36 typedef void (*callback_handler_t)(void *object, void *data); 37 38 extern void sysman_events_init(void); 39 40 extern int sysman_events_loop(void *); 41 42 extern void sysman_raise_event(event_handler_t, void *); 43 44 extern int sysman_object_observer(void *, callback_handler_t, void *); 45 46 47 extern void sysman_event_job_process(void *); 48 extern void sysman_event_job_queue_run(void *); 49 extern void sysman_event_job_changed(void *); 35 50 36 51 #endif -
uspace/srv/sysman/unit.c
rd7c5fc0 r3f7e1f24 68 68 69 69 unit->state = STATE_EMBRYO; 70 fibril_mutex_initialize(&unit->state_mtx);71 fibril_condvar_initialize(&unit->state_cv);72 70 73 71 list_initialize(&unit->dependants); … … 105 103 } 106 104 107 void unit_set_state(unit_t *unit, unit_state_t state)108 {109 fibril_mutex_lock(&unit->state_mtx);110 unit->state = state;111 fibril_condvar_broadcast(&unit->state_cv);112 fibril_mutex_unlock(&unit->state_mtx);113 }114 105 115 106 /** Issue request to restarter to start a unit 116 107 * 117 * Return from this function only means start request was issued. 118 * If you need to wait for real start of the unit, use waiting on state_cv. 108 * Ideally this function is non-blocking synchronous, however, some units 109 * cannot be started synchronously and thus return from this function generally 110 * means that start was requested. 111 * 112 * Check state of the unit for actual result, start method can end in states: 113 * - STATE_STARTED, (succesful synchronous start) 114 * - STATE_STARTING, (succesful asynchronous start request) 115 * - STATE_FAILED. (error occured) 119 116 */ 120 117 int unit_start(unit_t *unit) -
uspace/srv/sysman/unit.h
rd7c5fc0 r3f7e1f24 62 62 63 63 unit_state_t state; 64 fibril_mutex_t state_mtx;65 fibril_condvar_t state_cv;66 64 67 65 list_t dependencies; -
uspace/srv/sysman/units/unit_cfg.c
rd7c5fc0 r3f7e1f24 176 176 } 177 177 178 assert(unit->state = STATE_EMBRYO);178 assert(unit->state == STATE_EMBRYO); 179 179 configuration_add_unit(unit); 180 180 } … … 232 232 assert(u_cfg); 233 233 234 /*235 * Skip starting state and hold state lock during whole configuration236 * load.237 */238 fibril_mutex_lock(&unit->state_mtx);239 234 int rc = cfg_load_configuration(u_cfg->path); 240 235 … … 244 239 unit->state = STATE_FAILED; 245 240 } 246 fibril_condvar_broadcast(&unit->state_cv);247 fibril_mutex_unlock(&unit->state_mtx);248 241 249 242 return rc; -
uspace/srv/sysman/units/unit_mnt.c
rd7c5fc0 r3f7e1f24 87 87 static int unit_mnt_start(unit_t *unit) 88 88 { 89 // TODO replace with non-blocking 90 const bool blocking = true; 89 91 unit_mnt_t *u_mnt = CAST_MNT(unit); 90 92 assert(u_mnt); 91 93 92 fibril_mutex_lock(&unit->state_mtx);93 94 94 95 // TODO think about unit's lifecycle (is STOPPED only acceptable?) 95 96 assert(unit->state == STATE_STOPPED); 96 unit->state = STATE_STARTING;97 98 fibril_condvar_broadcast(&unit->state_cv);99 fibril_mutex_unlock(&unit->state_mtx);100 97 101 98 102 99 // TODO use other mount parameters 103 100 int rc = mount(u_mnt->type, u_mnt->mountpoint, u_mnt->device, "", 104 IPC_FLAG_BLOCKING, 0);101 blocking ? IPC_FLAG_BLOCKING : 0, 0); 105 102 106 if (rc == EOK) { 107 sysman_log(LVL_NOTE, "Mount ('%s') mounted", unit_name(unit)); 108 unit_set_state(unit, STATE_STARTED); 103 if (blocking) { 104 if (rc == EOK) { 105 sysman_log(LVL_DEBUG, "Mount ('%s') mounted", unit_name(unit)); 106 unit->state = STATE_STARTED; 107 } else { 108 sysman_log(LVL_ERROR, "Mount ('%s') failed (%i)", 109 unit_name(unit), rc); 110 unit->state = STATE_FAILED; 111 } 109 112 } else { 110 sysman_log(LVL_ERROR, "Mount ('%s') failed (%i)", 111 unit_name(unit), rc); 112 unit_set_state(unit, STATE_FAILED); 113 if (rc == EOK) { 114 sysman_log(LVL_DEBUG, "Mount ('%s') requested", unit_name(unit)); 115 unit->state = STATE_STARTING; 116 } else { 117 sysman_log(LVL_ERROR, "Mount ('%s') request failed (%i)", 118 unit_name(unit), rc); 119 unit->state = STATE_FAILED; 120 } 113 121 } 114 122 -
uspace/srv/sysman/units/unit_tgt.c
rd7c5fc0 r3f7e1f24 59 59 assert(u_tgt); 60 60 61 unit->state = STATE_STARTED; 61 62 return EOK; 62 63 }
Note:
See TracChangeset
for help on using the changeset viewer.