Browse Source

调整元数据缓存代码

gitlink
Sydonian 7 months ago
parent
commit
36cb4dbf5a
2 changed files with 17 additions and 17 deletions
  1. +9
    -9
      client/internal/metacache/connectivity.go
  2. +8
    -8
      client/internal/metacache/hubmeta.go

+ 9
- 9
client/internal/metacache/connectivity.go View File

@@ -5,14 +5,14 @@ import (
"time" "time"


"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgglb "gitlink.org.cn/cloudream/storage2/common/globals" stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types"
) )


func (m *MetaCacheHost) AddConnectivity() *Connectivity { func (m *MetaCacheHost) AddConnectivity() *Connectivity {
cache := &Connectivity{ cache := &Connectivity{
entries: make(map[cdssdk.HubID]*ConnectivityEntry),
entries: make(map[cortypes.HubID]*ConnectivityEntry),
} }


m.caches = append(m.caches, cache) m.caches = append(m.caches, cache)
@@ -21,10 +21,10 @@ func (m *MetaCacheHost) AddConnectivity() *Connectivity {


type Connectivity struct { type Connectivity struct {
lock sync.RWMutex lock sync.RWMutex
entries map[cdssdk.HubID]*ConnectivityEntry
entries map[cortypes.HubID]*ConnectivityEntry
} }


func (c *Connectivity) Get(from cdssdk.HubID, to cdssdk.HubID) *time.Duration {
func (c *Connectivity) Get(from cortypes.HubID, to cortypes.HubID) *time.Duration {
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
c.lock.RLock() c.lock.RLock()
entry, ok := c.entries[from] entry, ok := c.entries[from]
@@ -59,7 +59,7 @@ func (c *Connectivity) ClearOutdated() {
} }
} }


func (c *Connectivity) load(hubID cdssdk.HubID) {
func (c *Connectivity) load(hubID cortypes.HubID) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire() coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil { if err != nil {
logger.Warnf("new coordinator client: %v", err) logger.Warnf("new coordinator client: %v", err)
@@ -67,7 +67,7 @@ func (c *Connectivity) load(hubID cdssdk.HubID) {
} }
defer stgglb.CoordinatorMQPool.Release(coorCli) defer stgglb.CoordinatorMQPool.Release(coorCli)


get, err := coorCli.GetHubConnectivities(coormq.ReqGetHubConnectivities([]cdssdk.HubID{hubID}))
get, err := coorCli.GetHubConnectivities(coormq.ReqGetHubConnectivities([]cortypes.HubID{hubID}))
if err != nil { if err != nil {
logger.Warnf("get hub connectivities: %v", err) logger.Warnf("get hub connectivities: %v", err)
return return
@@ -78,7 +78,7 @@ func (c *Connectivity) load(hubID cdssdk.HubID) {


ce := &ConnectivityEntry{ ce := &ConnectivityEntry{
From: hubID, From: hubID,
To: make(map[cdssdk.HubID]cdssdk.HubConnectivity),
To: make(map[cortypes.HubID]cortypes.HubConnectivity),
UpdateTime: time.Now(), UpdateTime: time.Now(),
} }


@@ -90,7 +90,7 @@ func (c *Connectivity) load(hubID cdssdk.HubID) {
} }


type ConnectivityEntry struct { type ConnectivityEntry struct {
From cdssdk.HubID
To map[cdssdk.HubID]cdssdk.HubConnectivity
From cortypes.HubID
To map[cortypes.HubID]cortypes.HubConnectivity
UpdateTime time.Time UpdateTime time.Time
} }

+ 8
- 8
client/internal/metacache/hubmeta.go View File

@@ -4,14 +4,14 @@ import (
"time" "time"


"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgglb "gitlink.org.cn/cloudream/storage2/common/globals" stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types"
) )


func (m *MetaCacheHost) AddHubMeta() *HubMeta { func (m *MetaCacheHost) AddHubMeta() *HubMeta {
meta := &HubMeta{} meta := &HubMeta{}
meta.cache = NewSimpleMetaCache(SimpleMetaCacheConfig[cdssdk.HubID, cdssdk.Hub]{
meta.cache = NewSimpleMetaCache(SimpleMetaCacheConfig[cortypes.HubID, cortypes.Hub]{
Getter: meta.load, Getter: meta.load,
Expire: time.Minute * 5, Expire: time.Minute * 5,
}) })
@@ -21,10 +21,10 @@ func (m *MetaCacheHost) AddHubMeta() *HubMeta {
} }


type HubMeta struct { type HubMeta struct {
cache *SimpleMetaCache[cdssdk.HubID, cdssdk.Hub]
cache *SimpleMetaCache[cortypes.HubID, cortypes.Hub]
} }


func (h *HubMeta) Get(hubID cdssdk.HubID) *cdssdk.Hub {
func (h *HubMeta) Get(hubID cortypes.HubID) *cortypes.Hub {
v, ok := h.cache.Get(hubID) v, ok := h.cache.Get(hubID)
if ok { if ok {
return &v return &v
@@ -32,9 +32,9 @@ func (h *HubMeta) Get(hubID cdssdk.HubID) *cdssdk.Hub {
return nil return nil
} }


func (h *HubMeta) GetMany(hubIDs []cdssdk.HubID) []*cdssdk.Hub {
func (h *HubMeta) GetMany(hubIDs []cortypes.HubID) []*cortypes.Hub {
vs, oks := h.cache.GetMany(hubIDs) vs, oks := h.cache.GetMany(hubIDs)
ret := make([]*cdssdk.Hub, len(vs))
ret := make([]*cortypes.Hub, len(vs))
for i := range vs { for i := range vs {
if oks[i] { if oks[i] {
ret[i] = &vs[i] ret[i] = &vs[i]
@@ -47,8 +47,8 @@ func (h *HubMeta) ClearOutdated() {
h.cache.ClearOutdated() h.cache.ClearOutdated()
} }


func (h *HubMeta) load(keys []cdssdk.HubID) ([]cdssdk.Hub, []bool) {
vs := make([]cdssdk.Hub, len(keys))
func (h *HubMeta) load(keys []cortypes.HubID) ([]cortypes.Hub, []bool) {
vs := make([]cortypes.Hub, len(keys))
oks := make([]bool, len(keys)) oks := make([]bool, len(keys))


coorCli, err := stgglb.CoordinatorMQPool.Acquire() coorCli, err := stgglb.CoordinatorMQPool.Acquire()


Loading…
Cancel
Save