Browse Source

增加元数据锁

pull/1/head
Sydonian 2 years ago
parent
commit
85b1144ae1
2 changed files with 304 additions and 0 deletions
  1. +122
    -0
      pkg/distlock/lockprovider/lock_compatibility_table.go
  2. +182
    -0
      pkg/distlock/lockprovider/metadata_lock.go

+ 122
- 0
pkg/distlock/lockprovider/lock_compatibility_table.go View File

@@ -0,0 +1,122 @@
package lockprovider

import (
"fmt"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkg/distlock"
)

const (
LOCK_COMPATIBILITY_COMPATIBLE LockCompatibilityType = "Compatible"
LOCK_COMPATIBILITY_UNCOMPATIBLE LockCompatibilityType = "Uncompatible"
LOCK_COMPATIBILITY_SPECIAL LockCompatibilityType = "Special"
)

type HasSuchLockFn = func() bool

type LockCompatibilitySpecialFn func(lock distlock.Lock, testLockName string) bool

type LockCompatibilityType string

type LockCompatibility struct {
Type LockCompatibilityType
SpecialFn LockCompatibilitySpecialFn
}

func LockCompatible() LockCompatibility {
return LockCompatibility{
Type: LOCK_COMPATIBILITY_COMPATIBLE,
}
}

func LockUncompatible() LockCompatibility {
return LockCompatibility{
Type: LOCK_COMPATIBILITY_UNCOMPATIBLE,
}
}

func LockSpecial(specialFn LockCompatibilitySpecialFn) LockCompatibility {
return LockCompatibility{
Type: LOCK_COMPATIBILITY_SPECIAL,
SpecialFn: specialFn,
}
}

type LockCompatibilityTableRow struct {
LockName string
HasSuchLockFn HasSuchLockFn
Compatibilities []LockCompatibility
}

type LockCompatibilityTable struct {
rows []LockCompatibilityTableRow
rowIndex int
}

func (t *LockCompatibilityTable) Column(lockName string, hasSuchLock HasSuchLockFn) *LockCompatibilityTable {
t.rows = append(t.rows, LockCompatibilityTableRow{
LockName: lockName,
HasSuchLockFn: hasSuchLock,
})

return t
}
func (t *LockCompatibilityTable) MustRow(comps ...LockCompatibility) {
err := t.Row(comps...)
if err != nil {
panic(fmt.Sprintf("build lock compatibility table failed, err: %s", err.Error()))
}
}

func (t *LockCompatibilityTable) Row(comps ...LockCompatibility) error {
if t.rowIndex >= len(t.rows) {
return fmt.Errorf("there should be no more rows in the table")
}

if len(comps) < len(t.rows) {
return fmt.Errorf("the columns should equals the rows")
}

t.rows[t.rowIndex].Compatibilities = comps

for i := 0; i < t.rowIndex-1; i++ {
chkRowCeil := t.rows[t.rowIndex].Compatibilities[i]
chkColCeil := t.rows[i].Compatibilities[t.rowIndex]

if chkRowCeil.Type != chkColCeil.Type {
return fmt.Errorf("value at %d, %d is not equals to at %d, %d", t.rowIndex, i, i, t.rowIndex)
}
}

t.rowIndex++

return nil
}

func (t *LockCompatibilityTable) Test(lock distlock.Lock) error {
row, ok := lo.Find(t.rows, func(row LockCompatibilityTableRow) bool { return lock.Name == row.LockName })
if !ok {
return fmt.Errorf("unknow lock name %s", lock.Name)
}

for i, c := range row.Compatibilities {
if c.Type == LOCK_COMPATIBILITY_COMPATIBLE {
continue
}

if c.Type == LOCK_COMPATIBILITY_UNCOMPATIBLE {
if t.rows[i].HasSuchLockFn() {
return distlock.NewLockTargetBusyError(t.rows[i].LockName)
}
}

if c.Type == LOCK_COMPATIBILITY_SPECIAL {
if !c.SpecialFn(lock, t.rows[i].LockName) {
return distlock.NewLockTargetBusyError(t.rows[i].LockName)
}
}
}

return nil
}

