Changeset 72c8f77 in mainline for uspace/srv/sysman/job.c
- Timestamp:
- 2019-08-06T18:30:29Z (5 years ago)
- Children:
- 63a3276
- Parents:
- c2d50c8
- git-author:
- Michal Koutný <xm.koutny+hos@…> (2015-05-26 16:15:58)
- git-committer:
- Matthieu Riolo <matthieu.riolo@…> (2019-08-06 18:30:29)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
uspace/srv/sysman/job.c
rc2d50c8 r72c8f77 46 46 static int job_add_blocked_job(job_t *blocking_job, job_t *blocked_job) 47 47 { 48 int rc = dyn_array_append(&blocking_job->blocked_jobs, job_ptr_t, 48 assert(blocking_job->blocked_jobs.size == 49 blocking_job->blocked_jobs_count); 50 51 int rc = dyn_array_append(&blocking_job->blocked_jobs, job_t *, 49 52 blocked_job); 50 53 if (rc != EOK) { … … 53 56 job_add_ref(blocked_job); 54 57 58 blocking_job->blocked_jobs_count += 1; 55 59 blocked_job->blocking_jobs += 1; 56 60 … … 86 90 job->unit = u; 87 91 88 u->job = job; 89 job_add_ref(job); 90 91 dyn_array_initialize(&job->blocked_jobs, job_ptr_t); 92 dyn_array_initialize(&job->blocked_jobs, job_t *); 92 93 job->blocking_jobs = 0; 93 94 job->blocking_job_failed = false; 94 95 95 job->state = JOB_ UNQUEUED;96 job->state = JOB_EMBRYO; 96 97 job->retval = JOB_UNDEFINED_; 97 98 } … … 100 101 { 101 102 unit_t *u = job->unit; 103 102 104 if (u->state == job->target_state) { 103 105 job->retval = JOB_OK; … … 117 119 118 120 /* 119 * We have one reference from caller for our disposal, *121 * We have one reference from caller for our disposal, 120 122 * if needed, pass it to observer. 121 123 */ … … 138 140 assert(!link_used(&job->job_queue)); 139 141 140 dyn_array_foreach(job->blocked_jobs, job_ ptr_t, job_it) {142 dyn_array_foreach(job->blocked_jobs, job_t *, job_it) { 141 143 job_del_ref(&(*job_it)); 142 144 } … … 149 151 static bool job_is_runnable(job_t *job) 150 152 { 151 return job->state == JOB_QUEUED && job->blocking_jobs == 0; 153 assert(job->state == JOB_PENDING); 154 return job->blocking_jobs == 0; 152 155 } 153 156 … … 170 173 /* Remove job from queue and pass reference to caller */ 171 174 list_remove(&result->job_queue); 172 result->state = JOB_DEQUEUED;173 175 } 174 176 175 177 return result; 178 } 179 180 /** Merge two jobs together 181 * 182 * @param[in/out] trunk job that 183 * @param[in] other job that will be cleared out 184 * 185 * @return EOK on success 186 * @return error code on fail 187 */ 188 static int job_pre_merge(job_t *trunk, job_t *other) 189 { 190 assert(trunk->unit == other->unit); 191 assert(trunk->target_state == other->target_state); 192 assert(trunk->blocked_jobs.size == trunk->blocked_jobs_count); 193 assert(other->merged_into == NULL); 194 195 int rc = dyn_array_concat(&trunk->blocked_jobs, &other->blocked_jobs); 196 if (rc != EOK) { 197 return rc; 198 } 199 dyn_array_clear(&other->blocked_jobs); 200 201 // TODO allocate observed object 202 203 other->merged_into = trunk; 204 205 return EOK; 206 } 207 208 static void job_finish_merge(job_t *trunk, job_t *other) 209 { 210 assert(trunk->blocked_jobs.size >= trunk->blocked_jobs_count); 211 //TODO aggregate merged blocked_jobs 212 trunk->blocked_jobs_count = other->blocked_jobs.size; 213 214 /* All allocation is done in job_pre_merge, cannot fail here. */ 215 int rc = sysman_move_observers(other, trunk); 216 assert(rc == EOK); 217 } 218 219 static void job_undo_merge(job_t *trunk) 220 { 221 assert(trunk->blocked_jobs.size >= trunk->blocked_jobs_count); 222 dyn_array_clear_range(&trunk->blocked_jobs, 223 trunk->blocked_jobs_count, trunk->blocked_jobs.size); 176 224 } 177 225 … … 185 233 } 186 234 187 int job_queue_add_jobs(dyn_array_t *jobs) 188 { 189 /* Check consistency with queue. */ 190 dyn_array_foreach(*jobs, job_ptr_t, new_job_it) { 191 list_foreach(job_queue, job_queue, job_t, queued_job) { 192 /* 193 * Currently we have strict strategy not permitting 194 * multiple jobs for one unit in the queue at a time. 195 */ 196 if ((*new_job_it)->unit == queued_job->unit) { 235 int job_queue_add_closure(dyn_array_t *closure) 236 { 237 bool has_error = false; 238 int rc = EOK; 239 240 /* Check consistency with existing jobs. */ 241 dyn_array_foreach(*closure, job_t *, job_it) { 242 job_t *job = *job_it; 243 job_t *other_job = job->unit->job; 244 245 if (other_job == NULL) { 246 continue; 247 } 248 249 if (other_job->target_state != job->target_state) { 250 switch (other_job->state) { 251 case JOB_RUNNING: 197 252 sysman_log(LVL_ERROR, 198 "Cannot queue multiple jobs for unit '%s'", 199 unit_name((*new_job_it)->unit)); 200 return EEXIST; 253 "Unit '%s' has already different job running.", 254 unit_name(job->unit)); 255 has_error = true; 256 continue; 257 case JOB_PENDING: 258 /* 259 * Currently we have strict strategy not 260 * permitting multiple jobs for one unit in the 261 * queue at a time. 262 */ 263 sysman_log(LVL_ERROR, 264 "Cannot queue multiple jobs for unit '%s'.", 265 unit_name(job->unit)); 266 has_error = true; 267 continue; 268 default: 269 assert(false); 201 270 } 202 } 203 } 204 205 /* Enqueue jobs */ 206 dyn_array_foreach(*jobs, job_ptr_t, job_it) { 207 (*job_it)->state = JOB_QUEUED; 208 list_append(&(*job_it)->job_queue, &job_queue); 271 } else { 272 // TODO think about other options to merging 273 // (replacing, cancelling) 274 rc = job_pre_merge(other_job, job); 275 if (rc != EOK) { 276 break; 277 } 278 } 279 } 280 281 /* Aggregate merged jobs, or rollback any changes in existing jobs */ 282 bool finish_merge = (rc == EOK) && !has_error; 283 dyn_array_foreach(*closure, job_t *, job_it) { 284 if ((*job_it)->merged_into == NULL) { 285 continue; 286 } 287 if (finish_merge) { 288 job_finish_merge((*job_it)->merged_into, *job_it); 289 } else { 290 job_undo_merge((*job_it)->merged_into); 291 } 292 } 293 if (has_error) { 294 return EBUSY; 295 } else if (rc != EOK) { 296 return rc; 297 } 298 299 /* Unmerged jobs are enqueued, merged are disposed */ 300 dyn_array_foreach(*closure, job_t *, job_it) { 301 job_t *job = (*job_it); 302 if (job->merged_into != NULL) { 303 job_del_ref(&job); 304 continue; 305 } 306 307 308 unit_t *u = job->unit; 309 assert(u->bfs_job != NULL); 310 assert(u->job == NULL); 311 u->job = u->bfs_job; 312 u->bfs_job = NULL; 313 314 315 job->state = JOB_PENDING; 209 316 /* We pass reference from the closure to the queue */ 317 list_append(&job->job_queue, &job_queue); 210 318 } 211 319 … … 232 340 int job_create_closure(job_t *main_job, dyn_array_t *job_closure) 233 341 { 342 sysman_log(LVL_DEBUG2, "%s(%s)", __func__, unit_name(main_job->unit)); 234 343 int rc; 235 344 list_t units_fifo; 236 345 list_initialize(&units_fifo); 237 346 238 /* Zero BFS tags before use*/347 /* Check invariant */ 239 348 list_foreach(units, units, unit_t, u) { 240 u->bfs_tag = false;349 assert(u->bfs_job == false); 241 350 } 242 351 243 352 unit_t *unit = main_job->unit; 353 job_add_ref(main_job); 354 unit->bfs_job = main_job; 244 355 list_append(&unit->bfs_link, &units_fifo); 245 unit->bfs_tag = true;246 356 247 357 while (!list_empty(&units_fifo)) { 248 358 unit = list_get_instance(list_first(&units_fifo), unit_t, 249 359 bfs_link); 250 assert(unit->job);251 360 list_remove(&unit->bfs_link); 252 job_t *job = unit->job; 253 254 255 // TODO more sophisticated check? (unit that is in transitional 256 // state cannot have currently multiple jobs queued) 257 if (job->target_state == unit->state) { 258 /* 259 * Job would do nothing, finish it on spot. 260 * No need to continue BFS search from it. 261 */ 262 job->retval = JOB_OK; 263 job_finish(job); 264 continue; 265 } 361 job_t *job = unit->bfs_job; 362 assert(job != NULL); 266 363 267 364 job_add_ref(job); 268 dyn_array_append(job_closure, job_ ptr_t, job);365 dyn_array_append(job_closure, job_t *, job); 269 366 270 367 /* 271 368 * Traverse dependencies edges 272 * Depending ondependency type and edge direction create273 * appropriate jobs .369 * According to dependency type and edge direction create 370 * appropriate jobs (currently "After" only). 274 371 */ 275 372 list_foreach(unit->dependencies, dependencies, unit_dependency_t, dep) { 276 373 unit_t *u = dep->dependency; 277 374 job_t *blocking_job; 278 if (u->bfs_tag) { 279 assert(u->job); 280 blocking_job = u->job; 281 } else { 282 u->bfs_tag = true; 375 376 if (u->bfs_job == NULL) { 283 377 blocking_job = job_create(u, job->target_state); 284 378 if (blocking_job == NULL) { … … 286 380 goto finish; 287 381 } 288 /* Reference to job is kept inunit */289 job_del_ref(&blocking_job);382 /* Pass reference to unit */ 383 u->bfs_job = blocking_job; 290 384 list_append(&u->bfs_link, &units_fifo); 385 } else { 386 blocking_job = u->bfs_job; 291 387 } 292 388 293 389 job_add_blocked_job(blocking_job, job); 294 390 } 295 391 } 296 sysman_log(LVL_DEBUG2, "%s(%s):", __func__, unit_name(main_job->unit));297 dyn_array_foreach(*job_closure, job_ptr_t, job_it) {298 sysman_log(LVL_DEBUG2, "%s\t%s", __func__, unit_name((*job_it)->unit));299 }392 // sysman_log(LVL_DEBUG2, "%s(%s):", __func__, unit_name(main_job->unit)); 393 // dyn_array_foreach(*job_closure, job_t *, job_it) { 394 // sysman_log(LVL_DEBUG2, "%s\t%s", __func__, unit_name((*job_it)->unit)); 395 // } 300 396 rc = EOK; 301 397 302 398 finish: 303 /* Any unprocessed jobs may be referenced by units */ 304 list_foreach(units_fifo, bfs_link, unit_t, u) { 305 job_del_ref(&u->job); 306 } 399 /* Unreference any jobs in interrupted BFS queue */ 400 list_foreach_safe(units_fifo, cur_link, next_link) { 401 unit_t *u = list_get_instance(cur_link, unit_t, bfs_link); 402 job_del_ref(&u->bfs_job); 403 list_remove(cur_link); 404 } 405 307 406 return rc; 308 407 } … … 310 409 /** Create job assigned to the unit 311 410 * 312 * @param[in] unit unit to be modified, its job must be empty411 * @param[in] unit 313 412 * @param[in] target_state 314 413 * 315 * @return NULL or newly created job 316 * There are two references to the job, one set in the unit and second 317 * is the return value. 414 * @return NULL or newly created job (there is a single refernce for the creator) 318 415 */ 319 416 job_t *job_create(unit_t *u, unit_state_t target_state) … … 362 459 void job_run(job_t *job) 363 460 { 364 assert(job->state != JOB_RUNNING);365 assert(job->state != JOB_FINISHED); 366 461 assert(job->state == JOB_PENDING); 462 463 job->state = JOB_RUNNING; 367 464 unit_t *u = job->unit; 368 465 sysman_log(LVL_DEBUG, "%s(%p), %s -> %i", … … 377 474 switch (job->target_state) { 378 475 case STATE_STARTED: 379 rc = unit_start(u); 476 // TODO put here same evaluation as in job_check 477 // goal is to have job_run "idempotent" 478 if (u->state == job->target_state) { 479 rc = EOK; 480 } else { 481 rc = unit_start(u); 482 } 380 483 break; 381 484 default: … … 412 515 assert(job->unit->job == job); 413 516 414 sysman_log(LVL_DEBUG2, "%s(%p) %s ->%i",517 sysman_log(LVL_DEBUG2, "%s(%p) %s ret %i", 415 518 __func__, job, unit_name(job->unit), job->retval); 416 519 … … 418 521 419 522 /* First remove references, then clear the array */ 420 dyn_array_foreach(job->blocked_jobs, job_ptr_t, job_it) { 523 assert(job->blocked_jobs.size == job->blocked_jobs_count); 524 dyn_array_foreach(job->blocked_jobs, job_t *, job_it) { 421 525 job_unblock(*job_it, job); 422 526 }
Note:
See TracChangeset
for help on using the changeset viewer.