Browse Source

Merge branch 'master' of https://gitlink.org.cn/cloudream/storage

# Conflicts:
#	coordinator/main.go
gitlink
JeshuaRen 1 year ago
parent
commit
3f4c376c0c
5 changed files with 149 additions and 6 deletions
  1. +4
    -4
      common/pkgs/downloader/strip_iterator.go
  2. +18
    -0
      coordinator/internal/cmd/cmd.go
  3. +61
    -0
      coordinator/internal/cmd/migrate.go
  4. +60
    -0
      coordinator/internal/cmd/serve.go
  5. +6
    -2
      coordinator/internal/config/config.go

+ 4
- 4
common/pkgs/downloader/strip_iterator.go View File

@@ -65,7 +65,7 @@ func NewStripIterator(object cdssdk.Object, blocks []downloadBlock, red *cdssdk.

func (s *StripIterator) MoveNext() (Strip, error) {
if !s.inited {
go s.downloading()
go s.downloading(s.curStripIndex)
s.inited = true
}

@@ -112,8 +112,8 @@ func (s *StripIterator) Close() {
})
}

func (s *StripIterator) downloading() {
curStripIndex := s.curStripIndex
func (s *StripIterator) downloading(startStripIndex int64) {
curStripIndex := startStripIndex
loop:
for {
stripBytesPos := curStripIndex * int64(s.red.K) * int64(s.red.ChunkSize)
@@ -199,7 +199,7 @@ func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) {
}

toExec, hd := ioswitch2.NewToDriverWithRange(-1, exec.Range{
Offset: s.curStripIndex * s.red.StripSize(),
Offset: stripIndex * s.red.StripSize(),
})
ft.AddTo(toExec)



+ 18
- 0
coordinator/internal/cmd/cmd.go View File

@@ -0,0 +1,18 @@
package cmd

import "github.com/spf13/cobra"

var RootCmd = &cobra.Command{
Use: "coordinator",
Short: "Coordinator service for storage",
Long: `Coordinator service for storage`,
}

func init() {
var configPath string
RootCmd.Flags().StringVarP(&configPath, "config", "c", "", "Path to config file")

RootCmd.Run = func(cmd *cobra.Command, args []string) {
serve(configPath)
}
}

+ 61
- 0
coordinator/internal/cmd/migrate.go View File

@@ -0,0 +1,61 @@
package cmd

import (
"fmt"
"os"

"github.com/spf13/cobra"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/coordinator/internal/config"
"gorm.io/driver/mysql"
"gorm.io/gorm"
)

func init() {
var configPath string
cmd := cobra.Command{
Use: "migrate",
Short: "Run database migrations",
Run: func(cmd *cobra.Command, args []string) {
migrate(configPath)
},
}
cmd.Flags().StringVarP(&configPath, "config", "c", "", "Path to config file")
RootCmd.AddCommand(&cmd)
}

func migrate(configPath string) {
// TODO 将create_database.sql的内容逐渐移动到这里来

err := config.Init(configPath)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

db, err := gorm.Open(mysql.Open(config.Cfg().DB.MakeSourceString()))
if err != nil {
fmt.Println(err)
os.Exit(1)
}

err = db.AutoMigrate(&cdssdk.Storage{})
if err != nil {
fmt.Printf("migratting model Storage: %v\n", err)
os.Exit(1)
}

err = db.AutoMigrate(&cdssdk.ShardStorage{})
if err != nil {
fmt.Printf("migratting model ShardStorage: %v\n", err)
os.Exit(1)
}

err = db.AutoMigrate(&cdssdk.SharedStorage{})
if err != nil {
fmt.Printf("migratting model SharedStorage: %v\n", err)
os.Exit(1)
}

fmt.Println("migrate success")
}

+ 60
- 0
coordinator/internal/cmd/serve.go View File

@@ -0,0 +1,60 @@
package cmd

import (
"fmt"
"os"

"gitlink.org.cn/cloudream/common/pkgs/logger"
mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage/coordinator/internal/config"
"gitlink.org.cn/cloudream/storage/coordinator/internal/mq"
)

func serve(configPath string) {
err := config.Init(configPath)
if err != nil {
fmt.Printf("init config failed, err: %s", err.Error())
os.Exit(1)
}

err = logger.Init(&config.Cfg().Logger)
if err != nil {
fmt.Printf("init logger failed, err: %s", err.Error())
os.Exit(1)
}

db, err := mydb.NewDB(&config.Cfg().DB)
if err != nil {
logger.Fatalf("new db failed, err: %s", err.Error())
}

coorSvr, err := coormq.NewServer(mq.NewService(db), &config.Cfg().RabbitMQ)
if err != nil {
logger.Fatalf("new coordinator server failed, err: %s", err.Error())
}

coorSvr.OnError(func(err error) {
logger.Warnf("coordinator server err: %s", err.Error())
})

// 启动服务
go serveCoorServer(coorSvr)

forever := make(chan bool)
<-forever
}

func serveCoorServer(server *coormq.Server) {
logger.Info("start serving command server")

err := server.Serve()
if err != nil {
logger.Errorf("command server stopped with error: %s", err.Error())
}

logger.Info("command server stopped")

// TODO 仅简单结束了程序
os.Exit(1)
}

+ 6
- 2
coordinator/internal/config/config.go View File

@@ -15,8 +15,12 @@ type Config struct {

var cfg Config

func Init() error {
return c.DefaultLoad("coordinator", &cfg)
func Init(path string) error {
if path == "" {
return c.DefaultLoad("coordinator", &cfg)
}

return c.Load(path, &cfg)
}

func Cfg() *Config {


Loading…
Cancel
Save