Browse Source

Merge pull request #164 from JoeyHwong-gk/federated

Support websocket reconnection when the server status is abnormal
tags/v0.4.0
KubeEdge Bot GitHub 4 years ago
parent
commit
008364c2b6
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 39 deletions
  1. +2
    -2
      lib/requirements.txt
  2. +33
    -37
      lib/sedna/service/client.py

+ 2
- 2
lib/requirements.txt View File

@@ -7,9 +7,9 @@ setuptools~=54.2.0
fastapi~=0.63.0 # MIT
starlette~=0.13.6 # BSD
pydantic~=1.8.1 # MIT
retrying~=1.3.3 # Apache-2.0
tenacity~=8.0.1 # Apache-2.0
joblib~=1.0.1 # BSD
pandas~=1.1.5 # BSD
six~=1.15.0 # MIT
minio~=7.0.3 # Apache-2.0
uvicorn~=0.14.0 # BSD
uvicorn~=0.14.0 # BSD

+ 33
- 37
lib/sedna/service/client.py View File

@@ -19,8 +19,9 @@ import asyncio
import threading
from copy import deepcopy

from retrying import retry
from requests import request
import tenacity
from tenacity import retry
import requests
import websockets
from websockets.exceptions import InvalidStatusCode, WebSocketException
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK
@@ -29,13 +30,14 @@ from sedna.common.log import LOGGER
from sedna.common.file_ops import FileOps


@retry(stop_max_attempt_number=3,
retry_on_result=lambda x: x is None, wait_fixed=2000)
@retry(stop=tenacity.stop_after_attempt(5),
retry=tenacity.retry_if_result(lambda x: x is None),
wait=tenacity.wait_fixed(3))
def http_request(url, method=None, timeout=None, binary=True, **kwargs):
_maxTimeout = timeout if timeout else 300
_method = "GET" if not method else method
try:
response = request(method=_method, url=url, **kwargs)
response = requests.request(method=_method, url=url, **kwargs)
if response.status_code == 200:
return (response.json() if binary else
response.content.decode("utf-8"))
@@ -45,10 +47,14 @@ def http_request(url, method=None, timeout=None, binary=True, **kwargs):
'Get invalid status code %s while request %s',
response.status_code,
url)
except ConnectionRefusedError:
except (ConnectionRefusedError, requests.exceptions.ConnectionError):
LOGGER.warning(f'Connection refused while request {url}')
except Exception as e:
LOGGER.warning(f'Error occurred while request {url}, Msg: {e}')
except requests.exceptions.HTTPError as err:
LOGGER.warning(f"Http Error while request {url} : f{err}")
except requests.exceptions.Timeout as err:
LOGGER.warning(f"Timeout Error while request {url} : f{err}")
except requests.exceptions.RequestException as err:
LOGGER.warning(f"Error occurred while request {url} : f{err}")


class LCReporter(threading.Thread):
@@ -123,8 +129,6 @@ class LCClient:
class AggregationClient:
"""Client that interacts with the cloud aggregator."""
_ws_timeout = 5
_retry = 15
_retry_interval_seconds = 3
max_size = 500 * 1024 * 1024

def __init__(self, url, client_id, **kwargs):
@@ -136,7 +140,7 @@ class AggregationClient:
timeout = int(timeout) if str(timeout).isdigit() else self._ws_timeout
interval = self.kwargs.get("ping_interval", "")
interval = int(interval) if str(interval).isdigit(
) else timeout * self._retry_interval_seconds
) else self._ws_timeout
max_size = self.kwargs.get("max_size", "")
max_size = int(max_size) if str(max_size).isdigit() else self.max_size
self.kwargs.update({
@@ -149,51 +153,43 @@ class AggregationClient:
asyncio.wait_for(self.connect(), timeout=timeout)
)

@retry(stop=tenacity.stop_after_attempt(15),
retry=tenacity.retry_if_result(lambda x: x is None),
wait=tenacity.wait_fixed(3))
async def connect(self):
LOGGER.info(f"{self.uri} connection by {self.client_id}")

try:
self.ws = await asyncio.wait_for(websockets.connect(
self.uri, **self.kwargs
), self._ws_timeout)
await self.ws.send(json.dumps({'type': 'subscribe',
'client_id': self.client_id}))

res = await self.ws.recv()
return res
except ConnectionRefusedError:
LOGGER.info(f"{self.uri} connection was refused by server")
raise
LOGGER.warning(f"{self.uri} connection was refused by server")
except ConnectionClosedError:
LOGGER.info(f"{self.uri} connection lost")
raise
LOGGER.warning(f"{self.uri} connection lost")
except ConnectionClosedOK:
LOGGER.info(f"{self.uri} connection closed")
raise
LOGGER.warning(f"{self.uri} connection closed")
except InvalidStatusCode as err:
LOGGER.info(
LOGGER.warning(
f"{self.uri} websocket failed - "
f"with invalid status code {err.status_code}")
raise
except WebSocketException as err:
LOGGER.info(f"{self.uri} websocket failed - with {err}")
raise
LOGGER.warning(f"{self.uri} websocket failed - with {err}")
except OSError as err:
LOGGER.info(f"{self.uri} connection failed - with {err}")
raise
except Exception:
LOGGER.exception(f"{self.uri} websocket Error")
raise
LOGGER.warning(f"{self.uri} connection failed - with {err}")

@retry(stop=tenacity.stop_after_attempt(15),
retry=tenacity.retry_if_result(lambda x: x is None),
wait=tenacity.wait_fixed(3))
async def _send(self, data):
for _ in range(self._retry):
try:
await asyncio.wait_for(self.ws.send(data), self._ws_timeout)
return
except Exception as err:
LOGGER.info(f"{self.uri} send data failed - with {err}")
time.sleep(self._retry_interval_seconds)
return
try:
await asyncio.wait_for(self.ws.send(data), self._ws_timeout)
return True
except Exception as err:
LOGGER.info(f"{self.uri} send data failed - with {err}")

async def _recv(self):
result = await self.ws.recv()
@@ -212,7 +208,7 @@ class AggregationClient:
data = loop.run_until_complete(self._recv())
try:
data = json.loads(data)
except Exception:
except (json.decoder.JSONDecodeError, TypeError):
pass
if not wait_data_type or (isinstance(data, dict) and
data.get("type", "") == wait_data_type):


Loading…
Cancel
Save