Browse Source

add hpc task impl

Signed-off-by: jagger <cossjie@foxmail.com>
pull/460/head
jagger 7 months ago
parent
commit
89ae54df38
13 changed files with 648 additions and 51 deletions
  1. +0
    -1
      go.mod
  2. +0
    -39
      go.sum
  3. +11
    -0
      internal/cron/cron.go
  4. +89
    -0
      internal/cron/hpc_cron_task.go
  5. +25
    -5
      internal/logic/hpc/commithpctasklogic.go
  6. +149
    -0
      internal/scheduler/database/hpc_storage.go
  7. +4
    -2
      internal/scheduler/scheduler.go
  8. +28
    -0
      internal/scheduler/service/collector/hpc_collector.go
  9. +73
    -0
      internal/scheduler/service/hpc/slurm.go
  10. +79
    -0
      internal/scheduler/service/hpc_service.go
  11. +184
    -0
      internal/scheduler/service/utils/status/hpc_task_sync.go
  12. +3
    -2
      internal/svc/servicecontext.go
  13. +3
    -2
      pkg/models/taskhpcmodel_gen.go

+ 0
- 1
go.mod View File

@@ -53,7 +53,6 @@ require (
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudwego/base64x v0.1.5 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/coreos/go-semver v0.3.1 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect


+ 0
- 39
go.sum View File

@@ -75,11 +75,8 @@ github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/bwmarrin/snowflake v0.3.0 h1:xm67bEhkKh6ij1790JB83OujPR5CzNe8QuQqAgISZN0=
github.com/bwmarrin/snowflake v0.3.0/go.mod h1:NdZxfVWX+oR6y2K0o6qAYv6gIOP9rjG0/E9WsDpxqwE=
github.com/bytedance/sonic v1.11.6 h1:oUp34TzMlL+OY1OUWxHqsdkgC/Zfc85zGqw9siXjrc0=
github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1ZaiQtds4=
github.com/bytedance/sonic v1.13.1 h1:Jyd5CIvdFnkOWuKXr+wm4Nyk2h0yAFsr8ucJgEasO3g=
github.com/bytedance/sonic v1.13.1/go.mod h1:o68xyaF9u2gvVBuGHPlUVCy+ZfmNNO5ETf1+KgkJhz4=
github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM=
github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
github.com/bytedance/sonic/loader v0.2.4 h1:ZWCw4stuXUsn1/+zQDqeE7JKP+QO47tz7QCNan80NzY=
github.com/bytedance/sonic/loader v0.2.4/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI=
@@ -95,11 +92,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4=
github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg=
github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4=
@@ -127,14 +121,10 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ=
github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0=
github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk=
github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM=
github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-contrib/sse v1.0.0 h1:y3bT1mUWUxDpW4JLQg/HnTqV4rozuW4tC9eFKTxYI9E=
github.com/gin-contrib/sse v1.0.0/go.mod h1:zNuFdwarAygJBht0NTKiSi3jRf6RbqeILZ9Sp6Slhe0=
github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU=
@@ -183,8 +173,6 @@ github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/o
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8=
github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/go-playground/validator/v10 v10.25.0 h1:5Dh7cjvzR7BRZadnsVOzPhWsrwUr0nmsZJxEAnFLNO8=
github.com/go-playground/validator/v10 v10.25.0/go.mod h1:GGzBIJMuE98Ic/kJsBXbz1x/7cByt++cQ+YOuDM5wus=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
@@ -198,8 +186,6 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=
github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
@@ -351,8 +337,6 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE=
github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
@@ -555,10 +539,6 @@ gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250313064001-91fb558cfdb6 h1:9o
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250313064001-91fb558cfdb6/go.mod h1:MxtnJJcU8S4zfGKZVcg2MOXGtwucKy7MMDwA0IemBd0=
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110 h1:GaXwr5sgDh0raHjUf9IewTvnRvajYea7zbLsaerYyXo=
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110/go.mod h1:QOD5+/l2D+AYBjF2h5T0mdJyfGAmF78QmeKdbBXbjLQ=
gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20250102093846-164b4884c9ec h1:Yul2JOAIS94B+eIg0UvmBSe8JrtSrZ2OA47gAYLiBYI=
gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20250102093846-164b4884c9ec/go.mod h1:oDJrr/TNbUCaVjI+RaOrUtGawD7UPAvp7U/oVgT2Dhc=
gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20250311101016-c5dc5179d72d h1:OFMao9b6ueEI4TXKTH447iSwN6S34ZDjSJGn0xkosd4=
gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20250311101016-c5dc5179d72d/go.mod h1:0VMTWXsRx7Z5z+kxBid2zf7kq5YtFlxubXEwPHiicyM=
gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20250320103718-7bd6650118ee h1:+YYzcWPX0Up98nOb5ngkCaqiWHpSH7XJQRTUSvYclWU=
gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20250320103718-7bd6650118ee/go.mod h1:0VMTWXsRx7Z5z+kxBid2zf7kq5YtFlxubXEwPHiicyM=
gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 h1:s6PsZ1+bev294IWdZRlV7mnOwI1+UzFcldVW/BqhQzI=
@@ -610,9 +590,6 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc=
golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
golang.org/x/arch v0.15.0 h1:QtOrQd0bTUnhNVNndMpLHNWrDmYzZ2KDqSrEymqInZw=
golang.org/x/arch v0.15.0/go.mod h1:JmwW7aLIoRUKgaTzhkiEFxvcEiQGyOg9BMonBJUS7EE=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
@@ -622,8 +599,6 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY=
golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34=
golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -694,8 +669,6 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI=
golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs=
golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c=
golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -717,8 +690,6 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw=
golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -765,15 +736,10 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q=
golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y=
golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -782,8 +748,6 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -923,8 +887,6 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
@@ -987,7 +949,6 @@ k8s.io/utils v0.0.0-20241210054802-24370beab758 h1:sdbE21q2nlQtFh65saZY+rRM6x6aJ
k8s.io/utils v0.0.0-20241210054802-24370beab758/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 h1:gBQPwqORJ8d8/YNZWEjoZs7npUVDpVXUUOFfW6CgAqE=


+ 11
- 0
internal/cron/cron.go View File

@@ -59,4 +59,15 @@ func AddCronGroup(svc *svc.ServiceContext) {
svc.Scheduler.AiService.LocalCache[schedule.QUERY_RESOURCES] = rus
})

