diff --git a/models/cloudbrain.go b/models/cloudbrain.go index fe9b60276..26fc62e04 100755 --- a/models/cloudbrain.go +++ b/models/cloudbrain.go @@ -566,6 +566,17 @@ type FlavorInfo struct { Desc string `json:"desc"` } +type SpecialPools struct { + Pools []*SpecialPool `json:"pools"` +} +type SpecialPool struct { + Org string `json:"org"` + Type string `json:"type"` + IsExclusive bool `json:"isExclusive"` + Pool []*GpuInfo `json:"pool"` + JobType []string `json:"jobType"` +} + type ImageInfosModelArts struct { ImageInfo []*ImageInfoModelArts `json:"image_info"` } diff --git a/modules/grampus/grampus.go b/modules/grampus/grampus.go index 57d65a593..4d87bb607 100755 --- a/modules/grampus/grampus.go +++ b/modules/grampus/grampus.go @@ -1,12 +1,16 @@ package grampus import ( + "encoding/json" + "strings" + + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/timeutil" - "strings" ) const ( @@ -28,6 +32,8 @@ var ( poolInfos *models.PoolInfos FlavorInfos *models.FlavorInfos ImageInfos *models.ImageInfosModelArts + + SpecialPools *models.SpecialPools ) type GenerateTrainJobReq struct { @@ -63,6 +69,27 @@ type GenerateTrainJobReq struct { func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error) { createTime := timeutil.TimeStampNow() + + var CenterID []string + var CenterName []string + + if SpecialPools != nil { + for _, pool := range SpecialPools.Pools { + if !pool.IsExclusive && strings.Contains(req.ComputeResource, pool.Type) { + org, _ := models.GetOrgByName(pool.Org) + if org != nil { + isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID) + if isOrgMember { + for _, info := range pool.Pool { + CenterID = append(CenterID, info.Queue) + CenterName = append(CenterName, info.Value) + } + } + } + } + } + } + jobResult, err := createJob(models.CreateGrampusJobRequest{ Name: req.JobName, Tasks: []models.GrampusTasks{ @@ -72,6 +99,8 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error ResourceSpecId: req.ResourceSpecId, ImageId: req.ImageId, ImageUrl: req.ImageUrl, + CenterID: CenterID, + CenterName: CenterName, ReplicaNum: 1, }, }, @@ -136,3 +165,8 @@ func TransTrainJobStatus(status string) string { return strings.ToUpper(status) } +func InitSpecialPool() { + if SpecialPools == nil && setting.Grampus.SpecialPools != "" { + json.Unmarshal([]byte(setting.Grampus.SpecialPools), &SpecialPools) + } +} diff --git a/modules/setting/setting.go b/modules/setting/setting.go index adaf5bd4b..2058c51a8 100755 --- a/modules/setting/setting.go +++ b/modules/setting/setting.go @@ -531,10 +531,11 @@ var ( //grampus config Grampus = struct { - Env string - Host string - UserName string - Password string + Env string + Host string + UserName string + Password string + SpecialPools string }{} //elk config @@ -1414,6 +1415,8 @@ func GetGrampusConfig() { Grampus.Host = sec.Key("SERVER_HOST").MustString("") Grampus.UserName = sec.Key("USERNAME").MustString("") Grampus.Password = sec.Key("PASSWORD").MustString("") + Grampus.SpecialPools = sec.Key("SPECIAL_POOL").MustString("") + } func SetRadarMapConfig() { diff --git a/modules/setting/webhook.go b/modules/setting/webhook.go index 34cf8a62d..a14ad949f 100644 --- a/modules/setting/webhook.go +++ b/modules/setting/webhook.go @@ -6,6 +6,7 @@ package setting import ( "net/url" + "strings" "code.gitea.io/gitea/modules/log" ) @@ -13,14 +14,18 @@ import ( var ( // Webhook settings Webhook = struct { - QueueLength int - DeliverTimeout int - SkipTLSVerify bool - Types []string - PagingNum int - ProxyURL string - ProxyURLFixed *url.URL - ProxyHosts []string + QueueLength int + DeliverTimeout int + SkipTLSVerify bool + Types []string + PagingNum int + ProxyURL string + ProxyURLFixed *url.URL + ProxyHosts []string + Socks5Proxy string + Socks5UserName string + Socks5Password string + Socks5ProxyHosts []string }{ QueueLength: 1000, DeliverTimeout: 5, @@ -39,6 +44,10 @@ func newWebhookService() { Webhook.Types = []string{"gitea", "gogs", "slack", "discord", "dingtalk", "telegram", "msteams", "feishu", "matrix"} Webhook.PagingNum = sec.Key("PAGING_NUM").MustInt(10) Webhook.ProxyURL = sec.Key("PROXY_URL").MustString("") + Webhook.Socks5Proxy = sec.Key("SOCKS5_PROXY_URL").MustString("") + Webhook.Socks5UserName = sec.Key("SOCKS5_USER_NAME").MustString("") + Webhook.Socks5Password = sec.Key("SOCKS5_PASSWORD").MustString("") + Webhook.Socks5ProxyHosts = strings.Split(sec.Key("SOCKS5_PROXY_HOST").MustString(""), ";") if Webhook.ProxyURL != "" { var err error Webhook.ProxyURLFixed, err = url.Parse(Webhook.ProxyURL) diff --git a/modules/webhook/deliver.go b/modules/webhook/deliver.go index 7b0c65173..8348e8641 100644 --- a/modules/webhook/deliver.go +++ b/modules/webhook/deliver.go @@ -16,6 +16,8 @@ import ( "sync" "time" + "golang.org/x/net/proxy" + "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" @@ -137,8 +139,10 @@ func Deliver(t *models.HookTask) error { return } }() + match := isSocks5ProxyUrlMatch(req) + + resp, err := makeReq(req, match) - resp, err := webhookHTTPClient.Do(req) if err != nil { t.ResponseInfo.Body = fmt.Sprintf("Delivery: %v", err) return err @@ -161,6 +165,23 @@ func Deliver(t *models.HookTask) error { return nil } +func makeReq(req *http.Request, proxyMatch bool) (*http.Response, error) { + if proxyMatch { + return webhookSocks5PoxyHTTPClient.Do(req) + } + return webhookHTTPClient.Do(req) +} + +func isSocks5ProxyUrlMatch(req *http.Request) bool { + + for _, v := range socks5HostMatchers { + if v.Match(req.URL.Host) { + return true + } + } + return false +} + // DeliverHooks checks and delivers undelivered hooks. // FIXME: graceful: This would likely benefit from either a worker pool with dummy queue // or a full queue. Then more hooks could be sent at same time. @@ -225,9 +246,11 @@ func DeliverHooks(ctx context.Context) { } var ( - webhookHTTPClient *http.Client - once sync.Once - hostMatchers []glob.Glob + webhookHTTPClient *http.Client + once sync.Once + hostMatchers []glob.Glob + webhookSocks5PoxyHTTPClient *http.Client + socks5HostMatchers []glob.Glob ) func webhookProxy() func(req *http.Request) (*url.URL, error) { @@ -274,5 +297,31 @@ func InitDeliverHooks() { }, } + if setting.Webhook.Socks5Proxy != "" { + auth := proxy.Auth{ + User: setting.Webhook.Socks5UserName, + Password: setting.Webhook.Socks5Password, + } + + dialSocksProxy, err := proxy.SOCKS5("tcp", setting.Webhook.Socks5Proxy, &auth, proxy.Direct) + if err != nil { + fmt.Println("Error connecting to proxy:", err) + } + tr := &http.Transport{Dial: dialSocksProxy.Dial} + + webhookSocks5PoxyHTTPClient = &http.Client{ + Transport: tr, + } + + for _, h := range setting.Webhook.Socks5ProxyHosts { + if g, err := glob.Compile(h); err == nil { + socks5HostMatchers = append(socks5HostMatchers, g) + } else { + log.Error("glob.Compile %s failed: %v", h, err) + } + } + + } + go graceful.GetManager().RunWithShutdownContext(DeliverHooks) } diff --git a/options/locale/locale_en-US.ini b/options/locale/locale_en-US.ini index ce96c669f..2eca76373 100755 --- a/options/locale/locale_en-US.ini +++ b/options/locale/locale_en-US.ini @@ -1178,6 +1178,7 @@ model.manage.model_accuracy = Model Accuracy grampus.train_job.ai_center = AI Center grampus.dataset_path_rule = The code is storaged in /cache/code;the dataset is storaged in /cache/dataset;and please put your model into /cache/output, then you can download it online。 +grampus.no_operate_right = You have no right to do this operation. template.items = Template Items template.git_content = Git Content (Default Branch) diff --git a/options/locale/locale_zh-CN.ini b/options/locale/locale_zh-CN.ini index c6b98071a..78d80a6bc 100755 --- a/options/locale/locale_zh-CN.ini +++ b/options/locale/locale_zh-CN.ini @@ -1193,6 +1193,8 @@ model.manage.model_accuracy = 模型精度 grampus.train_job.ai_center=智算中心 grampus.dataset_path_rule = 训练脚本存储在/cache/code中,数据集存储在/cache/dataset中,训练输出请存储在/cache/output中以供后续下载。 +grampus.no_operate_right = 您没有权限创建这类任务。 + template.items=模板选项 template.git_content=Git数据(默认分支) template.git_hooks=Git 钩子 diff --git a/routers/repo/grampus.go b/routers/repo/grampus.go index 48957387f..06b25b57f 100755 --- a/routers/repo/grampus.go +++ b/routers/repo/grampus.go @@ -71,6 +71,25 @@ func grampusTrainJobNewDataPrepare(ctx *context.Context, processType string) err ctx.Data["images"] = images.Infos } + grampus.InitSpecialPool() + + ctx.Data["GPUEnabled"] = true + ctx.Data["NPUEnabled"] = true + + if grampus.SpecialPools != nil { + for _, pool := range grampus.SpecialPools.Pools { + if pool.IsExclusive { + org, _ := models.GetOrgByName(pool.Org) + if org != nil { + isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID) + if !isOrgMember { + ctx.Data[pool.Type+"Enabled"] = false + } + } + } + } + } + //get valid resource specs specs, err := grampus.GetResourceSpecs(processType) if err != nil { @@ -128,10 +147,18 @@ func GrampusTrainJobGpuCreate(ctx *context.Context, form auth.CreateGrampusTrain image := strings.TrimSpace(form.Image) if !jobNamePattern.MatchString(displayJobName) { + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tplGrampusTrainJobGPUNew, &form) return } + errStr := checkSpecialPool(ctx, "GPU") + if errStr != "" { + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) + ctx.RenderWithErr(errStr, tplGrampusTrainJobGPUNew, &form) + return + } + //check count limit count, err := models.GetGrampusCountByUserID(ctx.User.ID, string(models.JobTypeTrain), models.GPUResource) if err != nil { @@ -263,6 +290,28 @@ func GrampusTrainJobGpuCreate(ctx *context.Context, form auth.CreateGrampusTrain ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job") } +func checkSpecialPool(ctx *context.Context, resourceType string) string { + grampus.InitSpecialPool() + if grampus.SpecialPools != nil { + for _, pool := range grampus.SpecialPools.Pools { + + if pool.IsExclusive && pool.Type == resourceType { + + org, _ := models.GetOrgByName(pool.Org) + if org != nil { + isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID) + if !isOrgMember { + return ctx.Tr("repo.grampus.no_operate_right") + } + } + } + + } + + } + return "" +} + func GrampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrainJobForm) { displayJobName := form.DisplayJobName jobName := util.ConvertDisplayJobNameToJobName(displayJobName) @@ -281,10 +330,18 @@ func GrampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrain engineName := form.EngineName if !jobNamePattern.MatchString(displayJobName) { + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tplGrampusTrainJobNPUNew, &form) return } + errStr := checkSpecialPool(ctx, "NPU") + if errStr != "" { + grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) + ctx.RenderWithErr(errStr, tplGrampusTrainJobGPUNew, &form) + return + } + //check count limit count, err := models.GetGrampusCountByUserID(ctx.User.ID, string(models.JobTypeTrain), models.NPUResource) if err != nil { diff --git a/templates/repo/grampus/trainjob/gpu/new.tmpl b/templates/repo/grampus/trainjob/gpu/new.tmpl index f48d5fdd1..61fe0931f 100755 --- a/templates/repo/grampus/trainjob/gpu/new.tmpl +++ b/templates/repo/grampus/trainjob/gpu/new.tmpl @@ -89,23 +89,23 @@ {{.i18n.Tr "cloudbrain.resource_cluster_openi"}} - - - {{.i18n.Tr "cloudbrain.resource_cluster_c2net"}} - + + + {{.i18n.Tr "cloudbrain.resource_cluster_c2net"}}(Beta) +