Browse Source

Support websocket reconnection

Signed-off-by: JoeyHwong <joeyhwong@gknow.cn>
tags/v0.4.0
JoeyHwong 4 years ago
parent
commit
0a140ee106
1 changed files with 30 additions and 35 deletions
  1. +30
    -35
      lib/sedna/service/client.py

+ 30
- 35
lib/sedna/service/client.py View File

@@ -29,8 +29,8 @@ from sedna.common.log import LOGGER
from sedna.common.file_ops import FileOps


@retry(stop_max_attempt_number=3,
retry_on_result=lambda x: x is None, wait_fixed=2000)
@retry(stop_max_attempt_number=5,
retry_on_result=lambda x: x is None, wait_fixed=3000)
def http_request(url, method=None, timeout=None, binary=True, **kwargs):
_maxTimeout = timeout if timeout else 300
_method = "GET" if not method else method
@@ -151,39 +151,34 @@ class AggregationClient:

async def connect(self):
LOGGER.info(f"{self.uri} connection by {self.client_id}")

try:
self.ws = await asyncio.wait_for(websockets.connect(
self.uri, **self.kwargs
), self._ws_timeout)
await self.ws.send(json.dumps({'type': 'subscribe',
'client_id': self.client_id}))

res = await self.ws.recv()
return res
except ConnectionRefusedError:
LOGGER.info(f"{self.uri} connection was refused by server")
raise
except ConnectionClosedError:
LOGGER.info(f"{self.uri} connection lost")
raise
except ConnectionClosedOK:
LOGGER.info(f"{self.uri} connection closed")
raise
except InvalidStatusCode as err:
LOGGER.info(
f"{self.uri} websocket failed - "
f"with invalid status code {err.status_code}")
raise
except WebSocketException as err:
LOGGER.info(f"{self.uri} websocket failed - with {err}")
raise
except OSError as err:
LOGGER.info(f"{self.uri} connection failed - with {err}")
raise
except Exception:
LOGGER.exception(f"{self.uri} websocket Error")
raise
for _ in range(self._retry):
try:
self.ws = await asyncio.wait_for(websockets.connect(
self.uri, **self.kwargs
), self._ws_timeout)
await self.ws.send(json.dumps({'type': 'subscribe',
'client_id': self.client_id}))

res = await self.ws.recv()
return res
except ConnectionRefusedError:
LOGGER.warning(f"{self.uri} connection was refused by server")
except ConnectionClosedError:
LOGGER.warning(f"{self.uri} connection lost")
except ConnectionClosedOK:
LOGGER.warning(f"{self.uri} connection closed")
except InvalidStatusCode as err:
LOGGER.warning(
f"{self.uri} websocket failed - "
f"with invalid status code {err.status_code}")
except WebSocketException as err:
LOGGER.warning(f"{self.uri} websocket failed - with {err}")
except OSError as err:
LOGGER.warning(f"{self.uri} connection failed - with {err}")
except Exception:
LOGGER.warning(f"{self.uri} websocket Error")
time.sleep(self._retry_interval_seconds)
raise

async def _send(self, data):
for _ in range(self._retry):


Loading…
Cancel
Save