//更新hpc任务状态
svc.Cron.AddFunc("*/5 * * * * ?", func() {
list, err := GetHpcTaskList(svc)
if err != nil {
logx.Errorf(err.Error())
return
}
status.UpdateTaskStatusByHpc(svc, list)
status.UpdateTaskHpcStatus(svc, list)
})

}

+ 89
- 0
internal/cron/hpc_cron_task.go View File

@@ -0,0 +1,89 @@
package cron

import (
"errors"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
)

// GetHpcTaskList get hpc task list
func GetHpcTaskList(svc *svc.ServiceContext) ([]*types.TaskModel, error) {
limit := 10
offset := 0
var list []*types.TaskModel
db := svc.DbEngin.Model(&types.TaskModel{}).Table("task").
Joins("join task_hpc hpc on task.id = hpc.task_id").
Select("task.* ,hpc.job_id, hpc.work_dir, hpc.status, hpc.updated_time").
Where("task.adapter_type_dict = 2 AND task.status NOT IN ('Succeeded', 'Failed') and task.deleted_at is null")

//count total
var total int64
err := db.Count(&total).Error
if err != nil {
return nil, err
}

db.Limit(limit).Offset(offset)
err = db.Order("created_time desc").Scan(&list).Error
if err != nil {
return nil, err
}
return list, nil
}

func UpdateHpcAdapterMaps(svc *svc.ServiceContext) {
var hpcType = "2"
adapterIds, err := svc.Scheduler.HpcStorages.GetAdapterIdsByType(hpcType)
if err != nil {
msg := fmt.Sprintf("###UpdateHpcAdapterMaps###, error: %v \n", err.Error())
logx.Errorf(errors.New(msg).Error())
return
}
if len(adapterIds) == 0 {
return
}

for _, id := range adapterIds {
clusters, err := svc.Scheduler.HpcStorages.GetClustersByAdapterId(id)
if err != nil {
msg := fmt.Sprintf("###UpdateHpcAdapterMaps###, error: %v \n", err.Error())
logx.Errorf(errors.New(msg).Error())
return
}
if len(clusters.List) == 0 {
continue
}
if hpcAdapterExist(svc, id, len(clusters.List)) {
continue
} else {
if hpcAdapterEmpty(svc, id) {
exeClusterMap := service.InitHpcClusterMap(&svc.Config, clusters.List)
svc.Scheduler.HpcService.HpcExecutorAdapterMap[id] = exeClusterMap
} else {
svc.Scheduler.HpcService.UpdateHpcClusterMaps(&svc.Config, id, clusters.List)
}
}
}
}

