diff --git a/src/jobservice/opm/job_stats_mgr.go b/src/jobservice/opm/job_stats_mgr.go index f78e6b0662..f77da38e7b 100644 --- a/src/jobservice/opm/job_stats_mgr.go +++ b/src/jobservice/opm/job_stats_mgr.go @@ -16,6 +16,9 @@ package opm import "github.com/goharbor/harbor/src/jobservice/models" +// Range for list scope defining +type Range int + // JobStatsManager defines the methods to handle stats of job. type JobStatsManager interface { // Start to serve @@ -55,10 +58,11 @@ type JobStatsManager interface { // // jobID string : ID of the being retried job // command string : the command applied to the job like stop/cancel + // isCached bool : to indicate if only cache the op command // // Returns: // error if it was not successfully sent - SendCommand(jobID string, command string) error + SendCommand(jobID string, command string, isCached bool) error // CtlCommand checks if control command is fired for the specified job. // @@ -122,9 +126,12 @@ type JobStatsManager interface { // Get all the executions (IDs) fro the specified upstream Job. // // upstreamJobID string: ID of the upstream job - // + // ranges ...Range: Define the start and end for the list, e.g: + // 0, 10 means [0:10] + // 10 means [10:] + // empty means [0:-1]==all // Returns: // the ID list of the executions if no error occurred // or a non-nil error is returned - GetExecutions(upstreamJobID string) ([]string, error) + GetExecutions(upstreamJobID string, ranges ...Range) ([]string, error) } diff --git a/src/jobservice/opm/redis_job_stats_mgr.go b/src/jobservice/opm/redis_job_stats_mgr.go index 6833175892..87c8ca2282 100644 --- a/src/jobservice/opm/redis_job_stats_mgr.go +++ b/src/jobservice/opm/redis_job_stats_mgr.go @@ -44,7 +44,7 @@ const ( opPersistExecutions = "persist_executions" opUpdateStats = "update_job_stats" maxFails = 3 - jobStatsDataExpireTime = 60 * 60 * 24 * 7 // one week + jobStatsDataExpireTime = 60 * 60 * 24 * 5 // 5 days // CtlCommandStop : command stop CtlCommandStop = "stop" @@ -249,7 +249,7 @@ func (rjs *RedisJobStatsManager) loop() { } // SendCommand for the specified job -func (rjs *RedisJobStatsManager) SendCommand(jobID string, command string) error { +func (rjs *RedisJobStatsManager) SendCommand(jobID string, command string, isCached bool) error { if utils.IsEmptyStr(jobID) { return errors.New("empty job ID") } @@ -258,8 +258,11 @@ func (rjs *RedisJobStatsManager) SendCommand(jobID string, command string) error return errors.New("unknown command") } - if err := rjs.opCommands.Fire(jobID, command); err != nil { - return err + if !isCached { + // Let other interested parties awareness + if err := rjs.opCommands.Fire(jobID, command); err != nil { + return err + } } // Directly add to op commands maintaining list @@ -341,7 +344,16 @@ func (rjs *RedisJobStatsManager) GetHook(jobID string) (string, error) { return hookURL, nil } - return rjs.getHook(jobID) + // Not hit in cache! Get it from the backend. + hookURL, err := rjs.getHook(jobID) + if err != nil { + return "", err + } + + // Cache and return + rjs.hookStore.Add(jobID, hookURL) + + return hookURL, nil } // ExpirePeriodicJobStats marks the periodic job stats expired @@ -379,7 +391,7 @@ func (rjs *RedisJobStatsManager) AttachExecution(upstreamJobID string, execution } // GetExecutions returns the existing executions (IDs) for the specified job. -func (rjs *RedisJobStatsManager) GetExecutions(upstreamJobID string) ([]string, error) { +func (rjs *RedisJobStatsManager) GetExecutions(upstreamJobID string, ranges ...Range) ([]string, error) { if len(upstreamJobID) == 0 { return nil, errors.New("no upstream ID specified") } @@ -387,8 +399,16 @@ func (rjs *RedisJobStatsManager) GetExecutions(upstreamJobID string) ([]string, conn := rjs.redisPool.Get() defer conn.Close() + var start, end interface{} = "-inf", "+inf" + if len(ranges) >= 1 { + start = int(ranges[0]) + } + if len(ranges) > 1 { + end = int(ranges[1]) + } + key := utils.KeyUpstreamJobAndExecutions(rjs.namespace, upstreamJobID) - ids, err := redis.Strings(conn.Do("ZRANGE", key, 0, -1)) + ids, err := redis.Strings(conn.Do("ZRANGEBYSCORE", key, start, end)) if err != nil { if err == redis.ErrNil { return []string{}, nil @@ -786,7 +806,7 @@ func (rjs *RedisJobStatsManager) getHook(jobID string) (string, error) { defer conn.Close() key := utils.KeyJobStats(rjs.namespace, jobID) - hookURL, err := redis.String(conn.Do("HMGET", key, "status_hook")) + hookURL, err := redis.String(conn.Do("HGET", key, "status_hook")) if err != nil { if err == redis.ErrNil { return "", fmt.Errorf("no registered web hook found for job '%s'", jobID) diff --git a/src/jobservice/opm/redis_job_stats_mgr_test.go b/src/jobservice/opm/redis_job_stats_mgr_test.go index 8cfa093796..def65f7977 100644 --- a/src/jobservice/opm/redis_job_stats_mgr_test.go +++ b/src/jobservice/opm/redis_job_stats_mgr_test.go @@ -90,7 +90,7 @@ func TestCommand(t *testing.T) { defer mgr.Shutdown() <-time.After(200 * time.Millisecond) - if err := mgr.SendCommand("fake_job_ID", CtlCommandStop); err != nil { + if err := mgr.SendCommand("fake_job_ID", CtlCommandStop, true); err != nil { t.Fatal(err) } diff --git a/src/jobservice/period/enqueuer.go b/src/jobservice/period/enqueuer.go index 801840d6b4..373f95190d 100644 --- a/src/jobservice/period/enqueuer.go +++ b/src/jobservice/period/enqueuer.go @@ -104,6 +104,8 @@ func (pe *periodicEnqueuer) loop() { func (pe *periodicEnqueuer) enqueue() error { now := time.Now().Unix() + logger.Debugf("Periodic enqueuing loop: %d", now) + conn := pe.pool.Get() defer conn.Close() diff --git a/src/jobservice/pool/redis_pool.go b/src/jobservice/pool/redis_pool.go index dacc00efe8..ac4b8e5200 100644 --- a/src/jobservice/pool/redis_pool.go +++ b/src/jobservice/pool/redis_pool.go @@ -19,6 +19,7 @@ import ( "fmt" "math" "reflect" + "strings" "time" "github.com/gocraft/work" @@ -30,7 +31,6 @@ import ( "github.com/goharbor/harbor/src/jobservice/period" "github.com/goharbor/harbor/src/jobservice/utils" "github.com/gomodule/redigo/redis" - "github.com/robfig/cron" ) var ( @@ -419,8 +419,6 @@ func (gcwp *GoCraftWorkPool) StopJob(jobID string) error { return err } - needSetStopStatus := false - switch theJob.Stats.JobKind { case job.JobKindGeneric: // Only running job can be stopped @@ -429,12 +427,18 @@ func (gcwp *GoCraftWorkPool) StopJob(jobID string) error { } case job.JobKindScheduled: // we need to delete the scheduled job in the queue if it is not running yet - // otherwise, nothing need to do - if theJob.Stats.Status == job.JobStatusScheduled { + // otherwise, stop it. + if theJob.Stats.Status == job.JobStatusPending { if err := gcwp.client.DeleteScheduledJob(theJob.Stats.RunAt, jobID); err != nil { return err } - needSetStopStatus = true + + // Update the job status to 'stopped' + gcwp.statsManager.SetJobStatus(jobID, job.JobStatusStopped) + + logger.Debugf("Scheduled job which plan to run at %d '%s' is stopped", theJob.Stats.RunAt, jobID) + + return nil } case job.JobKindPeriodic: // firstly delete the periodic job policy @@ -445,31 +449,28 @@ func (gcwp *GoCraftWorkPool) StopJob(jobID string) error { logger.Infof("Periodic job policy %s is removed", jobID) // secondly we need try to delete the job instances scheduled for this periodic job, a try best action - gcwp.deleteScheduledJobsOfPeriodicPolicy(theJob.Stats.JobID, theJob.Stats.CronSpec) // ignore error as we have logged + if err := gcwp.deleteScheduledJobsOfPeriodicPolicy(theJob.Stats.JobID); err != nil { + // only logged + logger.Errorf("Errors happened when deleting jobs of periodic policy %s: %s", theJob.Stats.JobID, err) + } + // thirdly expire the job stats of this periodic job if exists if err := gcwp.statsManager.ExpirePeriodicJobStats(theJob.Stats.JobID); err != nil { // only logged logger.Errorf("Expire the stats of job %s failed with error: %s\n", theJob.Stats.JobID, err) } - needSetStopStatus = true + return nil default: - break + return fmt.Errorf("Job kind %s is not supported", theJob.Stats.JobKind) } // Check if the job has 'running' instance if theJob.Stats.Status == job.JobStatusRunning { // Send 'stop' ctl command to the running instance - if err := gcwp.statsManager.SendCommand(jobID, opm.CtlCommandStop); err != nil { + if err := gcwp.statsManager.SendCommand(jobID, opm.CtlCommandStop, false); err != nil { return err } - // The job running instance will set the status to 'stopped' - needSetStopStatus = false - } - - // If needed, update the job status to 'stopped' - if needSetStopStatus { - gcwp.statsManager.SetJobStatus(jobID, job.JobStatusStopped) } return nil @@ -493,7 +494,7 @@ func (gcwp *GoCraftWorkPool) CancelJob(jobID string) error { } // Send 'cancel' ctl command to the running instance - if err := gcwp.statsManager.SendCommand(jobID, opm.CtlCommandCancel); err != nil { + if err := gcwp.statsManager.SendCommand(jobID, opm.CtlCommandCancel, false); err != nil { return err } break @@ -552,30 +553,64 @@ func (gcwp *GoCraftWorkPool) RegisterHook(jobID string, hookURL string) error { return gcwp.statsManager.RegisterHook(jobID, hookURL, false) } -func (gcwp *GoCraftWorkPool) deleteScheduledJobsOfPeriodicPolicy(policyID string, cronSpec string) error { - schedule, err := cron.Parse(cronSpec) +// A try best method to delete the scheduled jobs of one periodic job +func (gcwp *GoCraftWorkPool) deleteScheduledJobsOfPeriodicPolicy(policyID string) error { + // Check the scope of [-periodicEnqueuerHorizon, -1] + // If the job is still not completed after a 'periodicEnqueuerHorizon', just ignore it + now := time.Now().Unix() // Baseline + startTime := now - (int64)(periodicEnqueuerHorizon/time.Minute)*60 + + // Try to delete more + // Get the range scope + start := (opm.Range)(startTime) + ids, err := gcwp.statsManager.GetExecutions(policyID, start) if err != nil { - logger.Errorf("cron spec '%s' is not valid", cronSpec) return err } - now := time.Now().Unix() - nowTime := time.Unix(now, 0) - horizon := nowTime.Add(periodicEnqueuerHorizon) + logger.Debugf("Found scheduled jobs '%v' in scope [%d,+inf] for periodic job policy %s", ids, start, policyID) - // try to delete more - // return the last error if occurred - for t := schedule.Next(nowTime); t.Before(horizon); t = schedule.Next(t) { - epoch := t.Unix() - if err = gcwp.client.DeleteScheduledJob(epoch, policyID); err != nil { - // only logged - logger.Warningf("Delete scheduled instance for periodic job %s failed with error: %s\n", policyID, err) + if len(ids) == 0 { + // Treat as a normal case, nothing need to do + return nil + } + + multiErrs := []string{} + for _, id := range ids { + subJob, err := gcwp.statsManager.Retrieve(id) + if err != nil { + multiErrs = append(multiErrs, err.Error()) + continue // going on + } + + if subJob.Stats.Status == job.JobStatusRunning { + // Send 'stop' ctl command to the running instance + if err := gcwp.statsManager.SendCommand(subJob.Stats.JobID, opm.CtlCommandStop, false); err != nil { + multiErrs = append(multiErrs, err.Error()) + continue + } + + logger.Debugf("Stop running job %s for periodic job policy %s", subJob.Stats.JobID, policyID) } else { - logger.Infof("Delete scheduled job for periodic job policy %s: runat = %d", policyID, epoch) + if subJob.Stats.JobKind == job.JobKindScheduled && + subJob.Stats.Status == job.JobStatusPending { + // The pending scheduled job + if err := gcwp.client.DeleteScheduledJob(subJob.Stats.RunAt, subJob.Stats.JobID); err != nil { + multiErrs = append(multiErrs, err.Error()) + continue // going on + } + + // Log action + logger.Debugf("Delete scheduled job for periodic job policy %s: runat = %d", policyID, subJob.Stats.RunAt) + } } } - return err + if len(multiErrs) > 0 { + return errors.New(strings.Join(multiErrs, "\n")) + } + + return nil } func (gcwp *GoCraftWorkPool) handleSchedulePolicy(data interface{}) error { @@ -637,7 +672,8 @@ func (gcwp *GoCraftWorkPool) handleOPCommandFiring(data interface{}) error { return errors.New("malformed op command info") } - return gcwp.statsManager.SendCommand(jobID, command) + // Put the command into the maintaining list + return gcwp.statsManager.SendCommand(jobID, command, true) } // log the job