| @@ -0,0 +1,2 @@ | |||
| # storage-agent | |||
| @@ -0,0 +1,80 @@ | |||
| module gitlink.org.cn/cloudream/storage-agent | |||
| go 1.20 | |||
| require ( | |||
| github.com/ipfs/go-ipfs-api v0.6.0 | |||
| github.com/samber/lo v1.38.1 | |||
| gitlink.org.cn/cloudream/common v0.0.0 | |||
| gitlink.org.cn/cloudream/storage-common v0.0.0 | |||
| google.golang.org/grpc v1.57.0 | |||
| ) | |||
| require ( | |||
| github.com/antonfisher/nested-logrus-formatter v1.3.1 // indirect | |||
| github.com/baohan10/reedsolomon v0.0.0-20230406042632-43574cac9fa7 // indirect | |||
| github.com/beevik/etree v1.2.0 // indirect | |||
| github.com/benbjohnson/clock v1.3.0 // indirect | |||
| github.com/coreos/go-semver v0.3.0 // indirect | |||
| github.com/coreos/go-systemd/v22 v22.5.0 // indirect | |||
| github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 // indirect | |||
| github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect | |||
| github.com/go-ping/ping v1.1.0 // indirect | |||
| github.com/gogo/protobuf v1.3.2 // indirect | |||
| github.com/golang/protobuf v1.5.3 // indirect | |||
| github.com/google/uuid v1.3.0 // indirect | |||
| github.com/hashicorp/errwrap v1.1.0 // indirect | |||
| github.com/hashicorp/go-multierror v1.1.1 // indirect | |||
| github.com/imdario/mergo v0.3.15 // indirect | |||
| github.com/ipfs/boxo v0.8.0 // indirect | |||
| github.com/ipfs/go-cid v0.4.1 // indirect | |||
| github.com/json-iterator/go v1.1.12 // indirect | |||
| github.com/klauspost/cpuid/v2 v2.2.4 // indirect | |||
| github.com/libp2p/go-buffer-pool v0.1.0 // indirect | |||
| github.com/libp2p/go-flow-metrics v0.1.0 // indirect | |||
| github.com/libp2p/go-libp2p v0.27.0 // indirect | |||
| github.com/magefile/mage v1.15.0 // indirect | |||
| github.com/minio/sha256-simd v1.0.0 // indirect | |||
| github.com/mitchellh/go-homedir v1.1.0 // indirect | |||
| github.com/mitchellh/mapstructure v1.5.0 // indirect | |||
| github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect | |||
| github.com/modern-go/reflect2 v1.0.2 // indirect | |||
| github.com/mr-tron/base58 v1.2.0 // indirect | |||
| github.com/multiformats/go-base32 v0.1.0 // indirect | |||
| github.com/multiformats/go-base36 v0.2.0 // indirect | |||
| github.com/multiformats/go-multiaddr v0.9.0 // indirect | |||
| github.com/multiformats/go-multibase v0.2.0 // indirect | |||
| github.com/multiformats/go-multicodec v0.8.1 // indirect | |||
| github.com/multiformats/go-multihash v0.2.1 // indirect | |||
| github.com/multiformats/go-multistream v0.4.1 // indirect | |||
| github.com/multiformats/go-varint v0.0.7 // indirect | |||
| github.com/otiai10/copy v1.12.0 // indirect | |||
| github.com/sirupsen/logrus v1.9.2 // indirect | |||
| github.com/spaolacci/murmur3 v1.1.0 // indirect | |||
| github.com/streadway/amqp v1.1.0 // indirect | |||
| github.com/whyrusleeping/tar-utils v0.0.0-20201201191210-20a61371de5b // indirect | |||
| github.com/zyedidia/generic v1.2.1 // indirect | |||
| go.etcd.io/etcd/api/v3 v3.5.9 // indirect | |||
| go.etcd.io/etcd/client/pkg/v3 v3.5.9 // indirect | |||
| go.etcd.io/etcd/client/v3 v3.5.9 // indirect | |||
| go.uber.org/atomic v1.10.0 // indirect | |||
| go.uber.org/multierr v1.11.0 // indirect | |||
| go.uber.org/zap v1.24.0 // indirect | |||
| golang.org/x/crypto v0.8.0 // indirect | |||
| golang.org/x/exp v0.0.0-20230519143937-03e91628a987 // indirect | |||
| golang.org/x/net v0.9.0 // indirect | |||
| golang.org/x/sync v0.1.0 // indirect | |||
| golang.org/x/sys v0.7.0 // indirect | |||
| golang.org/x/text v0.9.0 // indirect | |||
| google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54 // indirect | |||
| google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 // indirect | |||
| google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect | |||
| google.golang.org/protobuf v1.30.0 // indirect | |||
| lukechampine.com/blake3 v1.1.7 // indirect | |||
| ) | |||
| // 运行go mod tidy时需要将下面几行取消注释 | |||
| replace gitlink.org.cn/cloudream/common => ../../common | |||
| replace gitlink.org.cn/cloudream/storage-common => ../storage-common | |||
| @@ -0,0 +1,196 @@ | |||
| github.com/antonfisher/nested-logrus-formatter v1.3.1 h1:NFJIr+pzwv5QLHTPyKz9UMEoHck02Q9L0FP13b/xSbQ= | |||
| github.com/antonfisher/nested-logrus-formatter v1.3.1/go.mod h1:6WTfyWFkBc9+zyBaKIqRrg/KwMqBbodBjgbHjDz7zjA= | |||
| github.com/baohan10/reedsolomon v0.0.0-20230406042632-43574cac9fa7 h1:wcvD6enR///dFvb9cRodx5SGbPH4G4jPjw+aVIWkAKE= | |||
| github.com/baohan10/reedsolomon v0.0.0-20230406042632-43574cac9fa7/go.mod h1:rAxMF6pVaFK/s6T4gGczvloccNbtwzuYaP2Y7W6flE8= | |||
| github.com/beevik/etree v1.2.0 h1:l7WETslUG/T+xOPs47dtd6jov2Ii/8/OjCldk5fYfQw= | |||
| github.com/beevik/etree v1.2.0/go.mod h1:aiPf89g/1k3AShMVAzriilpcE4R/Vuor90y83zVZWFc= | |||
| github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= | |||
| github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= | |||
| github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 h1:SKI1/fuSdodxmNNyVBR8d7X/HuLnRpvvFO0AgyQk764= | |||
| github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= | |||
| github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= | |||
| github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= | |||
| github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= | |||
| github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 h1:HVTnpeuvF6Owjd5mniCL8DEXo7uYXdQEmOP4FJbV5tg= | |||
| github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3/go.mod h1:p1d6YEZWvFzEh4KLyvBcVSnrfNDDvK2zfK/4x2v/4pE= | |||
| github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | |||
| github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= | |||
| github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | |||
| github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0= | |||
| github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 h1:HbphB4TFFXpv7MNrT52FGrrgVXF1owhMVTHFZIlnvd4= | |||
| github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0/go.mod h1:DZGJHZMqrU4JJqFAWUS2UO1+lbSKsdiOoYi9Zzey7Fc= | |||
| github.com/go-ping/ping v1.1.0 h1:3MCGhVX4fyEUuhsfwPrsEdQw6xspHkv5zHsiSoDFZYw= | |||
| github.com/go-ping/ping v1.1.0/go.mod h1:xIFjORFzTxqIV/tDVGO4eDy/bLuSyawEeojSm3GfRGk= | |||
| github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= | |||
| github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= | |||
| github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= | |||
| github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= | |||
| github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= | |||
| github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= | |||
| github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= | |||
| github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= | |||
| github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= | |||
| github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= | |||
| github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= | |||
| github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= | |||
| github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= | |||
| github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= | |||
| github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= | |||
| github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= | |||
| github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= | |||
| github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= | |||
| github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM= | |||
| github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= | |||
| github.com/ipfs/boxo v0.8.0 h1:UdjAJmHzQHo/j3g3b1bAcAXCj/GM6iTwvSlBDvPBNBs= | |||
| github.com/ipfs/boxo v0.8.0/go.mod h1:RIsi4CnTyQ7AUsNn5gXljJYZlQrHBMnJp94p73liFiA= | |||
| github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= | |||
| github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= | |||
| github.com/ipfs/go-ipfs-api v0.6.0 h1:JARgG0VTbjyVhO5ZfesnbXv9wTcMvoKRBLF1SzJqzmg= | |||
| github.com/ipfs/go-ipfs-api v0.6.0/go.mod h1:iDC2VMwN9LUpQV/GzEeZ2zNqd8NUdRmWcFM+K/6odf0= | |||
| github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= | |||
| github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= | |||
| github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= | |||
| github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= | |||
| github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= | |||
| github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= | |||
| github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= | |||
| github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk= | |||
| github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= | |||
| github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= | |||
| github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= | |||
| github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM= | |||
| github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro= | |||
| github.com/libp2p/go-libp2p v0.27.0 h1:QbhrTuB0ln9j9op6yAOR0o+cx/qa9NyNZ5ov0Tql8ZU= | |||
| github.com/libp2p/go-libp2p v0.27.0/go.mod h1:FAvvfQa/YOShUYdiSS03IR9OXzkcJXwcNA2FUCh9ImE= | |||
| github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= | |||
| github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= | |||
| github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= | |||
| github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= | |||
| github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= | |||
| github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= | |||
| github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= | |||
| github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= | |||
| github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= | |||
| github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= | |||
| github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= | |||
| github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= | |||
| github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= | |||
| github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= | |||
| github.com/multiformats/go-base32 v0.1.0 h1:pVx9xoSPqEIQG8o+UbAe7DNi51oej1NtK+aGkbLYxPE= | |||
| github.com/multiformats/go-base32 v0.1.0/go.mod h1:Kj3tFY6zNr+ABYMqeUNeGvkIC/UYgtWibDcT0rExnbI= | |||
| github.com/multiformats/go-base36 v0.2.0 h1:lFsAbNOGeKtuKozrtBsAkSVhv1p9D0/qedU9rQyccr0= | |||
| github.com/multiformats/go-base36 v0.2.0/go.mod h1:qvnKE++v+2MWCfePClUEjE78Z7P2a1UV0xHgWc0hkp4= | |||
| github.com/multiformats/go-multiaddr v0.9.0 h1:3h4V1LHIk5w4hJHekMKWALPXErDfz/sggzwC/NcqbDQ= | |||
| github.com/multiformats/go-multiaddr v0.9.0/go.mod h1:mI67Lb1EeTOYb8GQfL/7wpIZwc46ElrvzhYnoJOmTT0= | |||
| github.com/multiformats/go-multibase v0.2.0 h1:isdYCVLvksgWlMW9OZRYJEa9pZETFivncJHmHnnd87g= | |||
| github.com/multiformats/go-multibase v0.2.0/go.mod h1:bFBZX4lKCA/2lyOFSAoKH5SS6oPyjtnzK/XTFDPkNuk= | |||
| github.com/multiformats/go-multicodec v0.8.1 h1:ycepHwavHafh3grIbR1jIXnKCsFm0fqsfEOsJ8NtKE8= | |||
| github.com/multiformats/go-multicodec v0.8.1/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI16i14xuaojr/H7Ai54k= | |||
| github.com/multiformats/go-multihash v0.2.1 h1:aem8ZT0VA2nCHHk7bPJ1BjUbHNciqZC/d16Vve9l108= | |||
| github.com/multiformats/go-multihash v0.2.1/go.mod h1:WxoMcYG85AZVQUyRyo9s4wULvW5qrI9vb2Lt6evduFc= | |||
| github.com/multiformats/go-multistream v0.4.1 h1:rFy0Iiyn3YT0asivDUIR05leAdwZq3de4741sbiSdfo= | |||
| github.com/multiformats/go-multistream v0.4.1/go.mod h1:Mz5eykRVAjJWckE2U78c6xqdtyNUEhKSM0Lwar2p77Q= | |||
| github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= | |||
| github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= | |||
| github.com/otiai10/copy v1.12.0 h1:cLMgSQnXBs1eehF0Wy/FAGsgDTDmAqFR7rQylBb1nDY= | |||
| github.com/otiai10/copy v1.12.0/go.mod h1:rSaLseMUsZFFbsFGc7wCJnnkTAvdc5L6VWxPE4308Ww= | |||
| github.com/otiai10/mint v1.5.1 h1:XaPLeE+9vGbuyEHem1JNk3bYc7KKqyI/na0/mLd/Kks= | |||
| github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= | |||
| github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= | |||
| github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | |||
| github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= | |||
| github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= | |||
| github.com/sirupsen/logrus v1.9.2 h1:oxx1eChJGI6Uks2ZC4W1zpLlVgqB8ner4EuQwV4Ik1Y= | |||
| github.com/sirupsen/logrus v1.9.2/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= | |||
| github.com/smartystreets/assertions v1.13.1 h1:Ef7KhSmjZcK6AVf9YbJdvPYG9avaF0ZxudX+ThRdWfU= | |||
| github.com/smartystreets/goconvey v1.8.0 h1:Oi49ha/2MURE0WexF052Z0m+BNSGirfjg5RL+JXWq3w= | |||
| github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= | |||
| github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= | |||
| github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM= | |||
| github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg= | |||
| github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= | |||
| github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= | |||
| github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= | |||
| github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= | |||
| github.com/whyrusleeping/tar-utils v0.0.0-20201201191210-20a61371de5b h1:wA3QeTsaAXybLL2kb2cKhCAQTHgYTMwuI8lBlJSv5V8= | |||
| github.com/whyrusleeping/tar-utils v0.0.0-20201201191210-20a61371de5b/go.mod h1:xT1Y5p2JR2PfSZihE0s4mjdJaRGp1waCTf5JzhQLBck= | |||
| github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= | |||
| github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= | |||
| github.com/zyedidia/generic v1.2.1 h1:Zv5KS/N2m0XZZiuLS82qheRG4X1o5gsWreGb0hR7XDc= | |||
| github.com/zyedidia/generic v1.2.1/go.mod h1:ly2RBz4mnz1yeuVbQA/VFwGjK3mnHGRj1JuoG336Bis= | |||
| go.etcd.io/etcd/api/v3 v3.5.9 h1:4wSsluwyTbGGmyjJktOf3wFQoTBIURXHnq9n/G/JQHs= | |||
| go.etcd.io/etcd/api/v3 v3.5.9/go.mod h1:uyAal843mC8uUVSLWz6eHa/d971iDGnCRpmKd2Z+X8k= | |||
| go.etcd.io/etcd/client/pkg/v3 v3.5.9 h1:oidDC4+YEuSIQbsR94rY9gur91UPL6DnxDCIYd2IGsE= | |||
| go.etcd.io/etcd/client/pkg/v3 v3.5.9/go.mod h1:y+CzeSmkMpWN2Jyu1npecjB9BBnABxGM4pN8cGuJeL4= | |||
| go.etcd.io/etcd/client/v3 v3.5.9 h1:r5xghnU7CwbUxD/fbUtRyJGaYNfDun8sp/gTr1hew6E= | |||
| go.etcd.io/etcd/client/v3 v3.5.9/go.mod h1:i/Eo5LrZ5IKqpbtpPDuaUnDOUv471oDg8cjQaUr2MbA= | |||
| go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= | |||
| go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= | |||
| go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= | |||
| go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= | |||
| go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= | |||
| go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= | |||
| go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= | |||
| golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= | |||
| golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= | |||
| golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= | |||
| golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ= | |||
| golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= | |||
| golang.org/x/exp v0.0.0-20230519143937-03e91628a987 h1:3xJIFvzUFbu4ls0BTBYcgbCGhA63eAOEMxIHugyXJqA= | |||
| golang.org/x/exp v0.0.0-20230519143937-03e91628a987/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= | |||
| golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= | |||
| golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= | |||
| golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= | |||
| golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= | |||
| golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= | |||
| golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= | |||
| golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= | |||
| golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= | |||
| golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= | |||
| golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | |||
| golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | |||
| golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | |||
| golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | |||
| golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= | |||
| golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | |||
| golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= | |||
| golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | |||
| golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | |||
| golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | |||
| golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | |||
| golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= | |||
| golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= | |||
| golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= | |||
| golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= | |||
| golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= | |||
| golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= | |||
| golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= | |||
| golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= | |||
| golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= | |||
| golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= | |||
| golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= | |||
| golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= | |||
| golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= | |||
| golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | |||
| golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | |||
| golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | |||
| golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | |||
| google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54 h1:9NWlQfY2ePejTmfwUH1OWwmznFa+0kKcHGPDvcPza9M= | |||
| google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54/go.mod h1:zqTuNwFlFRsw5zIts5VnzLQxSRqh+CGOTVMlYbY0Eyk= | |||
| google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 h1:m8v1xLLLzMe1m5P+gCTF8nJB9epwZQUBERm20Oy1poQ= | |||
| google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= | |||
| google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM= | |||
| google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= | |||
| google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= | |||
| google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= | |||
| google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= | |||
| google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= | |||
| google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= | |||
| google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= | |||
| gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= | |||
| gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= | |||
| gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= | |||
| gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= | |||
| gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= | |||
| lukechampine.com/blake3 v1.1.7 h1:GgRMhmdsuK8+ii6UZFDL8Nb+VyMwadAgcJyfYHxG6n0= | |||
| lukechampine.com/blake3 v1.1.7/go.mod h1:tkKEOtDkNtklkXtLNEOGNq5tcV90tJiA1vAA12R78LA= | |||
| @@ -0,0 +1,34 @@ | |||
| package config | |||
| import ( | |||
| "gitlink.org.cn/cloudream/common/pkgs/distlock" | |||
| "gitlink.org.cn/cloudream/common/pkgs/ipfs" | |||
| log "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| c "gitlink.org.cn/cloudream/common/utils/config" | |||
| stgmodels "gitlink.org.cn/cloudream/storage-common/models" | |||
| "gitlink.org.cn/cloudream/storage-common/pkgs/grpc" | |||
| stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" | |||
| ) | |||
| type Config struct { | |||
| ID int64 `json:"id"` | |||
| Local stgmodels.LocalMachineInfo `json:"local"` | |||
| GRPC *grpc.Config `json:"grpc"` | |||
| ECPacketSize int64 `json:"ecPacketSize"` | |||
| StorageBaseDir string `json:"storageBaseDir"` | |||
| TempFileLifetime int `json:"tempFileLifetime"` // temp状态的副本最多能保持多久时间,单位:秒 | |||
| Logger log.Config `json:"logger"` | |||
| RabbitMQ stgmq.Config `json:"rabbitMQ"` | |||
| IPFS ipfs.Config `json:"ipfs"` | |||
| DistLock distlock.Config `json:"distlock"` | |||
| } | |||
| var cfg Config | |||
| func Init() error { | |||
| return c.DefaultLoad("agent", &cfg) | |||
| } | |||
| func Cfg() *Config { | |||
| return &cfg | |||
| } | |||
| @@ -0,0 +1,135 @@ | |||
| package grpc | |||
| import ( | |||
| "fmt" | |||
| "io" | |||
| log "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| myio "gitlink.org.cn/cloudream/common/utils/io" | |||
| "gitlink.org.cn/cloudream/storage-common/globals" | |||
| agentserver "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent" | |||
| ) | |||
| type Service struct { | |||
| agentserver.AgentServer | |||
| } | |||
| func NewService() *Service { | |||
| return &Service{} | |||
| } | |||
| func (s *Service) SendIPFSFile(server agentserver.Agent_SendIPFSFileServer) error { | |||
| log.Debugf("client upload file") | |||
| ipfsCli, err := globals.IPFSPool.Acquire() | |||
| if err != nil { | |||
| log.Warnf("new ipfs client: %s", err.Error()) | |||
| return fmt.Errorf("new ipfs client: %w", err) | |||
| } | |||
| defer ipfsCli.Close() | |||
| writer, err := ipfsCli.CreateFileStream() | |||
| if err != nil { | |||
| log.Warnf("create file failed, err: %s", err.Error()) | |||
| return fmt.Errorf("create file failed, err: %w", err) | |||
| } | |||
| // 然后读取文件数据 | |||
| var recvSize int64 | |||
| for { | |||
| msg, err := server.Recv() | |||
| // 读取客户端数据失败 | |||
| // 即使err是io.EOF,只要没有收到客户端包含EOF数据包就被断开了连接,就认为接收失败 | |||
| if err != nil { | |||
| // 关闭文件写入,不需要返回的hash和error | |||
| writer.Abort(io.ErrClosedPipe) | |||
| log.WithField("ReceiveSize", recvSize). | |||
| Warnf("recv message failed, err: %s", err.Error()) | |||
| return fmt.Errorf("recv message failed, err: %w", err) | |||
| } | |||
| err = myio.WriteAll(writer, msg.Data) | |||
| if err != nil { | |||
| // 关闭文件写入,不需要返回的hash和error | |||
| writer.Abort(io.ErrClosedPipe) | |||
| log.Warnf("write data to file failed, err: %s", err.Error()) | |||
| return fmt.Errorf("write data to file failed, err: %w", err) | |||
| } | |||
| recvSize += int64(len(msg.Data)) | |||
| if msg.Type == agentserver.FileDataPacketType_EOF { | |||
| // 客户端明确说明文件传输已经结束,那么结束写入,获得文件Hash | |||
| hash, err := writer.Finish() | |||
| if err != nil { | |||
| log.Warnf("finish writing failed, err: %s", err.Error()) | |||
| return fmt.Errorf("finish writing failed, err: %w", err) | |||
| } | |||
| // 并将结果返回到客户端 | |||
| err = server.SendAndClose(&agentserver.SendIPFSFileResp{ | |||
| FileHash: hash, | |||
| }) | |||
| if err != nil { | |||
| log.Warnf("send response failed, err: %s", err.Error()) | |||
| return fmt.Errorf("send response failed, err: %w", err) | |||
| } | |||
| return nil | |||
| } | |||
| } | |||
| } | |||
| func (s *Service) GetIPFSFile(req *agentserver.GetIPFSFileReq, server agentserver.Agent_GetIPFSFileServer) error { | |||
| log.WithField("FileHash", req.FileHash).Debugf("client download file") | |||
| ipfsCli, err := globals.IPFSPool.Acquire() | |||
| if err != nil { | |||
| log.Warnf("new ipfs client: %s", err.Error()) | |||
| return fmt.Errorf("new ipfs client: %w", err) | |||
| } | |||
| defer ipfsCli.Close() | |||
| reader, err := ipfsCli.OpenRead(req.FileHash) | |||
| if err != nil { | |||
| log.Warnf("open file %s to read failed, err: %s", req.FileHash, err.Error()) | |||
| return fmt.Errorf("open file to read failed, err: %w", err) | |||
| } | |||
| defer reader.Close() | |||
| buf := make([]byte, 1024) | |||
| readAllCnt := 0 | |||
| for { | |||
| readCnt, err := reader.Read(buf) | |||
| if readCnt > 0 { | |||
| readAllCnt += readCnt | |||
| err = server.Send(&agentserver.FileDataPacket{ | |||
| Type: agentserver.FileDataPacketType_Data, | |||
| Data: buf[:readCnt], | |||
| }) | |||
| if err != nil { | |||
| log.WithField("FileHash", req.FileHash). | |||
| Warnf("send file data failed, err: %s", err.Error()) | |||
| return fmt.Errorf("send file data failed, err: %w", err) | |||
| } | |||
| } | |||
| // 文件读取完毕 | |||
| if err == io.EOF { | |||
| log.WithField("FileHash", req.FileHash).Debugf("send data size %d", readAllCnt) | |||
| // 发送EOF消息 | |||
| server.Send(&agentserver.FileDataPacket{ | |||
| Type: agentserver.FileDataPacketType_EOF, | |||
| }) | |||
| return nil | |||
| } | |||
| // io.ErrUnexpectedEOF没有读满整个buf就遇到了EOF,此时正常发送剩余数据即可。除了这两个错误之外,其他错误都中断操作 | |||
| if err != nil && err != io.ErrUnexpectedEOF { | |||
| log.Warnf("read file %s data failed, err: %s", req.FileHash, err.Error()) | |||
| return fmt.Errorf("read file data failed, err: %w", err) | |||
| } | |||
| } | |||
| } | |||
| @@ -0,0 +1,29 @@ | |||
| package mq | |||
| import ( | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| "gitlink.org.cn/cloudream/storage-common/consts" | |||
| "gitlink.org.cn/cloudream/storage-common/globals" | |||
| agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" | |||
| ) | |||
| func (svc *Service) GetState(msg *agtmq.GetState) (*agtmq.GetStateResp, *mq.CodeMessage) { | |||
| var ipfsState string | |||
| ipfsCli, err := globals.IPFSPool.Acquire() | |||
| if err != nil { | |||
| logger.Warnf("new ipfs client: %s", err.Error()) | |||
| ipfsState = consts.IPFSStateUnavailable | |||
| } else { | |||
| if ipfsCli.IsUp() { | |||
| ipfsState = consts.IPFSStateOK | |||
| } else { | |||
| ipfsState = consts.IPFSStateUnavailable | |||
| } | |||
| ipfsCli.Close() | |||
| } | |||
| return mq.ReplyOK(agtmq.NewGetStateResp(ipfsState)) | |||
| } | |||
| @@ -0,0 +1,153 @@ | |||
| package mq | |||
| import ( | |||
| "time" | |||
| shell "github.com/ipfs/go-ipfs-api" | |||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||
| "gitlink.org.cn/cloudream/common/pkgs/ipfs" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| "gitlink.org.cn/cloudream/storage-agent/internal/config" | |||
| "gitlink.org.cn/cloudream/storage-agent/internal/task" | |||
| mytask "gitlink.org.cn/cloudream/storage-agent/internal/task" | |||
| "gitlink.org.cn/cloudream/storage-common/consts" | |||
| "gitlink.org.cn/cloudream/storage-common/globals" | |||
| agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" | |||
| ) | |||
| func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *mq.CodeMessage) { | |||
| ipfsCli, err := globals.IPFSPool.Acquire() | |||
| if err != nil { | |||
| logger.Warnf("new ipfs client: %s", err.Error()) | |||
| return mq.ReplyFailed[agtmq.CheckCacheResp](errorcode.OperationFailed, "new ipfs client failed") | |||
| } | |||
| defer ipfsCli.Close() | |||
| filesMap, err := ipfsCli.GetPinnedFiles() | |||
| if err != nil { | |||
| logger.Warnf("get pinned files from ipfs failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[agtmq.CheckCacheResp](errorcode.OperationFailed, "get pinned files from ipfs failed") | |||
| } | |||
| // TODO 根据锁定清单过滤被锁定的文件的记录 | |||
| if msg.IsComplete { | |||
| return svc.checkComplete(msg, filesMap, ipfsCli) | |||
| } else { | |||
| return svc.checkIncrement(msg, filesMap, ipfsCli) | |||
| } | |||
| } | |||
| func (svc *Service) checkIncrement(msg *agtmq.CheckCache, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckCacheResp, *mq.CodeMessage) { | |||
| var entries []agtmq.CheckIPFSRespEntry | |||
| for _, cache := range msg.Caches { | |||
| _, ok := filesMap[cache.FileHash] | |||
| if ok { | |||
| if cache.State == consts.CacheStatePinned { | |||
| // 不处理 | |||
| } else if cache.State == consts.CacheStateTemp { | |||
| logger.WithField("FileHash", cache.FileHash).Debugf("unpin for cache entry state is temp") | |||
| err := ipfsCli.Unpin(cache.FileHash) | |||
| if err != nil { | |||
| logger.WithField("FileHash", cache.FileHash).Warnf("unpin file failed, err: %s", err.Error()) | |||
| } | |||
| } | |||
| // 删除map中的记录,表示此记录已被检查过 | |||
| delete(filesMap, cache.FileHash) | |||
| } else { | |||
| if cache.State == consts.CacheStatePinned { | |||
| svc.taskManager.StartComparable(task.NewIPFSPin(cache.FileHash)) | |||
| } else if cache.State == consts.CacheStateTemp { | |||
| if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second { | |||
| entries = append(entries, agtmq.NewCheckCacheRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP)) | |||
| } | |||
| } | |||
| } | |||
| } | |||
| // 增量情况下,不需要对filesMap中没检查的记录进行处理 | |||
| return mq.ReplyOK(agtmq.NewCheckCacheResp(entries)) | |||
| } | |||
| func (svc *Service) checkComplete(msg *agtmq.CheckCache, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckCacheResp, *mq.CodeMessage) { | |||
| var entries []agtmq.CheckIPFSRespEntry | |||
| for _, cache := range msg.Caches { | |||
| _, ok := filesMap[cache.FileHash] | |||
| if ok { | |||
| if cache.State == consts.CacheStatePinned { | |||
| // 不处理 | |||
| } else if cache.State == consts.CacheStateTemp { | |||
| logger.WithField("FileHash", cache.FileHash).Debugf("unpin for cache entry state is temp") | |||
| err := ipfsCli.Unpin(cache.FileHash) | |||
| if err != nil { | |||
| logger.WithField("FileHash", cache.FileHash).Warnf("unpin file failed, err: %s", err.Error()) | |||
| } | |||
| } | |||
| // 删除map中的记录,表示此记录已被检查过 | |||
| delete(filesMap, cache.FileHash) | |||
| } else { | |||
| if cache.State == consts.CacheStatePinned { | |||
| svc.taskManager.StartComparable(task.NewIPFSPin(cache.FileHash)) | |||
| } else if cache.State == consts.CacheStateTemp { | |||
| if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second { | |||
| entries = append(entries, agtmq.NewCheckCacheRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP)) | |||
| } | |||
| } | |||
| } | |||
| } | |||
| // map中剩下的数据是没有被遍历过,即Cache中没有记录的,那么就Unpin文件,并产生一条Temp记录 | |||
| for hash := range filesMap { | |||
| logger.WithField("FileHash", hash).Debugf("unpin for no cacah entry") | |||
| err := ipfsCli.Unpin(hash) | |||
| if err != nil { | |||
| logger.WithField("FileHash", hash).Warnf("unpin file failed, err: %s", err.Error()) | |||
| } | |||
| entries = append(entries, agtmq.NewCheckCacheRespEntry(hash, agtmq.CHECK_IPFS_RESP_OP_CREATE_TEMP)) | |||
| } | |||
| return mq.ReplyOK(agtmq.NewCheckCacheResp(entries)) | |||
| } | |||
| func (svc *Service) StartCacheMovePackage(msg *agtmq.StartCacheMovePackage) (*agtmq.StartCacheMovePackageResp, *mq.CodeMessage) { | |||
| tsk := svc.taskManager.StartNew(mytask.NewCacheMovePackage(msg.UserID, msg.PackageID)) | |||
| return mq.ReplyOK(agtmq.NewStartCacheMovePackageResp(tsk.ID())) | |||
| } | |||
| func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtmq.WaitCacheMovePackageResp, *mq.CodeMessage) { | |||
| tsk := svc.taskManager.FindByID(msg.TaskID) | |||
| if tsk == nil { | |||
| return mq.ReplyFailed[agtmq.WaitCacheMovePackageResp](errorcode.TaskNotFound, "task not found") | |||
| } | |||
| if msg.WaitTimeoutMs == 0 { | |||
| tsk.Wait() | |||
| errMsg := "" | |||
| if tsk.Error() != nil { | |||
| errMsg = tsk.Error().Error() | |||
| } | |||
| return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg)) | |||
| } else { | |||
| if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { | |||
| errMsg := "" | |||
| if tsk.Error() != nil { | |||
| errMsg = tsk.Error().Error() | |||
| } | |||
| return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg)) | |||
| } | |||
| return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(false, "")) | |||
| } | |||
| } | |||
| @@ -0,0 +1,58 @@ | |||
| package mq | |||
| import ( | |||
| "time" | |||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||
| log "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| "gitlink.org.cn/cloudream/storage-agent/internal/task" | |||
| agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" | |||
| ) | |||
| func (svc *Service) StartPinningObject(msg *agtmq.StartPinningObject) (*agtmq.StartPinningObjectResp, *mq.CodeMessage) { | |||
| log.WithField("FileHash", msg.FileHash).Debugf("pin object") | |||
| tsk := svc.taskManager.StartComparable(task.NewIPFSPin(msg.FileHash)) | |||
| if tsk.Error() != nil { | |||
| log.WithField("FileHash", msg.FileHash). | |||
| Warnf("pin object failed, err: %s", tsk.Error().Error()) | |||
| return mq.ReplyFailed[agtmq.StartPinningObjectResp](errorcode.OperationFailed, "pin object failed") | |||
| } | |||
| return mq.ReplyOK(agtmq.NewStartPinningObjectResp(tsk.ID())) | |||
| } | |||
| func (svc *Service) WaitPinningObject(msg *agtmq.WaitPinningObject) (*agtmq.WaitPinningObjectResp, *mq.CodeMessage) { | |||
| log.WithField("TaskID", msg.TaskID).Debugf("wait pinning object") | |||
| tsk := svc.taskManager.FindByID(msg.TaskID) | |||
| if tsk == nil { | |||
| return mq.ReplyFailed[agtmq.WaitPinningObjectResp](errorcode.TaskNotFound, "task not found") | |||
| } | |||
| if msg.WaitTimeoutMs == 0 { | |||
| tsk.Wait() | |||
| errMsg := "" | |||
| if tsk.Error() != nil { | |||
| errMsg = tsk.Error().Error() | |||
| } | |||
| return mq.ReplyOK(agtmq.NewWaitPinningObjectResp(true, errMsg)) | |||
| } else { | |||
| if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { | |||
| errMsg := "" | |||
| if tsk.Error() != nil { | |||
| errMsg = tsk.Error().Error() | |||
| } | |||
| return mq.ReplyOK(agtmq.NewWaitPinningObjectResp(true, errMsg)) | |||
| } | |||
| return mq.ReplyOK(agtmq.NewWaitPinningObjectResp(false, "")) | |||
| } | |||
| } | |||
| @@ -0,0 +1,15 @@ | |||
| package mq | |||
| import ( | |||
| "gitlink.org.cn/cloudream/storage-agent/internal/task" | |||
| ) | |||
| type Service struct { | |||
| taskManager *task.Manager | |||
| } | |||
| func NewService(taskMgr *task.Manager) *Service { | |||
| return &Service{ | |||
| taskManager: taskMgr, | |||
| } | |||
| } | |||
| @@ -0,0 +1,247 @@ | |||
| package mq | |||
| import ( | |||
| "io/fs" | |||
| "os" | |||
| "path/filepath" | |||
| "time" | |||
| "github.com/samber/lo" | |||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| "gitlink.org.cn/cloudream/storage-agent/internal/config" | |||
| mytask "gitlink.org.cn/cloudream/storage-agent/internal/task" | |||
| "gitlink.org.cn/cloudream/storage-common/consts" | |||
| "gitlink.org.cn/cloudream/storage-common/globals" | |||
| "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" | |||
| agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" | |||
| coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" | |||
| "gitlink.org.cn/cloudream/storage-common/utils" | |||
| ) | |||
| func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage) (*agtmq.StartStorageLoadPackageResp, *mq.CodeMessage) { | |||
| coorCli, err := globals.CoordinatorMQPool.Acquire() | |||
| if err != nil { | |||
| logger.Warnf("new coordinator client: %s", err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed") | |||
| } | |||
| defer coorCli.Close() | |||
| getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID)) | |||
| if err != nil { | |||
| logger.WithField("StorageID", msg.StorageID). | |||
| Warnf("getting storage info: %s", err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed") | |||
| } | |||
| outputDirPath := filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, utils.MakeStorageLoadPackageDirName(msg.PackageID, msg.UserID)) | |||
| if err = os.MkdirAll(outputDirPath, 0755); err != nil { | |||
| logger.WithField("StorageID", msg.StorageID). | |||
| Warnf("creating output directory: %s", err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "create output directory failed") | |||
| } | |||
| tsk := svc.taskManager.StartNew(mytask.NewDownloadPackage(msg.UserID, msg.PackageID, outputDirPath)) | |||
| return mq.ReplyOK(agtmq.NewStartStorageLoadPackageResp(tsk.ID())) | |||
| } | |||
| func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (*agtmq.WaitStorageLoadPackageResp, *mq.CodeMessage) { | |||
| logger.WithField("TaskID", msg.TaskID).Debugf("wait loading package") | |||
| tsk := svc.taskManager.FindByID(msg.TaskID) | |||
| if tsk == nil { | |||
| return mq.ReplyFailed[agtmq.WaitStorageLoadPackageResp](errorcode.TaskNotFound, "task not found") | |||
| } | |||
| if msg.WaitTimeoutMs == 0 { | |||
| tsk.Wait() | |||
| errMsg := "" | |||
| if tsk.Error() != nil { | |||
| errMsg = tsk.Error().Error() | |||
| } | |||
| return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg)) | |||
| } else { | |||
| if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { | |||
| errMsg := "" | |||
| if tsk.Error() != nil { | |||
| errMsg = tsk.Error().Error() | |||
| } | |||
| return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg)) | |||
| } | |||
| return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, "")) | |||
| } | |||
| } | |||
| func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckResp, *mq.CodeMessage) { | |||
| dirFullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory) | |||
| infos, err := os.ReadDir(dirFullPath) | |||
| if err != nil { | |||
| logger.Warnf("list storage directory failed, err: %s", err.Error()) | |||
| return mq.ReplyOK(agtmq.NewStorageCheckResp( | |||
| err.Error(), | |||
| nil, | |||
| )) | |||
| } | |||
| dirInfos := lo.Filter(infos, func(info fs.DirEntry, index int) bool { return info.IsDir() }) | |||
| if msg.IsComplete { | |||
| return svc.checkStorageComplete(msg, dirInfos) | |||
| } else { | |||
| return svc.checkStorageIncrement(msg, dirInfos) | |||
| } | |||
| } | |||
| func (svc *Service) checkStorageIncrement(msg *agtmq.StorageCheck, dirInfos []fs.DirEntry) (*agtmq.StorageCheckResp, *mq.CodeMessage) { | |||
| infosMap := make(map[string]fs.DirEntry) | |||
| for _, info := range dirInfos { | |||
| infosMap[info.Name()] = info | |||
| } | |||
| var entries []agtmq.StorageCheckRespEntry | |||
| for _, obj := range msg.Packages { | |||
| dirName := utils.MakeStorageLoadPackageDirName(obj.PackageID, obj.UserID) | |||
| _, ok := infosMap[dirName] | |||
| if ok { | |||
| // 不需要做处理 | |||
| // 删除map中的记录,表示此记录已被检查过 | |||
| delete(infosMap, dirName) | |||
| } else { | |||
| // 只要文件不存在,就删除StoragePackage表中的记录 | |||
| entries = append(entries, agtmq.NewStorageCheckRespEntry(obj.PackageID, obj.UserID, agtmq.CHECK_STORAGE_RESP_OP_DELETE)) | |||
| } | |||
| } | |||
| // 增量情况下,不需要对infosMap中没检查的记录进行处理 | |||
| return mq.ReplyOK(agtmq.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries)) | |||
| } | |||
| func (svc *Service) checkStorageComplete(msg *agtmq.StorageCheck, dirInfos []fs.DirEntry) (*agtmq.StorageCheckResp, *mq.CodeMessage) { | |||
| infosMap := make(map[string]fs.DirEntry) | |||
| for _, info := range dirInfos { | |||
| infosMap[info.Name()] = info | |||
| } | |||
| var entries []agtmq.StorageCheckRespEntry | |||
| for _, obj := range msg.Packages { | |||
| dirName := utils.MakeStorageLoadPackageDirName(obj.PackageID, obj.UserID) | |||
| _, ok := infosMap[dirName] | |||
| if ok { | |||
| // 不需要做处理 | |||
| // 删除map中的记录,表示此记录已被检查过 | |||
| delete(infosMap, dirName) | |||
| } else { | |||
| // 只要文件不存在,就删除StoragePackage表中的记录 | |||
| entries = append(entries, agtmq.NewStorageCheckRespEntry(obj.PackageID, obj.UserID, agtmq.CHECK_STORAGE_RESP_OP_DELETE)) | |||
| } | |||
| } | |||
| return mq.ReplyOK(agtmq.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries)) | |||
| } | |||
| func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePackage) (*agtmq.StartStorageCreatePackageResp, *mq.CodeMessage) { | |||
| coorCli, err := globals.CoordinatorMQPool.Acquire() | |||
| if err != nil { | |||
| logger.Warnf("new coordinator client: %s", err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed") | |||
| } | |||
| defer coorCli.Close() | |||
| getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID)) | |||
| if err != nil { | |||
| logger.WithField("StorageID", msg.StorageID). | |||
| Warnf("getting storage info: %s", err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed") | |||
| } | |||
| fullPath := filepath.Clean(filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, msg.Path)) | |||
| var uploadFilePathes []string | |||
| err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error { | |||
| if err != nil { | |||
| return nil | |||
| } | |||
| if !fi.IsDir() { | |||
| uploadFilePathes = append(uploadFilePathes, fname) | |||
| } | |||
| return nil | |||
| }) | |||
| if err != nil { | |||
| logger.Warnf("opening directory %s: %s", fullPath, err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "read directory failed") | |||
| } | |||
| objIter := iterator.NewUploadingObjectIterator(fullPath, uploadFilePathes) | |||
| if msg.Redundancy.IsRepInfo() { | |||
| repInfo, err := msg.Redundancy.ToRepInfo() | |||
| if err != nil { | |||
| logger.Warnf("getting rep redundancy info: %s", err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "get rep redundancy info failed") | |||
| } | |||
| tsk := svc.taskManager.StartNew(mytask.NewCreateRepPackage(msg.UserID, msg.BucketID, msg.Name, objIter, repInfo)) | |||
| return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) | |||
| } | |||
| ecInfo, err := msg.Redundancy.ToECInfo() | |||
| if err != nil { | |||
| logger.Warnf("getting ec redundancy info: %s", err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "get ec redundancy info failed") | |||
| } | |||
| tsk := svc.taskManager.StartNew(mytask.NewCreateECPackage(msg.UserID, msg.BucketID, msg.Name, objIter, ecInfo)) | |||
| return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID())) | |||
| } | |||
| func (svc *Service) WaitStorageCreatePackage(msg *agtmq.WaitStorageCreatePackage) (*agtmq.WaitStorageCreatePackageResp, *mq.CodeMessage) { | |||
| tsk := svc.taskManager.FindByID(msg.TaskID) | |||
| if tsk == nil { | |||
| return nil, mq.Failed(errorcode.TaskNotFound, "task not found") | |||
| } | |||
| if msg.WaitTimeoutMs == 0 { | |||
| tsk.Wait() | |||
| } else if !tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { | |||
| return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(false, "", 0)) | |||
| } | |||
| if tsk.Error() != nil { | |||
| return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, tsk.Error().Error(), 0)) | |||
| } | |||
| // TODO 避免判断类型 | |||
| if repTask, ok := tsk.Body().(*mytask.CreateRepPackage); ok { | |||
| return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", repTask.Result.PackageID)) | |||
| } | |||
| if ecTask, ok := tsk.Body().(*mytask.CreateECPackage); ok { | |||
| return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", ecTask.Result.PackageID)) | |||
| } | |||
| return nil, mq.Failed(errorcode.TaskNotFound, "task not found") | |||
| } | |||
| @@ -0,0 +1,102 @@ | |||
| package task | |||
| import ( | |||
| "fmt" | |||
| "time" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/storage-common/globals" | |||
| "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" | |||
| "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder" | |||
| coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" | |||
| ) | |||
| type CacheMovePackage struct { | |||
| userID int64 | |||
| packageID int64 | |||
| } | |||
| func NewCacheMovePackage(userID int64, packageID int64) *CacheMovePackage { | |||
| return &CacheMovePackage{ | |||
| userID: userID, | |||
| packageID: packageID, | |||
| } | |||
| } | |||
| func (t *CacheMovePackage) Execute(ctx TaskContext, complete CompleteFn) { | |||
| err := t.do(ctx) | |||
| complete(err, CompleteOption{ | |||
| RemovingDelay: time.Minute, | |||
| }) | |||
| } | |||
| func (t *CacheMovePackage) do(ctx TaskContext) error { | |||
| log := logger.WithType[CacheMovePackage]("Task") | |||
| log.Debugf("begin with %v", logger.FormatStruct(t)) | |||
| defer log.Debugf("end") | |||
| // TOOD EC的锁 | |||
| mutex, err := reqbuilder.NewBuilder(). | |||
| Metadata(). | |||
| // 读取Package信息和包含的Object信息 | |||
| Package().ReadOne(t.packageID).Object().ReadAny(). | |||
| // 读取Rep对象的配置 | |||
| ObjectRep().ReadAny(). | |||
| // 创建Cache记录 | |||
| Cache().CreateAny(). | |||
| IPFS(). | |||
| // pin文件 | |||
| CreateAnyRep(*globals.Local.NodeID). | |||
| MutexLock(ctx.distlock) | |||
| if err != nil { | |||
| return fmt.Errorf("acquiring distlock: %w", err) | |||
| } | |||
| defer mutex.Unlock() | |||
| coorCli, err := globals.CoordinatorMQPool.Acquire() | |||
| if err != nil { | |||
| return fmt.Errorf("new coordinator client: %w", err) | |||
| } | |||
| defer coorCli.Close() | |||
| pkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(t.userID, t.packageID)) | |||
| if err != nil { | |||
| return fmt.Errorf("getting package: %w", err) | |||
| } | |||
| if pkgResp.Redundancy.IsRepInfo() { | |||
| return t.moveRep(ctx, coorCli, pkgResp.Package) | |||
| } else { | |||
| // TODO EC的CacheMove逻辑 | |||
| } | |||
| return nil | |||
| } | |||
| func (t *CacheMovePackage) moveRep(ctx TaskContext, coorCli *coormq.PoolClient, pkg model.Package) error { | |||
| getRepResp, err := coorCli.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(pkg.PackageID)) | |||
| if err != nil { | |||
| return fmt.Errorf("getting package object rep data: %w", err) | |||
| } | |||
| ipfsCli, err := globals.IPFSPool.Acquire() | |||
| if err != nil { | |||
| return fmt.Errorf("new ipfs client: %w", err) | |||
| } | |||
| defer ipfsCli.Close() | |||
| var fileHashes []string | |||
| for _, rep := range getRepResp.Data { | |||
| if err := ipfsCli.Pin(rep.FileHash); err != nil { | |||
| return fmt.Errorf("pinning file %s: %w", rep.FileHash, err) | |||
| } | |||
| fileHashes = append(fileHashes, rep.FileHash) | |||
| } | |||
| _, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(pkg.PackageID, *globals.Local.NodeID, fileHashes)) | |||
| if err != nil { | |||
| return fmt.Errorf("reporting cache package moved: %w", err) | |||
| } | |||
| return nil | |||
| } | |||
| @@ -0,0 +1,39 @@ | |||
| package task | |||
| import ( | |||
| "time" | |||
| "gitlink.org.cn/cloudream/common/models" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" | |||
| "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" | |||
| ) | |||
| type CreateECPackageResult = cmd.CreateECPackageResult | |||
| type CreateECPackage struct { | |||
| cmd cmd.CreateECPackage | |||
| Result *CreateECPackageResult | |||
| } | |||
| func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage { | |||
| return &CreateECPackage{ | |||
| cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy), | |||
| } | |||
| } | |||
| func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) { | |||
| log := logger.WithType[CreateECPackage]("Task") | |||
| log.Debugf("begin") | |||
| defer log.Debugf("end") | |||
| ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ | |||
| Distlock: ctx.distlock, | |||
| }) | |||
| t.Result = ret | |||
| complete(err, CompleteOption{ | |||
| RemovingDelay: time.Minute, | |||
| }) | |||
| } | |||
| @@ -0,0 +1,39 @@ | |||
| package task | |||
| import ( | |||
| "time" | |||
| "gitlink.org.cn/cloudream/common/models" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" | |||
| "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" | |||
| ) | |||
| type CreateRepPackageResult = cmd.CreateRepPackageResult | |||
| type CreateRepPackage struct { | |||
| cmd cmd.CreateRepPackage | |||
| Result *CreateRepPackageResult | |||
| } | |||
| func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage { | |||
| return &CreateRepPackage{ | |||
| cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy), | |||
| } | |||
| } | |||
| func (t *CreateRepPackage) Execute(ctx TaskContext, complete CompleteFn) { | |||
| log := logger.WithType[CreateRepPackage]("Task") | |||
| log.Debugf("begin") | |||
| defer log.Debugf("end") | |||
| ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{ | |||
| Distlock: ctx.distlock, | |||
| }) | |||
| t.Result = ret | |||
| complete(err, CompleteOption{ | |||
| RemovingDelay: time.Minute, | |||
| }) | |||
| } | |||
| @@ -0,0 +1,26 @@ | |||
| package task | |||
| import ( | |||
| "time" | |||
| "gitlink.org.cn/cloudream/storage-common/pkgs/cmd" | |||
| ) | |||
| type DownloadPackage struct { | |||
| cmd *cmd.DownloadPackage | |||
| } | |||
| func NewDownloadPackage(userID int64, packageID int64, outputPath string) *DownloadPackage { | |||
| return &DownloadPackage{ | |||
| cmd: cmd.NewDownloadPackage(userID, packageID, outputPath), | |||
| } | |||
| } | |||
| func (t *DownloadPackage) Execute(ctx TaskContext, complete CompleteFn) { | |||
| err := t.cmd.Execute(&cmd.DownloadPackageContext{ | |||
| Distlock: ctx.distlock, | |||
| }) | |||
| complete(err, CompleteOption{ | |||
| RemovingDelay: time.Minute, | |||
| }) | |||
| } | |||
| @@ -0,0 +1,61 @@ | |||
| package task | |||
| import ( | |||
| "fmt" | |||
| "time" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/storage-common/globals" | |||
| ) | |||
| type IPFSPin struct { | |||
| FileHash string | |||
| } | |||
| func NewIPFSPin(fileHash string) *IPFSPin { | |||
| return &IPFSPin{ | |||
| FileHash: fileHash, | |||
| } | |||
| } | |||
| func (t *IPFSPin) Compare(other *Task) bool { | |||
| tsk, ok := other.Body().(*IPFSPin) | |||
| if !ok { | |||
| return false | |||
| } | |||
| return t.FileHash == tsk.FileHash | |||
| } | |||
| func (t *IPFSPin) Execute(ctx TaskContext, complete CompleteFn) { | |||
| log := logger.WithType[IPFSPin]("Task") | |||
| log.Debugf("begin with %v", logger.FormatStruct(t)) | |||
| defer log.Debugf("end") | |||
| ipfsCli, err := globals.IPFSPool.Acquire() | |||
| if err != nil { | |||
| err := fmt.Errorf("new ipfs client: %w", err) | |||
| log.Warn(err.Error()) | |||
| complete(err, CompleteOption{ | |||
| RemovingDelay: time.Minute, | |||
| }) | |||
| return | |||
| } | |||
| defer ipfsCli.Close() | |||
| err = ipfsCli.Pin(t.FileHash) | |||
| if err != nil { | |||
| err := fmt.Errorf("pin file failed, err: %w", err) | |||
| log.WithField("FileHash", t.FileHash).Warn(err.Error()) | |||
| complete(err, CompleteOption{ | |||
| RemovingDelay: time.Minute, | |||
| }) | |||
| return | |||
| } | |||
| complete(nil, CompleteOption{ | |||
| RemovingDelay: time.Minute, | |||
| }) | |||
| } | |||
| @@ -0,0 +1,102 @@ | |||
| package task | |||
| import ( | |||
| "fmt" | |||
| "io" | |||
| "os" | |||
| "path/filepath" | |||
| "time" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/storage-common/globals" | |||
| ) | |||
| type IPFSRead struct { | |||
| FileHash string | |||
| LocalPath string | |||
| } | |||
| func NewIPFSRead(fileHash string, localPath string) *IPFSRead { | |||
| return &IPFSRead{ | |||
| FileHash: fileHash, | |||
| LocalPath: localPath, | |||
| } | |||
| } | |||
| func (t *IPFSRead) Compare(other *Task) bool { | |||
| tsk, ok := other.Body().(*IPFSRead) | |||
| if !ok { | |||
| return false | |||
| } | |||
| return t.FileHash == tsk.FileHash && t.LocalPath == tsk.LocalPath | |||
| } | |||
| func (t *IPFSRead) Execute(ctx TaskContext, complete CompleteFn) { | |||
| log := logger.WithType[IPFSRead]("Task") | |||
| log.Debugf("begin with %v", logger.FormatStruct(t)) | |||
| defer log.Debugf("end") | |||
| outputFileDir := filepath.Dir(t.LocalPath) | |||
| err := os.MkdirAll(outputFileDir, os.ModePerm) | |||
| if err != nil { | |||
| err := fmt.Errorf("create output file directory %s failed, err: %w", outputFileDir, err) | |||
| log.WithField("LocalPath", t.LocalPath).Warn(err.Error()) | |||
| complete(err, CompleteOption{ | |||
| RemovingDelay: time.Minute, | |||
| }) | |||
| return | |||
| } | |||
| outputFile, err := os.Create(t.LocalPath) | |||
| if err != nil { | |||
| err := fmt.Errorf("create output file %s failed, err: %w", t.LocalPath, err) | |||
| log.WithField("LocalPath", t.LocalPath).Warn(err.Error()) | |||
| complete(err, CompleteOption{ | |||
| RemovingDelay: time.Minute, | |||
| }) | |||
| return | |||
| } | |||
| defer outputFile.Close() | |||
| ipfsCli, err := globals.IPFSPool.Acquire() | |||
| if err != nil { | |||
| err := fmt.Errorf("new ipfs client: %w", err) | |||
| log.Warn(err.Error()) | |||
| complete(err, CompleteOption{ | |||
| RemovingDelay: time.Minute, | |||
| }) | |||
| return | |||
| } | |||
| defer ipfsCli.Close() | |||
| rd, err := ipfsCli.OpenRead(t.FileHash) | |||
| if err != nil { | |||
| err := fmt.Errorf("read ipfs file failed, err: %w", err) | |||
| log.WithField("FileHash", t.FileHash).Warn(err.Error()) | |||
| complete(err, CompleteOption{ | |||
| RemovingDelay: time.Minute, | |||
| }) | |||
| return | |||
| } | |||
| _, err = io.Copy(outputFile, rd) | |||
| if err != nil { | |||
| err := fmt.Errorf("copy ipfs file to local file failed, err: %w", err) | |||
| log.WithField("LocalPath", t.LocalPath).Warn(err.Error()) | |||
| complete(err, CompleteOption{ | |||
| RemovingDelay: time.Minute, | |||
| }) | |||
| return | |||
| } | |||
| complete(nil, CompleteOption{ | |||
| RemovingDelay: time.Minute, | |||
| }) | |||
| } | |||
| @@ -0,0 +1,28 @@ | |||
| package task | |||
| import ( | |||
| distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" | |||
| "gitlink.org.cn/cloudream/common/pkgs/task" | |||
| ) | |||
| type TaskContext struct { | |||
| distlock *distsvc.Service | |||
| } | |||
| // 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, | |||
| // 因此适合进行执行结果的设置 | |||
| type CompleteFn = task.CompleteFn | |||
| type Manager = task.Manager[TaskContext] | |||
| type TaskBody = task.TaskBody[TaskContext] | |||
| type Task = task.Task[TaskContext] | |||
| type CompleteOption = task.CompleteOption | |||
| func NewManager(distlock *distsvc.Service) Manager { | |||
| return task.NewManager(TaskContext{ | |||
| distlock: distlock, | |||
| }) | |||
| } | |||
| @@ -0,0 +1,20 @@ | |||
| //go:build mage | |||
| package main | |||
| import ( | |||
| "gitlink.org.cn/cloudream/common/magefiles" | |||
| //mage:import | |||
| _ "gitlink.org.cn/cloudream/common/magefiles/targets" | |||
| ) | |||
| var Default = Build | |||
| func Build() error { | |||
| return magefiles.Build(magefiles.BuildArgs{ | |||
| OutputName: "agent", | |||
| OutputDir: "agent", | |||
| AssetsDir: "assets", | |||
| }) | |||
| } | |||
| @@ -0,0 +1,128 @@ | |||
| package main | |||
| import ( | |||
| "fmt" | |||
| "net" | |||
| "os" | |||
| "sync" | |||
| log "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/storage-agent/internal/config" | |||
| "gitlink.org.cn/cloudream/storage-agent/internal/task" | |||
| "gitlink.org.cn/cloudream/storage-common/globals" | |||
| "gitlink.org.cn/cloudream/storage-common/pkgs/distlock" | |||
| agtrpc "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent" | |||
| "google.golang.org/grpc" | |||
| agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" | |||
| grpcsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/grpc" | |||
| cmdsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/mq" | |||
| ) | |||
| // TODO 此数据是否在运行时会发生变化? | |||
| var AgentIpList []string | |||
| func main() { | |||
| // TODO 放到配置里读取 | |||
| AgentIpList = []string{"pcm01", "pcm1", "pcm2"} | |||
| err := config.Init() | |||
| if err != nil { | |||
| fmt.Printf("init config failed, err: %s", err.Error()) | |||
| os.Exit(1) | |||
| } | |||
| err = log.Init(&config.Cfg().Logger) | |||
| if err != nil { | |||
| fmt.Printf("init logger failed, err: %s", err.Error()) | |||
| os.Exit(1) | |||
| } | |||
| globals.InitLocal(&config.Cfg().Local) | |||
| globals.InitMQPool(&config.Cfg().RabbitMQ) | |||
| globals.InitAgentRPCPool(&agtrpc.PoolConfig{ | |||
| Port: config.Cfg().GRPC.Port, | |||
| }) | |||
| globals.InitIPFSPool(&config.Cfg().IPFS) | |||
| distlock, err := distlock.NewService(&config.Cfg().DistLock) | |||
| if err != nil { | |||
| log.Fatalf("new ipfs failed, err: %s", err.Error()) | |||
| } | |||
| //处置协调端、客户端命令(可多建几个) | |||
| wg := sync.WaitGroup{} | |||
| wg.Add(5) | |||
| taskMgr := task.NewManager(distlock) | |||
| // 启动命令服务器 | |||
| // TODO 需要设计AgentID持久化机制 | |||
| agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr), config.Cfg().ID, &config.Cfg().RabbitMQ) | |||
| if err != nil { | |||
| log.Fatalf("new agent server failed, err: %s", err.Error()) | |||
| } | |||
| agtSvr.OnError = func(err error) { | |||
| log.Warnf("agent server err: %s", err.Error()) | |||
| } | |||
| go serveAgentServer(agtSvr, &wg) | |||
| go reportStatus(&wg) //网络延迟感知 | |||
| //面向客户端收发数据 | |||
| listenAddr := config.Cfg().GRPC.MakeListenAddress() | |||
| lis, err := net.Listen("tcp", listenAddr) | |||
| if err != nil { | |||
| log.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error()) | |||
| } | |||
| s := grpc.NewServer() | |||
| agtrpc.RegisterAgentServer(s, grpcsvc.NewService()) | |||
| go serveGRPC(s, lis, &wg) | |||
| go serveDistLock(distlock) | |||
| wg.Wait() | |||
| } | |||
| func serveAgentServer(server *agtmq.Server, wg *sync.WaitGroup) { | |||
| log.Info("start serving command server") | |||
| err := server.Serve() | |||
| if err != nil { | |||
| log.Errorf("command server stopped with error: %s", err.Error()) | |||
| } | |||
| log.Info("command server stopped") | |||
| wg.Done() | |||
| } | |||
| func serveGRPC(s *grpc.Server, lis net.Listener, wg *sync.WaitGroup) { | |||
| log.Info("start serving grpc") | |||
| err := s.Serve(lis) | |||
| if err != nil { | |||
| log.Errorf("grpc stopped with error: %s", err.Error()) | |||
| } | |||
| log.Info("grpc stopped") | |||
| wg.Done() | |||
| } | |||
| func serveDistLock(svc *distlock.Service) { | |||
| log.Info("start serving distlock") | |||
| err := svc.Serve() | |||
| if err != nil { | |||
| log.Errorf("distlock stopped with error: %s", err.Error()) | |||
| } | |||
| log.Info("distlock stopped") | |||
| } | |||
| @@ -0,0 +1,66 @@ | |||
| package main | |||
| import ( | |||
| "sync" | |||
| "time" | |||
| log "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/storage-agent/internal/config" | |||
| "gitlink.org.cn/cloudream/storage-common/consts" | |||
| coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" | |||
| "gitlink.org.cn/cloudream/storage-common/utils" | |||
| ) | |||
| func reportStatus(wg *sync.WaitGroup) { | |||
| coorCli, err := coormq.NewClient(&config.Cfg().RabbitMQ) | |||
| if err != nil { | |||
| wg.Done() | |||
| log.Error("new coordinator client failed, err: %w", err) | |||
| return | |||
| } | |||
| // TODO 增加退出死循环的方法 | |||
| for { | |||
| //挨个ping其他agent(AgentIpList),记录延迟到AgentDelay | |||
| // TODO AgentIP考虑放到配置文件里或者启动时从coor获取 | |||
| ips := utils.GetAgentIps() | |||
| agentDelay := make([]int, len(ips)) | |||
| waitG := sync.WaitGroup{} | |||
| waitG.Add(len(ips)) | |||
| for i := 0; i < len(ips); i++ { | |||
| go func(i int, wg *sync.WaitGroup) { | |||
| connStatus, err := utils.GetConnStatus(ips[i]) | |||
| if err != nil { | |||
| wg.Done() | |||
| log.Warnf("ping %s failed, err: %s", ips[i], err.Error()) | |||
| return | |||
| } | |||
| log.Debugf("connection status to %s: %+v", ips[i], connStatus) | |||
| if connStatus.IsReachable { | |||
| agentDelay[i] = int(connStatus.Delay.Milliseconds()) + 1 | |||
| } else { | |||
| agentDelay[i] = -1 | |||
| } | |||
| wg.Done() | |||
| }(i, &waitG) | |||
| } | |||
| waitG.Wait() | |||
| //TODO: 查看本地IPFS daemon是否正常,记录到ipfsStatus | |||
| ipfsStatus := consts.IPFSStateOK | |||
| //TODO:访问自身资源目录(配置文件中获取路径),记录是否正常,记录到localDirStatus | |||
| localDirStatus := consts.StorageDirectoryStateOK | |||
| //发送心跳 | |||
| // TODO 由于数据结构未定,暂时不发送真实数据 | |||
| coorCli.AgentStatusReport(coormq.NewAgentStatusReportBody(config.Cfg().ID, []int64{}, []int{}, ipfsStatus, localDirStatus)) | |||
| time.Sleep(time.Minute * 5) | |||
| } | |||
| coorCli.Close() | |||
| wg.Done() | |||
| } | |||