func hpcAdapterExist(svc *svc.ServiceContext, id string, clusterNum int) bool {
emap, ok := svc.Scheduler.HpcService.HpcExecutorAdapterMap[id]

if ok {
if len(emap) == clusterNum {
return true
}
}
return false
}

func hpcAdapterEmpty(svc *svc.ServiceContext, id string) bool {
_, ok := svc.Scheduler.HpcService.HpcExecutorAdapterMap[id]
if !ok {
return true
}
return false
}

+ 25
- 5
internal/logic/hpc/commithpctasklogic.go View File

@@ -4,8 +4,10 @@ import (
"context"
"errors"
"github.com/go-resty/resty/v2"
jsoniter "github.com/json-iterator/go"
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"strconv"
"time"

@@ -51,7 +53,8 @@ type ResultParticipant struct {
}

func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) {

reqStr, _ := jsoniter.MarshalToString(req)
yaml := utils.StringToYaml(reqStr)
var clusterInfo types.ClusterInfo
l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where id = ?", req.ClusterId).First(&clusterInfo)

@@ -68,6 +71,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
Status: "Running",
AdapterTypeDict: "2",
UserId: userId,
YamlString: *yaml,
}

// 保存任务数据到数据库
@@ -110,8 +114,10 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
Partition: req.Parameters["partition"],
CreatedTime: time.Now(),
UpdatedTime: time.Now(),
Status: "Running",
Status: "Deploying",
TimeLimit: timelimit,
UserId: userId,
YamlString: *yaml,
}
hpcInfo.WorkDir = clusterInfo.WorkDir + req.Parameters["WorkDir"]
tx = l.svcCtx.DbEngin.Create(&hpcInfo)
@@ -151,8 +157,19 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
//})
// 提交job到指定集群
logx.Info("提交job到指定集群")
resp, _ = submitJob(req, server)

resp, err = submitJob(req, server)
if err != nil {
return nil, err
}
// 更新任务状态
updates := l.svcCtx.DbEngin.Model(&hpcInfo).Updates(models.TaskHpc{
Id: hpcInfo.Id,
JobId: resp.Data.JobInfo["jobId"],
WorkDir: resp.Data.JobInfo["jobDir"],
})
if updates.Error != nil {
return nil, updates.Error
}
return resp, nil
}

@@ -168,10 +185,13 @@ func submitJob(req *types.CommitHpcTaskReq, adapterAddress string) (resp *types.
}
httpClient := resty.New().R()
logx.Info("远程调用p端接口开始")
httpClient.SetHeader("Content-Type", "application/json").
_, err = httpClient.SetHeader("Content-Type", "application/json").
SetBody(reqParticipant).
SetResult(&resp).
Post(adapterAddress + "/api/v1/jobs")
if err != nil {
return nil, err
}
logx.Info("远程调用p端接口完成")

return resp, nil


+ 149
- 0
internal/scheduler/database/hpc_storage.go View File

@@ -0,0 +1,149 @@
package database

import (
"github.com/zeromicro/go-zero/core/logx"
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gorm.io/gorm"
"strconv"
"time"
)

type HpcStorage struct {
DbEngin *gorm.DB
}

func (s *HpcStorage) GetParticipants() (*types.ClusterListResp, error) {
var resp types.ClusterListResp
tx := s.DbEngin.Raw("select * from t_cluster where `deleted_at` IS NULL ORDER BY create_time Desc").Scan(&resp.List)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return nil, tx.Error
}
return &resp, nil
}

func (s *HpcStorage) GetClustersByAdapterId(id string) (*types.ClusterListResp, error) {
var resp types.ClusterListResp
tx := s.DbEngin.Raw("select * from t_cluster where `deleted_at` IS NULL and `adapter_id` = ? ORDER BY create_time Desc", id).Scan(&resp.List)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return nil, tx.Error
}
return &resp, nil
}

