From 5ed7520cf50f4da5623fa9351eb5803ab7ece345 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 22 Oct 2024 10:47:42 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E7=BC=93=E5=AD=98bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/downloader/strip_iterator.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/common/pkgs/downloader/strip_iterator.go b/common/pkgs/downloader/strip_iterator.go index 828dcdf..fd74cdf 100644 --- a/common/pkgs/downloader/strip_iterator.go +++ b/common/pkgs/downloader/strip_iterator.go @@ -65,7 +65,7 @@ func NewStripIterator(object cdssdk.Object, blocks []downloadBlock, red *cdssdk. func (s *StripIterator) MoveNext() (Strip, error) { if !s.inited { - go s.downloading() + go s.downloading(s.curStripIndex) s.inited = true } @@ -112,8 +112,8 @@ func (s *StripIterator) Close() { }) } -func (s *StripIterator) downloading() { - curStripIndex := s.curStripIndex +func (s *StripIterator) downloading(startStripIndex int64) { + curStripIndex := startStripIndex loop: for { stripBytesPos := curStripIndex * int64(s.red.K) * int64(s.red.ChunkSize) @@ -199,7 +199,7 @@ func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) { } toExec, hd := ioswitch2.NewToDriverWithRange(-1, exec.Range{ - Offset: s.curStripIndex * s.red.StripSize(), + Offset: stripIndex * s.red.StripSize(), }) ft.AddTo(toExec) From 5fb73a8a5209213fbb31ed5a1389645655777d4b Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 22 Oct 2024 11:25:34 +0800 Subject: [PATCH 2/2] =?UTF-8?q?coordinator=E5=A2=9E=E5=8A=A0migrate?= =?UTF-8?q?=E5=91=BD=E4=BB=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- coordinator/internal/cmd/cmd.go | 18 ++++++++ coordinator/internal/cmd/migrate.go | 61 +++++++++++++++++++++++++++ coordinator/internal/cmd/serve.go | 60 ++++++++++++++++++++++++++ coordinator/internal/config/config.go | 8 +++- coordinator/main.go | 55 +----------------------- 5 files changed, 147 insertions(+), 55 deletions(-) create mode 100644 coordinator/internal/cmd/cmd.go create mode 100644 coordinator/internal/cmd/migrate.go create mode 100644 coordinator/internal/cmd/serve.go diff --git a/coordinator/internal/cmd/cmd.go b/coordinator/internal/cmd/cmd.go new file mode 100644 index 0000000..b05e4f5 --- /dev/null +++ b/coordinator/internal/cmd/cmd.go @@ -0,0 +1,18 @@ +package cmd + +import "github.com/spf13/cobra" + +var RootCmd = &cobra.Command{ + Use: "coordinator", + Short: "Coordinator service for storage", + Long: `Coordinator service for storage`, +} + +func init() { + var configPath string + RootCmd.Flags().StringVarP(&configPath, "config", "c", "", "Path to config file") + + RootCmd.Run = func(cmd *cobra.Command, args []string) { + serve(configPath) + } +} diff --git a/coordinator/internal/cmd/migrate.go b/coordinator/internal/cmd/migrate.go new file mode 100644 index 0000000..53a38ca --- /dev/null +++ b/coordinator/internal/cmd/migrate.go @@ -0,0 +1,61 @@ +package cmd + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/coordinator/internal/config" + "gorm.io/driver/mysql" + "gorm.io/gorm" +) + +func init() { + var configPath string + cmd := cobra.Command{ + Use: "migrate", + Short: "Run database migrations", + Run: func(cmd *cobra.Command, args []string) { + migrate(configPath) + }, + } + cmd.Flags().StringVarP(&configPath, "config", "c", "", "Path to config file") + RootCmd.AddCommand(&cmd) +} + +func migrate(configPath string) { + // TODO 将create_database.sql的内容逐渐移动到这里来 + + err := config.Init(configPath) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + db, err := gorm.Open(mysql.Open(config.Cfg().DB.MakeSourceString())) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + err = db.AutoMigrate(&cdssdk.Storage{}) + if err != nil { + fmt.Printf("migratting model Storage: %v\n", err) + os.Exit(1) + } + + err = db.AutoMigrate(&cdssdk.ShardStorage{}) + if err != nil { + fmt.Printf("migratting model ShardStorage: %v\n", err) + os.Exit(1) + } + + err = db.AutoMigrate(&cdssdk.SharedStorage{}) + if err != nil { + fmt.Printf("migratting model SharedStorage: %v\n", err) + os.Exit(1) + } + + fmt.Println("migrate success") +} diff --git a/coordinator/internal/cmd/serve.go b/coordinator/internal/cmd/serve.go new file mode 100644 index 0000000..adfd720 --- /dev/null +++ b/coordinator/internal/cmd/serve.go @@ -0,0 +1,60 @@ +package cmd + +import ( + "fmt" + "os" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/storage/coordinator/internal/config" + "gitlink.org.cn/cloudream/storage/coordinator/internal/mq" +) + +func serve(configPath string) { + err := config.Init(configPath) + if err != nil { + fmt.Printf("init config failed, err: %s", err.Error()) + os.Exit(1) + } + + err = logger.Init(&config.Cfg().Logger) + if err != nil { + fmt.Printf("init logger failed, err: %s", err.Error()) + os.Exit(1) + } + + db, err := mydb.NewDB(&config.Cfg().DB) + if err != nil { + logger.Fatalf("new db failed, err: %s", err.Error()) + } + + coorSvr, err := coormq.NewServer(mq.NewService(db), &config.Cfg().RabbitMQ) + if err != nil { + logger.Fatalf("new coordinator server failed, err: %s", err.Error()) + } + + coorSvr.OnError(func(err error) { + logger.Warnf("coordinator server err: %s", err.Error()) + }) + + // 启动服务 + go serveCoorServer(coorSvr) + + forever := make(chan bool) + <-forever +} + +func serveCoorServer(server *coormq.Server) { + logger.Info("start serving command server") + + err := server.Serve() + if err != nil { + logger.Errorf("command server stopped with error: %s", err.Error()) + } + + logger.Info("command server stopped") + + // TODO 仅简单结束了程序 + os.Exit(1) +} diff --git a/coordinator/internal/config/config.go b/coordinator/internal/config/config.go index f503305..946572e 100644 --- a/coordinator/internal/config/config.go +++ b/coordinator/internal/config/config.go @@ -15,8 +15,12 @@ type Config struct { var cfg Config -func Init() error { - return c.DefaultLoad("coordinator", &cfg) +func Init(path string) error { + if path == "" { + return c.DefaultLoad("coordinator", &cfg) + } + + return c.Load(path, &cfg) } func Cfg() *Config { diff --git a/coordinator/main.go b/coordinator/main.go index 90b2d1b..212b22e 100644 --- a/coordinator/main.go +++ b/coordinator/main.go @@ -1,60 +1,9 @@ package main import ( - "fmt" - "os" - - "gitlink.org.cn/cloudream/common/pkgs/logger" - mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db" - coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" - "gitlink.org.cn/cloudream/storage/coordinator/internal/config" - "gitlink.org.cn/cloudream/storage/coordinator/internal/mq" + "gitlink.org.cn/cloudream/storage/coordinator/internal/cmd" ) func main() { - err := config.Init() - if err != nil { - fmt.Printf("init config failed, err: %s", err.Error()) - os.Exit(1) - } - - err = logger.Init(&config.Cfg().Logger) - if err != nil { - fmt.Printf("init logger failed, err: %s", err.Error()) - os.Exit(1) - } - - db, err := mydb.NewDB(&config.Cfg().DB) - if err != nil { - logger.Fatalf("new db failed, err: %s", err.Error()) - } - - coorSvr, err := coormq.NewServer(mq.NewService(db), &config.Cfg().RabbitMQ) - if err != nil { - logger.Fatalf("new coordinator server failed, err: %s", err.Error()) - } - - coorSvr.OnError(func(err error) { - logger.Warnf("coordinator server err: %s", err.Error()) - }) - - // 启动服务 - go serveCoorServer(coorSvr) - - forever := make(chan bool) - <-forever -} - -func serveCoorServer(server *coormq.Server) { - logger.Info("start serving command server") - - err := server.Serve() - if err != nil { - logger.Errorf("command server stopped with error: %s", err.Error()) - } - - logger.Info("command server stopped") - - // TODO 仅简单结束了程序 - os.Exit(1) + cmd.RootCmd.Execute() }