Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ go_library(
"job_scheduler.go",
"job_submitter.go",
"job_worker.go",
"masking_policy.go",
"metabuild.go",
"mock.go",
"modify_column.go",
Expand Down Expand Up @@ -273,6 +274,7 @@ go_test(
"job_submitter_test.go",
"job_worker_test.go",
"main_test.go",
"masking_policy_test.go",
"metabuild_test.go",
"modify_column_test.go",
"multi_schema_change_test.go",
Expand Down
196 changes: 196 additions & 0 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import (
"github.com/pingcap/tidb/pkg/util/collate"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
plannererrors "github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
"github.com/pingcap/tidb/pkg/util/dbutil"
"github.com/pingcap/tidb/pkg/util/domainutil"
"github.com/pingcap/tidb/pkg/util/filter"
Expand Down Expand Up @@ -129,6 +130,7 @@ type Executor interface {
CreateSequence(ctx sessionctx.Context, stmt *ast.CreateSequenceStmt) error
DropSequence(ctx sessionctx.Context, stmt *ast.DropSequenceStmt) (err error)
AlterSequence(ctx sessionctx.Context, stmt *ast.AlterSequenceStmt) error
CreateMaskingPolicy(ctx sessionctx.Context, stmt *ast.CreateMaskingPolicyStmt) error
CreatePlacementPolicy(ctx sessionctx.Context, stmt *ast.CreatePlacementPolicyStmt) error
DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacementPolicyStmt) error
AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPlacementPolicyStmt) error
Expand Down Expand Up @@ -1811,6 +1813,14 @@ func (e *executor) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt
case ast.AlterTableDropForeignKey:
// NOTE: we do not check `if not exists` and `if exists` for ForeignKey now.
err = e.DropForeignKey(sctx, ident, ast.NewCIStr(spec.Name))
case ast.AlterTableAddMaskingPolicy:
err = e.AddMaskingPolicy(sctx, ident, spec)
case ast.AlterTableEnableMaskingPolicy:
err = e.AlterTableMaskingPolicyState(sctx, ident, spec, true)
case ast.AlterTableDisableMaskingPolicy:
err = e.AlterTableMaskingPolicyState(sctx, ident, spec, false)
case ast.AlterTableDropMaskingPolicy:
err = e.DropMaskingPolicy(sctx, ident, spec)
case ast.AlterTableModifyColumn:
err = e.ModifyColumn(ctx, sctx, ident, spec)
case ast.AlterTableChangeColumn:
Expand Down Expand Up @@ -6396,6 +6406,179 @@ func (e *executor) AlterResourceGroup(ctx sessionctx.Context, stmt *ast.AlterRes
return err
}

func (e *executor) CreateMaskingPolicy(ctx sessionctx.Context, stmt *ast.CreateMaskingPolicyStmt) error {
if stmt.OrReplace && stmt.IfNotExists {
return dbterror.ErrWrongUsage.GenWithStackByArgs("OR REPLACE", "IF NOT EXISTS")
}

tableIdent := ast.Ident{Schema: stmt.Table.Schema, Name: stmt.Table.Name}
if tableIdent.Schema.L == "" {
schemaName := strings.ToLower(ctx.GetSessionVars().CurrentDB)
if schemaName == "" {
return errors.Trace(plannererrors.ErrNoDB)
}
tableIdent.Schema = ast.NewCIStr(schemaName)
}
schema, tbl, err := e.getSchemaAndTableByIdent(tableIdent)
if err != nil {
return errors.Trace(err)
}

policyInfo, err := buildMaskingPolicyInfo(ctx, schema, tbl, stmt.PolicyName, stmt.Column.Name, stmt.Expr, stmt.MaskingPolicyState)
if err != nil {
return errors.Trace(err)
}

var onExist OnExist
switch {
case stmt.IfNotExists:
onExist = OnExistIgnore
case stmt.OrReplace:
onExist = OnExistReplace
default:
onExist = OnExistError
}
return e.createMaskingPolicyWithInfo(ctx, policyInfo, onExist)
}

func (e *executor) AddMaskingPolicy(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
schema, tbl, err := e.getSchemaAndTableByIdent(ident)
if err != nil {
return errors.Trace(err)
}
policyInfo, err := buildMaskingPolicyInfo(ctx, schema, tbl, spec.MaskingPolicyName, spec.MaskingPolicyColumn.Name, spec.MaskingPolicyExpr, spec.MaskingPolicyState)
if err != nil {
return errors.Trace(err)
}
return e.createMaskingPolicyWithInfo(ctx, policyInfo, OnExistError)
}

func (e *executor) AlterTableMaskingPolicyState(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec, enabled bool) error {
_, tbl, err := e.getSchemaAndTableByIdent(ident)
if err != nil {
return errors.Trace(err)
}
policyName := spec.MaskingPolicyName
policy, ok := e.infoCache.GetLatest().MaskingPolicyByName(policyName)
if !ok {
return errors.Errorf("masking policy %s doesn't exist", policyName.O)
}
if policy.TableID != tbl.Meta().ID {
return errors.Errorf("masking policy %s doesn't belong to table %s", policyName.O, tbl.Meta().Name.O)
}

status := model.MaskingPolicyStatusEnable
if !enabled {
status = model.MaskingPolicyStatusDisable
}
newPolicy := policy.Clone()
newPolicy.Status = status
newPolicy.UpdatedAt = time.Now()

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: policy.ID,
SchemaName: policy.DBName.L,
TableID: policy.TableID,
TableName: policy.TableName.L,
Type: model.ActionAlterMaskingPolicy,
BinlogInfo: &model.HistoryInfo{},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: policy.DBName.L,
Table: policy.TableName.L,
}},
SQLMode: ctx.GetSessionVars().SQLMode,
}
args := &model.MaskingPolicyArgs{
Policy: newPolicy,
PolicyID: policy.ID,
}
return errors.Trace(e.doDDLJob2(ctx, job, args))
}

