diff --git a/sdks/imfs/config.go b/sdks/imfs/config.go index 9227153..2f0adce 100644 --- a/sdks/imfs/config.go +++ b/sdks/imfs/config.go @@ -3,9 +3,3 @@ package imsdk type Config struct { URL string `json:"url"` } - -type ProxyConfig struct { - IP string `json:"ip"` - ClientPort string `json:"clientPort"` - NodePort string `json:"nodePort"` -} diff --git a/sdks/imfs/models.go b/sdks/imfs/models.go index ca69c94..15f17f6 100644 --- a/sdks/imfs/models.go +++ b/sdks/imfs/models.go @@ -5,11 +5,10 @@ import schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" const ( EnvPackageList = "IMFS_PACKAGE_LIST" EnvServiceAddress = "IMFS_SERVICE_ADDRESS" - - EnvLocalJobID = "LOCAL_JOB_ID" - EnvJobsetID = "JOBSET_ID" - EnvClientServiceList = "CLENT_SERVICE_LIST" - EnvServerServiceList = "SERVER_SERVICE_LIST" + EnvLocalJobID = "IMFS_LOCAL_JOB_ID" + EnvJobsetID = "IMFS_JOBSET_ID" + EnvListeningList = "IMFS_PROXY_LSTENING_LIST" + EnvServingList = "IMFS_PROXY_SERVING_LIST" ) //代表本任务需要访问的服务 diff --git a/sdks/imfs/proxy.go b/sdks/imfs/proxy.go index ff797b7..079cc66 100644 --- a/sdks/imfs/proxy.go +++ b/sdks/imfs/proxy.go @@ -8,19 +8,19 @@ import ( myhttp "gitlink.org.cn/cloudream/common/utils/http" ) -const ProxyGetJobIDPath = "/proxy/getJobID" +const ProxyGetServiceInfoPath = "/proxy/getServiceInfo" -type ProxyGetJobID struct { +type ProxyGetServiceInfo struct { ServiceName string `json:"serviceName"` JobSetID schsdk.JobSetID `json:"jobSetID"` } -type ProxyGetJobIDResp struct { +type ProxyGetServiceInfoResp struct { LocalJobID string `json:"localJobID"` } -func (c *Client) ProxyGetJobID(req ProxyGetJobID) (*ProxyGetJobIDResp, error) { - url, err := url.JoinPath(c.baseURL, ProxyGetJobIDPath) +func (c *Client) ProxyGetServiceInfo(req ProxyGetServiceInfo) (*ProxyGetServiceInfoResp, error) { + url, err := url.JoinPath(c.baseURL, ProxyGetServiceInfoPath) if err != nil { return nil, err } @@ -32,7 +32,7 @@ func (c *Client) ProxyGetJobID(req ProxyGetJobID) (*ProxyGetJobIDResp, error) { return nil, err } - jsonResp, err := myhttp.ParseJSONResponse[response[ProxyGetJobIDResp]](resp) + jsonResp, err := myhttp.ParseJSONResponse[response[ProxyGetServiceInfoResp]](resp) if err != nil { return nil, err } diff --git a/utils/io/binary.go b/utils/io/binary.go index d2c28c6..a5a400d 100644 --- a/utils/io/binary.go +++ b/utils/io/binary.go @@ -6,6 +6,28 @@ import ( "io" ) +// TODO2 定义BinaryWriter和BinaryReader类型,下面的函数作为它的成员函数 + +func WriteBool(writer *bufio.Writer, val bool) error { + v := byte(0) + if val { + v = 1 + } + + if err := writer.WriteByte(v); err != nil { + return err + } + return nil +} + +func ReadBool(reader *bufio.Reader) (bool, error) { + v, err := reader.ReadByte() + if err != nil { + return false, err + } + return v > 0, nil +} + func WriteUint8Field(writer *bufio.Writer, data uint8) error { if err := writer.WriteByte(data); err != nil { return err diff --git a/utils/sync/safe_channel.go b/utils/sync/safe_channel.go new file mode 100644 index 0000000..04a791e --- /dev/null +++ b/utils/sync/safe_channel.go @@ -0,0 +1,66 @@ +package sync + +import "context" + +type SafeChannel[T any] struct { + ch chan T + done context.Context + cacnel func() +} + +func NewChannel[T any]() SafeChannel[T] { + ctx, cancel := context.WithCancel(context.Background()) + return SafeChannel[T]{ + ch: make(chan T), + done: ctx, + cacnel: cancel, + } +} + +func NewChannelWithCapacity[T any](cap int) SafeChannel[T] { + ctx, cancel := context.WithCancel(context.Background()) + return SafeChannel[T]{ + ch: make(chan T, cap), + done: ctx, + cacnel: cancel, + } +} + +func (c *SafeChannel[T]) Send(val T) bool { + select { + case <-c.done.Done(): + return false + case c.ch <- val: + return true + } +} + +func (c *SafeChannel[T]) Receive() (T, bool) { + select { + case <-c.done.Done(): + var ret T + return ret, false + + case v := <-c.ch: + return v, true + } +} + +// 需要与Closed函数一起使用 +func (c *SafeChannel[T]) Sender() chan<- T { + return c.ch +} + +// 需要与Closed函数一起使用 +func (c *SafeChannel[T]) Receiver() <-chan T { + return c.ch +} + +// 如果返回的chan被关闭,则代表此SafeChannel已经关闭 +func (c *SafeChannel[T]) Closed() <-chan struct{} { + return c.done.Done() +} + +func (c *SafeChannel[T]) Close() { + c.cacnel() +}