Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Single Active Replication in Replication Policy for v2.12 #21394

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions api/v2.0/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,7 @@ paths:
'404':
$ref: '#/responses/404'
'422':
$ref: '#/responses/422'
$ref: '#/responses/422'
'500':
$ref: '#/responses/500'
/projects/{project_name}/repositories/{repository_name}/artifacts/{reference}/scan/stop:
Expand Down Expand Up @@ -1226,7 +1226,7 @@ paths:
'404':
$ref: '#/responses/404'
'422':
$ref: '#/responses/422'
$ref: '#/responses/422'
'500':
$ref: '#/responses/500'
/projects/{project_name}/repositories/{repository_name}/artifacts/{reference}/scan/{report_id}/log:
Expand Down Expand Up @@ -6553,7 +6553,7 @@ responses:
description: The ID of the corresponding request for the response
type: string
schema:
$ref: '#/definitions/Errors'
$ref: '#/definitions/Errors'
'500':
description: Internal server error
headers:
Expand Down Expand Up @@ -7488,6 +7488,12 @@ definitions:
type: boolean
description: Whether to enable copy by chunk.
x-isnullable: true
single_active_replication:
type: boolean
description: |-
Whether to defer execution until the previous active execution finishes,
avoiding the execution of the same replication rules multiple times in parallel.
x-isnullable: true # make this field optional to keep backward compatibility
ReplicationTrigger:
type: object
properties:
Expand Down Expand Up @@ -7937,7 +7943,7 @@ definitions:
properties:
resource:
type: string
description: The resource of the access. Possible resources are listed here for system and project level https://github.com/goharbor/harbor/blob/main/src/common/rbac/const.go
description: The resource of the access. Possible resources are listed here for system and project level https://github.com/goharbor/harbor/blob/main/src/common/rbac/const.go
action:
type: string
description: The action of the access. Possible actions are *, pull, push, create, read, update, delete, list, operate, scanner-pull and stop.
Expand Down Expand Up @@ -10112,4 +10118,4 @@ definitions:
scan_type:
type: string
description: 'The scan type for the scan request. Two options are currently supported, vulnerability and sbom'
enum: [ vulnerability, sbom ]
enum: [ vulnerability, sbom ]
4 changes: 3 additions & 1 deletion make/migrations/postgresql/0150_2.12.0_schema.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ Add new column creator_ref and creator_type for robot table to record the creato
ALTER TABLE robot ADD COLUMN IF NOT EXISTS creator_ref integer default 0;
ALTER TABLE robot ADD COLUMN IF NOT EXISTS creator_type varchar(255);

ALTER TABLE p2p_preheat_policy ADD COLUMN IF NOT EXISTS scope varchar(255);
ALTER TABLE p2p_preheat_policy ADD COLUMN IF NOT EXISTS scope varchar(255);

ALTER TABLE replication_policy ADD COLUMN IF NOT EXISTS single_active_replication boolean;
25 changes: 25 additions & 0 deletions make/migrations/postgresql/0160_2.13.0_schema.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
ALTER TABLE p2p_preheat_policy DROP COLUMN IF EXISTS scope;
ALTER TABLE p2p_preheat_policy ADD COLUMN IF NOT EXISTS extra_attrs text;

CREATE TABLE IF NOT EXISTS audit_log_ext
(
id BIGSERIAL PRIMARY KEY NOT NULL,
project_id BIGINT,
operation VARCHAR(50) NULL,
resource_type VARCHAR(50) NULL,
resource VARCHAR(50) NULL,
username VARCHAR(50) NULL,
op_desc VARCHAR(500) NULL,
op_result BOOLEAN DEFAULT true,
payload TEXT NULL,
source_ip VARCHAR(50) NULL,
op_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- add index to the audit_log_ext table
CREATE INDEX IF NOT EXISTS idx_audit_log_ext_op_time ON audit_log_ext (op_time);
CREATE INDEX IF NOT EXISTS idx_audit_log_ext_project_id_optime ON audit_log_ext (project_id, op_time);
CREATE INDEX IF NOT EXISTS idx_audit_log_ext_project_id_resource_type ON audit_log_ext (project_id, resource_type);
CREATE INDEX IF NOT EXISTS idx_audit_log_ext_project_id_operation ON audit_log_ext (project_id, operation);

ALTER TABLE replication_policy ADD COLUMN IF NOT EXISTS single_active_replication boolean;
4 changes: 2 additions & 2 deletions src/controller/jobmonitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ func NewMonitorController() MonitorController {
taskManager: task.NewManager(),
queueManager: jm.NewQueueClient(),
queueStatusManager: queuestatus.Mgr,
monitorClient: jobServiceMonitorClient,
monitorClient: JobServiceMonitorClient,
jobServiceRedisClient: jm.JobServiceRedisClient,
executionDAO: taskDao.NewExecutionDAO(),
}
}

