Changeset 25a9fec in mainline
- Timestamp:
- 2019-08-07T09:54:10Z (6 years ago)
- Children:
- 92a7cfb1
- Parents:
- 18377301
- git-author:
- Michal Koutný <xm.koutny+hos@…> (2015-11-03 21:48:45)
- git-committer:
- Matthieu Riolo <matthieu.riolo@…> (2019-08-07 09:54:10)
- Location:
- uspace/srv/sysman
- Files:
-
- 4 added
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
uspace/srv/sysman/Makefile
r18377301 r25a9fec 42 42 edge.c \ 43 43 job.c \ 44 job_closure.c \ 45 job_queue.c \ 44 46 log.c \ 45 47 repo.c \ -
uspace/srv/sysman/job.c
r18377301 r25a9fec 38 38 #include "sysman.h" 39 39 40 static list_t job_queue;41 40 42 41 /* … … 44 43 */ 45 44 46 static int job_add_blocked_job(job_t *blocking_job, job_t *blocked_job)47 {48 assert(blocking_job->blocked_jobs.size ==49 blocking_job->blocked_jobs_count);50 51 int rc = dyn_array_append(&blocking_job->blocked_jobs, job_t *,52 blocked_job);53 if (rc != EOK) {54 return ENOMEM;55 }56 job_add_ref(blocked_job);57 58 blocking_job->blocked_jobs_count += 1;59 blocked_job->blocking_jobs += 1;60 61 return EOK;62 }63 45 64 46 /** Remove blocking_job from blocked job structure … … 148 130 } 149 131 150 static bool job_is_runnable(job_t *job)151 {152 assert(job->state == JOB_PENDING);153 return job->blocking_jobs == 0;154 }155 156 /** Pop next runnable job157 *158 * @return runnable job or NULL when there's none159 */160 static job_t *job_queue_pop_runnable(void)161 {162 job_t *result = NULL;163 164 /* Select first runnable job */165 list_foreach(job_queue, job_queue, job_t, candidate) {166 if (job_is_runnable(candidate)) {167 result = candidate;168 break;169 }170 }171 if (result) {172 /* Remove job from queue and pass reference to caller */173 list_remove(&result->job_queue);174 }175 176 return result;177 }178 179 /** Add multiple references to job180 *181 * Non-atomicity doesn't mind as long as individual increments are atomic.182 *183 * @note Function is not exported as other modules shouldn't need it.184 */185 static inline void job_add_refs(job_t *job, size_t refs)186 {187 for (size_t i = 0; i < refs; ++i) {188 job_add_ref(job);189 }190 }191 192 /** Delete multiple references to job193 *194 * Behavior of concurrent runs with job_add_refs aren't specified.195 */196 static inline void job_del_refs(job_t **job_ptr, size_t refs)197 {198 for (size_t i = 0; i < refs; ++i) {199 job_del_ref(job_ptr);200 }201 }202 203 /** Merge two jobs together204 *205 * @param[in/out] trunk job that206 * @param[in] other job that will be cleared out207 *208 * @return EOK on success209 * @return error code on fail210 */211 static int job_pre_merge(job_t *trunk, job_t *other)212 {213 assert(trunk->unit == other->unit);214 assert(trunk->target_state == other->target_state);215 assert(trunk->blocked_jobs.size == trunk->blocked_jobs_count);216 assert(other->merged_into == NULL);217 218 int rc = dyn_array_concat(&trunk->blocked_jobs, &other->blocked_jobs);219 if (rc != EOK) {220 return rc;221 }222 dyn_array_clear(&other->blocked_jobs);223 224 // TODO allocate observed object225 226 other->merged_into = trunk;227 228 return EOK;229 }230 231 static void job_finish_merge(job_t *trunk, job_t *other)232 {233 assert(trunk->blocked_jobs.size >= trunk->blocked_jobs_count);234 //TODO aggregate merged blocked_jobs235 trunk->blocked_jobs_count = other->blocked_jobs.size;236 237 /*238 * Note, the sysman_move_observers cannot fail here sice all necessary239 * allocation is done in job_pre_merge.240 */241 size_t observers_refs = sysman_observers_count(other);242 int rc = sysman_move_observers(other, trunk);243 assert(rc == EOK);244 245 /* When we move observers, don't forget to pass their references too. */246 job_add_refs(trunk, observers_refs);247 job_del_refs(&other, observers_refs);248 }249 250 static void job_undo_merge(job_t *trunk)251 {252 assert(trunk->blocked_jobs.size >= trunk->blocked_jobs_count);253 dyn_array_clear_range(&trunk->blocked_jobs,254 trunk->blocked_jobs_count, trunk->blocked_jobs.size);255 }256 257 132 /* 258 133 * Non-static functions 259 134 */ 260 261 void job_queue_init()262 {263 list_initialize(&job_queue);264 }265 266 /** Consistenly add jobs to the queue267 *268 * @param[in/out] closure jobs closure, on success it's emptied, otherwise269 * you should take care of remaining jobs270 *271 * @return EOK on success272 * @return EBUSY when any job in closure is conflicting273 */274 int job_queue_add_closure(dyn_array_t *closure)275 {276 bool has_error = false;277 int rc = EOK;278 279 /* Check consistency with existing jobs. */280 dyn_array_foreach(*closure, job_t *, job_it) {281 job_t *job = *job_it;282 job_t *other_job = job->unit->job;283 284 if (other_job == NULL) {285 continue;286 }287 288 if (other_job->target_state != job->target_state) {289 switch (other_job->state) {290 case JOB_RUNNING:291 sysman_log(LVL_ERROR,292 "Unit '%s' has already different job running.",293 unit_name(job->unit));294 has_error = true;295 continue;296 case JOB_PENDING:297 /*298 * Currently we have strict strategy not299 * permitting multiple jobs for one unit in the300 * queue at a time.301 */302 sysman_log(LVL_ERROR,303 "Cannot queue multiple jobs for unit '%s'.",304 unit_name(job->unit));305 has_error = true;306 continue;307 default:308 assert(false);309 }310 } else {311 // TODO think about other options to merging312 // (replacing, cancelling)313 rc = job_pre_merge(other_job, job);314 if (rc != EOK) {315 break;316 }317 }318 }319 320 /* Aggregate merged jobs, or rollback any changes in existing jobs */321 bool finish_merge = (rc == EOK) && !has_error;322 dyn_array_foreach(*closure, job_t *, job_it) {323 if ((*job_it)->merged_into == NULL) {324 continue;325 }326 if (finish_merge) {327 job_finish_merge((*job_it)->merged_into, *job_it);328 } else {329 job_undo_merge((*job_it)->merged_into);330 }331 }332 if (has_error) {333 return EBUSY;334 } else if (rc != EOK) {335 return rc;336 }337 338 /* Unmerged jobs are enqueued, merged are disposed */339 dyn_array_foreach(*closure, job_t *, job_it) {340 job_t *job = (*job_it);341 if (job->merged_into != NULL) {342 job_del_ref(&job);343 continue;344 }345 346 347 unit_t *u = job->unit;348 assert(u->job == NULL);349 /* Pass reference from the closure to the unit */350 u->job = job;351 352 /* Enqueue job (new reference) */353 job->state = JOB_PENDING;354 job_add_ref(job);355 list_append(&job->job_queue, &job_queue);356 }357 358 /* We've stolen references from the closure, so erase it */359 dyn_array_clear(closure);360 361 return EOK;362 }363 364 /** Process all jobs that aren't transitively blocked365 *366 * Job can be blocked either by another job or by an incoming event, that will367 * be queued after this job_queue_process call.368 *369 * TODO Write down rules from where this function can be called, to avoid stack370 * overflow.371 */372 void job_queue_process(void)373 {374 job_t *job;375 while ((job = job_queue_pop_runnable())) {376 job_run(job);377 job_del_ref(&job);378 }379 }380 381 int job_create_closure(job_t *main_job, dyn_array_t *job_closure)382 {383 sysman_log(LVL_DEBUG2, "%s(%s)", __func__, unit_name(main_job->unit));384 int rc;385 list_t units_fifo;386 list_initialize(&units_fifo);387 388 /* Check invariant */389 list_foreach(units, units, unit_t, u) {390 assert(u->bfs_job == NULL);391 }392 393 unit_t *unit = main_job->unit;394 job_add_ref(main_job);395 unit->bfs_job = main_job;396 list_append(&unit->bfs_link, &units_fifo);397 398 while (!list_empty(&units_fifo)) {399 unit = list_get_instance(list_first(&units_fifo), unit_t,400 bfs_link);401 list_remove(&unit->bfs_link);402 job_t *job = unit->bfs_job;403 assert(job != NULL);404 405 job_add_ref(job);406 dyn_array_append(job_closure, job_t *, job);407 408 /*409 * Traverse dependencies edges410 * According to dependency type and edge direction create411 * appropriate jobs (currently "After" only).412 */413 list_foreach(unit->edges_out, edges_out, unit_edge_t, e) {414 unit_t *u = e->output;415 job_t *blocking_job;416 417 if (u->bfs_job == NULL) {418 blocking_job = job_create(u, job->target_state);419 if (blocking_job == NULL) {420 rc = ENOMEM;421 goto finish;422 }423 /* Pass reference to unit */424 u->bfs_job = blocking_job;425 list_append(&u->bfs_link, &units_fifo);426 } else {427 blocking_job = u->bfs_job;428 }429 430 job_add_blocked_job(blocking_job, job);431 }432 }433 sysman_log(LVL_DEBUG2, "%s(%s):", __func__, unit_name(main_job->unit));434 dyn_array_foreach(*job_closure, job_t *, job_it) {435 sysman_log(LVL_DEBUG2, "%s\t%s, refs: %u", __func__,436 unit_name((*job_it)->unit), atomic_get(&(*job_it)->refcnt));437 }438 rc = EOK;439 440 finish:441 /* Unreference any jobs in interrupted BFS queue */442 list_foreach_safe(units_fifo, cur_link, next_link) {443 unit_t *u = list_get_instance(cur_link, unit_t, bfs_link);444 job_del_ref(&u->bfs_job);445 list_remove(cur_link);446 }447 448 /* Clean after ourselves (BFS tag jobs) */449 dyn_array_foreach(*job_closure, job_t *, job_it) {450 assert(*job_it == (*job_it)->unit->bfs_job);451 job_del_ref(&(*job_it)->unit->bfs_job);452 (*job_it)->unit->bfs_job = NULL;453 }454 455 return rc;456 }457 135 458 136 /** Create job assigned to the unit -
uspace/srv/sysman/job.h
r18377301 r25a9fec 81 81 }; 82 82 83 extern void job_queue_init(void);84 extern int job_queue_add_closure(dyn_array_t *);85 extern void job_queue_process(void);86 87 extern int job_create_closure(job_t *, dyn_array_t *);88 83 extern job_t *job_create(unit_t *, unit_state_t); 89 84 -
uspace/srv/sysman/main.c
r18377301 r25a9fec 41 41 #include "connection_ctl.h" 42 42 #include "edge.h" 43 #include "job .h"43 #include "job_queue.h" 44 44 #include "log.h" 45 45 #include "sysman.h" -
uspace/srv/sysman/sysman.c
r18377301 r25a9fec 33 33 #include <stdlib.h> 34 34 35 #include "job_closure.h" 36 #include "job_queue.h" 35 37 #include "log.h" 36 38 #include "sysman.h" -
uspace/srv/sysman/test/job_closure.c
r18377301 r25a9fec 31 31 #include <stdio.h> 32 32 33 #include "../job .h"33 #include "../job_closure.h" 34 34 35 35 #include "mock_unit.h" -
uspace/srv/sysman/test/job_queue.c
r18377301 r25a9fec 33 33 34 34 #include "mock_unit.h" 35 #include "../job .h"35 #include "../job_queue.h" 36 36 #include "../sysman.h" 37 37
Note:
See TracChangeset
for help on using the changeset viewer.