Browse Source

Update dependency of server request in lib

- replace `retry==1.3.3` with `tenacity==8.0.1` because of `retry` no longer maintained.

Signed-off-by: JoeyHwong <joeyhwong@gknow.cn>
tags/v0.4.0
JoeyHwong 4 years ago
parent
commit
9d601600a2
2 changed files with 52 additions and 51 deletions
  1. +2
    -2
      lib/requirements.txt
  2. +50
    -49
      lib/sedna/service/client.py

+ 2
- 2
lib/requirements.txt View File

@@ -7,9 +7,9 @@ setuptools~=54.2.0
fastapi~=0.63.0 # MIT fastapi~=0.63.0 # MIT
starlette~=0.13.6 # BSD starlette~=0.13.6 # BSD
pydantic~=1.8.1 # MIT pydantic~=1.8.1 # MIT
retrying~=1.3.3 # Apache-2.0
tenacity~=8.0.1 # Apache-2.0
joblib~=1.0.1 # BSD joblib~=1.0.1 # BSD
pandas~=1.1.5 # BSD pandas~=1.1.5 # BSD
six~=1.15.0 # MIT six~=1.15.0 # MIT
minio~=7.0.3 # Apache-2.0 minio~=7.0.3 # Apache-2.0
uvicorn~=0.14.0 # BSD
uvicorn~=0.14.0 # BSD

+ 50
- 49
lib/sedna/service/client.py View File

@@ -19,8 +19,9 @@ import asyncio
import threading import threading
from copy import deepcopy from copy import deepcopy


from retrying import retry
from requests import request
import tenacity
from tenacity import retry
import requests
import websockets import websockets
from websockets.exceptions import InvalidStatusCode, WebSocketException from websockets.exceptions import InvalidStatusCode, WebSocketException
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK
@@ -29,13 +30,14 @@ from sedna.common.log import LOGGER
from sedna.common.file_ops import FileOps from sedna.common.file_ops import FileOps




@retry(stop_max_attempt_number=5,
retry_on_result=lambda x: x is None, wait_fixed=3000)
@retry(stop=tenacity.stop_after_attempt(5),
retry=tenacity.retry_if_result(lambda x: x is None),
wait=tenacity.wait_fixed(3))
def http_request(url, method=None, timeout=None, binary=True, **kwargs): def http_request(url, method=None, timeout=None, binary=True, **kwargs):
_maxTimeout = timeout if timeout else 300 _maxTimeout = timeout if timeout else 300
_method = "GET" if not method else method _method = "GET" if not method else method
try: try:
response = request(method=_method, url=url, **kwargs)
response = requests.request(method=_method, url=url, **kwargs)
if response.status_code == 200: if response.status_code == 200:
return (response.json() if binary else return (response.json() if binary else
response.content.decode("utf-8")) response.content.decode("utf-8"))
@@ -45,10 +47,14 @@ def http_request(url, method=None, timeout=None, binary=True, **kwargs):
'Get invalid status code %s while request %s', 'Get invalid status code %s while request %s',
response.status_code, response.status_code,
url) url)
except ConnectionRefusedError:
except (ConnectionRefusedError, requests.exceptions.ConnectionError):
LOGGER.warning(f'Connection refused while request {url}') LOGGER.warning(f'Connection refused while request {url}')
except Exception as e:
LOGGER.warning(f'Error occurred while request {url}, Msg: {e}')
except requests.exceptions.HTTPError as err:
LOGGER.warning(f"Http Error while request {url} : f{err}")
except requests.exceptions.Timeout as err:
LOGGER.warning(f"Timeout Error while request {url} : f{err}")
except requests.exceptions.RequestException as err:
LOGGER.warning(f"Error occurred while request {url} : f{err}")




class LCReporter(threading.Thread): class LCReporter(threading.Thread):
@@ -123,8 +129,6 @@ class LCClient:
class AggregationClient: class AggregationClient:
"""Client that interacts with the cloud aggregator.""" """Client that interacts with the cloud aggregator."""
_ws_timeout = 5 _ws_timeout = 5
_retry = 15
_retry_interval_seconds = 3
max_size = 500 * 1024 * 1024 max_size = 500 * 1024 * 1024