func (e *executor) DropMaskingPolicy(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) error {
_, tbl, err := e.getSchemaAndTableByIdent(ident)
if err != nil {
return errors.Trace(err)
}
policyName := spec.MaskingPolicyName
policy, ok := e.infoCache.GetLatest().MaskingPolicyByName(policyName)
if !ok {
return errors.Errorf("masking policy %s doesn't exist", policyName.O)
}
if policy.TableID != tbl.Meta().ID {
return errors.Errorf("masking policy %s doesn't belong to table %s", policyName.O, tbl.Meta().Name.O)
}

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: policy.ID,
SchemaName: policy.DBName.L,
TableID: policy.TableID,
TableName: policy.TableName.L,
Type: model.ActionDropMaskingPolicy,
BinlogInfo: &model.HistoryInfo{},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: policy.DBName.L,
Table: policy.TableName.L,
}},
SQLMode: ctx.GetSessionVars().SQLMode,
}
args := &model.MaskingPolicyArgs{
PolicyName: policy.Name,
PolicyID: policy.ID,
}
return errors.Trace(e.doDDLJob2(ctx, job, args))
}

func (e *executor) createMaskingPolicyWithInfo(ctx sessionctx.Context, policy *model.MaskingPolicyInfo, onExist OnExist) error {
is := e.infoCache.GetLatest()
if existPolicy, ok := is.MaskingPolicyByName(policy.Name); ok {
if existPolicy.TableID != policy.TableID || existPolicy.ColumnID != policy.ColumnID {
return errors.Errorf("masking policy %s already exists on another column", existPolicy.Name.O)
}
err := errors.Errorf("masking policy %s already exists", policy.Name.O)
switch onExist {
case OnExistIgnore:
ctx.GetSessionVars().StmtCtx.AppendNote(err)
return nil
case OnExistError:
return err
}
}
if existPolicy, ok := is.MaskingPolicyByTableColumn(policy.TableID, policy.ColumnID); ok && existPolicy.Name.L != policy.Name.L {
return errors.Errorf("masking policy already exists on column %s", existPolicy.ColumnName.O)
}

policyID, err := e.genMaskingPolicyID()
if err != nil {
return err
}
policy.ID = policyID

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaName: policy.DBName.L,
TableID: policy.TableID,
TableName: policy.TableName.L,
Type: model.ActionCreateMaskingPolicy,
BinlogInfo: &model.HistoryInfo{},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: policy.DBName.L,
Table: policy.TableName.L,
}},
SQLMode: ctx.GetSessionVars().SQLMode,
}
args := &model.MaskingPolicyArgs{
Policy: policy,
ReplaceOnExist: onExist == OnExistReplace,
}
return errors.Trace(e.doDDLJob2(ctx, job, args))
}

func (e *executor) CreatePlacementPolicy(ctx sessionctx.Context, stmt *ast.CreatePlacementPolicyStmt) (err error) {
if checkIgnorePlacementDDL(ctx) {
return nil
Expand Down Expand Up @@ -6785,6 +6968,19 @@ func (e *executor) genPlacementPolicyID() (int64, error) {
return ret, err
}

func (e *executor) genMaskingPolicyID() (int64, error) {
var ret int64
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
err := kv.RunInNewTxn(ctx, e.store, true, func(_ context.Context, txn kv.Transaction) error {
m := meta.NewMutator(txn)
var err error
ret, err = m.GenMaskingPolicyID()
return err
})

return ret, err
}

// DoDDLJob will return
// - nil: found in history DDL job and no job error
// - context.Cancel: job has been sent to worker, but not found in history DDL job before cancel
Expand Down
6 changes: 6 additions & 0 deletions pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,12 @@ func (w *worker) runOneJobStep(
ver, err = onAlterTableAttributes(jobCtx, job)
case model.ActionAlterTablePartitionAttributes:
ver, err = onAlterTablePartitionAttributes(jobCtx, job)
case model.ActionCreateMaskingPolicy:
ver, err = w.onCreateMaskingPolicy(jobCtx, job)
case model.ActionAlterMaskingPolicy:
ver, err = w.onAlterMaskingPolicy(jobCtx, job)
case model.ActionDropMaskingPolicy:
ver, err = w.onDropMaskingPolicy(jobCtx, job)
case model.ActionCreatePlacementPolicy:
ver, err = onCreatePlacementPolicy(jobCtx, job)
case model.ActionDropPlacementPolicy:
Expand Down
Loading