+ 182
- 0
pkg/distlock/lockprovider/metadata_lock.go View File

@@ -0,0 +1,182 @@
package lockprovider

import (
"fmt"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkg/distlock"
mylo "gitlink.org.cn/cloudream/common/utils/lo"
)

const (
METADATA_SET_READ_LOCK = "SetRead"
METADATA_SET_WRITE_LOCK = "SetWrite"
METADATA_SET_CREATE_LOCK = "SetCreate"

METADATA_ELEMENT_READ_LOCK = "ElementRead"
METADATA_ELEMENT_WRITE_LOCK = "ElementWrite"
METADATA_ELEMENT_CREATE_LOCK = "ElementCreate"
)

type metadataElementLock struct {
target StringLockTarget
requestIDs []string
}

type MetadataLock struct {
setReadReqIDs []string
setWriteReqIDs []string
setCreateReqIDs []string

elementReadLocks []*metadataElementLock
elementWriteLocks []*metadataElementLock
elementCreateLocks []*metadataElementLock

lockCompatibilityTable LockCompatibilityTable
}

func NewMetadataLock() *MetadataLock {

metadataLock := MetadataLock{
lockCompatibilityTable: LockCompatibilityTable{},
}

compTable := &metadataLock.lockCompatibilityTable

compTable.
Column(METADATA_SET_READ_LOCK, func() bool { return len(metadataLock.setReadReqIDs) > 0 }).
Column(METADATA_SET_WRITE_LOCK, func() bool { return len(metadataLock.setWriteReqIDs) > 0 }).
Column(METADATA_SET_CREATE_LOCK, func() bool { return len(metadataLock.setCreateReqIDs) > 0 }).
Column(METADATA_ELEMENT_READ_LOCK, func() bool { return len(metadataLock.elementReadLocks) > 0 }).
Column(METADATA_ELEMENT_WRITE_LOCK, func() bool { return len(metadataLock.elementWriteLocks) > 0 }).
Column(METADATA_ELEMENT_CREATE_LOCK, func() bool { return len(metadataLock.elementCreateLocks) > 0 })

comp := LockCompatible()
uncp := LockUncompatible()
trgt := LockSpecial(func(lock distlock.Lock, testLockName string) bool {
strTar := lock.Target.(StringLockTarget)
if testLockName == METADATA_ELEMENT_READ_LOCK {
// 如果没有任何锁的锁对象与当前的锁对象冲突,那么这个锁可以加
return lo.NoneBy(metadataLock.elementReadLocks, func(other *metadataElementLock) bool { return strTar.IsConflict(&other.target) })
}

if testLockName == METADATA_ELEMENT_WRITE_LOCK {
return lo.NoneBy(metadataLock.elementWriteLocks, func(other *metadataElementLock) bool { return strTar.IsConflict(&other.target) })
}

return lo.NoneBy(metadataLock.elementCreateLocks, func(other *metadataElementLock) bool { return strTar.IsConflict(&other.target) })
})

compTable.MustRow(comp, trgt, comp, comp, uncp, comp)
compTable.MustRow(trgt, trgt, comp, uncp, uncp, comp)
compTable.MustRow(comp, comp, trgt, uncp, uncp, uncp)
compTable.MustRow(comp, uncp, uncp, comp, uncp, uncp)
compTable.MustRow(uncp, uncp, uncp, uncp, uncp, uncp)
compTable.MustRow(comp, comp, uncp, uncp, uncp, uncp)

return &metadataLock
}

// CanLock 判断这个锁能否锁定成功
func (l *MetadataLock) CanLock(lock distlock.Lock) error {
return l.lockCompatibilityTable.Test(lock)
}

