@@ -28,6 +28,7 @@ import ( | |||||
"github.com/seata/seata-go/pkg/common/log" | "github.com/seata/seata-go/pkg/common/log" | ||||
"github.com/seata/seata-go/pkg/protocol/message" | "github.com/seata/seata-go/pkg/protocol/message" | ||||
"github.com/seata/seata-go/pkg/remoting/getty" | "github.com/seata/seata-go/pkg/remoting/getty" | ||||
"github.com/seata/seata-go/pkg/util/backoff" | |||||
) | ) | ||||
type GlobalTransaction struct { | type GlobalTransaction struct { | ||||
@@ -96,35 +97,35 @@ func (g *GlobalTransactionManager) Commit(ctx context.Context, gtr *GlobalTransa | |||||
return errors.New("Commit xid should not be empty") | return errors.New("Commit xid should not be empty") | ||||
} | } | ||||
// todo: replace retry with config | |||||
var ( | |||||
err error | |||||
res interface{} | |||||
// todo retry and retryInterval should read from config | |||||
retry = 10 | |||||
retryInterval = 200 * time.Millisecond | |||||
req = message.GlobalCommitRequest{ | |||||
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{ | |||||
Xid: gtr.Xid, | |||||
}, | |||||
} | |||||
) | |||||
for ; retry > 0; retry-- { | |||||
res, err = getty.GetGettyRemotingClient().SendSyncRequest(req) | |||||
if err != nil { | |||||
log.Errorf("GlobalCommitRequest error, xid %s, error %v", gtr.Xid, err) | |||||
time.Sleep(retryInterval) | |||||
} else { | |||||
bf := backoff.New(ctx, backoff.Config{ | |||||
MaxRetries: 10, | |||||
MinBackoff: 100 * time.Millisecond, | |||||
MaxBackoff: 200 * time.Millisecond, | |||||
}) | |||||
var req = message.GlobalCommitRequest{ | |||||
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{Xid: gtr.Xid}, | |||||
} | |||||
var res interface{} | |||||
var err error | |||||
for bf.Ongoing() { | |||||
if res, err = getty.GetGettyRemotingClient().SendSyncRequest(req); err == nil { | |||||
break | break | ||||
} | } | ||||
log.Warnf("send global commit request failed, xid %s, error %v", gtr.Xid, err) | |||||
bf.Wait() | |||||
} | } | ||||
if err != nil { | |||||
log.Infof("send global commit request failed, xid %s, error %v", gtr.Xid, err) | |||||
return err | |||||
if bf.Err() != nil { | |||||
lastErr := errors.Wrap(err, bf.Err().Error()) | |||||
log.Warnf("send global commit request failed, xid %s, error %v", gtr.Xid, lastErr) | |||||
return lastErr | |||||
} | } | ||||
log.Infof("send global commit request success, xid %s", gtr.Xid) | log.Infof("send global commit request success, xid %s", gtr.Xid) | ||||
gtr.Status = res.(message.GlobalCommitResponse).GlobalStatus | gtr.Status = res.(message.GlobalCommitResponse).GlobalStatus | ||||
UnbindXid(ctx) | UnbindXid(ctx) | ||||
return nil | return nil | ||||
} | } | ||||
@@ -138,35 +139,36 @@ func (g *GlobalTransactionManager) Rollback(ctx context.Context, gtr *GlobalTran | |||||
return errors.New("Rollback xid should not be empty") | return errors.New("Rollback xid should not be empty") | ||||
} | } | ||||
// todo: replace retry with config | |||||
var ( | |||||
err error | |||||
res interface{} | |||||
// todo retry and retryInterval should read from config | |||||
retry = 10 | |||||
retryInterval = 200 * time.Millisecond | |||||
req = message.GlobalRollbackRequest{ | |||||
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{ | |||||
Xid: gtr.Xid, | |||||
}, | |||||
} | |||||
) | |||||
for ; retry > 0; retry-- { | |||||
res, err = getty.GetGettyRemotingClient().SendSyncRequest(req) | |||||
if err != nil { | |||||
log.Errorf("GlobalRollbackRequest error, xid %s, error %v", gtr.Xid, err) | |||||
time.Sleep(retryInterval) | |||||
} else { | |||||
bf := backoff.New(ctx, backoff.Config{ | |||||
MaxRetries: 10, | |||||
MinBackoff: 100 * time.Millisecond, | |||||
MaxBackoff: 200 * time.Millisecond, | |||||
}) | |||||
var res interface{} | |||||
var req = message.GlobalRollbackRequest{ | |||||
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{Xid: gtr.Xid}, | |||||
} | |||||
var err error | |||||
for bf.Ongoing() { | |||||
if res, err = getty.GetGettyRemotingClient().SendSyncRequest(req); err == nil { | |||||
break | break | ||||
} | } | ||||
} | |||||
if err != nil { | |||||
log.Errorf("GlobalRollbackRequest rollback failed, xid %s, error %v", gtr.Xid, err) | log.Errorf("GlobalRollbackRequest rollback failed, xid %s, error %v", gtr.Xid, err) | ||||
return err | |||||
bf.Wait() | |||||
} | } | ||||
if bf.Err() != nil { | |||||
lastErr := errors.Wrap(err, bf.Err().Error()) | |||||
log.Errorf("GlobalRollbackRequest rollback failed, xid %s, error %v", gtr.Xid, lastErr) | |||||
return lastErr | |||||
} | |||||
log.Infof("GlobalRollbackRequest rollback success, xid %s,", gtr.Xid) | log.Infof("GlobalRollbackRequest rollback success, xid %s,", gtr.Xid) | ||||
gtr.Status = res.(message.GlobalRollbackResponse).GlobalStatus | gtr.Status = res.(message.GlobalRollbackResponse).GlobalStatus | ||||
UnbindXid(ctx) | UnbindXid(ctx) | ||||
return nil | return nil | ||||
} | } | ||||
@@ -0,0 +1,134 @@ | |||||
/* | |||||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||||
* contributor license agreements. See the NOTICE file distributed with | |||||
* this work for additional information regarding copyright ownership. | |||||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||||
* (the "License"); you may not use this file except in compliance with | |||||
* the License. You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package backoff | |||||
import ( | |||||
"context" | |||||
"flag" | |||||
"fmt" | |||||
"math/rand" | |||||
"time" | |||||
) | |||||
// Config configures a Backoff | |||||
type Config struct { | |||||
MinBackoff time.Duration `yaml:"min_period"` // start backoff at this level | |||||
MaxBackoff time.Duration `yaml:"max_period"` // increase exponentially to this level | |||||
MaxRetries int `yaml:"max_retries"` // give up after this many; zero means infinite retries | |||||
} | |||||
// RegisterFlagsWithPrefix for Config. | |||||
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { | |||||
f.DurationVar(&cfg.MinBackoff, prefix+".backoff-min-period", 100*time.Millisecond, "Minimum delay when backing off.") | |||||
f.DurationVar(&cfg.MaxBackoff, prefix+".backoff-max-period", 10*time.Second, "Maximum delay when backing off.") | |||||
f.IntVar(&cfg.MaxRetries, prefix+".backoff-retries", 10, "Number of times to backoff and retry before failing.") | |||||
} | |||||
// Backoff implements exponential backoff with randomized wait times | |||||
type Backoff struct { | |||||
cfg Config | |||||
ctx context.Context | |||||
numRetries int | |||||
nextDelayMin time.Duration | |||||
nextDelayMax time.Duration | |||||
} | |||||
// New creates a Backoff object. Pass a Context that can also terminate the operation. | |||||
func New(ctx context.Context, cfg Config) *Backoff { | |||||
return &Backoff{ | |||||
cfg: cfg, | |||||
ctx: ctx, | |||||
nextDelayMin: cfg.MinBackoff, | |||||
nextDelayMax: doubleDuration(cfg.MinBackoff, cfg.MaxBackoff), | |||||
} | |||||
} | |||||
// Reset the Backoff back to its initial condition | |||||
func (b *Backoff) Reset() { | |||||
b.numRetries = 0 | |||||
b.nextDelayMin = b.cfg.MinBackoff | |||||
b.nextDelayMax = doubleDuration(b.cfg.MinBackoff, b.cfg.MaxBackoff) | |||||
} | |||||
// Ongoing returns true if caller should keep going | |||||
func (b *Backoff) Ongoing() bool { | |||||
// Stop if Context has errored or max retry count is exceeded | |||||
return b.ctx.Err() == nil && (b.cfg.MaxRetries == 0 || b.numRetries < b.cfg.MaxRetries) | |||||
} | |||||
// Err returns the reason for terminating the backoff, or nil if it didn't terminate | |||||
func (b *Backoff) Err() error { | |||||
if b.ctx.Err() != nil { | |||||
return b.ctx.Err() | |||||
} | |||||
if b.cfg.MaxRetries != 0 && b.numRetries >= b.cfg.MaxRetries { | |||||
return fmt.Errorf("terminated after %d retries", b.numRetries) | |||||
} | |||||
return nil | |||||
} | |||||
// NumRetries returns the number of retries so far | |||||
func (b *Backoff) NumRetries() int { | |||||
return b.numRetries | |||||
} | |||||
// Wait sleeps for the backoff time then increases the retry count and backoff time | |||||
// Returns immediately if Context is terminated | |||||
func (b *Backoff) Wait() { | |||||
// Increase the number of retries and get the next delay | |||||
sleepTime := b.NextDelay() | |||||
if b.Ongoing() { | |||||
select { | |||||
case <-b.ctx.Done(): | |||||
case <-time.After(sleepTime): | |||||
} | |||||
} | |||||
} | |||||
func (b *Backoff) NextDelay() time.Duration { | |||||
b.numRetries++ | |||||
// Handle the edge case where the min and max have the same value | |||||
// (or due to some misconfig max is < min) | |||||
if b.nextDelayMin >= b.nextDelayMax { | |||||
return b.nextDelayMin | |||||
} | |||||
// Add a jitter within the next exponential backoff range | |||||
sleepTime := b.nextDelayMin + time.Duration(rand.Int63n(int64(b.nextDelayMax-b.nextDelayMin))) | |||||
// Apply the exponential backoff to calculate the next jitter | |||||
// range, unless we've already reached the max | |||||
if b.nextDelayMax < b.cfg.MaxBackoff { | |||||
b.nextDelayMin = doubleDuration(b.nextDelayMin, b.cfg.MaxBackoff) | |||||
b.nextDelayMax = doubleDuration(b.nextDelayMax, b.cfg.MaxBackoff) | |||||
} | |||||
return sleepTime | |||||
} | |||||
func doubleDuration(value time.Duration, max time.Duration) time.Duration { | |||||
value = value * 2 | |||||
if value <= max { | |||||
return value | |||||
} | |||||
return max | |||||
} |
@@ -0,0 +1,120 @@ | |||||
/* | |||||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||||
* contributor license agreements. See the NOTICE file distributed with | |||||
* this work for additional information regarding copyright ownership. | |||||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||||
* (the "License"); you may not use this file except in compliance with | |||||
* the License. You may obtain a copy of the License at | |||||
* | |||||
* http://www.apache.org/licenses/LICENSE-2.0 | |||||
* | |||||
* Unless required by applicable law or agreed to in writing, software | |||||
* distributed under the License is distributed on an "AS IS" BASIS, | |||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
* See the License for the specific language governing permissions and | |||||
* limitations under the License. | |||||
*/ | |||||
package backoff | |||||
import ( | |||||
"context" | |||||
"testing" | |||||
"time" | |||||
) | |||||
func TestBackoff_NextDelay(t *testing.T) { | |||||
t.Parallel() | |||||
tests := map[string]struct { | |||||
minBackoff time.Duration | |||||
maxBackoff time.Duration | |||||
expectedRanges [][]time.Duration | |||||
}{ | |||||
"exponential backoff with jitter honoring min and max": { | |||||
minBackoff: 100 * time.Millisecond, | |||||
maxBackoff: 10 * time.Second, | |||||
expectedRanges: [][]time.Duration{ | |||||
{100 * time.Millisecond, 200 * time.Millisecond}, | |||||
{200 * time.Millisecond, 400 * time.Millisecond}, | |||||
{400 * time.Millisecond, 800 * time.Millisecond}, | |||||
{800 * time.Millisecond, 1600 * time.Millisecond}, | |||||
{1600 * time.Millisecond, 3200 * time.Millisecond}, | |||||
{3200 * time.Millisecond, 6400 * time.Millisecond}, | |||||
{6400 * time.Millisecond, 10000 * time.Millisecond}, | |||||
{6400 * time.Millisecond, 10000 * time.Millisecond}, | |||||
}, | |||||
}, | |||||
"exponential backoff with max equal to the end of a range": { | |||||
minBackoff: 100 * time.Millisecond, | |||||
maxBackoff: 800 * time.Millisecond, | |||||
expectedRanges: [][]time.Duration{ | |||||
{100 * time.Millisecond, 200 * time.Millisecond}, | |||||
{200 * time.Millisecond, 400 * time.Millisecond}, | |||||
{400 * time.Millisecond, 800 * time.Millisecond}, | |||||
{400 * time.Millisecond, 800 * time.Millisecond}, | |||||
}, | |||||
}, | |||||
"exponential backoff with max equal to the end of a range + 1": { | |||||
minBackoff: 100 * time.Millisecond, | |||||
maxBackoff: 801 * time.Millisecond, | |||||
expectedRanges: [][]time.Duration{ | |||||
{100 * time.Millisecond, 200 * time.Millisecond}, | |||||
{200 * time.Millisecond, 400 * time.Millisecond}, | |||||
{400 * time.Millisecond, 800 * time.Millisecond}, | |||||
{800 * time.Millisecond, 801 * time.Millisecond}, | |||||
{800 * time.Millisecond, 801 * time.Millisecond}, | |||||
}, | |||||
}, | |||||
"exponential backoff with max equal to the end of a range - 1": { | |||||
minBackoff: 100 * time.Millisecond, | |||||
maxBackoff: 799 * time.Millisecond, | |||||
expectedRanges: [][]time.Duration{ | |||||
{100 * time.Millisecond, 200 * time.Millisecond}, | |||||
{200 * time.Millisecond, 400 * time.Millisecond}, | |||||
{400 * time.Millisecond, 799 * time.Millisecond}, | |||||
{400 * time.Millisecond, 799 * time.Millisecond}, | |||||
}, | |||||
}, | |||||
"min backoff is equal to max": { | |||||
minBackoff: 100 * time.Millisecond, | |||||
maxBackoff: 100 * time.Millisecond, | |||||
expectedRanges: [][]time.Duration{ | |||||
{100 * time.Millisecond, 100 * time.Millisecond}, | |||||
{100 * time.Millisecond, 100 * time.Millisecond}, | |||||
{100 * time.Millisecond, 100 * time.Millisecond}, | |||||
}, | |||||
}, | |||||
"min backoff is greater then max": { | |||||
minBackoff: 200 * time.Millisecond, | |||||
maxBackoff: 100 * time.Millisecond, | |||||
expectedRanges: [][]time.Duration{ | |||||
{200 * time.Millisecond, 200 * time.Millisecond}, | |||||
{200 * time.Millisecond, 200 * time.Millisecond}, | |||||
{200 * time.Millisecond, 200 * time.Millisecond}, | |||||
}, | |||||
}, | |||||
} | |||||
for testName, testData := range tests { | |||||
testData := testData | |||||
t.Run(testName, func(t *testing.T) { | |||||
t.Parallel() | |||||
b := New(context.Background(), Config{ | |||||
MinBackoff: testData.minBackoff, | |||||
MaxBackoff: testData.maxBackoff, | |||||
MaxRetries: len(testData.expectedRanges), | |||||
}) | |||||
for _, expectedRange := range testData.expectedRanges { | |||||
delay := b.NextDelay() | |||||
if delay < expectedRange[0] || delay > expectedRange[1] { | |||||
t.Errorf("%d expected to be within %d and %d", delay, expectedRange[0], expectedRange[1]) | |||||
} | |||||
} | |||||
}) | |||||
} | |||||
} |