Browse Source

Merge branch 'master' of https://gitlink.org.cn/cloudream/common

pull/53/head
JeshuaRen 6 months ago
parent
commit
cba37d1bf6
5 changed files with 52 additions and 5 deletions
  1. +5
    -1
      pkgs/future/future.go
  2. +1
    -1
      pkgs/future/ready.go
  3. +16
    -2
      pkgs/future/set_value_future.go
  4. +1
    -1
      pkgs/future/set_void_future.go
  5. +29
    -0
      pkgs/ioswitch/plan/ops/var.go

+ 5
- 1
pkgs/future/future.go View File

@@ -5,7 +5,11 @@ import (
"fmt" "fmt"
) )


var ErrCompleted = fmt.Errorf("context canceled")
var ErrConsumed = fmt.Errorf("future already consumed")

var ErrNotComplete = fmt.Errorf("future not complete")

var ErrCanceled = context.Canceled


type Future interface { type Future interface {
IsComplete() bool IsComplete() bool


+ 1
- 1
pkgs/future/ready.go View File

@@ -24,7 +24,7 @@ func (f *Ready) Wait(ctx context.Context) error {
select { select {
case v, ok := <-f.ch: case v, ok := <-f.ch:
if !ok { if !ok {
return ErrCompleted
return ErrConsumed
} }
return v return v




+ 16
- 2
pkgs/future/set_value_future.go View File

@@ -63,7 +63,7 @@ func (f *SetValueFuture[T]) Wait(ctx context.Context) (T, error) {
case cv, ok := <-f.ch: case cv, ok := <-f.ch:
if !ok { if !ok {
var ret T var ret T
return ret, cv.Err
return ret, ErrConsumed
} }
return cv.Value, cv.Err return cv.Value, cv.Err


@@ -73,6 +73,20 @@ func (f *SetValueFuture[T]) Wait(ctx context.Context) (T, error) {
} }
} }


func (f *SetValueFuture[T]) TryGetValue() (T, error) {
select {
case cv, ok := <-f.ch:
if !ok {
var ret T
return ret, ErrConsumed
}
return cv.Value, cv.Err
default:
var ret T
return ret, ErrNotComplete
}
}

type SetValueFuture2[T1 any, T2 any] struct { type SetValueFuture2[T1 any, T2 any] struct {
isCompleted bool isCompleted bool
ch chan ChanValue2[T1, T2] ch chan ChanValue2[T1, T2]
@@ -126,7 +140,7 @@ func (f *SetValueFuture2[T1, T2]) Wait(ctx context.Context) (T1, T2, error) {
select { select {
case cv, ok := <-f.ch: case cv, ok := <-f.ch:
if !ok { if !ok {
return cv.Value1, cv.Value2, cv.Err
return cv.Value1, cv.Value2, ErrConsumed
} }
return cv.Value1, cv.Value2, cv.Err return cv.Value1, cv.Value2, cv.Err




+ 1
- 1
pkgs/future/set_void_future.go View File

@@ -41,7 +41,7 @@ func (f *SetVoidFuture) Wait(ctx context.Context) error {
select { select {
case v, ok := <-f.ch: case v, ok := <-f.ch:
if !ok { if !ok {
return ErrCompleted
return ErrConsumed
} }
return v return v




+ 29
- 0
pkgs/ioswitch/plan/ops/var.go View File

@@ -1,6 +1,7 @@
package ops package ops


import ( import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
) )


@@ -21,3 +22,31 @@ func (o *ConstVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
func (o *ConstVar) String() string { func (o *ConstVar) String() string {
return "ConstVar" return "ConstVar"
} }

type ConstNode struct {
dag.NodeBase
Value exec.VarValue
}

func (b *GraphNodeBuilder) NewConst(val exec.VarValue) *ConstNode {
node := &ConstNode{
Value: val,
}
b.AddNode(node)

node.OutputValues().Init(node, 1)
return node
}

func (t *ConstNode) Output() dag.ValueOutputSlot {
return dag.ValueOutputSlot{
Node: t,
Index: 0,
}
}

func (t *ConstNode) GenerateOp() (exec.Op, error) {
return &DropStream{
Input: t.Output().Var().VarID,
}, nil
}

Loading…
Cancel
Save