|
|
|
@@ -21,41 +21,49 @@ func StorageSvc(svc *Service) *StorageService { |
|
|
|
|
|
|
|
func (svc *StorageService) MoveObjectToStorage(userID int, objectID int, storageID int) error { |
|
|
|
// 先向协调端请求文件相关的元数据 |
|
|
|
moveResp, err := svc.coordinator.Move(coormsg.NewMoveObjectToStorageBody(objectID, storageID, userID)) |
|
|
|
preMoveResp, err := svc.coordinator.PreMoveObjectToStorage(coormsg.NewPreMoveObjectToStorageBody(objectID, storageID, userID)) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("request to coordinator failed, err: %w", err) |
|
|
|
} |
|
|
|
if moveResp.ErrorCode != errorcode.OK { |
|
|
|
return fmt.Errorf("coordinator operation failed, code: %s, message: %s", moveResp.ErrorCode, moveResp.ErrorMessage) |
|
|
|
if preMoveResp.ErrorCode != errorcode.OK { |
|
|
|
return fmt.Errorf("coordinator PreMoveObjectToStorage failed, code: %s, message: %s", preMoveResp.ErrorCode, preMoveResp.ErrorMessage) |
|
|
|
} |
|
|
|
|
|
|
|
// 然后向代理端发送移动文件的请求 |
|
|
|
agentClient, err := agtcli.NewAgentClient(moveResp.Body.NodeID, &config.Cfg().RabbitMQ) |
|
|
|
agentClient, err := agtcli.NewAgentClient(preMoveResp.Body.NodeID, &config.Cfg().RabbitMQ) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("create agent client to %d failed, err: %w", moveResp.Body.NodeID, err) |
|
|
|
return fmt.Errorf("create agent client to %d failed, err: %w", preMoveResp.Body.NodeID, err) |
|
|
|
} |
|
|
|
defer agentClient.Close() |
|
|
|
|
|
|
|
switch moveResp.Body.Redundancy { |
|
|
|
switch preMoveResp.Body.Redundancy { |
|
|
|
case consts.REDUNDANCY_REP: |
|
|
|
agentMoveResp, err := agentClient.RepMove(agtmsg.NewRepMoveCommandBody(moveResp.Body.Directory, moveResp.Body.Hashes, objectID, userID, moveResp.Body.FileSizeInBytes)) |
|
|
|
agentMoveResp, err := agentClient.RepMove(agtmsg.NewRepMoveCommandBody(preMoveResp.Body.Directory, preMoveResp.Body.Hashes, objectID, userID, preMoveResp.Body.FileSizeInBytes)) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("request to agent %d failed, err: %w", moveResp.Body.NodeID, err) |
|
|
|
return fmt.Errorf("request to agent %d failed, err: %w", preMoveResp.Body.NodeID, err) |
|
|
|
} |
|
|
|
if agentMoveResp.ErrorCode != errorcode.OK { |
|
|
|
return fmt.Errorf("agent %d operation failed, code: %s, messsage: %s", moveResp.Body.NodeID, agentMoveResp.ErrorCode, agentMoveResp.ErrorMessage) |
|
|
|
return fmt.Errorf("agent %d operation failed, code: %s, messsage: %s", preMoveResp.Body.NodeID, agentMoveResp.ErrorCode, agentMoveResp.ErrorMessage) |
|
|
|
} |
|
|
|
|
|
|
|
case consts.REDUNDANCY_EC: |
|
|
|
agentMoveResp, err := agentClient.ECMove(agtmsg.NewECMoveCommandBody(moveResp.Body.Directory, moveResp.Body.Hashes, moveResp.Body.IDs, *moveResp.Body.ECName, objectID, userID, moveResp.Body.FileSizeInBytes)) |
|
|
|
agentMoveResp, err := agentClient.ECMove(agtmsg.NewECMoveCommandBody(preMoveResp.Body.Directory, preMoveResp.Body.Hashes, preMoveResp.Body.IDs, *preMoveResp.Body.ECName, objectID, userID, preMoveResp.Body.FileSizeInBytes)) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("request to agent %d failed, err: %w", moveResp.Body.NodeID, err) |
|
|
|
return fmt.Errorf("request to agent %d failed, err: %w", preMoveResp.Body.NodeID, err) |
|
|
|
} |
|
|
|
if agentMoveResp.ErrorCode != errorcode.OK { |
|
|
|
return fmt.Errorf("agent %d operation failed, code: %s, messsage: %s", moveResp.Body.NodeID, agentMoveResp.ErrorCode, agentMoveResp.ErrorMessage) |
|
|
|
return fmt.Errorf("agent %d operation failed, code: %s, messsage: %s", preMoveResp.Body.NodeID, agentMoveResp.ErrorCode, agentMoveResp.ErrorMessage) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
moveResp, err := svc.coordinator.MoveObjectToStorage(coormsg.NewMoveObjectToStorageBody(objectID, storageID, userID)) |
|
|
|
if err != nil { |
|
|
|
return fmt.Errorf("request to coordinator failed, err: %w", err) |
|
|
|
} |
|
|
|
if preMoveResp.ErrorCode != errorcode.OK { |
|
|
|
return fmt.Errorf("coordinator MoveObjectToStorage failed, code: %s, message: %s", moveResp.ErrorCode, moveResp.ErrorMessage) |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
|