func (s *HpcStorage) GetClusterNameById(id string) (string, error) {
var name string
tx := s.DbEngin.Raw("select `description` from t_cluster where `id` = ?", id).Scan(&name)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return "", tx.Error
}
return name, nil
}

func (s *HpcStorage) GetAdapterNameById(id string) (string, error) {
var name string
tx := s.DbEngin.Raw("select `name` from t_adapter where `id` = ?", id).Scan(&name)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return "", tx.Error
}
return name, nil
}

func (s *HpcStorage) GetAdapterIdsByType(adapterType string) ([]string, error) {
var list []types.AdapterInfo
var ids []string
db := s.DbEngin.Model(&types.AdapterInfo{}).Table("t_adapter")
db = db.Where("type = ?", adapterType)
err := db.Order("create_time desc").Find(&list).Error
if err != nil {
return nil, err
}
for _, info := range list {
ids = append(ids, info.Id)
}
return ids, nil
}

func (s *HpcStorage) GetAdaptersByType(adapterType string) ([]*types.AdapterInfo, error) {
var list []*types.AdapterInfo
db := s.DbEngin.Model(&types.AdapterInfo{}).Table("t_adapter")
db = db.Where("type = ?", adapterType)
err := db.Order("create_time desc").Find(&list).Error
if err != nil {
return nil, err
}
return list, nil
}

func (s *HpcStorage) GetHpcTasksByAdapterId(adapterId string) ([]*models.TaskHpc, error) {
var resp []*models.TaskHpc
db := s.DbEngin.Model(&models.TaskHpc{}).Table("task_hpc")
db = db.Where("adapter_id = ?", adapterId)
err := db.Order("start_time desc").Find(&resp).Error
if err != nil {
return nil, err
}
return resp, nil
}

func (s *HpcStorage) GetHpcTaskListById(id int64) ([]*models.TaskHpc, error) {
var taskList []*models.TaskHpc
tx := s.DbEngin.Raw("select * from task_hpc where `task_id` = ? ", id).Scan(&taskList)
if tx.Error != nil {
return nil, tx.Error
}
return taskList, nil
}

func (s *HpcStorage) UpdateTask(task *types.TaskModel) error {
task.UpdatedTime = time.Now().Format(constants.Layout)
tx := s.DbEngin.Table("task").Model(task).Updates(task)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return tx.Error
}
return nil
}

func (s *HpcStorage) UpdateHpcTask(task *models.TaskHpc) error {
tx := s.DbEngin.Updates(task)
if tx.Error != nil {
return tx.Error
}
return nil
}

func (s *HpcStorage) AddNoticeInfo(adapterId string, adapterName string, clusterId string, clusterName string, taskName string, noticeType string, incident string) {
aId, err := strconv.ParseInt(adapterId, 10, 64)
if err != nil {
logx.Errorf("adapterId convert failure, err: %v", err)
}
var cId int64
if clusterId != "" {
cId, err = strconv.ParseInt(clusterId, 10, 64)
if err != nil {
logx.Errorf("clusterId convert failure, err: %v", err)
}
}

noticeInfo := clientCore.NoticeInfo{
AdapterId: aId,
AdapterName: adapterName,
ClusterId: cId,
ClusterName: clusterName,
NoticeType: noticeType,
TaskName: taskName,
Incident: incident,
CreatedTime: time.Now(),
}
result := s.DbEngin.Table("t_notice").Create(&noticeInfo)
if result.Error != nil {
logx.Errorf("Task creation failure, err: %v", result.Error)
}
}

+ 4
- 2
internal/scheduler/scheduler.go View File

@@ -39,6 +39,8 @@ type Scheduler struct {
result []string //pID:子任务yamlstring 键值对
AiStorages *database.AiStorage
AiService *service.AiService
HpcStorages *database.HpcStorage
HpcService *service.HpcService
}

