You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

command_service.go 8.5 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. package services
  2. import (
  3. log "github.com/sirupsen/logrus"
  4. ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
  5. "gitlink.org.cn/cloudream/utils"
  6. "gitlink.org.cn/cloudream/utils/consts"
  7. "gitlink.org.cn/cloudream/utils/consts/errorcode"
  8. )
  9. func (service *Service) Read(msg *ramsg.ReadCommand) ramsg.ReadResp {
  10. var hashes []string
  11. blockIDs := []int{0}
  12. // 查询文件对象
  13. object, err := service.db.QueryObjectByID(msg.ObjectID)
  14. if err != nil {
  15. log.WithField("ObjectID", msg.ObjectID).
  16. Warnf("query Object failed, err: %s", err.Error())
  17. return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query Object failed")
  18. }
  19. var nodeIPs []string
  20. //-若redundancy是rep,查询对象副本表, 获得repHash
  21. if object.Redundancy == consts.REDUNDANCY_REP {
  22. objectRep, err := service.db.QueryObjectRep(object.ObjectID)
  23. if err != nil {
  24. log.WithField("ObjectID", object.ObjectID).
  25. Warnf("query ObjectRep failed, err: %s", err.Error())
  26. return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query ObjectRep failed")
  27. }
  28. hashes = append(hashes, objectRep.RepHash)
  29. nodes, err := service.db.QueryCacheNodeByBlockHash(objectRep.RepHash)
  30. if err != nil {
  31. log.WithField("RepHash", objectRep.RepHash).
  32. Warnf("query Cache failed, err: %s", err.Error())
  33. return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query Cache failed")
  34. }
  35. for _, node := range nodes {
  36. nodeIPs = append(nodeIPs, node.IP)
  37. }
  38. } else {
  39. blocks, err := service.db.QueryObjectBlock(object.ObjectID)
  40. if err != nil {
  41. log.WithField("ObjectID", object.ObjectID).
  42. Warnf("query Object Block failed, err: %s", err.Error())
  43. return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query Object Block failed")
  44. }
  45. ecPolicies := *utils.GetEcPolicy()
  46. ecPolicy := ecPolicies[*object.ECName]
  47. ecN := ecPolicy.GetN()
  48. ecK := ecPolicy.GetK()
  49. nodeIPs = make([]string, ecN)
  50. hashes = make([]string, ecN)
  51. for _, tt := range blocks {
  52. id := tt.InnerID
  53. hash := tt.BlockHash
  54. hashes[id] = hash //这里有问题,采取的其实是直接顺序读的方式,等待加入自适应读模块
  55. nodes, err := service.db.QueryCacheNodeByBlockHash(hash)
  56. if err != nil {
  57. log.WithField("BlockHash", hash).
  58. Warnf("query Cache failed, err: %s", err.Error())
  59. return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query Cache failed")
  60. }
  61. if len(nodes) == 0 {
  62. log.WithField("BlockHash", hash).
  63. Warnf("No node cache the block data for the BlockHash")
  64. return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "No node cache the block data for the BlockHash")
  65. }
  66. nodeIPs[id] = nodes[0].IP
  67. }
  68. //这里也有和上面一样的问题
  69. for i := 1; i < ecK; i++ {
  70. blockIDs = append(blockIDs, i)
  71. }
  72. }
  73. return ramsg.NewCoorReadRespOK(
  74. object.Redundancy,
  75. nodeIPs,
  76. hashes,
  77. blockIDs,
  78. object.ECName,
  79. object.FileSizeInBytes,
  80. )
  81. }
  82. func (service *Service) RepWrite(msg *ramsg.RepWriteCommand) ramsg.WriteResp {
  83. // TODO 需要在此处判断同名对象是否存在。等到WriteRepHash时再判断一次。
  84. // 此次的判断只作为参考,具体是否成功还是看WriteRepHash的结果
  85. //查询用户可用的节点IP
  86. nodes, err := service.db.QueryUserNodes(msg.UserID)
  87. if err != nil {
  88. log.Warnf("query user nodes failed, err: %s", err.Error())
  89. return ramsg.NewCoorWriteRespFailed(errorcode.OPERATION_FAILED, "query user nodes failed")
  90. }
  91. if len(nodes) < msg.ReplicateNumber {
  92. log.WithField("UserID", msg.UserID).
  93. WithField("ReplicateNumber", msg.ReplicateNumber).
  94. Warnf("user nodes are not enough")
  95. return ramsg.NewCoorWriteRespFailed(errorcode.OPERATION_FAILED, "user nodes are not enough")
  96. }
  97. numRep := msg.ReplicateNumber
  98. ids := make([]int, numRep)
  99. ips := make([]string, numRep)
  100. //随机选取numRep个nodeIp
  101. start := utils.GetRandInt(len(nodes))
  102. for i := 0; i < numRep; i++ {
  103. index := (start + i) % len(nodes)
  104. ids[i] = nodes[index].NodeID
  105. ips[i] = nodes[index].IP
  106. }
  107. return ramsg.NewCoorWriteRespOK(ids, ips)
  108. }
  109. func (service *Service) WriteRepHash(msg *ramsg.WriteRepHashCommand) ramsg.WriteHashResp {
  110. _, err := service.db.CreateRepObject(msg.BucketID, msg.ObjectName, msg.FileSizeInBytes, msg.ReplicateNumber, msg.NodeIDs, msg.Hashes)
  111. if err != nil {
  112. log.WithField("BucketName", msg.BucketID).
  113. WithField("ObjectName", msg.ObjectName).
  114. Warnf("create rep object failed, err: %s", err.Error())
  115. return ramsg.NewCoorWriteHashRespFailed(errorcode.OPERATION_FAILED, "create rep object failed")
  116. }
  117. return ramsg.NewCoorWriteHashRespOK()
  118. }
  119. func (service *Service) Move(msg *ramsg.MoveCommand) ramsg.MoveResp {
  120. //查询数据库,获取冗余类型,冗余参数
  121. //jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSizeInBytes
  122. //-若redundancy是rep,查询对象副本表, 获得repHash
  123. //--ids :={0}
  124. //--hashs := {repHash}
  125. //-若redundancy是ec,查询对象编码块表,获得blockHashs, ids(innerID),
  126. //--查询缓存表,获得每个hash的nodeIps、TempOrPins、Times
  127. //--查询节点延迟表,得到command.destination与各个nodeIps的的延迟,存到一个map类型中(Delay)
  128. //--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids
  129. // TODO 需要在StorageData中增加记录
  130. // 查询用户关联的存储服务
  131. stg, err := service.db.QueryUserStorage(msg.UserID, msg.StorageID)
  132. if err != nil {
  133. log.WithField("UserID", msg.UserID).
  134. WithField("StorageID", msg.StorageID).
  135. Warnf("query storage directory failed, err: %s", err.Error())
  136. return ramsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query storage directory failed")
  137. }
  138. // 查询文件对象
  139. object, err := service.db.QueryObjectByID(msg.ObjectID)
  140. if err != nil {
  141. log.WithField("ObjectID", msg.ObjectID).
  142. Warnf("query Object failed, err: %s", err.Error())
  143. return ramsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query Object failed")
  144. }
  145. //-若redundancy是rep,查询对象副本表, 获得repHash
  146. var hashs []string
  147. ids := []int{0}
  148. if object.Redundancy == consts.REDUNDANCY_REP {
  149. objectRep, err := service.db.QueryObjectRep(object.ObjectID)
  150. if err != nil {
  151. log.Warnf("query ObjectRep failed, err: %s", err.Error())
  152. return ramsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query ObjectRep failed")
  153. }
  154. hashs = append(hashs, objectRep.RepHash)
  155. } else {
  156. blockHashs, err := service.db.QueryObjectBlock(object.ObjectID)
  157. if err != nil {
  158. log.Warnf("query ObjectBlock failed, err: %s", err.Error())
  159. return ramsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query ObjectBlock failed")
  160. }
  161. ecPolicies := *utils.GetEcPolicy()
  162. ecPolicy := ecPolicies[*object.ECName]
  163. ecN := ecPolicy.GetN()
  164. ecK := ecPolicy.GetK()
  165. ids = make([]int, ecK)
  166. for i := 0; i < ecN; i++ {
  167. hashs = append(hashs, "-1")
  168. }
  169. for i := 0; i < ecK; i++ {
  170. ids[i] = i
  171. }
  172. hashs = make([]string, ecN)
  173. for _, tt := range blockHashs {
  174. id := tt.InnerID
  175. hash := tt.BlockHash
  176. hashs[id] = hash
  177. }
  178. //--查询缓存表,获得每个hash的nodeIps、TempOrPins、Times
  179. /*for id,hash := range blockHashs{
  180. //type Cache struct {NodeIP string,TempOrPin bool,Cachetime string}
  181. Cache := Query_Cache(hash)
  182. //利用Time_trans()函数可将Cache[i].Cachetime转化为时间戳格式
  183. //--查询节点延迟表,得到command.Destination与各个nodeIps的延迟,存到一个map类型中(Delay)
  184. Delay := make(map[string]int) // 延迟集合
  185. for i:=0; i<len(Cache); i++{
  186. Delay[Cache[i].NodeIP] = Query_NodeDelay(Destination, Cache[i].NodeIP)
  187. }
  188. //--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids
  189. }*/
  190. }
  191. return ramsg.NewCoorMoveRespOK(
  192. stg.NodeID,
  193. stg.Directory,
  194. object.Redundancy,
  195. object.ECName,
  196. hashs,
  197. ids,
  198. object.FileSizeInBytes,
  199. )
  200. }
  201. func (service *Service) TempCacheReport(msg *ramsg.TempCacheReport) {
  202. service.db.BatchInsertOrUpdateCache(msg.Hashes, msg.NodeID)
  203. }
  204. func (service *Service) AgentStatusReport(msg *ramsg.AgentStatusReport) {
  205. //jh:根据command中的Ip,插入节点延迟表,和节点表的NodeStatus
  206. //根据command中的Ip,插入节点延迟表
  207. // TODO
  208. /*
  209. ips := utils.GetAgentIps()
  210. Insert_NodeDelay(msg.IP, ips, msg.AgentDelay)
  211. //从配置表里读取节点地域NodeLocation
  212. //插入节点表的NodeStatus
  213. Insert_Node(msg.IP, msg.IP, msg.IPFSStatus, msg.LocalDirStatus)
  214. */
  215. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。