You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

downloader.go 4.7 kB

7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. package downloader
  2. import (
  3. "fmt"
  4. "io"
  5. lru "github.com/hashicorp/golang-lru/v2"
  6. "gitlink.org.cn/cloudream/common/pkgs/iterator"
  7. "gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
  8. "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader/strategy"
  9. clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
  10. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity"
  11. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool"
  12. )
  13. const (
  14. DefaultMaxStripCacheCount = 128
  15. )
  16. type DownloadIterator = iterator.Iterator[*Downloading]
  17. type DownloadReqeust struct {
  18. ObjectID clitypes.ObjectID
  19. Offset int64
  20. Length int64
  21. }
  22. type downloadReqeust2 struct {
  23. Detail *clitypes.ObjectDetail
  24. Raw DownloadReqeust
  25. }
  26. type Downloading struct {
  27. Object *clitypes.Object
  28. File io.ReadCloser // 文件流,如果文件不存在,那么为nil
  29. Request DownloadReqeust
  30. }
  31. type Downloader struct {
  32. strips *StripCache
  33. cfg Config
  34. conn *connectivity.Collector
  35. stgPool *pool.Pool
  36. selector *strategy.Selector
  37. db *db.DB
  38. }
  39. func NewDownloader(cfg Config, conn *connectivity.Collector, stgPool *pool.Pool, sel *strategy.Selector, db *db.DB) *Downloader {
  40. if cfg.MaxStripCacheCount == 0 {
  41. cfg.MaxStripCacheCount = DefaultMaxStripCacheCount
  42. }
  43. ch, _ := lru.New[ECStripKey, ObjectECStrip](cfg.MaxStripCacheCount)
  44. return &Downloader{
  45. strips: ch,
  46. cfg: cfg,
  47. conn: conn,
  48. stgPool: stgPool,
  49. selector: sel,
  50. db: db,
  51. }
  52. }
  53. func (d *Downloader) DownloadObjects(reqs []DownloadReqeust) DownloadIterator {
  54. objIDs := make([]clitypes.ObjectID, len(reqs))
  55. for i, req := range reqs {
  56. objIDs[i] = req.ObjectID
  57. }
  58. if len(objIDs) == 0 {
  59. return iterator.Empty[*Downloading]()
  60. }
  61. objDetails, err := db.DoTx11(d.db, d.db.Object().BatchGetDetails, objIDs)
  62. if err != nil {
  63. return iterator.FuseError[*Downloading](fmt.Errorf("request to db: %w", err))
  64. }
  65. detailsMap := make(map[clitypes.ObjectID]*clitypes.ObjectDetail)
  66. for _, detail := range objDetails {
  67. d := detail
  68. detailsMap[detail.Object.ObjectID] = &d
  69. }
  70. req2s := make([]downloadReqeust2, len(reqs))
  71. for i, req := range reqs {
  72. req2s[i] = downloadReqeust2{
  73. Detail: detailsMap[req.ObjectID],
  74. Raw: req,
  75. }
  76. }
  77. return NewDownloadObjectIterator(d, req2s)
  78. }
  79. func (d *Downloader) DownloadObjectByDetail(detail clitypes.ObjectDetail, off int64, length int64) (*Downloading, error) {
  80. req2s := []downloadReqeust2{{
  81. Detail: &detail,
  82. Raw: DownloadReqeust{
  83. ObjectID: detail.Object.ObjectID,
  84. Offset: off,
  85. Length: length,
  86. },
  87. }}
  88. iter := NewDownloadObjectIterator(d, req2s)
  89. return iter.MoveNext()
  90. }
  91. func (d *Downloader) DownloadPackage(pkgID clitypes.PackageID, prefix string) (clitypes.Package, DownloadIterator, error) {
  92. pkg, details, err := db.DoTx02(d.db, func(tx db.SQLContext) (clitypes.Package, []clitypes.ObjectDetail, error) {
  93. pkg, err := d.db.Package().GetByID(tx, pkgID)
  94. if err != nil {
  95. return clitypes.Package{}, nil, err
  96. }
  97. var details []clitypes.ObjectDetail
  98. if prefix != "" {
  99. objs, err := d.db.Object().GetWithPathPrefix(tx, pkgID, prefix)
  100. if err != nil {
  101. return clitypes.Package{}, nil, err
  102. }
  103. objIDs := make([]clitypes.ObjectID, len(objs))
  104. for i, obj := range objs {
  105. objIDs[i] = obj.ObjectID
  106. }
  107. allBlocks, err := d.db.ObjectBlock().BatchGetByObjectID(tx, objIDs)
  108. if err != nil {
  109. return clitypes.Package{}, nil, err
  110. }
  111. allPinnedObjs, err := d.db.PinnedObject().BatchGetByObjectID(tx, objIDs)
  112. if err != nil {
  113. return clitypes.Package{}, nil, err
  114. }
  115. details = make([]clitypes.ObjectDetail, 0, len(objs))
  116. for _, obj := range objs {
  117. detail := clitypes.ObjectDetail{
  118. Object: obj,
  119. }
  120. details = append(details, detail)
  121. }
  122. clitypes.DetailsFillObjectBlocks(details, allBlocks)
  123. clitypes.DetailsFillPinnedAt(details, allPinnedObjs)
  124. } else {
  125. details, err = d.db.Object().GetPackageObjectDetails(tx, pkgID)
  126. if err != nil {
  127. return clitypes.Package{}, nil, err
  128. }
  129. }
  130. return pkg, details, nil
  131. })
  132. if err != nil {
  133. return clitypes.Package{}, nil, err
  134. }
  135. req2s := make([]downloadReqeust2, len(details))
  136. for i, objDetail := range details {
  137. dt := objDetail
  138. req2s[i] = downloadReqeust2{
  139. Detail: &dt,
  140. Raw: DownloadReqeust{
  141. ObjectID: objDetail.Object.ObjectID,
  142. Offset: 0,
  143. Length: objDetail.Object.Size,
  144. },
  145. }
  146. }
  147. return pkg, NewDownloadObjectIterator(d, req2s), nil
  148. }
  149. type ObjectECStrip struct {
  150. Data []byte
  151. ObjectFileHash clitypes.FileHash // 添加这条缓存时,Object的FileHash
  152. }
  153. type ECStripKey struct {
  154. ObjectID clitypes.ObjectID
  155. StripIndex int64
  156. }
  157. type StripCache = lru.Cache[ECStripKey, ObjectECStrip]

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。