type SubSchedule interface {
@@ -56,8 +58,8 @@ func NewScheduler(subSchedule SubSchedule, val string, dbEngin *gorm.DB) (*Sched
return &Scheduler{task: task, subSchedule: subSchedule, dbEngin: dbEngin}, nil
}

func NewSchdlr(aiService *service.AiService, storages *database.AiStorage) *Scheduler {
return &Scheduler{AiService: aiService, AiStorages: storages}
func NewSchdlr(aiService *service.AiService, storages *database.AiStorage, hpcStorage *database.HpcStorage, hpcService *service.HpcService) *Scheduler {
return &Scheduler{AiService: aiService, AiStorages: storages, HpcStorages: hpcStorage, HpcService: hpcService}
}

func (s *Scheduler) SpecifyClusters() {


+ 28
- 0
internal/scheduler/service/collector/hpc_collector.go View File

@@ -0,0 +1,28 @@
package collector

import (
"context"
"time"
)

type HPCCollector interface {
GetTask(ctx context.Context, taskId string) (*Task, error)
}

type JobInfo struct {
ID string // 作业ID
Name string // 作业名称
Status int // 作业状态
SubmitTime time.Time // 提交时间
StartTime time.Time // 开始时间(可选)
EndTime time.Time // 结束时间(可选)
StatusText string
WorkDir string // 作业工作目录
}

type HpcJobDetailResp struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data JobInfo `json:"data"`
TraceId string `json:"trace_id"`
}

+ 73
- 0
internal/scheduler/service/hpc/slurm.go View File

@@ -0,0 +1,73 @@
package hpcservice

import (
"context"
"github.com/go-resty/resty/v2"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
)

type ParticipantHpc struct {
participantId int64
platform string
host string
userName string
accessToken string
}

const (
JobDetailUrl = "/api/v1/jobs/detail/{backend}/{jobId}"
)

func (c *ParticipantHpc) GetTask(ctx context.Context, taskId string) (*collector.Task, error) {
reqUrl := c.host + JobDetailUrl
hpcResp := &collector.HpcJobDetailResp{}
httpClient := resty.New().R()
_, err := httpClient.SetHeader("Content-Type", "application/json").
SetPathParam("jobId", taskId).
SetPathParam("backend", "slurm").
SetResult(&hpcResp).
Get(reqUrl)
logx.Info("远程调用p端接口开始")
if err != nil {
return nil, err
}
logx.Info("远程调用p端接口完成")
var resp collector.Task
resp.Id = hpcResp.Data.ID
if !hpcResp.Data.StartTime.IsZero() {
resp.Start = hpcResp.Data.StartTime.Format(constants.Layout)
}
if !hpcResp.Data.EndTime.IsZero() {
resp.End = hpcResp.Data.EndTime.Format(constants.Layout)
}
switch hpcResp.Data.StatusText {
case "COMPLETED":
resp.Status = constants.Completed
case "FAILED":
resp.Status = constants.Failed
case "CREATED_FAILED":
resp.Status = constants.Failed
case "RUNNING":
resp.Status = constants.Running
case "STOPPED":
resp.Status = constants.Stopped
case "PENDING":
resp.Status = constants.Pending
case "WAITING":
resp.Status = constants.Waiting
default:
resp.Status = "undefined"
}

return &resp, nil
}

func NewHpc(host string, id int64, platform string) *ParticipantHpc {
return &ParticipantHpc{
host: host,
participantId: id,
platform: platform,
}
}

+ 79
- 0
internal/scheduler/service/hpc_service.go View File

@@ -0,0 +1,79 @@
package service

import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/config"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
hpcservice "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/hpc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"strconv"
"sync"
)

const (
Slurm_Arm = "slurm_arm"
)

type HpcService struct {
HpcExecutorAdapterMap map[string]map[string]collector.HPCCollector
Storage *database.HpcStorage
LocalCache map[string]interface{}
Conf *config.Config
TaskSyncLock sync.Mutex
}

func NewHpcService(conf *config.Config, storages *database.HpcStorage, localCache map[string]interface{}) (*HpcService, error) {
var aiType = "2"
adapterIds, err := storages.GetAdapterIdsByType(aiType)
if err != nil {
return nil, err
}
hpcService := &HpcService{
HpcExecutorAdapterMap: make(map[string]map[string]collector.HPCCollector),
Storage: storages,
LocalCache: localCache,
Conf: conf,
}
for _, id := range adapterIds {
clusters, err := storages.GetClustersByAdapterId(id)
if err != nil {
return nil, err
}
if len(clusters.List) == 0 {
continue
}
exeClusterMap := InitHpcClusterMap(conf, clusters.List)
hpcService.HpcExecutorAdapterMap[id] = exeClusterMap
}

return hpcService, nil
}

func InitHpcClusterMap(conf *config.Config, clusters []types.ClusterInfo) map[string]collector.HPCCollector {
executorMap := make(map[string]collector.HPCCollector)
for _, c := range clusters {
switch c.Name {
case Slurm_Arm:
id, _ := strconv.ParseInt(c.Id, 10, 64)
slurm := hpcservice.NewHpc(c.Server, id, c.Nickname)
executorMap[c.Id] = slurm
}
}
return executorMap
}

func (as *HpcService) UpdateHpcClusterMaps(conf *config.Config, adapterId string, clusters []types.ClusterInfo) {
for _, c := range clusters {
_, ok := as.HpcExecutorAdapterMap[adapterId][c.Id]
if !ok {
switch c.Name {
case Slurm_Arm:
id, _ := strconv.ParseInt(c.Id, 10, 64)
slurm := hpcservice.NewHpc(c.Server, id, c.Nickname)
as.HpcExecutorAdapterMap[adapterId][c.Id] = slurm
}
} else {
continue
}
}
}

+ 184
- 0
internal/scheduler/service/utils/status/hpc_task_sync.go View File

@@ -0,0 +1,184 @@
package status

import (
"fmt"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"net/http"
"strconv"
"sync"
)

func reportHpcStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, hpcTask *models.TaskHpc) error {
report := &jcs.JobStatusReportReq{
TaskName: task.Name,
TaskID: strconv.FormatInt(task.Id, 10),
Messages: make([]*jcs.ReportMessage, 0),
}

jobMsg := &jcs.ReportMessage{
Status: true,
Message: "",
ClusterID: strconv.FormatInt(hpcTask.ClusterId, 10),
Output: hpcTask.JobId,
}
report.Messages = append(report.Messages, jobMsg)
log.Debug().Msgf("通知中间件任务状态参数: [%v]", report)
_ = jcs.StatusReport(svc.Scheduler.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report)

return nil
}

