You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

base_store.go 8.1 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. package ops2
  2. import (
  3. "fmt"
  4. "io"
  5. "time"
  6. "gitlink.org.cn/cloudream/common/pkgs/future"
  7. "gitlink.org.cn/cloudream/common/pkgs/logger"
  8. "gitlink.org.cn/cloudream/common/utils/io2"
  9. clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
  10. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag"
  11. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec"
  12. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2"
  13. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool"
  14. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
  15. )
  16. const (
  17. BaseReadStatsStoreKey = "BaseReadSpeed"
  18. )
  19. func init() {
  20. exec.UseOp[*BaseWrite]()
  21. exec.UseOp[*BaseRead]()
  22. exec.UseOp[*BaseReadDyn]()
  23. exec.UseVarValue[*BaseReadStatsValue]()
  24. }
  25. type BaseReadStatsValue struct {
  26. Size int64
  27. Time time.Duration
  28. Location exec.Location
  29. }
  30. func (v *BaseReadStatsValue) Clone() exec.VarValue {
  31. return &BaseReadStatsValue{
  32. Size: v.Size,
  33. Time: v.Time,
  34. Location: v.Location,
  35. }
  36. }
  37. type BaseRead struct {
  38. Output exec.VarID
  39. UserSpace clitypes.UserSpaceDetail
  40. Path clitypes.JPath
  41. Option types.OpenOption
  42. }
  43. func (o *BaseRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
  44. logger.
  45. WithField("Output", o.Output).
  46. WithField("UserSpace", o.UserSpace).
  47. WithField("Path", o.Path).
  48. Debug("base read")
  49. defer logger.Debug("base read end")
  50. stgPool, err := exec.GetValueByType[*pool.Pool](ctx)
  51. if err != nil {
  52. return fmt.Errorf("getting storage pool: %w", err)
  53. }
  54. store, err := stgPool.GetBaseStore(&o.UserSpace)
  55. if err != nil {
  56. return fmt.Errorf("getting base store of storage %v: %w", o.UserSpace, err)
  57. }
  58. stream, err := store.Read(o.Path, o.Option)
  59. if err != nil {
  60. return fmt.Errorf("reading object %v: %w", o.Path, err)
  61. }
  62. startTime := time.Now()
  63. counter := io2.CounterCloser(stream, func(cnt int64, err error) {
  64. if err != nil && err != io.EOF {
  65. return
  66. }
  67. // 要注意这个回调一定要在return之前调用
  68. e.Store(BaseReadStatsStoreKey, &BaseReadStatsValue{
  69. Size: cnt,
  70. Time: time.Since(startTime),
  71. Location: e.Location(),
  72. })
  73. })
  74. fut := future.NewSetVoid()
  75. output := &exec.StreamValue{
  76. Stream: io2.AfterReadClosed(counter, func(closer io.ReadCloser) {
  77. fut.SetVoid()
  78. }),
  79. }
  80. e.PutVar(o.Output, output)
  81. return fut.Wait(ctx.Context)
  82. }
  83. func (o *BaseRead) String() string {
  84. return fmt.Sprintf("BaseRead(opt=%v) %v:%v -> %v", o.Option, o.UserSpace, o.Path, o.Output)
  85. }
  86. type BaseReadDyn struct {
  87. UserSpace clitypes.UserSpaceDetail
  88. Output exec.VarID
  89. FileInfo exec.VarID
  90. Option types.OpenOption
  91. }
  92. func (o *BaseReadDyn) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
  93. logger.
  94. WithField("Output", o.Output).
  95. WithField("UserSpace", o.UserSpace).
  96. WithField("Path", o.FileInfo).
  97. Debug("base read dynamic")
  98. defer logger.Debug("base read dynamic end")
  99. stgPool, err := exec.GetValueByType[*pool.Pool](ctx)
  100. if err != nil {
  101. return fmt.Errorf("getting storage pool: %w", err)
  102. }
  103. info, err := exec.BindVar[*FileInfoValue](e, ctx.Context, o.FileInfo)
  104. if err != nil {
  105. return err
  106. }
  107. store, err := stgPool.GetBaseStore(&o.UserSpace)
  108. if err != nil {
  109. return fmt.Errorf("getting base store of storage %v: %w", o.UserSpace, err)
  110. }
  111. stream, err := store.Read(info.Path, o.Option)
  112. if err != nil {
  113. logger.Warnf("reading file %v: %v", info.Path, err)
  114. return fmt.Errorf("reading object %v: %w", o.FileInfo, err)
  115. }
  116. startTime := time.Now()
  117. counter := io2.CounterCloser(stream, func(cnt int64, err error) {
  118. if err != nil && err != io.EOF {
  119. return
  120. }
  121. // 要注意这个回调一定要在return之前调用
  122. e.Store(BaseReadStatsStoreKey, &BaseReadStatsValue{
  123. Size: cnt,
  124. Time: time.Since(startTime),
  125. Location: e.Location(),
  126. })
  127. })
  128. fut := future.NewSetVoid()
  129. output := &exec.StreamValue{
  130. Stream: io2.AfterReadClosed(counter, func(closer io.ReadCloser) {
  131. fut.SetVoid()
  132. }),
  133. }
  134. e.PutVar(o.Output, output)
  135. return fut.Wait(ctx.Context)
  136. }
  137. func (o *BaseReadDyn) String() string {
  138. return fmt.Sprintf("BaseReadDyn(opt=%v) %v:%v -> %v", o.Option, o.UserSpace, o.FileInfo, o.Output)
  139. }
  140. type BaseWrite struct {
  141. Input exec.VarID
  142. UserSpace clitypes.UserSpaceDetail
  143. Path clitypes.JPath
  144. FileInfo exec.VarID
  145. Option types.WriteOption
  146. }
  147. func (o *BaseWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
  148. logger.
  149. WithField("Input", o.Input).
  150. Debugf("write file to base store")
  151. defer logger.Debugf("write file to base store finished")
  152. stgPool, err := exec.GetValueByType[*pool.Pool](ctx)
  153. if err != nil {
  154. return fmt.Errorf("getting storage pool: %w", err)
  155. }
  156. store, err := stgPool.GetBaseStore(&o.UserSpace)
  157. if err != nil {
  158. return fmt.Errorf("getting base store of storage %v: %w", o.UserSpace, err)
  159. }
  160. input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input)
  161. if err != nil {
  162. return err
  163. }
  164. defer input.Stream.Close()
  165. ret, err := store.Write(o.Path, input.Stream, o.Option)
  166. if err != nil {
  167. return err
  168. }
  169. e.PutVar(o.FileInfo, &FileInfoValue{
  170. FileInfo: ret,
  171. })
  172. return nil
  173. }
  174. func (o *BaseWrite) String() string {
  175. return fmt.Sprintf("BaseWrite %v -> %v:%v, %v", o.Input, o.UserSpace, o.Path, o.FileInfo)
  176. }
  177. type BaseReadNode struct {
  178. dag.NodeBase
  179. From ioswitch2.From
  180. UserSpace clitypes.UserSpaceDetail
  181. Path clitypes.JPath
  182. Option types.OpenOption
  183. }
  184. func (b *GraphNodeBuilder) NewBaseRead(from ioswitch2.From, userSpace clitypes.UserSpaceDetail, path clitypes.JPath, opt types.OpenOption) *BaseReadNode {
  185. node := &BaseReadNode{
  186. From: from,
  187. UserSpace: userSpace,
  188. Path: path,
  189. Option: opt,
  190. }
  191. b.AddNode(node)
  192. node.OutputStreams().Init(node, 1)
  193. return node
  194. }
  195. func (t *BaseReadNode) GetFrom() ioswitch2.From {
  196. return t.From
  197. }
  198. func (t *BaseReadNode) Output() dag.StreamOutputSlot {
  199. return dag.StreamOutputSlot{
  200. Node: t,
  201. Index: 0,
  202. }
  203. }
  204. func (t *BaseReadNode) GenerateOp() (exec.Op, error) {
  205. return &BaseRead{
  206. Output: t.Output().Var().VarID,
  207. UserSpace: t.UserSpace,
  208. Path: t.Path,
  209. Option: t.Option,
  210. }, nil
  211. }
  212. type BaseReadDynNode struct {
  213. dag.NodeBase
  214. From ioswitch2.From
  215. UserSpace clitypes.UserSpaceDetail
  216. Option types.OpenOption
  217. }
  218. func (b *GraphNodeBuilder) NewBaseReadDyn(from ioswitch2.From, userSpace clitypes.UserSpaceDetail, opt types.OpenOption) *BaseReadDynNode {
  219. node := &BaseReadDynNode{
  220. From: from,
  221. UserSpace: userSpace,
  222. Option: opt,
  223. }
  224. b.AddNode(node)
  225. node.OutputStreams().Init(node, 1)
  226. node.InputValues().Init(1)
  227. return node
  228. }
  229. func (t *BaseReadDynNode) GetFrom() ioswitch2.From {
  230. return t.From
  231. }
  232. func (t *BaseReadDynNode) FileInfoSlot() dag.ValueInputSlot {
  233. return dag.ValueInputSlot{
  234. Node: t,
  235. Index: 0,
  236. }
  237. }
  238. func (t *BaseReadDynNode) Output() dag.StreamOutputSlot {
  239. return dag.StreamOutputSlot{
  240. Node: t,
  241. Index: 0,
  242. }
  243. }
  244. func (t *BaseReadDynNode) GenerateOp() (exec.Op, error) {
  245. return &BaseReadDyn{
  246. UserSpace: t.UserSpace,
  247. Output: t.Output().Var().VarID,
  248. FileInfo: t.FileInfoSlot().Var().VarID,
  249. Option: t.Option,
  250. }, nil
  251. }
  252. type BaseWriteNode struct {
  253. dag.NodeBase
  254. To ioswitch2.To
  255. UserSpace clitypes.UserSpaceDetail
  256. Path clitypes.JPath
  257. Option types.WriteOption
  258. }
  259. func (b *GraphNodeBuilder) NewBaseWrite(to ioswitch2.To, userSpace clitypes.UserSpaceDetail, path clitypes.JPath, opt types.WriteOption) *BaseWriteNode {
  260. node := &BaseWriteNode{
  261. To: to,
  262. UserSpace: userSpace,
  263. Path: path,
  264. Option: opt,
  265. }
  266. b.AddNode(node)
  267. node.InputStreams().Init(1)
  268. node.OutputValues().Init(node, 1)
  269. return node
  270. }
  271. func (t *BaseWriteNode) GetTo() ioswitch2.To {
  272. return t.To
  273. }
  274. func (t *BaseWriteNode) Input() dag.StreamInputSlot {
  275. return dag.StreamInputSlot{
  276. Node: t,
  277. Index: 0,
  278. }
  279. }
  280. func (t *BaseWriteNode) FileInfoVar() dag.ValueOutputSlot {
  281. return dag.ValueOutputSlot{
  282. Node: t,
  283. Index: 0,
  284. }
  285. }
  286. func (t *BaseWriteNode) GenerateOp() (exec.Op, error) {
  287. return &BaseWrite{
  288. Input: t.InputStreams().Get(0).VarID,
  289. UserSpace: t.UserSpace,
  290. Path: t.Path,
  291. FileInfo: t.FileInfoVar().Var().VarID,
  292. Option: t.Option,
  293. }, nil
  294. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。