Changeset 72c8f77 in mainline
- Timestamp:
- 2019-08-06T18:30:29Z (5 years ago)
- Children:
- 63a3276
- Parents:
- c2d50c8
- git-author:
- Michal Koutný <xm.koutny+hos@…> (2015-05-26 16:15:58)
- git-committer:
- Matthieu Riolo <matthieu.riolo@…> (2019-08-06 18:30:29)
- Location:
- uspace
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
uspace/lib/sysman/include/sysman/broker.h
rc2d50c8 r72c8f77 30 30 #define _SYSMAN_BROKER_H 31 31 32 #include <sysman/unit.h> 32 33 #include <task.h> 33 #include <sysman/unit.h>34 34 35 35 int sysman_broker_register(void); -
uspace/srv/sysman/job.c
rc2d50c8 r72c8f77 46 46 static int job_add_blocked_job(job_t *blocking_job, job_t *blocked_job) 47 47 { 48 int rc = dyn_array_append(&blocking_job->blocked_jobs, job_ptr_t, 48 assert(blocking_job->blocked_jobs.size == 49 blocking_job->blocked_jobs_count); 50 51 int rc = dyn_array_append(&blocking_job->blocked_jobs, job_t *, 49 52 blocked_job); 50 53 if (rc != EOK) { … … 53 56 job_add_ref(blocked_job); 54 57 58 blocking_job->blocked_jobs_count += 1; 55 59 blocked_job->blocking_jobs += 1; 56 60 … … 86 90 job->unit = u; 87 91 88 u->job = job; 89 job_add_ref(job); 90 91 dyn_array_initialize(&job->blocked_jobs, job_ptr_t); 92 dyn_array_initialize(&job->blocked_jobs, job_t *); 92 93 job->blocking_jobs = 0; 93 94 job->blocking_job_failed = false; 94 95 95 job->state = JOB_ UNQUEUED;96 job->state = JOB_EMBRYO; 96 97 job->retval = JOB_UNDEFINED_; 97 98 } … … 100 101 { 101 102 unit_t *u = job->unit; 103 102 104 if (u->state == job->target_state) { 103 105 job->retval = JOB_OK; … … 117 119 118 120 /* 119 * We have one reference from caller for our disposal, *121 * We have one reference from caller for our disposal, 120 122 * if needed, pass it to observer. 121 123 */ … … 138 140 assert(!link_used(&job->job_queue)); 139 141 140 dyn_array_foreach(job->blocked_jobs, job_ ptr_t, job_it) {142 dyn_array_foreach(job->blocked_jobs, job_t *, job_it) { 141 143 job_del_ref(&(*job_it)); 142 144 } … … 149 151 static bool job_is_runnable(job_t *job) 150 152 { 151 return job->state == JOB_QUEUED && job->blocking_jobs == 0; 153 assert(job->state == JOB_PENDING); 154 return job->blocking_jobs == 0; 152 155 } 153 156 … … 170 173 /* Remove job from queue and pass reference to caller */ 171 174 list_remove(&result->job_queue); 172 result->state = JOB_DEQUEUED;173 175 } 174 176 175 177 return result; 178 } 179 180 /** Merge two jobs together 181 * 182 * @param[in/out] trunk job that 183 * @param[in] other job that will be cleared out 184 * 185 * @return EOK on success 186 * @return error code on fail 187 */ 188 static int job_pre_merge(job_t *trunk, job_t *other) 189 { 190 assert(trunk->unit == other->unit); 191 assert(trunk->target_state == other->target_state); 192 assert(trunk->blocked_jobs.size == trunk->blocked_jobs_count); 193 assert(other->merged_into == NULL); 194 195 int rc = dyn_array_concat(&trunk->blocked_jobs, &other->blocked_jobs); 196 if (rc != EOK) { 197 return rc; 198 } 199 dyn_array_clear(&other->blocked_jobs); 200 201 // TODO allocate observed object 202 203 other->merged_into = trunk; 204 205 return EOK; 206 } 207 208 static void job_finish_merge(job_t *trunk, job_t *other) 209 { 210 assert(trunk->blocked_jobs.size >= trunk->blocked_jobs_count); 211 //TODO aggregate merged blocked_jobs 212 trunk->blocked_jobs_count = other->blocked_jobs.size; 213 214 /* All allocation is done in job_pre_merge, cannot fail here. */ 215 int rc = sysman_move_observers(other, trunk); 216 assert(rc == EOK); 217 } 218 219 static void job_undo_merge(job_t *trunk) 220 { 221 assert(trunk->blocked_jobs.size >= trunk->blocked_jobs_count); 222 dyn_array_clear_range(&trunk->blocked_jobs, 223 trunk->blocked_jobs_count, trunk->blocked_jobs.size); 176 224 } 177 225 … … 185 233 } 186 234 187 int job_queue_add_jobs(dyn_array_t *jobs) 188 { 189 /* Check consistency with queue. */ 190 dyn_array_foreach(*jobs, job_ptr_t, new_job_it) { 191 list_foreach(job_queue, job_queue, job_t, queued_job) { 192 /* 193 * Currently we have strict strategy not permitting 194 * multiple jobs for one unit in the queue at a time. 195 */ 196 if ((*new_job_it)->unit == queued_job->unit) { 235 int job_queue_add_closure(dyn_array_t *closure) 236 { 237 bool has_error = false; 238 int rc = EOK; 239 240 /* Check consistency with existing jobs. */ 241 dyn_array_foreach(*closure, job_t *, job_it) { 242 job_t *job = *job_it; 243 job_t *other_job = job->unit->job; 244 245 if (other_job == NULL) { 246 continue; 247 } 248 249 if (other_job->target_state != job->target_state) { 250 switch (other_job->state) { 251 case JOB_RUNNING: 197 252 sysman_log(LVL_ERROR, 198 "Cannot queue multiple jobs for unit '%s'", 199 unit_name((*new_job_it)->unit)); 200 return EEXIST; 253 "Unit '%s' has already different job running.", 254 unit_name(job->unit)); 255 has_error = true; 256 continue; 257 case JOB_PENDING: 258 /* 259 * Currently we have strict strategy not 260 * permitting multiple jobs for one unit in the 261 * queue at a time. 262 */ 263 sysman_log(LVL_ERROR, 264 "Cannot queue multiple jobs for unit '%s'.", 265 unit_name(job->unit)); 266 has_error = true; 267 continue; 268 default: 269 assert(false); 201 270 } 202 } 203 } 204 205 /* Enqueue jobs */ 206 dyn_array_foreach(*jobs, job_ptr_t, job_it) { 207 (*job_it)->state = JOB_QUEUED; 208 list_append(&(*job_it)->job_queue, &job_queue); 271 } else { 272 // TODO think about other options to merging 273 // (replacing, cancelling) 274 rc = job_pre_merge(other_job, job); 275 if (rc != EOK) { 276 break; 277 } 278 } 279 } 280 281 /* Aggregate merged jobs, or rollback any changes in existing jobs */ 282 bool finish_merge = (rc == EOK) && !has_error; 283 dyn_array_foreach(*closure, job_t *, job_it) { 284 if ((*job_it)->merged_into == NULL) { 285 continue; 286 } 287 if (finish_merge) { 288 job_finish_merge((*job_it)->merged_into, *job_it); 289 } else { 290 job_undo_merge((*job_it)->merged_into); 291 } 292 } 293 if (has_error) { 294 return EBUSY; 295 } else if (rc != EOK) { 296 return rc; 297 } 298 299 /* Unmerged jobs are enqueued, merged are disposed */ 300 dyn_array_foreach(*closure, job_t *, job_it) { 301 job_t *job = (*job_it); 302 if (job->merged_into != NULL) { 303 job_del_ref(&job); 304 continue; 305 } 306 307 308 unit_t *u = job->unit; 309 assert(u->bfs_job != NULL); 310 assert(u->job == NULL); 311 u->job = u->bfs_job; 312 u->bfs_job = NULL; 313 314 315 job->state = JOB_PENDING; 209 316 /* We pass reference from the closure to the queue */ 317 list_append(&job->job_queue, &job_queue); 210 318 } 211 319 … … 232 340 int job_create_closure(job_t *main_job, dyn_array_t *job_closure) 233 341 { 342 sysman_log(LVL_DEBUG2, "%s(%s)", __func__, unit_name(main_job->unit)); 234 343 int rc; 235 344 list_t units_fifo; 236 345 list_initialize(&units_fifo); 237 346 238 /* Zero BFS tags before use*/347 /* Check invariant */ 239 348 list_foreach(units, units, unit_t, u) { 240 u->bfs_tag = false;349 assert(u->bfs_job == false); 241 350 } 242 351 243 352 unit_t *unit = main_job->unit; 353 job_add_ref(main_job); 354 unit->bfs_job = main_job; 244 355 list_append(&unit->bfs_link, &units_fifo); 245 unit->bfs_tag = true;246 356 247 357 while (!list_empty(&units_fifo)) { 248 358 unit = list_get_instance(list_first(&units_fifo), unit_t, 249 359 bfs_link); 250 assert(unit->job);251 360 list_remove(&unit->bfs_link); 252 job_t *job = unit->job; 253 254 255 // TODO more sophisticated check? (unit that is in transitional 256 // state cannot have currently multiple jobs queued) 257 if (job->target_state == unit->state) { 258 /* 259 * Job would do nothing, finish it on spot. 260 * No need to continue BFS search from it. 261 */ 262 job->retval = JOB_OK; 263 job_finish(job); 264 continue; 265 } 361 job_t *job = unit->bfs_job; 362 assert(job != NULL); 266 363 267 364 job_add_ref(job); 268 dyn_array_append(job_closure, job_ ptr_t, job);365 dyn_array_append(job_closure, job_t *, job); 269 366 270 367 /* 271 368 * Traverse dependencies edges 272 * Depending ondependency type and edge direction create273 * appropriate jobs .369 * According to dependency type and edge direction create 370 * appropriate jobs (currently "After" only). 274 371 */ 275 372 list_foreach(unit->dependencies, dependencies, unit_dependency_t, dep) { 276 373 unit_t *u = dep->dependency; 277 374 job_t *blocking_job; 278 if (u->bfs_tag) { 279 assert(u->job); 280 blocking_job = u->job; 281 } else { 282 u->bfs_tag = true; 375 376 if (u->bfs_job == NULL) { 283 377 blocking_job = job_create(u, job->target_state); 284 378 if (blocking_job == NULL) { … … 286 380 goto finish; 287 381 } 288 /* Reference to job is kept inunit */289 job_del_ref(&blocking_job);382 /* Pass reference to unit */ 383 u->bfs_job = blocking_job; 290 384 list_append(&u->bfs_link, &units_fifo); 385 } else { 386 blocking_job = u->bfs_job; 291 387 } 292 388 293 389 job_add_blocked_job(blocking_job, job); 294 390 } 295 391 } 296 sysman_log(LVL_DEBUG2, "%s(%s):", __func__, unit_name(main_job->unit));297 dyn_array_foreach(*job_closure, job_ptr_t, job_it) {298 sysman_log(LVL_DEBUG2, "%s\t%s", __func__, unit_name((*job_it)->unit));299 }392 // sysman_log(LVL_DEBUG2, "%s(%s):", __func__, unit_name(main_job->unit)); 393 // dyn_array_foreach(*job_closure, job_t *, job_it) { 394 // sysman_log(LVL_DEBUG2, "%s\t%s", __func__, unit_name((*job_it)->unit)); 395 // } 300 396 rc = EOK; 301 397 302 398 finish: 303 /* Any unprocessed jobs may be referenced by units */ 304 list_foreach(units_fifo, bfs_link, unit_t, u) { 305 job_del_ref(&u->job); 306 } 399 /* Unreference any jobs in interrupted BFS queue */ 400 list_foreach_safe(units_fifo, cur_link, next_link) { 401 unit_t *u = list_get_instance(cur_link, unit_t, bfs_link); 402 job_del_ref(&u->bfs_job); 403 list_remove(cur_link); 404 } 405 307 406 return rc; 308 407 } … … 310 409 /** Create job assigned to the unit 311 410 * 312 * @param[in] unit unit to be modified, its job must be empty411 * @param[in] unit 313 412 * @param[in] target_state 314 413 * 315 * @return NULL or newly created job 316 * There are two references to the job, one set in the unit and second 317 * is the return value. 414 * @return NULL or newly created job (there is a single refernce for the creator) 318 415 */ 319 416 job_t *job_create(unit_t *u, unit_state_t target_state) … … 362 459 void job_run(job_t *job) 363 460 { 364 assert(job->state != JOB_RUNNING);365 assert(job->state != JOB_FINISHED); 366 461 assert(job->state == JOB_PENDING); 462 463 job->state = JOB_RUNNING; 367 464 unit_t *u = job->unit; 368 465 sysman_log(LVL_DEBUG, "%s(%p), %s -> %i", … … 377 474 switch (job->target_state) { 378 475 case STATE_STARTED: 379 rc = unit_start(u); 476 // TODO put here same evaluation as in job_check 477 // goal is to have job_run "idempotent" 478 if (u->state == job->target_state) { 479 rc = EOK; 480 } else { 481 rc = unit_start(u); 482 } 380 483 break; 381 484 default: … … 412 515 assert(job->unit->job == job); 413 516 414 sysman_log(LVL_DEBUG2, "%s(%p) %s ->%i",517 sysman_log(LVL_DEBUG2, "%s(%p) %s ret %i", 415 518 __func__, job, unit_name(job->unit), job->retval); 416 519 … … 418 521 419 522 /* First remove references, then clear the array */ 420 dyn_array_foreach(job->blocked_jobs, job_ptr_t, job_it) { 523 assert(job->blocked_jobs.size == job->blocked_jobs_count); 524 dyn_array_foreach(job->blocked_jobs, job_t *, job_it) { 421 525 job_unblock(*job_it, job); 422 526 } -
uspace/srv/sysman/job.h
rc2d50c8 r72c8f77 37 37 #include "unit.h" 38 38 39 // TODO simplify queue states40 39 /** Run state of job */ 41 40 typedef enum { 42 JOB_ UNQUEUED, /**< Job not in queue yet*/43 JOB_ QUEUED,44 JOB_ DEQUEUED, /**< Job not in queue already*/41 JOB_EMBRYO, /**< Job after creation */ 42 JOB_CLOSURED, /**< Intermmediate when closure is evaluated */ 43 JOB_PENDING, /**< Job is queued */ 45 44 JOB_RUNNING, 46 45 JOB_FINISHED … … 54 53 } job_retval_t; 55 54 55 struct job; 56 typedef struct job job_t; 57 56 58 struct job { 57 59 link_t job_queue; … … 63 65 /** Jobs that this job is preventing from running */ 64 66 dyn_array_t blocked_jobs; 67 /** No. of jobs that the job is actually blocking (may differ from size 68 * of blocked_jobs for not fully merged job */ 69 size_t blocked_jobs_count; 65 70 /** No. of jobs that must finish before this job */ 66 71 size_t blocking_jobs; 67 72 /** Any of blocking jobs failed */ 68 73 bool blocking_job_failed; 74 /** Job that this job was merged to */ 75 job_t *merged_into; 69 76 70 77 /** See job_state_t */ … … 74 81 }; 75 82 76 typedef struct job job_t;77 typedef job_t *job_ptr_t;78 79 83 extern void job_queue_init(void); 80 extern int job_queue_add_ jobs(dyn_array_t *);84 extern int job_queue_add_closure(dyn_array_t *); 81 85 extern void job_queue_process(void); 82 86 -
uspace/srv/sysman/sysman.c
rc2d50c8 r72c8f77 69 69 static fibril_condvar_t observed_objects_cv; 70 70 71 /* Hash table functions */ 71 /* 72 * Hash table functions 73 */ 72 74 static size_t observed_objects_ht_hash(const ht_link_t *item) 73 75 { … … 100 102 }; 101 103 104 /* 105 * Static functions 106 */ 107 108 static observed_object_t *observed_object_create(void *object) 109 { 110 observed_object_t *result = malloc(sizeof(observed_object_t)); 111 if (result) { 112 result->object = object; 113 list_initialize(&result->callbacks); 114 hash_table_insert(&observed_objects, &result->ht_link); 115 } 116 return result; 117 } 118 119 static void observed_object_destroy(observed_object_t **ptr_observed_object) 120 { 121 observed_object_t *observed_object = *ptr_observed_object; 122 if (observed_object == NULL) { 123 return; 124 } 125 126 ht_link_t *item = &observed_object->ht_link; 127 hash_table_remove_item(&observed_objects, item); 128 free(observed_object); 129 *ptr_observed_object = NULL; 130 } 131 102 132 static void notify_observers(void *object) 103 133 { … … 116 146 free(callback); 117 147 } 148 149 observed_object_destroy(&observed_object); 118 150 } 119 151 … … 174 206 callback_handler_t callback, void *callback_arg) 175 207 { 176 job_t *job; 177 178 if (unit->job != NULL) { 179 assert(unit->job->state != JOB_UNQUEUED); 180 181 if (unit->job->target_state != target_state) { 182 return EBUSY; 183 } 184 job = unit->job; 185 } else { 186 job = job_create(unit, target_state); 187 if (job == NULL) { 188 return ENOMEM; 189 } 190 /* Reference in unit is enough */ 191 job_del_ref(&job); 208 job_t *job = job_create(unit, target_state); 209 if (job == NULL) { 210 return ENOMEM; 192 211 } 193 212 … … 197 216 } 198 217 199 if (job->state == JOB_UNQUEUED) { 200 job_add_ref(job); 201 sysman_raise_event(&sysman_event_job_process, job); 202 } 218 /* Pass reference to event */ 219 sysman_raise_event(&sysman_event_job_process, job); 203 220 204 221 return EOK; … … 242 259 * 243 260 * TODO no one handles return value, it's quite fatal to lack memory for 244 * callbacks... @return EOK on success 261 * callbacks... 262 * 263 * @return EOK on success 245 264 * @return ENOMEM 246 265 */ … … 253 272 254 273 if (ht_link == NULL) { 255 observed_object = malloc(sizeof(observed_object_t));256 if ( observed_object == NULL) {274 new_observed_object = observed_object_create(object); 275 if (new_observed_object == NULL) { 257 276 rc = ENOMEM; 258 277 goto fail; 259 278 } 260 new_observed_object = observed_object; 261 262 observed_object->object = object; 263 list_initialize(&observed_object->callbacks); 264 hash_table_insert(&observed_objects, &observed_object->ht_link); 279 observed_object = new_observed_object; 265 280 } else { 266 281 observed_object = … … 284 299 } 285 300 301 int sysman_move_observers(void *src_object, void *dst_object) 302 { 303 ht_link_t *src_link = hash_table_find(&observed_objects, &src_object); 304 if (src_link == NULL) { 305 return EOK; 306 } 307 308 ht_link_t *dst_link = hash_table_find(&observed_objects, &dst_object); 309 observed_object_t *dst_observed_object; 310 if (dst_link == NULL) { 311 dst_observed_object = observed_object_create(dst_object); 312 if (dst_observed_object == NULL) { 313 return ENOMEM; 314 } 315 } else { 316 dst_observed_object = 317 hash_table_get_inst(dst_link, observed_object_t, ht_link); 318 } 319 320 observed_object_t *src_observed_object = 321 hash_table_get_inst(src_link, observed_object_t, ht_link); 322 323 list_concat(&dst_observed_object->callbacks, 324 &src_observed_object->callbacks); 325 observed_object_destroy(&src_observed_object); 326 327 return EOK; 328 } 329 330 286 331 /* 287 332 * Event handlers … … 293 338 job_t *job = data; 294 339 dyn_array_t job_closure; 295 dyn_array_initialize(&job_closure, job_ ptr_t);340 dyn_array_initialize(&job_closure, job_t *); 296 341 297 342 int rc = job_create_closure(job, &job_closure); … … 306 351 * otherwise, we still have the reference. 307 352 */ 308 rc = job_queue_add_ jobs(&job_closure);353 rc = job_queue_add_closure(&job_closure); 309 354 if (rc != EOK) { 310 355 goto fail; … … 321 366 job_del_ref(&job); 322 367 323 dyn_array_foreach(job_closure, job_ ptr_t, closure_job) {368 dyn_array_foreach(job_closure, job_t *, closure_job) { 324 369 job_del_ref(&(*closure_job)); 325 370 } -
uspace/srv/sysman/sysman.h
rc2d50c8 r72c8f77 44 44 extern void sysman_process_queue(void); 45 45 extern int sysman_object_observer(void *, callback_handler_t, void *); 46 extern int sysman_move_observers(void *, void *); 46 47 47 48 -
uspace/srv/sysman/test/job_queue.c
rc2d50c8 r72c8f77 42 42 static fid_t fibril_event_loop; 43 43 44 #if 0 45 static bool async_finished; 46 static fibril_condvar_t async_finished_cv; 47 static fibril_mutex_t async_finished_mtx; 48 #endif 49 50 static void async_finished_callback(void *object, void *arg) 44 static void job_finished_cb(void *object, void *arg) 51 45 { 52 46 job_t *job = object; … … 56 50 *job_ptr = job; 57 51 } 58 59 #if 060 static void reset_wait(void)61 {62 fibril_mutex_lock(&async_finished_mtx);63 async_finished = false;64 fibril_mutex_unlock(&async_finished_mtx);65 }66 67 static void async_wait()68 {69 fibril_mutex_lock(&async_finished_mtx);70 while (!async_finished) {71 fibril_condvar_wait(&async_finished_cv, &async_finished_mtx);72 }73 fibril_mutex_unlock(&async_finished_mtx);74 }75 #endif76 52 77 53 PCUT_TEST_SUITE(job_queue); … … 107 83 job_t *job = NULL; 108 84 109 int rc = sysman_run_job(u, STATE_STARTED, & async_finished_callback,85 int rc = sysman_run_job(u, STATE_STARTED, &job_finished_cb, 110 86 &job); 111 87 PCUT_ASSERT_INT_EQUALS(EOK, rc); … … 126 102 job_t *job = NULL; 127 103 128 int rc = sysman_run_job(u, STATE_STARTED, &async_finished_callback, 129 &job); 104 int rc = sysman_run_job(u, STATE_STARTED, &job_finished_cb, &job); 130 105 PCUT_ASSERT_INT_EQUALS(EOK, rc); 131 106 … … 142 117 } 143 118 119 PCUT_TEST(multipath_to_started_unit) { 120 /* Setup mock behavior */ 121 unit_type_vmts[UNIT_SERVICE]->start = &mock_unit_vmt_start_sync; 122 123 unit_type_vmts[UNIT_MOUNT]->start = &mock_unit_vmt_start_async; 124 unit_type_vmts[UNIT_MOUNT]->exposee_created = 125 &mock_unit_vmt_exposee_created; 126 127 /* Define mock units */ 128 unit_t *s0 = mock_units[UNIT_SERVICE][0]; 129 unit_t *s1 = mock_units[UNIT_SERVICE][1]; 130 unit_t *m0 = mock_units[UNIT_MOUNT][0]; 131 132 /* All services require root fs */ 133 mock_add_dependency(s0, m0); 134 mock_add_dependency(s1, m0); 135 136 /* S1 requires another mount and S0 */ 137 mock_add_dependency(s1, s0); 138 139 /* Enforce initial state */ 140 m0->state = STATE_STARTED; 141 142 /* Run test */ 143 job_t *job = NULL; 144 int rc = sysman_run_job(s1, STATE_STARTED, &job_finished_cb, &job); 145 PCUT_ASSERT_INT_EQUALS(EOK, rc); 146 147 sysman_process_queue(); 148 149 PCUT_ASSERT_NOT_NULL(job); 150 PCUT_ASSERT_EQUALS(STATE_STARTED, s0->state); 151 PCUT_ASSERT_EQUALS(STATE_STARTED, s1->state); 152 } 153 144 154 145 155 PCUT_EXPORT(job_queue);
Note:
See TracChangeset
for help on using the changeset viewer.