You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

command_service.go 8.5 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package services
  2. import (
  3. log "github.com/sirupsen/logrus"
  4. ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
  5. "gitlink.org.cn/cloudream/utils"
  6. "gitlink.org.cn/cloudream/utils/consts"
  7. "gitlink.org.cn/cloudream/utils/consts/errorcode"
  8. )
  9. func (service *Service) Read(msg *ramsg.ReadCommand) ramsg.ReadResp {
  10. var hashes []string
  11. blockIDs := []int{0}
  12. // 查询文件对象
  13. object, err := service.db.QueryObjectByID(msg.ObjectID)
  14. if err != nil {
  15. log.WithField("ObjectID", msg.ObjectID).
  16. Warnf("query Object failed, err: %s", err.Error())
  17. return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query Object failed")
  18. }
  19. var nodeIPs []string
  20. //-若redundancy是rep,查询对象副本表, 获得repHash
  21. if object.Redundancy == consts.REDUNDANCY_REP {
  22. objectRep, err := service.db.QueryObjectRep(object.ObjectID)
  23. if err != nil {
  24. log.WithField("ObjectID", object.ObjectID).
  25. Warnf("query ObjectRep failed, err: %s", err.Error())
  26. return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query ObjectRep failed")
  27. }
  28. hashes = append(hashes, objectRep.RepHash)
  29. nodes, err := service.db.QueryCacheNodeByBlockHash(objectRep.RepHash)
  30. if err != nil {
  31. log.WithField("RepHash", objectRep.RepHash).
  32. Warnf("query Cache failed, err: %s", err.Error())
  33. return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query Cache failed")
  34. }
  35. for _, node := range nodes {
  36. nodeIPs = append(nodeIPs, node.IP)
  37. }
  38. } else {
  39. blocks, err := service.db.QueryObjectBlock(object.ObjectID)
  40. if err != nil {
  41. log.WithField("ObjectID", object.ObjectID).
  42. Warnf("query Object Block failed, err: %s", err.Error())
  43. return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query Object Block failed")
  44. }
  45. ecPolicies := *utils.GetEcPolicy()
  46. ecPolicy := ecPolicies[*object.ECName]
  47. ecN := ecPolicy.GetN()
  48. ecK := ecPolicy.GetK()
  49. nodeIPs = make([]string, ecN)
  50. hashes = make([]string, ecN)
  51. for _, tt := range blocks {
  52. id := tt.InnerID
  53. hash := tt.BlockHash
  54. hashes[id] = hash //这里有问题,采取的其实是直接顺序读的方式,等待加入自适应读模块
  55. nodes, err := service.db.QueryCacheNodeByBlockHash(hash)
  56. if err != nil {
  57. log.WithField("BlockHash", hash).
  58. Warnf("query Cache failed, err: %s", err.Error())
  59. return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "query Cache failed")
  60. }
  61. if len(nodes) == 0 {
  62. log.WithField("BlockHash", hash).
  63. Warnf("No node cache the block data for the BlockHash")
  64. return ramsg.NewCoorReadRespFailed(errorcode.OPERATION_FAILED, "No node cache the block data for the BlockHash")
  65. }
  66. nodeIPs[id] = nodes[0].IP
  67. }
  68. //这里也有和上面一样的问题
  69. for i := 1; i < ecK; i++ {
  70. blockIDs = append(blockIDs, i)
  71. }
  72. }
  73. return ramsg.NewCoorReadRespOK(
  74. object.Redundancy,
  75. nodeIPs,
  76. hashes,
  77. blockIDs,
  78. object.ECName,
  79. object.FileSizeInBytes,
  80. )
  81. }
  82. func (service *Service) RepWrite(msg *ramsg.RepWriteCommand) ramsg.WriteResp {
  83. // TODO 需要在此处判断同名对象是否存在。等到WriteRepHash时再判断一次。
  84. // 此次的判断只作为参考,具体是否成功还是看WriteRepHash的结果
  85. //查询用户可用的节点IP
  86. nodes, err := service.db.QueryUserNodes(msg.UserID)
  87. if err != nil {
  88. log.Warnf("query user nodes failed, err: %s", err.Error())
  89. return ramsg.NewCoorWriteRespFailed(errorcode.OPERATION_FAILED, "query user nodes failed")
  90. }
  91. if len(nodes) < msg.ReplicateNumber {
  92. log.WithField("UserID", msg.UserID).
  93. WithField("ReplicateNumber", msg.ReplicateNumber).
  94. Warnf("user nodes are not enough")
  95. return ramsg.NewCoorWriteRespFailed(errorcode.OPERATION_FAILED, "user nodes are not enough")
  96. }
  97. numRep := msg.ReplicateNumber
  98. ids := make([]int, numRep)
  99. ips := make([]string, numRep)
  100. //随机选取numRep个nodeIp
  101. start := utils.GetRandInt(len(nodes))
  102. for i := 0; i < numRep; i++ {
  103. index := (start + i) % len(nodes)
  104. ids[i] = nodes[index].NodeID
  105. ips[i] = nodes[index].IP
  106. }
  107. return ramsg.NewCoorWriteRespOK(ids, ips)
  108. }
  109. func (service *Service) WriteRepHash(msg *ramsg.WriteRepHashCommand) ramsg.WriteHashResp {
  110. _, err := service.db.CreateRepObject(msg.BucketID, msg.ObjectName, msg.FileSizeInBytes, msg.ReplicateNumber, msg.NodeIDs, msg.Hashes)
  111. if err != nil {
  112. log.WithField("BucketName", msg.BucketID).
  113. WithField("ObjectName", msg.ObjectName).
  114. Warnf("create rep object failed, err: %s", err.Error())
  115. return ramsg.NewCoorWriteHashRespFailed(errorcode.OPERATION_FAILED, "create rep object failed")
  116. }
  117. return ramsg.NewCoorWriteHashRespOK()
  118. }
  119. func (service *Service) Move(msg *ramsg.MoveCommand) ramsg.MoveResp {
  120. //查询数据库,获取冗余类型,冗余参数
  121. //jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSizeInBytes
  122. //-若redundancy是rep,查询对象副本表, 获得repHash
  123. //--ids :={0}
  124. //--hashs := {repHash}
  125. //-若redundancy是ec,查询对象编码块表,获得blockHashs, ids(innerID),
  126. //--查询缓存表,获得每个hash的nodeIps、TempOrPins、Times
  127. //--查询节点延迟表,得到command.destination与各个nodeIps的的延迟,存到一个map类型中(Delay)
  128. //--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids
  129. // 查询用户关联的存储服务
  130. stg, err := service.db.QueryUserStorage(msg.UserID, msg.StorageID)
  131. if err != nil {
  132. log.WithField("UserID", msg.UserID).
  133. WithField("StorageID", msg.StorageID).
  134. Warnf("query storage directory failed, err: %s", err.Error())
  135. return ramsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query storage directory failed")
  136. }
  137. // 查询文件对象
  138. object, err := service.db.QueryObjectByID(msg.ObjectID)
  139. if err != nil {
  140. log.WithField("ObjectID", msg.ObjectID).
  141. Warnf("query Object failed, err: %s", err.Error())
  142. return ramsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query Object failed")
  143. }
  144. //-若redundancy是rep,查询对象副本表, 获得repHash
  145. var hashs []string
  146. ids := []int{0}
  147. if object.Redundancy == consts.REDUNDANCY_REP {
  148. objectRep, err := service.db.QueryObjectRep(object.ObjectID)
  149. if err != nil {
  150. log.Warnf("query ObjectRep failed, err: %s", err.Error())
  151. return ramsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query ObjectRep failed")
  152. }
  153. hashs = append(hashs, objectRep.RepHash)
  154. } else {
  155. blockHashs, err := service.db.QueryObjectBlock(object.ObjectID)
  156. if err != nil {
  157. log.Warnf("query ObjectBlock failed, err: %s", err.Error())
  158. return ramsg.NewCoorMoveRespFailed(errorcode.OPERATION_FAILED, "query ObjectBlock failed")
  159. }
  160. ecPolicies := *utils.GetEcPolicy()
  161. ecPolicy := ecPolicies[*object.ECName]
  162. ecN := ecPolicy.GetN()
  163. ecK := ecPolicy.GetK()
  164. ids = make([]int, ecK)
  165. for i := 0; i < ecN; i++ {
  166. hashs = append(hashs, "-1")
  167. }
  168. for i := 0; i < ecK; i++ {
  169. ids[i] = i
  170. }
  171. hashs = make([]string, ecN)
  172. for _, tt := range blockHashs {
  173. id := tt.InnerID
  174. hash := tt.BlockHash
  175. hashs[id] = hash
  176. }
  177. //--查询缓存表,获得每个hash的nodeIps、TempOrPins、Times
  178. /*for id,hash := range blockHashs{
  179. //type Cache struct {NodeIP string,TempOrPin bool,Cachetime string}
  180. Cache := Query_Cache(hash)
  181. //利用Time_trans()函数可将Cache[i].Cachetime转化为时间戳格式
  182. //--查询节点延迟表,得到command.Destination与各个nodeIps的延迟,存到一个map类型中(Delay)
  183. Delay := make(map[string]int) // 延迟集合
  184. for i:=0; i<len(Cache); i++{
  185. Delay[Cache[i].NodeIP] = Query_NodeDelay(Destination, Cache[i].NodeIP)
  186. }
  187. //--kx:根据查出来的hash/hashs、nodeIps、TempOrPins、Times(移动/读取策略)、Delay确定hashs、ids
  188. }*/
  189. }
  190. return ramsg.NewCoorMoveRespOK(
  191. stg.NodeID,
  192. stg.Directory,
  193. object.Redundancy,
  194. object.ECName,
  195. hashs,
  196. ids,
  197. object.FileSizeInBytes,
  198. )
  199. }
  200. func (service *Service) TempCacheReport(msg *ramsg.TempCacheReport) {
  201. service.db.BatchInsertOrUpdateCache(msg.Hashes, msg.NodeID)
  202. }
  203. func (service *Service) AgentStatusReport(msg *ramsg.AgentStatusReport) {
  204. //jh:根据command中的Ip,插入节点延迟表,和节点表的NodeStatus
  205. //根据command中的Ip,插入节点延迟表
  206. // TODO
  207. /*
  208. ips := utils.GetAgentIps()
  209. Insert_NodeDelay(msg.IP, ips, msg.AgentDelay)
  210. //从配置表里读取节点地域NodeLocation
  211. //插入节点表的NodeStatus
  212. Insert_Node(msg.IP, msg.IP, msg.IPFSStatus, msg.LocalDirStatus)
  213. */
  214. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。