|
- package ops2
-
- import (
- "fmt"
- "io"
-
- "gitlink.org.cn/cloudream/common/pkgs/future"
- "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
- "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
- "gitlink.org.cn/cloudream/common/pkgs/logger"
- "gitlink.org.cn/cloudream/common/utils/io2"
- clitypes "gitlink.org.cn/cloudream/storage2/client/types"
- "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2"
- "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool"
- "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types"
- )
-
- func init() {
- exec.UseOp[*ShardRead]()
- exec.UseOp[*ShardWrite]()
- exec.UseVarValue[*ShardInfoValue]()
- }
-
- type ShardInfoValue struct {
- Hash clitypes.FileHash `json:"hash"`
- Size int64 `json:"size"`
- }
-
- func (v *ShardInfoValue) Clone() exec.VarValue {
- return &ShardInfoValue{Hash: v.Hash, Size: v.Size}
- }
-
- type ShardRead struct {
- Output exec.VarID
- UserSpace clitypes.UserSpaceDetail
- Open types.OpenOption
- }
-
- func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
- logger.
- WithField("Open", o.Open.String()).
- Debugf("reading from shard store")
- defer logger.Debugf("reading from shard store finished")
-
- stgPool, err := exec.GetValueByType[*pool.Pool](ctx)
- if err != nil {
- return fmt.Errorf("getting storage pool: %w", err)
- }
-
- store, err := stgPool.GetShardStore(&o.UserSpace)
- if err != nil {
- return fmt.Errorf("getting shard store of user space %v: %w", o.UserSpace, err)
- }
-
- file, err := store.Open(o.Open)
- if err != nil {
- return fmt.Errorf("opening shard store file: %w", err)
- }
-
- fut := future.NewSetVoid()
- e.PutVar(o.Output, &exec.StreamValue{
- Stream: io2.AfterReadClosedOnce(file, func(closer io.ReadCloser) {
- fut.SetVoid()
- }),
- })
-
- return fut.Wait(ctx.Context)
- }
-
- func (o *ShardRead) String() string {
- return fmt.Sprintf("ShardRead %v -> %v", o.Open.String(), o.Output)
- }
-
- type ShardWrite struct {
- Input exec.VarID
- FileHashVar exec.VarID
- UserSpace clitypes.UserSpaceDetail
- }
-
- func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
- logger.
- WithField("Input", o.Input).
- WithField("FileHash", o.FileHashVar).
- Debugf("writting file to shard store")
- defer logger.Debugf("write to shard store finished")
-
- stgPool, err := exec.GetValueByType[*pool.Pool](ctx)
- if err != nil {
- return fmt.Errorf("getting storage pool: %w", err)
- }
-
- store, err := stgPool.GetShardStore(&o.UserSpace)
- if err != nil {
- return fmt.Errorf("getting shard store of user space %v: %w", o.UserSpace, err)
- }
-
- input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input)
- if err != nil {
- return err
- }
- defer input.Stream.Close()
-
- fileInfo, err := store.Create(input.Stream)
- if err != nil {
- return fmt.Errorf("writing file to shard store: %w", err)
- }
-
- e.PutVar(o.FileHashVar, &ShardInfoValue{
- Hash: fileInfo.Hash,
- Size: fileInfo.Size,
- })
- return nil
- }
-
- func (o *ShardWrite) String() string {
- return fmt.Sprintf("ShardWrite %v -> %v", o.Input, o.FileHashVar)
- }
-
- type ShardReadNode struct {
- dag.NodeBase
- From *ioswitch2.FromShardstore
- UserSpace clitypes.UserSpaceDetail
- Open types.OpenOption
- }
-
- func (b *GraphNodeBuilder) NewShardRead(fr *ioswitch2.FromShardstore, userSpace clitypes.UserSpaceDetail, open types.OpenOption) *ShardReadNode {
- node := &ShardReadNode{
- From: fr,
- UserSpace: userSpace,
- Open: open,
- }
- b.AddNode(node)
-
- node.OutputStreams().Init(node, 1)
- return node
- }
-
- func (t *ShardReadNode) GetFrom() ioswitch2.From {
- return t.From
- }
-
- func (t *ShardReadNode) Output() dag.StreamOutputSlot {
- return dag.StreamOutputSlot{
- Node: t,
- Index: 0,
- }
- }
-
- func (t *ShardReadNode) GenerateOp() (exec.Op, error) {
- return &ShardRead{
- Output: t.OutputStreams().Get(0).VarID,
- UserSpace: t.UserSpace,
- Open: t.Open,
- }, nil
- }
-
- // func (t *IPFSReadType) String() string {
- // return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node))
- // }
-
- type ShardWriteNode struct {
- dag.NodeBase
- To *ioswitch2.ToShardStore
- UserSpace clitypes.UserSpaceDetail
- FileHashStoreKey string
- }
-
- func (b *GraphNodeBuilder) NewShardWrite(to *ioswitch2.ToShardStore, userSpace clitypes.UserSpaceDetail, fileHashStoreKey string) *ShardWriteNode {
- node := &ShardWriteNode{
- To: to,
- UserSpace: userSpace,
- FileHashStoreKey: fileHashStoreKey,
- }
- b.AddNode(node)
-
- node.InputStreams().Init(1)
- node.OutputValues().Init(node, 1)
- return node
- }
-
- func (t *ShardWriteNode) GetTo() ioswitch2.To {
- return t.To
- }
-
- func (t *ShardWriteNode) SetInput(input *dag.StreamVar) {
- input.To(t, 0)
- }
-
- func (t *ShardWriteNode) Input() dag.StreamInputSlot {
- return dag.StreamInputSlot{
- Node: t,
- Index: 0,
- }
- }
-
- func (t *ShardWriteNode) FileHashVar() *dag.ValueVar {
- return t.OutputValues().Get(0)
- }
-
- func (t *ShardWriteNode) GenerateOp() (exec.Op, error) {
- return &ShardWrite{
- Input: t.InputStreams().Get(0).VarID,
- FileHashVar: t.OutputValues().Get(0).VarID,
- UserSpace: t.UserSpace,
- }, nil
- }
-
- // func (t *IPFSWriteType) String() string {
- // return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node))
- // }
|