Browse Source

replace redis with rabbitmq

tags/v1.21.12.1
yuyuanshifu 5 years ago
parent
commit
b374c0e93d
3 changed files with 18 additions and 4 deletions
  1. +5
    -2
      custom/conf/app.ini.sample
  2. +8
    -2
      modules/setting/setting.go
  3. +5
    -0
      modules/worker/worker.go

+ 5
- 2
custom/conf/app.ini.sample View File

@@ -1039,9 +1039,12 @@ RETRY_BACKOFF = 3

[machinery]
; redis conf for decompress dataset zip file.
BROKER = redis://localhost:6379
BROKER = amqp://admin:pcladmin@127.0.0.1:5672/
DEFAULT_QUEUE = DecompressTasksQueue
RESULT_BACKEND = redis://localhost:6379
RESULT_BACKEND = amqp://admin:pcladmin@127.0.0.1:5672/
EXCHANGE = decompress_exchange
EXCHANGE_TYPE = direct
BINDING_KEY = decompress_task

[cloudbrain]
HOST = http://192.168.204.24


+ 8
- 2
modules/setting/setting.go View File

@@ -423,6 +423,9 @@ var (
Broker string
DefaultQueue string
ResultBackend string
Exchange string
ExchangeType string
BindingKey string

//decompress config
DecompressAddress string
@@ -1084,9 +1087,12 @@ func NewContext() {
}

sec = Cfg.Section("machinery")
Broker = sec.Key("BROKER").MustString("redis://localhost:6379")
Broker = sec.Key("BROKER").MustString("amqp://admin:pcladmin@127.0.0.1:5672/")
DefaultQueue = sec.Key("DEFAULT_QUEUE").MustString("DecompressTasksQueue")
ResultBackend = sec.Key("RESULT_BACKEND").MustString("redis://localhost:6379")
ResultBackend = sec.Key("RESULT_BACKEND").MustString("amqp://admin:pcladmin@127.0.0.1:5672/")
Exchange = sec.Key("EXCHANGE").MustString("decompress_exchange")
ExchangeType = sec.Key("EXCHANGE_TYPE").MustString("direct")
BindingKey = sec.Key("BINDING_KEY").MustString("decompress_task")

sec = Cfg.Section("decompress")
DecompressAddress = sec.Key("HOST").MustString("http://192.168.207.34:39987")


+ 5
- 0
modules/worker/worker.go View File

@@ -15,6 +15,11 @@ func NewTaskCenter() {
Broker: setting.Broker,
DefaultQueue: setting.DefaultQueue,
ResultBackend: setting.ResultBackend,
AMQP: &mchConf.AMQPConfig{
Exchange: "machinery_exchange",
ExchangeType: "direct",
BindingKey: "machinery_task",
},
}
tc, err := machinery.NewServer(cnf)
if err != nil {


Loading…
Cancel
Save