Source code for chaise

  1import json
  2from typing import AsyncIterator, Literal, Callable, Protocol, TypeVar, Generic
  3
  4import httpx
  5
  6
  7DOCT = TypeVar("DOCT")
  8
  9
[docs] 10class DocumentLoader(Protocol, Generic[DOCT]): 11 """ 12 Protocol for things that go between JSON blobs and hydrated document 13 instances. 14 """ 15
[docs] 16 def loadj(self, blob: dict) -> DOCT: 17 """ 18 Convert a JSON blob into a document object. 19 """
20
[docs] 21 def dumpj(self, doc: DOCT) -> dict: 22 """ 23 Convert a document into a JSON blob. 24 """
25 26
[docs] 27class DocumentRegistry: 28 """ 29 Handles de/serialization, manages migrations, etc. 30 31 Default loader implementation. 32 33 Do not use directly. You probably want one of the :ref:`integrations`. 34 """ 35 36 TYPE_KEY = "" 37 38 _docclasses = {} 39 _migrations = [] 40 41 def __init_sublcass__(cls): 42 cls._docclasses = {} 43 cls._migrations = [] 44 45 @classmethod 46 def _get_class_from_name(cls, name: str) -> type: 47 return cls._docclasses[name] 48 49 @classmethod 50 def _get_name_from_class(cls, klass: type) -> str: 51 for name, kind in cls._docclasses.items(): 52 if issubclass(klass, kind): # In case of decorator shenanigans 53 return name 54 else: 55 raise ValueError(f"Couldn't find name for {klass}") 56
[docs] 57 @classmethod 58 def document(cls, name: str): 59 """ 60 Register a class as a loadable couch document. 61 62 Args: 63 name: The type identifier to save to CouchDB 64 65 .. note:: 66 67 The given name must be globally unique and must never change. 68 """ 69 assert not isinstance(name, type) 70 assert name not in cls._docclasses 71 72 def _(klass: type): 73 cls._docclasses[name] = klass 74 return klass 75 76 return _
77
[docs] 78 @classmethod 79 def migration(cls, before: type, after: type): 80 """ 81 Define a function that'll convert between documents. 82 """ 83 # Normalize to the document classes previously registered 84 bname = cls._get_name_from_class(before) 85 aname = cls._get_name_from_class(after) 86 # Enforce linearity 87 assert not any(b == bname for b, _, _ in cls._migrations) 88 89 def _(func: Callable): 90 cls._migrations.append((bname, aname, func)) 91 return func 92 93 return _
94
[docs] 95 def load_doc(self, cls: type, blob: dict): 96 """ 97 Converts a JSON blob into a document. 98 99 Override me. 100 """ 101 raise NotImplementedError
102
[docs] 103 def dump_doc(self, doc) -> dict: 104 """ 105 Convert a document into a JSON blob. 106 107 Override me. 108 """ 109 raise NotImplementedError
110 111 def _migrate(self, bname, doc): 112 while funcs := [f for b, _, f in self._migrations if b == bname]: 113 (func,) = funcs 114 doc = func(doc) 115 bname = self._get_name_from_class(type(doc)) 116 return doc 117 118 def loadj(self, blob): 119 type = blob.pop(self.TYPE_KEY) 120 klass = self._get_class_from_name(type) 121 doc = self.load_doc(klass, blob) 122 doc = self._migrate(type, doc) 123 return doc 124 125 def dumpj(self, doc): 126 blob = self.dump_doc(doc) 127 blob[self.TYPE_KEY] = self._get_name_from_class(type(doc)) 128 return blob
129 130
[docs] 131class Conflict(Exception): 132 """ 133 There was a conflict when trying to perform the operation. 134 """
135 136
[docs] 137class Missing(Exception): 138 """ 139 Could not find the requested document. 140 141 Note that this is a 404, not a tombstone. 142 """
143 144
[docs] 145class Deleted(Exception): 146 """ 147 Requested a deleted document. 148 149 Note that this is a document with a tombstone, not a 404. 150 """
151 152
[docs] 153class CouchSession: 154 """ 155 A connection to CouchDB. 156 157 You probably want to override like:: 158 159 class MySession(CouchSession): 160 loader = MyRegistry 161 162 """ 163 164 _client: httpx.AsyncClient 165 _root: httpx.URL 166 167 #: Class responsible for de/serializing data. 168 loader: type[DocumentLoader] 169 170 def __init__(self, client: httpx.AsyncClient, root: httpx.URL): 171 self._client = client 172 self._root = root 173 174 @staticmethod 175 def _fix_params(params): 176 rv = {} 177 for key, value in params.items(): 178 if value is None: 179 continue 180 else: 181 rv[key] = json.dumps(value) 182 return rv 183 184 async def _request(self, method, *urlparts, **kwargs): 185 url = self._root.join("/".join(urlparts)) 186 if "params" in kwargs: 187 kwargs["params"] = self._fix_params(kwargs["params"]) 188 resp = await self._client.request(method, url, **kwargs) 189 try: 190 resp.raise_for_status() 191 except httpx.HTTPStatusError as exc: 192 exc.add_note(f"Body: {exc.response.text}") 193 match exc.response.status_code: 194 case 404: 195 raise Missing(f"Could not find {'/'.join(urlparts)}") from exc 196 case 409: 197 raise Conflict(f"Conflict updating {'/'.join(urlparts)}") from exc 198 case _: 199 raise 200 return resp 201 202 def __getitem__(self, key: str) -> "Database": 203 """ 204 Gets a database. 205 206 (Does not actually check if it exists.) 207 """ 208 return Database(self, key) 209
[docs] 210 async def get_db(self, dbname: str) -> "Database": 211 """ 212 Gets a database. Checks if it exists. 213 214 See :http:head:`/{db}` 215 """ 216 await self._request("HEAD", dbname) 217 return Database(self, dbname)
218
[docs] 219 async def create_db( 220 self, 221 dbname: str, 222 *, 223 shards: int | None = None, 224 replicas: int | None = None, 225 partitioned: bool | None = None, 226 ) -> "Database": 227 """ 228 Create a database 229 230 See :http:post:`/{db}` 231 """ 232 await self._request( 233 "PUT", 234 dbname, 235 params={ 236 "q": shards, 237 "n": replicas, 238 partitioned: partitioned, 239 }, 240 ) 241 return Database(self, dbname)
242
[docs] 243 async def delete_db(self, dbname: str): 244 """ 245 Delete a database. 246 247 See :http:delete:`/{db}` 248 """ 249 await self._request("DELETE", dbname)
250 251 # TODO: Database metadata 252 253
[docs] 254class Database: 255 """ 256 An individual database. 257 """ 258 259 def __init__(self, session, name): 260 """ 261 :private: 262 """ 263 self._session = session 264 self._name = name 265 266 def _blob2doc(self, blob, db, docid, etag): 267 doc = self._session.loader().loadj(blob) 268 doc.__db = db 269 doc.__docid = docid 270 doc.__etag = etag 271 return doc 272 273 def _doc2blob(self, doc): 274 blob = self._session.loader().dumpj(doc) 275 db = docid = etag = None 276 try: 277 db = doc.__db 278 docid = doc.__docid 279 etag = doc.__etag 280 except AttributeError: 281 pass 282 return blob, db, docid, etag 283
[docs] 284 async def get( 285 self, 286 docid: str, 287 *, 288 attachments: bool = False, 289 conflicts: bool = False, 290 deleted_conflicts: bool = False, 291 latest: bool = False, 292 local_seq: bool = False, 293 meta: bool = False, 294 open_revs: list[str] | Literal["all"] | None = None, 295 rev: str | None = None, 296 revs: bool = False, 297 revs_info: bool = False, 298 ): 299 """ 300 Get a document 301 302 See :http:get:`/{db}/{docid}` 303 """ 304 resp = await self._session._request( 305 "GET", 306 self._name, 307 docid, 308 params={ 309 "attachments": attachments, 310 "conflicts": conflicts, 311 "deleted_conflicts": deleted_conflicts, 312 "latest": latest, 313 "local_seq": local_seq, 314 "meta": meta, 315 "open_revs": open_revs, 316 "rev": rev, 317 "revs": revs, 318 "revs_info": revs_info, 319 }, 320 headers={ 321 "Accept": "application/json", 322 }, 323 ) 324 325 blob = resp.json() 326 if blob.get("_deleted", False): # TODO: Flag to override this 327 raise Deleted("Document {self._name}/{docid} is marked as deleted") 328 if "ETag" in resp.headers: 329 etag = resp.headers["ETag"] 330 else: 331 # Conflicts mode 332 etag = f'"{blob["_rev"]}"' 333 doc = self._blob2doc(blob, self._name, docid, etag) 334 return doc
335 336 # TODO: Attachments 337
[docs] 338 async def attempt_put( 339 self, 340 doc, 341 docid: str | None = None, 342 *, 343 batch: bool = False, 344 ): 345 """ 346 Update a document. 347 348 db and docid only need to be given if it's a new document. 349 350 See :http:put:`/{db}/{docid}` 351 """ 352 blob, _db, _docid, etag = self._doc2blob(doc) 353 assert _db is None or _db == self._name 354 await self._session._request( 355 "PUT", 356 self._name, 357 _docid or docid, 358 params={"batch": "ok"} if batch else {}, 359 headers={"If-Match": etag} if etag else {}, 360 json=blob, 361 )
362
[docs] 363 async def attempt_delete(self, doc, *, batch: bool = False): 364 """ 365 Delete a document 366 367 See :http:delete:`/{db}/{docid}` 368 """ 369 _, db, docid, etag = self._doc2blob(doc) 370 assert db == self._name 371 assert docid 372 await self._session._request( 373 "DELETE", 374 db, 375 docid, 376 params={"batch": "ok"} if batch else {}, 377 headers={"If-Match": etag}, 378 )
379
[docs] 380 async def attempt_copy(self, src_doc, dst_doc, *, batch: bool = False): 381 """ 382 Copy a document 383 384 .. todo:: 385 386 Implement 387 388 See :http:copy:`/{db}/{docid}` 389 """
390 # FIXME: Figure out signature 391
[docs] 392 async def mutate(self, docid: str) -> AsyncIterator: 393 """ 394 A document mutation loop:: 395 396 async for doc in couch.mutate_doc("spam"): 397 doc.foo = "bar" 398 399 Will replay the mutation until it goes through. 400 """ 401 doc = await self.get(docid) 402 while True: 403 yield doc 404 try: 405 await self.attempt_put(doc) 406 except Conflict: 407 doc = await self.get(docid) 408 else: 409 break
410 411 # TODO: Mango searches 412 # TODO: Database operations 413 414
[docs] 415class SessionPool: 416 """ 417 Responsible for giving out Couch connections. 418 419 You probably want to override like:: 420 421 class MyPool(SessionPool): 422 session_class = MySession 423 """ 424 425 _client: httpx.AsyncClient 426 427 #: Class to use for sessions 428 session_class: type[CouchSession] 429 430 def __init__(self): 431 super().__init__() 432 self._client = self.make_client() 433
[docs] 434 def make_client(self) -> httpx.AsyncClient: 435 """ 436 Produce an httpx client. 437 """ 438 return httpx.AsyncClient(http2=True, follow_redirects=True)
439
[docs] 440 async def iter_servers(self) -> AsyncIterator[str]: 441 """ 442 Produce the list of potential servers. 443 444 Override this 445 """ 446 raise NotImplementedError 447 for _ in (): 448 yield
449 450 async def _check_server(self, url: httpx.URL): 451 resp = await self._client.get(url.join("_up")) 452 return resp.is_success 453
[docs] 454 async def session(self) -> CouchSession: 455 """ 456 Get a session 457 """ 458 async for url in self.iter_servers(): 459 url = httpx.URL(url) 460 if await self._check_server(url): 461 return self.session_class(self._client, url)