def __init__(self, url, client_id, **kwargs): def __init__(self, url, client_id, **kwargs):
@@ -136,7 +140,7 @@ class AggregationClient:
timeout = int(timeout) if str(timeout).isdigit() else self._ws_timeout timeout = int(timeout) if str(timeout).isdigit() else self._ws_timeout
interval = self.kwargs.get("ping_interval", "") interval = self.kwargs.get("ping_interval", "")
interval = int(interval) if str(interval).isdigit( interval = int(interval) if str(interval).isdigit(
) else timeout * self._retry_interval_seconds
) else self._ws_timeout
max_size = self.kwargs.get("max_size", "") max_size = self.kwargs.get("max_size", "")
max_size = int(max_size) if str(max_size).isdigit() else self.max_size max_size = int(max_size) if str(max_size).isdigit() else self.max_size
self.kwargs.update({ self.kwargs.update({
@@ -149,46 +153,43 @@ class AggregationClient:
asyncio.wait_for(self.connect(), timeout=timeout) asyncio.wait_for(self.connect(), timeout=timeout)
) )


@retry(stop=tenacity.stop_after_attempt(15),
retry=tenacity.retry_if_result(lambda x: x is None),
wait=tenacity.wait_fixed(3))
async def connect(self): async def connect(self):
LOGGER.info(f"{self.uri} connection by {self.client_id}") LOGGER.info(f"{self.uri} connection by {self.client_id}")
for _ in range(self._retry):
try:
self.ws = await asyncio.wait_for(websockets.connect(
self.uri, **self.kwargs
), self._ws_timeout)
await self.ws.send(json.dumps({'type': 'subscribe',
'client_id': self.client_id}))

res = await self.ws.recv()
return res
except ConnectionRefusedError:
LOGGER.warning(f"{self.uri} connection was refused by server")
except ConnectionClosedError:
LOGGER.warning(f"{self.uri} connection lost")
except ConnectionClosedOK:
LOGGER.warning(f"{self.uri} connection closed")
except InvalidStatusCode as err:
LOGGER.warning(
f"{self.uri} websocket failed - "
f"with invalid status code {err.status_code}")
except WebSocketException as err:
LOGGER.warning(f"{self.uri} websocket failed - with {err}")
except OSError as err:
LOGGER.warning(f"{self.uri} connection failed - with {err}")
except Exception:
LOGGER.warning(f"{self.uri} websocket Error")
time.sleep(self._retry_interval_seconds)
raise

try:
self.ws = await asyncio.wait_for(websockets.connect(
self.uri, **self.kwargs
), self._ws_timeout)
await self.ws.send(json.dumps({'type': 'subscribe',
'client_id': self.client_id}))
res = await self.ws.recv()
return res
except ConnectionRefusedError:
LOGGER.warning(f"{self.uri} connection was refused by server")
except ConnectionClosedError:
LOGGER.warning(f"{self.uri} connection lost")
except ConnectionClosedOK:
LOGGER.warning(f"{self.uri} connection closed")
except InvalidStatusCode as err:
LOGGER.warning(
f"{self.uri} websocket failed - "
f"with invalid status code {err.status_code}")
except WebSocketException as err:
LOGGER.warning(f"{self.uri} websocket failed - with {err}")
except OSError as err:
LOGGER.warning(f"{self.uri} connection failed - with {err}")

@retry(stop=tenacity.stop_after_attempt(15),
retry=tenacity.retry_if_result(lambda x: x is None),
wait=tenacity.wait_fixed(3))
async def _send(self, data): async def _send(self, data):
for _ in range(self._retry):
try:
await asyncio.wait_for(self.ws.send(data), self._ws_timeout)
return
except Exception as err:
LOGGER.info(f"{self.uri} send data failed - with {err}")
time.sleep(self._retry_interval_seconds)
return
try:
await asyncio.wait_for(self.ws.send(data), self._ws_timeout)
return True
except Exception as err:
LOGGER.info(f"{self.uri} send data failed - with {err}")


async def _recv(self): async def _recv(self):
result = await self.ws.recv() result = await self.ws.recv()
@@ -207,7 +208,7 @@ class AggregationClient:
data = loop.run_until_complete(self._recv()) data = loop.run_until_complete(self._recv())
try: try:
data = json.loads(data) data = json.loads(data)
except Exception:
except (json.decoder.JSONDecodeError, TypeError):
pass pass
if not wait_data_type or (isinstance(data, dict) and if not wait_data_type or (isinstance(data, dict) and
data.get("type", "") == wait_data_type): data.get("type", "") == wait_data_type):


Loading…
Cancel
Save