You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

readwriter.go 7.4 kB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package getty
  18. import (
  19. "fmt"
  20. "github.com/seata/seata-go/pkg/common/bytes"
  21. getty "github.com/apache/dubbo-getty"
  22. "github.com/pkg/errors"
  23. "github.com/seata/seata-go/pkg/protocol/codec"
  24. "github.com/seata/seata-go/pkg/protocol/message"
  25. )
  26. /**
  27. * <pre>
  28. * 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
  29. * +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
  30. * | magic |Proto | Full length | Head | Msg |Seria|Compr| RequestID |
  31. * | code |clVer | (head+body) | Length |Type |lizer|ess | |
  32. * +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
  33. * | |
  34. * | Head Map [Optional] |
  35. * +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
  36. * | |
  37. * | body |
  38. * | |
  39. * | ... ... |
  40. * +-----------------------------------------------------------------------------------------------+
  41. * </pre>
  42. * <p>
  43. * <li>Full Length: include all data </li>
  44. * <li>Head Length: include head data from magic code to head map. </li>
  45. * <li>Body Length: Full Length - Head Length</li>
  46. * </p>
  47. * https://github.com/seata/seata/issues/893
  48. */
  49. const (
  50. Seatav1HeaderLength = 16
  51. )
  52. var (
  53. magics = []uint8{0xda, 0xda}
  54. rpcPkgHandler = &RpcPackageHandler{}
  55. )
  56. var (
  57. ErrNotEnoughStream = errors.New("packet stream is not enough")
  58. ErrTooLargePackage = errors.New("package length is exceed the getty package's legal maximum length.")
  59. ErrInvalidPackage = errors.New("invalid rpc package")
  60. ErrIllegalMagic = errors.New("package magic is not right.")
  61. )
  62. type RpcPackageHandler struct{}
  63. type SeataV1PackageHeader struct {
  64. Magic0 byte
  65. Magic1 byte
  66. Version byte
  67. TotalLength uint32
  68. HeadLength uint16
  69. MessageType message.GettyRequestType
  70. CodecType byte
  71. CompressType byte
  72. RequestID uint32
  73. Meta map[string]string
  74. BodyLength uint32
  75. Body interface{}
  76. }
  77. func (p *RpcPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
  78. in := bytes.NewByteBuffer(data)
  79. header := SeataV1PackageHeader{}
  80. magic0 := bytes.ReadByte(in)
  81. magic1 := bytes.ReadByte(in)
  82. if magic0 != magics[0] || magic1 != magics[1] {
  83. return nil, 0, fmt.Errorf("codec decode not found magic offset")
  84. }
  85. header.Magic0 = magic0
  86. header.Magic1 = magic1
  87. header.Version = bytes.ReadByte(in)
  88. // length of head and body
  89. header.TotalLength = bytes.ReadUInt32(in)
  90. header.HeadLength = bytes.ReadUInt16(in)
  91. header.MessageType = message.GettyRequestType(bytes.ReadByte(in))
  92. header.CodecType = bytes.ReadByte(in)
  93. header.CompressType = bytes.ReadByte(in)
  94. header.RequestID = bytes.ReadUInt32(in)
  95. headMapLength := header.HeadLength - Seatav1HeaderLength
  96. header.Meta = decodeHeapMap(in, headMapLength)
  97. header.BodyLength = header.TotalLength - uint32(header.HeadLength)
  98. if uint32(len(data)) < header.TotalLength {
  99. return nil, int(header.TotalLength), nil
  100. }
  101. //r := byteio.BigEndianReader{Reader: bytes.NewReader(data)}
  102. rpcMessage := message.RpcMessage{
  103. Codec: header.CodecType,
  104. ID: int32(header.RequestID),
  105. Compressor: header.CompressType,
  106. Type: header.MessageType,
  107. HeadMap: header.Meta,
  108. }
  109. if header.MessageType == message.GettyRequestType_HeartbeatRequest {
  110. rpcMessage.Body = message.HeartBeatMessagePing
  111. } else if header.MessageType == message.GettyRequestType_HeartbeatResponse {
  112. rpcMessage.Body = message.HeartBeatMessagePong
  113. } else {
  114. if header.BodyLength > 0 {
  115. msg := codec.GetCodecManager().Decode(codec.CodecType(header.CodecType), data[header.HeadLength:])
  116. rpcMessage.Body = msg
  117. }
  118. }
  119. return rpcMessage, int(header.TotalLength), nil
  120. }
  121. // Write write rpc message to binary data
  122. func (p *RpcPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) {
  123. msg, ok := pkg.(message.RpcMessage)
  124. if !ok {
  125. return nil, ErrInvalidPackage
  126. }
  127. totalLength := message.V1HeadLength
  128. headLength := message.V1HeadLength
  129. var headMapBytes []byte
  130. if msg.HeadMap != nil && len(msg.HeadMap) > 0 {
  131. hb, headMapLength := encodeHeapMap(msg.HeadMap)
  132. headMapBytes = hb
  133. headLength += headMapLength
  134. totalLength += headMapLength
  135. }
  136. var bodyBytes []byte
  137. if msg.Type != message.GettyRequestType_HeartbeatRequest &&
  138. msg.Type != message.GettyRequestType_HeartbeatResponse {
  139. bodyBytes = codec.GetCodecManager().Encode(codec.CodecType(msg.Codec), msg.Body)
  140. totalLength += len(bodyBytes)
  141. }
  142. buf := bytes.NewByteBuffer([]byte{})
  143. buf.WriteByte(message.MAGIC_CODE_BYTES[0])
  144. buf.WriteByte(message.MAGIC_CODE_BYTES[1])
  145. buf.WriteByte(message.VERSION)
  146. buf.WriteUint32(uint32(totalLength))
  147. buf.WriteUint16(uint16(headLength))
  148. buf.WriteByte(byte(msg.Type))
  149. buf.WriteByte(msg.Codec)
  150. buf.WriteByte(msg.Compressor)
  151. buf.WriteUint32(uint32(msg.ID))
  152. buf.Write(headMapBytes)
  153. buf.Write(bodyBytes)
  154. return buf.Bytes(), nil
  155. }
  156. func encodeHeapMap(data map[string]string) ([]byte, int) {
  157. buf := bytes.NewByteBuffer([]byte{})
  158. for k, v := range data {
  159. if k == "" {
  160. buf.WriteUint16(uint16(0))
  161. } else {
  162. buf.WriteUint16(uint16(len(k)))
  163. buf.WriteString(k)
  164. }
  165. if v == "" {
  166. buf.WriteUint16(uint16(0))
  167. } else {
  168. buf.WriteUint16(uint16(len(v)))
  169. buf.WriteString(v)
  170. }
  171. }
  172. res := buf.Bytes()
  173. return res, len(res)
  174. }
  175. func decodeHeapMap(in *bytes.ByteBuffer, length uint16) map[string]string {
  176. res := make(map[string]string, 0)
  177. if length == 0 {
  178. return res
  179. }
  180. readedLength := uint16(0)
  181. for readedLength < length {
  182. var key, value string
  183. keyLength := bytes.ReadUInt16(in)
  184. if keyLength == 0 {
  185. key = ""
  186. } else {
  187. keyBytes := make([]byte, keyLength)
  188. in.Read(keyBytes)
  189. key = string(keyBytes)
  190. }
  191. valueLength := bytes.ReadUInt16(in)
  192. if valueLength == 0 {
  193. key = ""
  194. } else {
  195. valueBytes := make([]byte, valueLength)
  196. in.Read(valueBytes)
  197. value = string(valueBytes)
  198. }
  199. res[key] = value
  200. readedLength += 4 + keyLength + valueLength
  201. fmt.Sprintln("done")
  202. }
  203. return res
  204. }

Go Implementation For Seata