Changeset 72c8f77 in mainline for uspace/srv/sysman/job.c


Ignore:
Timestamp:
2019-08-06T18:30:29Z (5 years ago)
Author:
Matthieu Riolo <matthieu.riolo@…>
Children:
63a3276
Parents:
c2d50c8
git-author:
Michal Koutný <xm.koutny+hos@…> (2015-05-26 16:15:58)
git-committer:
Matthieu Riolo <matthieu.riolo@…> (2019-08-06 18:30:29)
Message:

sysman: Separate job closure creation and its enqueuing

  • added merge job operation
  • created test that failed with previous implementation of job closure
File:
1 edited

Legend:

Unmodified
Added
Removed
  • uspace/srv/sysman/job.c

    rc2d50c8 r72c8f77  
    4646static int job_add_blocked_job(job_t *blocking_job, job_t *blocked_job)
    4747{
    48         int rc = dyn_array_append(&blocking_job->blocked_jobs, job_ptr_t,
     48        assert(blocking_job->blocked_jobs.size ==
     49            blocking_job->blocked_jobs_count);
     50
     51        int rc = dyn_array_append(&blocking_job->blocked_jobs, job_t *,
    4952            blocked_job);
    5053        if (rc != EOK) {
     
    5356        job_add_ref(blocked_job);
    5457
     58        blocking_job->blocked_jobs_count += 1;
    5559        blocked_job->blocking_jobs += 1;
    5660
     
    8690        job->unit = u;
    8791
    88         u->job = job;
    89         job_add_ref(job);
    90 
    91         dyn_array_initialize(&job->blocked_jobs, job_ptr_t);
     92        dyn_array_initialize(&job->blocked_jobs, job_t *);
    9293        job->blocking_jobs = 0;
    9394        job->blocking_job_failed = false;
    9495
    95         job->state = JOB_UNQUEUED;
     96        job->state = JOB_EMBRYO;
    9697        job->retval = JOB_UNDEFINED_;
    9798}
     
    100101{
    101102        unit_t *u = job->unit;
     103
    102104        if (u->state == job->target_state) {
    103105                job->retval = JOB_OK;
     
    117119
    118120        /*
    119          * We have one reference from caller for our disposal,   *
     121         * We have one reference from caller for our disposal,
    120122         * if needed, pass it to observer.
    121123         */
     
    138140        assert(!link_used(&job->job_queue));
    139141
    140         dyn_array_foreach(job->blocked_jobs, job_ptr_t, job_it) {
     142        dyn_array_foreach(job->blocked_jobs, job_t *, job_it) {
    141143                job_del_ref(&(*job_it));
    142144        }
     
    149151static bool job_is_runnable(job_t *job)
    150152{
    151         return job->state == JOB_QUEUED && job->blocking_jobs == 0;
     153        assert(job->state == JOB_PENDING);
     154        return job->blocking_jobs == 0;
    152155}
    153156
     
    170173                /* Remove job from queue and pass reference to caller */
    171174                list_remove(&result->job_queue);
    172                 result->state = JOB_DEQUEUED;
    173175        }
    174176
    175177        return result;
     178}
     179
     180/** Merge two jobs together
     181 *
     182 * @param[in/out]  trunk  job that
     183 * @param[in]      other  job that will be cleared out
     184 *
     185 * @return EOK on success
     186 * @return error code on fail
     187 */
     188static int job_pre_merge(job_t *trunk, job_t *other)
     189{
     190        assert(trunk->unit == other->unit);
     191        assert(trunk->target_state == other->target_state);
     192        assert(trunk->blocked_jobs.size == trunk->blocked_jobs_count);
     193        assert(other->merged_into == NULL);
     194
     195        int rc = dyn_array_concat(&trunk->blocked_jobs, &other->blocked_jobs);
     196        if (rc != EOK) {
     197                return rc;
     198        }
     199        dyn_array_clear(&other->blocked_jobs);
     200
     201        // TODO allocate observed object
     202
     203        other->merged_into = trunk;
     204
     205        return EOK;
     206}
     207
     208static void job_finish_merge(job_t *trunk, job_t *other)
     209{
     210        assert(trunk->blocked_jobs.size >= trunk->blocked_jobs_count);
     211        //TODO aggregate merged blocked_jobs
     212        trunk->blocked_jobs_count = other->blocked_jobs.size;
     213
     214        /* All allocation is done in job_pre_merge, cannot fail here. */
     215        int rc = sysman_move_observers(other, trunk);
     216        assert(rc == EOK);
     217}
     218
     219static void job_undo_merge(job_t *trunk)
     220{
     221        assert(trunk->blocked_jobs.size >= trunk->blocked_jobs_count);
     222        dyn_array_clear_range(&trunk->blocked_jobs,
     223            trunk->blocked_jobs_count, trunk->blocked_jobs.size);
    176224}
    177225
     
    185233}
    186234
    187 int job_queue_add_jobs(dyn_array_t *jobs)
    188 {
    189         /* Check consistency with queue. */
    190         dyn_array_foreach(*jobs, job_ptr_t, new_job_it) {
    191                 list_foreach(job_queue, job_queue, job_t, queued_job) {
    192                         /*
    193                          * Currently we have strict strategy not permitting
    194                          * multiple jobs for one unit in the queue at a time.
    195                          */
    196                         if ((*new_job_it)->unit == queued_job->unit) {
     235int job_queue_add_closure(dyn_array_t *closure)
     236{
     237        bool has_error = false;
     238        int rc = EOK;
     239
     240        /* Check consistency with existing jobs. */
     241        dyn_array_foreach(*closure, job_t *, job_it) {
     242                job_t *job = *job_it;
     243                job_t *other_job = job->unit->job;
     244
     245                if (other_job == NULL) {
     246                        continue;
     247                }
     248
     249                if (other_job->target_state != job->target_state) {
     250                        switch (other_job->state) {
     251                        case JOB_RUNNING:
    197252                                sysman_log(LVL_ERROR,
    198                                     "Cannot queue multiple jobs for unit '%s'",
    199                                     unit_name((*new_job_it)->unit));
    200                                 return EEXIST;
     253                                    "Unit '%s' has already different job running.",
     254                                    unit_name(job->unit));
     255                                has_error = true;
     256                                continue;
     257                        case JOB_PENDING:
     258                                /*
     259                                 * Currently we have strict strategy not
     260                                 * permitting multiple jobs for one unit in the
     261                                 * queue at a time.
     262                                 */
     263                                sysman_log(LVL_ERROR,
     264                                    "Cannot queue multiple jobs for unit '%s'.",
     265                                    unit_name(job->unit));
     266                                has_error = true;
     267                                continue;
     268                        default:
     269                                assert(false);
    201270                        }
    202                 }
    203         }
    204 
    205         /* Enqueue jobs */
    206         dyn_array_foreach(*jobs, job_ptr_t, job_it) {
    207                 (*job_it)->state = JOB_QUEUED;
    208                 list_append(&(*job_it)->job_queue, &job_queue);
     271                } else {
     272                        // TODO think about other options to merging
     273                        //      (replacing, cancelling)
     274                        rc = job_pre_merge(other_job, job);
     275                        if (rc != EOK) {
     276                                break;
     277                        }
     278                }
     279        }
     280
     281        /* Aggregate merged jobs, or rollback any changes in existing jobs */
     282        bool finish_merge = (rc == EOK) && !has_error;
     283        dyn_array_foreach(*closure, job_t *, job_it) {
     284                if ((*job_it)->merged_into == NULL) {
     285                        continue;
     286                }
     287                if (finish_merge) {
     288                        job_finish_merge((*job_it)->merged_into, *job_it);
     289                } else {
     290                        job_undo_merge((*job_it)->merged_into);
     291                }
     292        }
     293        if (has_error) {
     294                return EBUSY;
     295        } else if (rc != EOK) {
     296                return rc;
     297        }
     298
     299        /* Unmerged jobs are enqueued, merged are disposed */
     300        dyn_array_foreach(*closure, job_t *, job_it) {
     301                job_t *job = (*job_it);
     302                if (job->merged_into != NULL) {
     303                        job_del_ref(&job);
     304                        continue;
     305                }
     306
     307
     308                unit_t *u = job->unit;
     309                assert(u->bfs_job != NULL);
     310                assert(u->job == NULL);
     311                u->job = u->bfs_job;
     312                u->bfs_job = NULL;
     313
     314
     315                job->state = JOB_PENDING;
    209316                /* We pass reference from the closure to the queue */
     317                list_append(&job->job_queue, &job_queue);
    210318        }
    211319
     
    232340int job_create_closure(job_t *main_job, dyn_array_t *job_closure)
    233341{
     342        sysman_log(LVL_DEBUG2, "%s(%s)", __func__, unit_name(main_job->unit));
    234343        int rc;
    235344        list_t units_fifo;
    236345        list_initialize(&units_fifo);
    237346
    238         /* Zero BFS tags before use */
     347        /* Check invariant */
    239348        list_foreach(units, units, unit_t, u) {
    240                 u->bfs_tag = false;
     349                assert(u->bfs_job == false);
    241350        }
    242351               
    243352        unit_t *unit = main_job->unit;
     353        job_add_ref(main_job);
     354        unit->bfs_job = main_job;
    244355        list_append(&unit->bfs_link, &units_fifo);
    245         unit->bfs_tag = true;
    246356       
    247357        while (!list_empty(&units_fifo)) {
    248358                unit = list_get_instance(list_first(&units_fifo), unit_t,
    249359                    bfs_link);
    250                 assert(unit->job);
    251360                list_remove(&unit->bfs_link);
    252                 job_t *job = unit->job;
    253 
    254 
    255                 // TODO more sophisticated check? (unit that is in transitional
    256                 //      state cannot have currently multiple jobs queued)
    257                 if (job->target_state == unit->state) {
    258                         /*
    259                          * Job would do nothing, finish it on spot.
    260                          * No need to continue BFS search from it.
    261                          */
    262                         job->retval = JOB_OK;
    263                         job_finish(job);
    264                         continue;
    265                 }
     361                job_t *job = unit->bfs_job;
     362                assert(job != NULL);
    266363
    267364                job_add_ref(job);
    268                 dyn_array_append(job_closure, job_ptr_t, job);
     365                dyn_array_append(job_closure, job_t *, job);
    269366
    270367                /*
    271368                 * Traverse dependencies edges
    272                  * Depending on dependency type and edge direction create
    273                  * appropriate jobs.
     369                 * According to dependency type and edge direction create
     370                 * appropriate jobs (currently "After" only).
    274371                 */
    275372                list_foreach(unit->dependencies, dependencies, unit_dependency_t, dep) {
    276373                        unit_t *u = dep->dependency;
    277374                        job_t *blocking_job;
    278                         if (u->bfs_tag) {
    279                                 assert(u->job);
    280                                 blocking_job = u->job;
    281                         } else {
    282                                 u->bfs_tag = true;
     375
     376                        if (u->bfs_job == NULL) {
    283377                                blocking_job = job_create(u, job->target_state);
    284378                                if (blocking_job == NULL) {
     
    286380                                        goto finish;
    287381                                }
    288                                 /* Reference to job is kept in unit */
    289                                 job_del_ref(&blocking_job);
     382                                /* Pass reference to unit */
     383                                u->bfs_job = blocking_job;
    290384                                list_append(&u->bfs_link, &units_fifo);
     385                        } else {
     386                                blocking_job = u->bfs_job;
    291387                        }
    292                        
     388
    293389                        job_add_blocked_job(blocking_job, job);
    294390                }
    295391        }
    296         sysman_log(LVL_DEBUG2, "%s(%s):", __func__, unit_name(main_job->unit));
    297         dyn_array_foreach(*job_closure, job_ptr_t, job_it) {
    298                 sysman_log(LVL_DEBUG2, "%s\t%s", __func__, unit_name((*job_it)->unit));
    299         }
     392//      sysman_log(LVL_DEBUG2, "%s(%s):", __func__, unit_name(main_job->unit));
     393//      dyn_array_foreach(*job_closure, job_t *, job_it) {
     394//              sysman_log(LVL_DEBUG2, "%s\t%s", __func__, unit_name((*job_it)->unit));
     395//      }
    300396        rc = EOK;
    301397
    302398finish:
    303         /* Any unprocessed jobs may be referenced by units */
    304         list_foreach(units_fifo, bfs_link, unit_t, u) {
    305                 job_del_ref(&u->job);
    306         }
     399        /* Unreference any jobs in interrupted BFS queue */
     400        list_foreach_safe(units_fifo, cur_link, next_link) {
     401                unit_t *u = list_get_instance(cur_link, unit_t, bfs_link);
     402                job_del_ref(&u->bfs_job);
     403                list_remove(cur_link);
     404        }
     405
    307406        return rc;
    308407}
     
    310409/** Create job assigned to the unit
    311410 *
    312  * @param[in]  unit          unit to be modified, its job must be empty
     411 * @param[in]  unit
    313412 * @param[in]  target_state
    314413 *
    315  * @return NULL or newly created job
    316  *         There are two references to the job, one set in the unit and second
    317  *         is the return value.
     414 * @return NULL or newly created job (there is a single refernce for the creator)
    318415 */
    319416job_t *job_create(unit_t *u, unit_state_t target_state)
     
    362459void job_run(job_t *job)
    363460{
    364         assert(job->state != JOB_RUNNING);
    365         assert(job->state != JOB_FINISHED);
    366 
     461        assert(job->state == JOB_PENDING);
     462
     463        job->state = JOB_RUNNING;
    367464        unit_t *u = job->unit;
    368465        sysman_log(LVL_DEBUG, "%s(%p), %s -> %i",
     
    377474        switch (job->target_state) {
    378475        case STATE_STARTED:
    379                 rc = unit_start(u);
     476                // TODO put here same evaluation as in job_check
     477                //      goal is to have job_run "idempotent"
     478                if (u->state == job->target_state) {
     479                        rc = EOK;
     480                } else {
     481                        rc = unit_start(u);
     482                }
    380483                break;
    381484        default:
     
    412515        assert(job->unit->job == job);
    413516
    414         sysman_log(LVL_DEBUG2, "%s(%p) %s -> %i",
     517        sysman_log(LVL_DEBUG2, "%s(%p) %s ret %i",
    415518            __func__, job, unit_name(job->unit), job->retval);
    416519
     
    418521
    419522        /* First remove references, then clear the array */
    420         dyn_array_foreach(job->blocked_jobs, job_ptr_t, job_it) {
     523        assert(job->blocked_jobs.size == job->blocked_jobs_count);
     524        dyn_array_foreach(job->blocked_jobs, job_t *, job_it) {
    421525                job_unblock(*job_it, job);
    422526        }
Note: See TracChangeset for help on using the changeset viewer.