// 更新主表的超算任务状态
func UpdateTaskStatusByHpc(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
svc.Scheduler.HpcService.TaskSyncLock.Lock()
defer svc.Scheduler.HpcService.TaskSyncLock.Unlock()

for _, task := range tasklist {
hpcTaskList, err := svc.Scheduler.HpcStorages.GetHpcTaskListById(task.Id)
if err != nil {
logx.Errorf(err.Error())
return
}
if len(hpcTaskList) == 0 {
break
}
logx.Errorf("############ Report Status Message Before switch %s", task.Status)
if len(hpcTaskList) == 1 {
logx.Errorf("############ Report Status Message Switch %s", hpcTaskList[0].Status)
switch hpcTaskList[0].Status {

case constants.Completed:
task.Status = constants.Succeeded
logx.Errorf("############ Report Status Message Before Sending %s", task.Status)

_ = reportHpcStatusMessages(svc, task, hpcTaskList[0])
case constants.Running:
task.Status = constants.Succeeded
logx.Errorf("############ Report Status Message Before Sending %s", task.Status)

_ = reportHpcStatusMessages(svc, task, hpcTaskList[0])
case constants.Failed:
task.Status = constants.Failed
logx.Errorf("############ Report Status Message Before Sending %s", task.Status)

_ = reportHpcStatusMessages(svc, task, hpcTaskList[0])
default:
task.Status = hpcTaskList[0].Status
}

task.StartTime = hpcTaskList[0].StartTime
task.EndTime = hpcTaskList[0].EndTime
err := svc.Scheduler.HpcStorages.UpdateTask(task)
if err != nil {
return
}
break
}
logx.Errorf("############ Report Status Message After switch %s", task.Status)
for i := len(hpcTaskList) - 1; i >= 0; i-- {
if hpcTaskList[i].StartTime == "" {
task.Status = hpcTaskList[i].Status
hpcTaskList = append(hpcTaskList[:i], hpcTaskList[i+1:]...)
}
}
if len(hpcTaskList) == 0 {
err := svc.Scheduler.HpcStorages.UpdateTask(task)
if err != nil {
break
}
break
}
}
}

