A
Size: a a a
A
С
ᅠᅠ
N
A
I
С
С
N
tn
С
С
A
@app.task
def https_test_task(arg1, arg2):
https_test(arg1, arg2)
@app.task
def check_proxy():
tasks = group([https_test.s(obj.proxy) for obj in Proxy.objects.filter(type=1)]) # 1 - HTTPS
tasks.delay()
@app.task()
def https_test(proxy, url="https://www.google.com/"):
try:
requests.get(url, proxies={"https": proxy}, timeout=5)
except (TimeoutError, requests.exceptions.ConnectTimeout, requests.exceptions.ProxyError,
requests.exceptions.ConnectionError):
proxy_obj = Proxy.objects.get(proxy=proxy)
print('FAIL', proxy_obj)
proxy_obj.delete()
return False
else:
proxy_obj = Proxy.objects.get(proxy=proxy)
print('GOOD', proxy_obj)
proxy_obj.status = 2 # 2 is live
proxy_obj.save()
return True
DB
DB
A
A
DB
A
DB