You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

mysql.go 11 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. package main
  2. // TODO 这里面的代码要移动到db库的db.go中
  3. /*
  4. import (
  5. "fmt"
  6. "time"
  7. _ "github.com/go-sql-driver/mysql"
  8. "github.com/jmoiron/sqlx"
  9. "gitlink.org.cn/cloudream/utils/consts"
  10. )
  11. // 数据库指针
  12. var db *sqlx.DB
  13. // 错误处理函数
  14. func HandleError(why string, err error) {
  15. if err != nil {
  16. fmt.Println(why, err)
  17. }
  18. }
  19. // 初始化数据库连接,init()方法系统会在动在main方法之前执行。
  20. func init() {
  21. database, err := sqlx.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/kx?charset=utf8mb4&parseTime=true")
  22. HandleError("open mysql failed,", err)
  23. db = database
  24. }
  25. // 节点延迟表插入
  26. func Insert_NodeDelay(innodeIP string, outnodeIP []string, delay []int) {
  27. insSql := "insert into NodeDelay values(?,?,?)"
  28. updateSql := "UPDATE NodeDelay SET DelayInMs=? WHERE InNodeIP=? AND OutNodeIP=?"
  29. for i := 0; i < len(outnodeIP); i++ {
  30. _, err := db.Exec(insSql, innodeIP, outnodeIP[i], delay[i])
  31. //HandleError("insert failed: ", err)
  32. if err != nil {
  33. _, e := db.Exec(updateSql, delay[i], innodeIP, outnodeIP[i])
  34. HandleError("update failed: ", e)
  35. }
  36. }
  37. }
  38. // 节点表插入
  39. func Insert_Node(nodeip string, nodelocation string, ipfsstatus string, localdirstatus string) {
  40. // 根据NodeIP查询,若不存在则插入,若存在则更新
  41. //查询
  42. type Node struct {
  43. NodeIP string `db:"NodeIP"`
  44. }
  45. var x Node
  46. sql := "select NodeIP from Node where NodeIP=?"
  47. err := db.Get(&x, sql, nodeip)
  48. HandleError("Get failed: ", err)
  49. //local和ipfs同时可达才可达
  50. // TODO 将status字段改成字符串(枚举值)
  51. NodeStatus := ipfsstatus == consts.IPFS_STATUS_OK && localdirstatus == consts.LOCAL_DIR_STATUS_OK
  52. //不存在才插入
  53. if x == (Node{}) {
  54. sql := "insert into Node values(?,?,?)"
  55. _, err := db.Exec(sql, nodeip, nodelocation, NodeStatus)
  56. HandleError("insert failed: ", err)
  57. } else {
  58. //存在则更新
  59. sql := "update Node set NodeStatus=? where NodeIP=?"
  60. _, err := db.Exec(sql, NodeStatus, nodeip)
  61. HandleError("update failed: ", err)
  62. }
  63. }
  64. // 纠删码对象表插入
  65. func Insert_EcObject(Object_Name string, Bucket_ID int, FileSizeInBytes int64, EcName string) (objectid int64) {
  66. // 根据objectname和bucketid查询,若不存在则插入,若存在则不操作
  67. //查询
  68. type Object struct {
  69. ObjectID int64 `db:"ObjectID"`
  70. ObjectName string `db:"ObjectName"`
  71. BucketID int `db:"BucketID"`
  72. }
  73. var x Object
  74. sql := "select ObjectID, ObjectName, BucketID from Object where ObjectName=? AND BucketID=?"
  75. err := db.Get(&x, sql, Object_Name, Bucket_ID)
  76. HandleError("Get failed: ", err)
  77. //不存在才插入
  78. if x == (Object{}) {
  79. sql := "insert into Object(ObjectName, BucketID, FileSizeInBytes, Redundancy, NumRep, EcName) values(?,?,?,?,?,?)"
  80. r, err := db.Exec(sql, Object_Name, Bucket_ID, FileSizeInBytes, false, "-1", EcName)
  81. HandleError("insert failed: ", err)
  82. id, err := r.LastInsertId()
  83. HandleError("exec failed: ", err)
  84. objectid = id
  85. } else {
  86. objectid = x.ObjectID
  87. }
  88. return
  89. }
  90. // 多副本对象表插入
  91. func Insert_RepObject(Object_Name string, Bucket_ID int, FileSizeInBytes int64, NumRep int) (objectid int64) {
  92. // 根据objectname和bucketid查询,若不存在则插入,若存在则不操作
  93. //查询
  94. type Object struct {
  95. ObjectID int64 `db:"ObjectID"`
  96. ObjectName string `db:"ObjectName"`
  97. BucketID int `db:"BucketID"`
  98. }
  99. var x Object
  100. sql := "select ObjectID, ObjectName, BucketID from Object where ObjectName=? AND BucketID=?"
  101. err := db.Get(&x, sql, Object_Name, Bucket_ID)
  102. HandleError("Get failed: ", err)
  103. //不存在才插入
  104. if x == (Object{}) {
  105. sql := "insert into Object(ObjectName, BucketID, FileSizeInBytes, Redundancy, NumRep) values(?,?,?,?,?)"
  106. r, err := db.Exec(sql, Object_Name, Bucket_ID, FileSizeInBytes, true, NumRep)
  107. HandleError("insert failed: ", err)
  108. id, err := r.LastInsertId()
  109. HandleError("exec failed: ", err)
  110. objectid = id
  111. } else {
  112. objectid = x.ObjectID
  113. }
  114. return
  115. }
  116. // 对象编码块表插入
  117. func Insert_EcObjectBlock(Object_ID int64, Inner_ID int) {
  118. // 根据objectID查询,若不存在则插入,若存在则不操作
  119. //查询
  120. type Block struct {
  121. ObjectID int64 `db:"ObjectID"`
  122. }
  123. var x []Block
  124. sql := "select ObjectID from ObjectBlock where ObjectID=? AND InnerID=?"
  125. err := db.Select(&x, sql, Object_ID, Inner_ID)
  126. HandleError("select failed: ", err)
  127. //不存在才插入
  128. if x == nil {
  129. sql := "insert into ObjectBlock(ObjectID, InnerID) values(?,?)"
  130. //执行SQL语句
  131. _, err := db.Exec(sql, Object_ID, Inner_ID)
  132. HandleError("insert failed: ", err)
  133. //查询最后一条用户ID,判断是否插入成功
  134. // id, err := r.LastInsertId()
  135. // HandleError("exec failed: ", err)
  136. // fmt.Println("insert EcObjectBlock succ: ", id)
  137. }
  138. }
  139. // 对象副本表插入
  140. func Insert_ObjectRep(Object_ID int64) {
  141. sql := "insert into ObjectRep(ObjectID) values(?)"
  142. _, err := db.Exec(sql, Object_ID)
  143. HandleError("insert failed: ", err)
  144. }
  145. // 对象编码块表Echash插入
  146. func Insert_EcHash(Object_ID int, Hashs []string) {
  147. for i := 0; i < len(Hashs); i++ {
  148. sql := "update ObjectBlock set BlockHash =? where ObjectID = ? AND InnerID = ?"
  149. _, err := db.Exec(sql, Hashs[i], Object_ID, i)
  150. HandleError("insert failed: ", err)
  151. }
  152. }
  153. // 对象副本表rephash插入
  154. func Insert_RepHash(Object_ID int, Hashs string) {
  155. sql := "update ObjectRep set RepHash =? where ObjectID = ?"
  156. _, err := db.Exec(sql, Hashs, Object_ID)
  157. HandleError("insert failed: ", err)
  158. }
  159. // 缓存表插入
  160. func Insert_Cache(Hashs []string, Ips []string, TempOrPin bool) {
  161. for i := 0; i < len(Hashs); i++ {
  162. sql := "insert into Cache values(?,?,?,?)"
  163. _, err := db.Exec(sql, Hashs[i], Ips[i], TempOrPin, time.Now())
  164. HandleError("insert failed: ", err)
  165. }
  166. return
  167. }
  168. func Query_ObjectID(objectname string) (objectid int) {
  169. type Object struct {
  170. ObjectID int `db:"ObjectID"`
  171. }
  172. var x Object
  173. sql := "select ObjectID from Object where ObjectName=? "
  174. err := db.Get(&x, sql, objectname)
  175. HandleError("Get failed: ", err)
  176. if x != (Object{}) {
  177. objectid = x.ObjectID
  178. } else {
  179. fmt.Println("Object not found!")
  180. }
  181. // fmt.Println("select bucketid succ:",bucketid)
  182. return
  183. }
  184. // 根据BucketName查询BucketID
  185. func Query_BucketID(bucketname string) (bucketid int) {
  186. //桶结构体
  187. type Bucket struct {
  188. BucketID int `db:"BucketID"`
  189. BucketName string `db:"BucketName"`
  190. }
  191. var x Bucket
  192. sql := "select BucketID, BucketName from Bucket where BucketName=? "
  193. err := db.Get(&x, sql, bucketname)
  194. HandleError("Get failed: ", err)
  195. if x != (Bucket{}) {
  196. bucketid = x.BucketID
  197. } else {
  198. fmt.Println("Bucket not found!")
  199. bucketid = -1
  200. }
  201. // fmt.Println("select bucketid succ:",bucketid)
  202. return
  203. }
  204. // 根据用户id查询可用nodeip
  205. func Query_UserNode(user_id int) []string {
  206. //用户节点结构体
  207. type UserNode struct {
  208. UserID int `db:"UserID"`
  209. NodeIP string `db:"NodeIP"`
  210. }
  211. var x []UserNode
  212. var node_ip []string
  213. sql := "select UserID, NodeIP from UserNode where UserID=? "
  214. err := db.Select(&x, sql, user_id)
  215. HandleError("select failed: ", err)
  216. for _, value := range x {
  217. node_ip = append(node_ip, value.NodeIP)
  218. }
  219. fmt.Println("select node_ip succ:", node_ip)
  220. return node_ip
  221. }
  222. // 根据objectname和bucketid查询对象表,获得redundancy,EcName,fileSizeInBytes
  223. func Query_Object(objectname string, bucketid int) (objectid int, filesizeinbytes int64, redundancy bool, ecname string) {
  224. //对象结构体
  225. type Object struct {
  226. ObjectID int `db:"ObjectID"`
  227. FileSizeInBytes int64 `db:"FileSizeInBytes"`
  228. Redundancy bool `db:"Redundancy"`
  229. EcName string `db:"EcName"`
  230. }
  231. var x Object
  232. sql := "select ObjectID, FileSizeInBytes, Redundancy, EcName from Object where ObjectName=? AND BucketID=?"
  233. err := db.Get(&x, sql, objectname, bucketid)
  234. HandleError("Get failed: ", err)
  235. if x != (Object{}) {
  236. objectid = x.ObjectID
  237. filesizeinbytes = x.FileSizeInBytes
  238. redundancy = x.Redundancy
  239. ecname = x.EcName
  240. } else {
  241. fmt.Println("Object not found!")
  242. }
  243. return
  244. }
  245. // 查询对象副本表
  246. func Query_ObjectRep(objectid int) (repHash string) {
  247. //对象结构体
  248. type ObjectRep struct {
  249. RepHash string `db:"RepHash"`
  250. }
  251. var x ObjectRep
  252. sql := "select RepHash from ObjectRep where ObjectID=?"
  253. err := db.Get(&x, sql, objectid)
  254. HandleError("Get failed: ", err)
  255. if x != (ObjectRep{}) {
  256. repHash = x.RepHash
  257. } else {
  258. fmt.Println("ObjectRep not found!")
  259. }
  260. return
  261. }
  262. // 对象编码块结构体
  263. type ObjectBlock struct {
  264. InnerID int `db:"InnerID"`
  265. BlockHash string `db:"BlockHash"`
  266. }
  267. // 查询对象编码块表
  268. func Query_ObjectBlock(Object_ID int) (x []ObjectBlock) {
  269. sql := "select InnerID, BlockHash from ObjectBlock where ObjectID=?"
  270. err := db.Select(&x, sql, Object_ID)
  271. HandleError("select failed: ", err)
  272. return
  273. }
  274. // 将时间字符串转化为时间戳格式(s)
  275. func Time_trans(time_string string) (timestamp int64) {
  276. timeTemplate1 := "2006-01-02 15:04:05"
  277. stamp, _ := time.ParseInLocation(timeTemplate1, time_string, time.Local) //使用parseInLocation将字符串格式化返回本地时区时间
  278. timestamp = stamp.Unix()
  279. return
  280. }
  281. // 缓存结构体
  282. type Cache struct {
  283. NodeIP string `db:"NodeIP"`
  284. TempOrPin bool `db:"TempOrPin"`
  285. Cachetime string `db:"Cachetime"`
  286. }
  287. // 查询缓存表
  288. func Query_Cache(BlockHash string) (x []Cache) {
  289. sql := "select NodeIP, TempOrPin, Cachetime from Cache where HashValue=?"
  290. err := db.Select(&x, sql, BlockHash)
  291. HandleError("Get failed: ", err)
  292. return
  293. }
  294. // 更新缓存表
  295. func Update_Cache(BlockHash string, nodeip string) (x Cache) {
  296. //根据hash和nodeip查询缓存表里是否存在此条记录
  297. sql := "select NodeIP, TempOrPin, Cachetime from Cache where HashValue=? AND NodeIP=?"
  298. err := db.Select(&x, sql, BlockHash, nodeip)
  299. HandleError("Get failed: ", err)
  300. //若在表中已存在且所对应的TempOrPin字段为true,则更新Time
  301. if x.TempOrPin == true {
  302. sql = "update Cache set Cachetime=? where HashValue=? AND NodeIP=?"
  303. _, err := db.Exec(sql, time.Now(), BlockHash, nodeip)
  304. HandleError("update failed: ", err)
  305. }
  306. return
  307. }
  308. // 查询节点延迟表
  309. func Query_NodeDelay(innodeip string, outnodeip string) (delay int) {
  310. //节点延迟结构体
  311. type NodeDelay struct {
  312. DelayInMs int `db:"DelayInMs"`
  313. }
  314. var x NodeDelay
  315. sql := "select DelayInMs from NodeDelay where InNodeIP=? AND OutNodeIP=?"
  316. err := db.Get(&x, sql, innodeip, outnodeip)
  317. HandleError("Get failed: ", err)
  318. if x != (NodeDelay{}) {
  319. delay = x.DelayInMs
  320. } else {
  321. fmt.Println("NodeDelay not found!")
  322. }
  323. return
  324. }
  325. */

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。