func jobServiceMonitorClient() (jm.JobServiceMonitorClient, error) {
func JobServiceMonitorClient() (jm.JobServiceMonitorClient, error) {
cfg, err := job.GlobalClient.GetJobServiceConfig()
if err != nil {
return nil, err
Expand Down
38 changes: 38 additions & 0 deletions src/controller/replication/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ package replication

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/gocraft/work"

"github.com/goharbor/harbor/src/controller/event/operator"
"github.com/goharbor/harbor/src/controller/jobmonitor"
"github.com/goharbor/harbor/src/controller/replication/flow"
replicationmodel "github.com/goharbor/harbor/src/controller/replication/model"
"github.com/goharbor/harbor/src/jobservice/job"
Expand Down Expand Up @@ -109,10 +113,32 @@ func (c *controller) Start(ctx context.Context, policy *replicationmodel.Policy,
if op := operator.FromContext(ctx); op != "" {
extra["operator"] = op
}

id, err := c.execMgr.Create(ctx, job.ReplicationVendorType, policy.ID, trigger, extra)
if err != nil {
return 0, err
}

if policy.SingleActiveReplication {
monitorClient, err := jobmonitor.JobServiceMonitorClient()
if err != nil {
return 0, errors.New(nil).WithCode(errors.PreconditionCode).WithMessagef("unable to get job monitor's client: %v", err)
}
observations, err := monitorClient.WorkerObservations()
if err != nil {
return 0, errors.New(nil).WithCode(errors.PreconditionCode).WithMessagef("unable to get jobs observations: %v", err)
}
for _, o := range observations {
if isDuplicateJob(o, policy.ID) {
err = c.execMgr.MarkSkipped(ctx, id, "Execution deferred: active replication still in progress.")
if err != nil {
return 0, err
}
return id, nil
}
}
}

// start the replication flow in background
// as the process runs inside a goroutine, the transaction in the outer ctx
// may be submitted already when the process starts, so create an new context
Expand Down Expand Up @@ -151,6 +177,18 @@ func (c *controller) Start(ctx context.Context, policy *replicationmodel.Policy,
return id, nil
}

func isDuplicateJob(o *work.WorkerObservation, policyID int64) bool {
if o.JobName != job.ReplicationVendorType {
return false
}
args := map[string]interface{}{}
if err := json.Unmarshal([]byte(o.ArgsJSON), &args); err != nil {
return false
}
policyIDFromArgs, ok := args["policy_id"].(float64)
return ok && int64(policyIDFromArgs) == policyID
}

func (c *controller) markError(ctx context.Context, executionID int64, err error) {
logger := log.GetLogger(ctx)
// try to stop the execution first in case that some tasks are already created
Expand Down
19 changes: 11 additions & 8 deletions src/controller/replication/flow/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (c *copyFlow) Run(ctx context.Context) error {
return err
}

return c.createTasks(ctx, srcResources, dstResources, c.policy.Speed, c.policy.CopyByChunk)
return c.createTasks(ctx, srcResources, dstResources, c.policy)
}

func (c *copyFlow) isExecutionStopped(ctx context.Context) (bool, error) {
Expand All @@ -103,7 +103,7 @@ func (c *copyFlow) isExecutionStopped(ctx context.Context) (bool, error) {
return execution.Status == job.StoppedStatus.String(), nil
}

func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources []*model.Resource, speed int32, copyByChunk bool) error {
func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources []*model.Resource, policy *repctlmodel.Policy) error {
var taskCnt int
defer func() {
// if no task be created, mark execution done.
Expand Down Expand Up @@ -137,19 +137,22 @@ func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources [
JobKind: job.KindGeneric,
},
Parameters: map[string]interface{}{
"src_resource": string(src),
"dst_resource": string(dest),
"speed": speed,
"copy_by_chunk": copyByChunk,
"src_resource": string(src),
"dst_resource": string(dest),
"speed": policy.Speed,
"copy_by_chunk": policy.CopyByChunk,
"single_active_replication": policy.SingleActiveReplication,
"policy_id": policy.ID,
},
}

if _, err = c.taskMgr.Create(ctx, c.executionID, job, map[string]interface{}{
"operation": "copy",
"resource_type": string(srcResource.Type),
"resource_type": srcResource.Type,
"source_resource": getResourceName(srcResource),
"destination_resource": getResourceName(dstResource),
"references": getResourceReferences(dstResource)}); err != nil {
"references": getResourceReferences(dstResource),
}); err != nil {
return err
}

Expand Down
3 changes: 3 additions & 0 deletions src/controller/replication/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Policy struct {
UpdateTime time.Time `json:"update_time"`
Speed int32 `json:"speed"`
CopyByChunk bool `json:"copy_by_chunk"`
SingleActiveReplication bool `json:"single_active_replication"`
}

// IsScheduledTrigger returns true when the policy is scheduled trigger and enabled
Expand Down Expand Up @@ -141,6 +142,7 @@ func (p *Policy) From(policy *replicationmodel.Policy) error {
p.UpdateTime = policy.UpdateTime
p.Speed = policy.Speed
p.CopyByChunk = policy.CopyByChunk
p.SingleActiveReplication = policy.SingleActiveReplication

if policy.SrcRegistryID > 0 {
p.SrcRegistry = &model.Registry{
Expand Down Expand Up @@ -186,6 +188,7 @@ func (p *Policy) To() (*replicationmodel.Policy, error) {
UpdateTime: p.UpdateTime,
Speed: p.Speed,
CopyByChunk: p.CopyByChunk,
SingleActiveReplication: p.SingleActiveReplication,
}
if p.SrcRegistry != nil {
policy.SrcRegistryID = p.SrcRegistry.ID
Expand Down
4 changes: 4 additions & 0 deletions src/jobservice/job/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const (
SuccessStatus Status = "Success"
// ScheduledStatus : job status scheduled
ScheduledStatus Status = "Scheduled"
// SkippedStatus : job status skipped
SkippedStatus Status = "Skipped"
)

// Status of job
Expand Down Expand Up @@ -62,6 +64,8 @@ func (s Status) Code() int {
return 3
case "Success":
return 3
case "Skipped":
return 3
default:
}

Expand Down
9 changes: 7 additions & 2 deletions src/jobservice/worker/cworker/c_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ type basicWorker struct {

// workerContext ...
// We did not use this context to pass context info so far, just a placeholder.
type workerContext struct{}
type workerContext struct {
client *work.Client
}

// log the job
func (rpc *workerContext) logJob(job *work.Job, next work.NextMiddlewareFunc) error {
Expand Down Expand Up @@ -146,9 +148,12 @@ func (w *basicWorker) Start() error {
w.pool.Stop()
}()

workCtx := workerContext{
client: w.client,
}
// Start the backend worker pool
// Add middleware
w.pool.Middleware((*workerContext).logJob)
w.pool.Middleware(workCtx.logJob)
// Non blocking call
w.pool.Start()
logger.Infof("Basic worker is started")
Expand Down
1 change: 1 addition & 0 deletions src/pkg/replication/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Policy struct {
UpdateTime time.Time `orm:"column(update_time);auto_now"`
Speed int32 `orm:"column(speed_kb)"`
CopyByChunk bool `orm:"column(copy_by_chunk)"`
SingleActiveReplication bool `orm:"column(single_active_replication)"`
}

// TableName set table name for ORM
Expand Down
14 changes: 14 additions & 0 deletions src/pkg/retention/policy/rule/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,20 @@ func init() {
},
},
}, daysps.New, daysps.Valid)

// Register daysps
Register(&Metadata{
TemplateID: daysps.TemplateID,
Action: "immutable",
Parameters: []*IndexedParam{
{
Name: daysps.ParameterN,
Type: "int",
Unit: "days",
Required: true,
},
},
}, daysps.New, daysps.Valid)
}

// Register the rule evaluator with the corresponding rule template
Expand Down
13 changes: 13 additions & 0 deletions src/pkg/task/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type ExecutionManager interface {
// In other cases, the execution status can be calculated from the referenced tasks automatically
// and no need to update it explicitly
MarkDone(ctx context.Context, id int64, message string) (err error)
// MarkSkipped marks the status of the specified execution as skipped.
MarkSkipped(ctx context.Context, id int64, message string) (err error)
// MarkError marks the status of the specified execution as error.
// It must be called to update the execution status when failed to create tasks.
// In other cases, the execution status can be calculated from the referenced tasks automatically
Expand Down Expand Up @@ -139,6 +141,17 @@ func (e *executionManager) UpdateExtraAttrs(ctx context.Context, id int64, extra
return e.executionDAO.Update(ctx, execution, "ExtraAttrs", "UpdateTime")
}

func (e *executionManager) MarkSkipped(ctx context.Context, id int64, message string) error {
now := time.Now()
return e.executionDAO.Update(ctx, &dao.Execution{
ID: id,
Status: job.SkippedStatus.String(),
StatusMessage: message,
UpdateTime: now,
EndTime: now,
}, "Status", "StatusMessage", "UpdateTime", "EndTime")
}

func (e *executionManager) MarkDone(ctx context.Context, id int64, message string) error {
now := time.Now()
return e.executionDAO.Update(ctx, &dao.Execution{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,37 @@ <h3 class="modal-title">{{ headerTitle | translate }}</h3>
'REPLICATION.ENABLED_RULE' | translate
}}</label>
</div>
<div class="clr-checkbox-wrapper">
<input
type="checkbox"
class="clr-checkbox"
[checked]="true"
id="singleActiveReplication"
formControlName="single_active_replication" />
<label
for="singleActiveReplication"
class="clr-control-label single-active"
>{{
'REPLICATION.SINGLE_ACTIVE_REPLICATION'
| translate
}}
<clr-tooltip class="override-tooltip">
<clr-icon
clrTooltipTrigger
shape="info-circle"
size="24"></clr-icon>
<clr-tooltip-content
clrPosition="top-left"
clrSize="md"
*clrIfOpen>
<span>{{
'TOOLTIP.SINGLE_ACTIVE_REPLICATION'
| translate
}}</span>
</clr-tooltip-content>
</clr-tooltip>
</label>
</div>
</div>
</div>
</form>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ clr-modal {
width: 8.6rem;
}

.single-active {
width: 16rem;
}

.des-tooltip {
margin-left: 0.5rem;
}
Expand Down
Loading
Loading