// 锁定
func (l *MetadataLock) Lock(reqID string, lock distlock.Lock) error {
switch lock.Name {
case METADATA_SET_READ_LOCK:
l.setReadReqIDs = append(l.setReadReqIDs, reqID)
case METADATA_SET_WRITE_LOCK:
l.setWriteReqIDs = append(l.setWriteReqIDs, reqID)
case METADATA_SET_CREATE_LOCK:
l.setCreateReqIDs = append(l.setCreateReqIDs, reqID)

case METADATA_ELEMENT_READ_LOCK:
l.elementReadLocks = l.addElementLock(lock, l.elementReadLocks, reqID)
case METADATA_ELEMENT_WRITE_LOCK:
l.elementWriteLocks = l.addElementLock(lock, l.elementWriteLocks, reqID)
case METADATA_ELEMENT_CREATE_LOCK:
l.elementCreateLocks = l.addElementLock(lock, l.elementCreateLocks, reqID)

default:
return fmt.Errorf("unknow lock name: %s", lock.Name)
}

return nil
}

func (l *MetadataLock) addElementLock(lock distlock.Lock, locks []*metadataElementLock, reqID string) []*metadataElementLock {
strTarget := lock.Target.(StringLockTarget)
lck, ok := lo.Find(locks, func(l *metadataElementLock) bool { return strTarget.IsConflict(&l.target) })
if !ok {
lck = &metadataElementLock{
target: strTarget,
}
locks = append(locks, lck)
}

lck.requestIDs = append(lck.requestIDs, reqID)
return locks
}

// 解锁
func (l *MetadataLock) Unlock(reqID string, lock distlock.Lock) error {
switch lock.Name {
case METADATA_SET_READ_LOCK:
l.setReadReqIDs = mylo.Remove(l.setReadReqIDs, reqID)
case METADATA_SET_WRITE_LOCK:
l.setWriteReqIDs = mylo.Remove(l.setWriteReqIDs, reqID)
case METADATA_SET_CREATE_LOCK:
l.setCreateReqIDs = mylo.Remove(l.setCreateReqIDs, reqID)

case METADATA_ELEMENT_READ_LOCK:
l.elementReadLocks = l.removeElementLock(lock, l.elementReadLocks, reqID)
case METADATA_ELEMENT_WRITE_LOCK:
l.elementWriteLocks = l.removeElementLock(lock, l.elementWriteLocks, reqID)
case METADATA_ELEMENT_CREATE_LOCK:
l.elementCreateLocks = l.removeElementLock(lock, l.elementCreateLocks, reqID)

default:
return fmt.Errorf("unknow lock name: %s", lock.Name)
}

return nil
}

func (l *MetadataLock) removeElementLock(lock distlock.Lock, locks []*metadataElementLock, reqID string) []*metadataElementLock {
strTarget := lock.Target.(StringLockTarget)
lck, index, ok := lo.FindIndexOf(locks, func(l *metadataElementLock) bool { return strTarget.IsConflict(&l.target) })
if !ok {
return locks
}

lck.requestIDs = mylo.Remove(lck.requestIDs, reqID)

if len(lck.requestIDs) == 0 {
locks = mylo.RemoveAt(locks, index)
}

return locks
}

// GetTargetString 将锁对象序列化为字符串,方便存储到ETCD
func (l *MetadataLock) GetTargetString(target any) (string, error) {
tar := target.(StringLockTarget)
return StringLockTargetToString(&tar)
}

// ParseTargetString 解析字符串格式的锁对象数据
func (l *MetadataLock) ParseTargetString(targetStr string) (any, error) {
return StringLockTargetFromString(targetStr)
}

// Clear 清除内部所有状态
func (l *MetadataLock) Clear() {
l.setReadReqIDs = nil
l.setWriteReqIDs = nil
l.setCreateReqIDs = nil
l.elementReadLocks = nil
l.elementWriteLocks = nil
l.elementCreateLocks = nil
}

Loading…
Cancel
Save