1""" 2Extra bits to make your job easier. 3""" 4 5importrandom 6fromsocketimportAddressFamily,SocketKind 7 8 9importanyio10importhttpx1112from.importSessionPool1314
[docs]15classConstantPoolMixin(SessionPool):16"""17 A trivial pool mixin that uses the passed in URL(s).18 """1920urls:list[str]2122def__init__(self,urls:str|list[str]):23ifisinstance(urls,str):24self.urls=[urls]25else:26self.urls=list(urls)27super().__init__()28
343536asyncdef_get_ips(hostname:str|bytes,port:int|None=None):37"""38 Look up the IPs for a given hostname39 """40forgaiinawaitanyio.getaddrinfo(hostname,port):41matchgai:42case(AddressFamily.AF_INET,SocketKind.SOCK_STREAM,_,_,(addr,_)):43yieldaddr44case(45AddressFamily.AF_INET6,46SocketKind.SOCK_STREAM,47_,48_,49(addr,_),50):51yieldaddr52case(53AddressFamily.AF_INET6,54SocketKind.SOCK_STREAM,55_,56_,57(addr,_,_,scope),58):59# IPv6 flow is without purpose and cannot be represented in addresses60ifscope:61yieldf"{addr}%{scope}"62else:63yieldaddr64case_:65continue6667
[docs]68classDnsPoolMixin(SessionPool):69"""70 Uses DNS round robin to find pool members.7172 Assumes that CouchDB is not behind a vhost or uses TLS.73 """7475url:httpx.URL7677def__init__(self,url:str):78self.url=httpx.URL(url)79super().__init__()80
[docs]81asyncdefiter_servers(self):82# The domain stack already does our shuffling83asyncforipin_get_ips(self.url.raw_host,self.url.port):84if":"inip:85ip=f"[{ip}]"86yieldstr(self.url.copy_with(host=ip))