diff --git a/common/pkgs/downloader/strip_iterator.go b/common/pkgs/downloader/strip_iterator.go index 828dcdf..fd74cdf 100644 --- a/common/pkgs/downloader/strip_iterator.go +++ b/common/pkgs/downloader/strip_iterator.go @@ -65,7 +65,7 @@ func NewStripIterator(object cdssdk.Object, blocks []downloadBlock, red *cdssdk. func (s *StripIterator) MoveNext() (Strip, error) { if !s.inited { - go s.downloading() + go s.downloading(s.curStripIndex) s.inited = true } @@ -112,8 +112,8 @@ func (s *StripIterator) Close() { }) } -func (s *StripIterator) downloading() { - curStripIndex := s.curStripIndex +func (s *StripIterator) downloading(startStripIndex int64) { + curStripIndex := startStripIndex loop: for { stripBytesPos := curStripIndex * int64(s.red.K) * int64(s.red.ChunkSize) @@ -199,7 +199,7 @@ func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) { } toExec, hd := ioswitch2.NewToDriverWithRange(-1, exec.Range{ - Offset: s.curStripIndex * s.red.StripSize(), + Offset: stripIndex * s.red.StripSize(), }) ft.AddTo(toExec) diff --git a/coordinator/internal/cmd/cmd.go b/coordinator/internal/cmd/cmd.go new file mode 100644 index 0000000..b05e4f5 --- /dev/null +++ b/coordinator/internal/cmd/cmd.go @@ -0,0 +1,18 @@ +package cmd + +import "github.com/spf13/cobra" + +var RootCmd = &cobra.Command{ + Use: "coordinator", + Short: "Coordinator service for storage", + Long: `Coordinator service for storage`, +} + +func init() { + var configPath string + RootCmd.Flags().StringVarP(&configPath, "config", "c", "", "Path to config file") + + RootCmd.Run = func(cmd *cobra.Command, args []string) { + serve(configPath) + } +} diff --git a/coordinator/internal/cmd/migrate.go b/coordinator/internal/cmd/migrate.go new file mode 100644 index 0000000..53a38ca --- /dev/null +++ b/coordinator/internal/cmd/migrate.go @@ -0,0 +1,61 @@ +package cmd + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/coordinator/internal/config" + "gorm.io/driver/mysql" + "gorm.io/gorm" +) + +func init() { + var configPath string + cmd := cobra.Command{ + Use: "migrate", + Short: "Run database migrations", + Run: func(cmd *cobra.Command, args []string) { + migrate(configPath) + }, + } + cmd.Flags().StringVarP(&configPath, "config", "c", "", "Path to config file") + RootCmd.AddCommand(&cmd) +} + +func migrate(configPath string) { + // TODO 将create_database.sql的内容逐渐移动到这里来 + + err := config.Init(configPath) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + db, err := gorm.Open(mysql.Open(config.Cfg().DB.MakeSourceString())) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + err = db.AutoMigrate(&cdssdk.Storage{}) + if err != nil { + fmt.Printf("migratting model Storage: %v\n", err) + os.Exit(1) + } + + err = db.AutoMigrate(&cdssdk.ShardStorage{}) + if err != nil { + fmt.Printf("migratting model ShardStorage: %v\n", err) + os.Exit(1) + } + + err = db.AutoMigrate(&cdssdk.SharedStorage{}) + if err != nil { + fmt.Printf("migratting model SharedStorage: %v\n", err) + os.Exit(1) + } + + fmt.Println("migrate success") +} diff --git a/coordinator/internal/cmd/serve.go b/coordinator/internal/cmd/serve.go new file mode 100644 index 0000000..adfd720 --- /dev/null +++ b/coordinator/internal/cmd/serve.go @@ -0,0 +1,60 @@ +package cmd + +import ( + "fmt" + "os" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/storage/coordinator/internal/config" + "gitlink.org.cn/cloudream/storage/coordinator/internal/mq" +) + +func serve(configPath string) { + err := config.Init(configPath) + if err != nil { + fmt.Printf("init config failed, err: %s", err.Error()) + os.Exit(1) + } + + err = logger.Init(&config.Cfg().Logger) + if err != nil { + fmt.Printf("init logger failed, err: %s", err.Error()) + os.Exit(1) + } + + db, err := mydb.NewDB(&config.Cfg().DB) + if err != nil { + logger.Fatalf("new db failed, err: %s", err.Error()) + } + + coorSvr, err := coormq.NewServer(mq.NewService(db), &config.Cfg().RabbitMQ) + if err != nil { + logger.Fatalf("new coordinator server failed, err: %s", err.Error()) + } + + coorSvr.OnError(func(err error) { + logger.Warnf("coordinator server err: %s", err.Error()) + }) + + // 启动服务 + go serveCoorServer(coorSvr) + + forever := make(chan bool) + <-forever +} + +func serveCoorServer(server *coormq.Server) { + logger.Info("start serving command server") + + err := server.Serve() + if err != nil { + logger.Errorf("command server stopped with error: %s", err.Error()) + } + + logger.Info("command server stopped") + + // TODO 仅简单结束了程序 + os.Exit(1) +} diff --git a/coordinator/internal/config/config.go b/coordinator/internal/config/config.go index f503305..946572e 100644 --- a/coordinator/internal/config/config.go +++ b/coordinator/internal/config/config.go @@ -15,8 +15,12 @@ type Config struct { var cfg Config -func Init() error { - return c.DefaultLoad("coordinator", &cfg) +func Init(path string) error { + if path == "" { + return c.DefaultLoad("coordinator", &cfg) + } + + return c.Load(path, &cfg) } func Cfg() *Config {