diff --git a/custom/conf/app.ini.sample b/custom/conf/app.ini.sample index e37752974..aa5e788d1 100755 --- a/custom/conf/app.ini.sample +++ b/custom/conf/app.ini.sample @@ -1039,9 +1039,12 @@ RETRY_BACKOFF = 3 [machinery] ; redis conf for decompress dataset zip file. -BROKER = redis://localhost:6379 +BROKER = amqp://admin:pcladmin@127.0.0.1:5672/ DEFAULT_QUEUE = DecompressTasksQueue -RESULT_BACKEND = redis://localhost:6379 +RESULT_BACKEND = amqp://admin:pcladmin@127.0.0.1:5672/ +EXCHANGE = decompress_exchange +EXCHANGE_TYPE = direct +BINDING_KEY = decompress_task [cloudbrain] HOST = http://192.168.204.24 diff --git a/modules/setting/setting.go b/modules/setting/setting.go index de8598ba0..e933690cd 100755 --- a/modules/setting/setting.go +++ b/modules/setting/setting.go @@ -423,6 +423,9 @@ var ( Broker string DefaultQueue string ResultBackend string + Exchange string + ExchangeType string + BindingKey string //decompress config DecompressAddress string @@ -1084,9 +1087,12 @@ func NewContext() { } sec = Cfg.Section("machinery") - Broker = sec.Key("BROKER").MustString("redis://localhost:6379") + Broker = sec.Key("BROKER").MustString("amqp://admin:pcladmin@127.0.0.1:5672/") DefaultQueue = sec.Key("DEFAULT_QUEUE").MustString("DecompressTasksQueue") - ResultBackend = sec.Key("RESULT_BACKEND").MustString("redis://localhost:6379") + ResultBackend = sec.Key("RESULT_BACKEND").MustString("amqp://admin:pcladmin@127.0.0.1:5672/") + Exchange = sec.Key("EXCHANGE").MustString("decompress_exchange") + ExchangeType = sec.Key("EXCHANGE_TYPE").MustString("direct") + BindingKey = sec.Key("BINDING_KEY").MustString("decompress_task") sec = Cfg.Section("decompress") DecompressAddress = sec.Key("HOST").MustString("http://192.168.207.34:39987") diff --git a/modules/worker/worker.go b/modules/worker/worker.go index 37445c978..1dbfff3b3 100755 --- a/modules/worker/worker.go +++ b/modules/worker/worker.go @@ -15,6 +15,11 @@ func NewTaskCenter() { Broker: setting.Broker, DefaultQueue: setting.DefaultQueue, ResultBackend: setting.ResultBackend, + AMQP: &mchConf.AMQPConfig{ + Exchange: "machinery_exchange", + ExchangeType: "direct", + BindingKey: "machinery_task", + }, } tc, err := machinery.NewServer(cnf) if err != nil {