// UpdateTaskHpcStatus 更新task_hpc表的任务状态
func UpdateTaskHpcStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
svc.Scheduler.HpcService.TaskSyncLock.Lock()
defer svc.Scheduler.HpcService.TaskSyncLock.Unlock()
for _, task := range tasklist {
hpcTaskList, err := svc.Scheduler.HpcStorages.GetHpcTaskListById(task.Id)
if err != nil {
logx.Errorf(err.Error())
return
}
if len(hpcTaskList) == 0 {
return
}
updateHpcTask(svc, hpcTaskList...)
}

}

func updateHpcTask(svc *svc.ServiceContext, hpcTaskList ...*models.TaskHpc) {
var wg sync.WaitGroup
for _, hpc := range hpcTaskList {
t := hpc
if t.Status == constants.Completed || t.Status == constants.Failed || t.JobId == "" || t.Status == constants.Cancelled {
continue
}
wg.Add(1)
go func() {
h := http.Request{}
hpcTask, err := svc.Scheduler.HpcService.HpcExecutorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTask(h.Context(), t.JobId)
if err != nil {
if status.Code(err) == codes.DeadlineExceeded {
msg := fmt.Sprintf("###UpdateHpcTaskStatus###, HpcTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
logx.Errorf(errors.New(msg).Error())
wg.Done()
return
}

msg := fmt.Sprintf("###UpdateHpcTaskStatus###, HpcTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
logx.Errorf(errors.New(msg).Error())
wg.Done()
return
}
if hpcTask == nil {
wg.Done()
return
}
switch hpcTask.Status {
case constants.Running:
if t.Status != hpcTask.Status {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "running", "任务运行中")
t.Status = hpcTask.Status
}
case constants.Failed:
if t.Status != hpcTask.Status {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "failed", "任务失败")
t.Status = hpcTask.Status
}
case constants.Completed:
if t.Status != hpcTask.Status {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "completed", "任务完成")
t.Status = hpcTask.Status
}
default:
if t.Status != hpcTask.Status {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "pending", "任务pending")
t.Status = hpcTask.Status
}
}
t.StartTime = hpcTask.Start
t.EndTime = hpcTask.End
err = svc.Scheduler.HpcStorages.UpdateHpcTask(t)
if err != nil {
msg := fmt.Sprintf("###UpdateHpcTaskStatus###, HpcTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
logx.Errorf(errors.New(msg).Error())
wg.Done()
return
}
wg.Done()
}()
}
wg.Wait()
}

+ 3
- 2
internal/svc/servicecontext.go View File

@@ -108,14 +108,15 @@ func NewServiceContext(c config.Config) *ServiceContext {

// scheduler
storage := &database.AiStorage{DbEngin: dbEngin}
hpcStorage := &database.HpcStorage{DbEngin: dbEngin}
cache := make(map[string]interface{}, 0)
aiService, err := service.NewAiService(&c, storage, cache)
hpcService, err := service.NewHpcService(&c, hpcStorage, cache)
if err != nil {
logx.Error(err.Error())
return nil
}
scheduler := scheduler.NewSchdlr(aiService, storage)

scheduler := scheduler.NewSchdlr(aiService, storage, hpcStorage, hpcService)
return &ServiceContext{
DbEngin: dbEngin,
Cron: cron.New(cron.WithSeconds()),


+ 3
- 2
pkg/models/taskhpcmodel_gen.go View File

@@ -44,11 +44,12 @@ type (
ClusterId int64 `db:"cluster_id"` //集群id
ClusterName string `db:"cluster_name"` //集群名称
Name string `db:"name"` // 名称
Backend string `db:"backend"` // 平台类型
OperateType string `db:"operate_type"` // 操作类型
Backend string `db:"backend"` // 平台类型
OperateType string `db:"operate_type"` // 操作类型
Status string `db:"status"` // 状态
CmdScript string `db:"cmd_script"`
StartTime string `db:"start_time"` // 开始时间
EndTime string `db:"end_time"` // 结束时间
RunningTime int64 `db:"running_time"` // 运行时间
DerivedEs string `db:"derived_es"`
Cluster string `db:"cluster"`


Loading…
Cancel
Save