1import json
2from typing import AsyncIterator, Literal, Callable, Protocol, TypeVar, Generic
3
4import httpx
5
6
7DOCT = TypeVar("DOCT")
8
9
[docs]
10class DocumentLoader(Protocol, Generic[DOCT]):
11 """
12 Protocol for things that go between JSON blobs and hydrated document
13 instances.
14 """
15
[docs]
16 def loadj(self, blob: dict) -> DOCT:
17 """
18 Convert a JSON blob into a document object.
19 """
20
[docs]
21 def dumpj(self, doc: DOCT) -> dict:
22 """
23 Convert a document into a JSON blob.
24 """
25
26
[docs]
27class DocumentRegistry:
28 """
29 Handles de/serialization, manages migrations, etc.
30
31 Default loader implementation.
32
33 Do not use directly. You probably want one of the :ref:`integrations`.
34 """
35
36 TYPE_KEY = ""
37
38 _docclasses = {}
39 _migrations = []
40
41 def __init_sublcass__(cls):
42 cls._docclasses = {}
43 cls._migrations = []
44
45 @classmethod
46 def _get_class_from_name(cls, name: str) -> type:
47 return cls._docclasses[name]
48
49 @classmethod
50 def _get_name_from_class(cls, klass: type) -> str:
51 for name, kind in cls._docclasses.items():
52 if issubclass(klass, kind): # In case of decorator shenanigans
53 return name
54 else:
55 raise ValueError(f"Couldn't find name for {klass}")
56
[docs]
57 @classmethod
58 def document(cls, name: str):
59 """
60 Register a class as a loadable couch document.
61
62 Args:
63 name: The type identifier to save to CouchDB
64
65 .. note::
66
67 The given name must be globally unique and must never change.
68 """
69 assert not isinstance(name, type)
70 assert name not in cls._docclasses
71
72 def _(klass: type):
73 cls._docclasses[name] = klass
74 return klass
75
76 return _
77
[docs]
78 @classmethod
79 def migration(cls, before: type, after: type):
80 """
81 Define a function that'll convert between documents.
82 """
83 # Normalize to the document classes previously registered
84 bname = cls._get_name_from_class(before)
85 aname = cls._get_name_from_class(after)
86 # Enforce linearity
87 assert not any(b == bname for b, _, _ in cls._migrations)
88
89 def _(func: Callable):
90 cls._migrations.append((bname, aname, func))
91 return func
92
93 return _
94
[docs]
95 def load_doc(self, cls: type, blob: dict):
96 """
97 Converts a JSON blob into a document.
98
99 Override me.
100 """
101 raise NotImplementedError
102
[docs]
103 def dump_doc(self, doc) -> dict:
104 """
105 Convert a document into a JSON blob.
106
107 Override me.
108 """
109 raise NotImplementedError
110
111 def _migrate(self, bname, doc):
112 while funcs := [f for b, _, f in self._migrations if b == bname]:
113 (func,) = funcs
114 doc = func(doc)
115 bname = self._get_name_from_class(type(doc))
116 return doc
117
118 def loadj(self, blob):
119 type = blob.pop(self.TYPE_KEY)
120 klass = self._get_class_from_name(type)
121 doc = self.load_doc(klass, blob)
122 doc = self._migrate(type, doc)
123 return doc
124
125 def dumpj(self, doc):
126 blob = self.dump_doc(doc)
127 blob[self.TYPE_KEY] = self._get_name_from_class(type(doc))
128 return blob
129
130
[docs]
131class Conflict(Exception):
132 """
133 There was a conflict when trying to perform the operation.
134 """
135
136
[docs]
137class Missing(Exception):
138 """
139 Could not find the requested document.
140
141 Note that this is a 404, not a tombstone.
142 """
143
144
[docs]
145class Deleted(Exception):
146 """
147 Requested a deleted document.
148
149 Note that this is a document with a tombstone, not a 404.
150 """
151
152
[docs]
153class CouchSession:
154 """
155 A connection to CouchDB.
156
157 You probably want to override like::
158
159 class MySession(CouchSession):
160 loader = MyRegistry
161
162 """
163
164 _client: httpx.AsyncClient
165 _root: httpx.URL
166
167 #: Class responsible for de/serializing data.
168 loader: type[DocumentLoader]
169
170 def __init__(self, client: httpx.AsyncClient, root: httpx.URL):
171 self._client = client
172 self._root = root
173
174 @staticmethod
175 def _fix_params(params):
176 rv = {}
177 for key, value in params.items():
178 if value is None:
179 continue
180 else:
181 rv[key] = json.dumps(value)
182 return rv
183
184 async def _request(self, method, *urlparts, **kwargs):
185 url = self._root.join("/".join(urlparts))
186 if "params" in kwargs:
187 kwargs["params"] = self._fix_params(kwargs["params"])
188 resp = await self._client.request(method, url, **kwargs)
189 try:
190 resp.raise_for_status()
191 except httpx.HTTPStatusError as exc:
192 exc.add_note(f"Body: {exc.response.text}")
193 match exc.response.status_code:
194 case 404:
195 raise Missing(f"Could not find {'/'.join(urlparts)}") from exc
196 case 409:
197 raise Conflict(f"Conflict updating {'/'.join(urlparts)}") from exc
198 case _:
199 raise
200 return resp
201
202 def __getitem__(self, key: str) -> "Database":
203 """
204 Gets a database.
205
206 (Does not actually check if it exists.)
207 """
208 return Database(self, key)
209
[docs]
210 async def get_db(self, dbname: str) -> "Database":
211 """
212 Gets a database. Checks if it exists.
213
214 See :http:head:`/{db}`
215 """
216 await self._request("HEAD", dbname)
217 return Database(self, dbname)
218
[docs]
219 async def create_db(
220 self,
221 dbname: str,
222 *,
223 shards: int | None = None,
224 replicas: int | None = None,
225 partitioned: bool | None = None,
226 ) -> "Database":
227 """
228 Create a database
229
230 See :http:post:`/{db}`
231 """
232 await self._request(
233 "PUT",
234 dbname,
235 params={
236 "q": shards,
237 "n": replicas,
238 partitioned: partitioned,
239 },
240 )
241 return Database(self, dbname)
242
[docs]
243 async def delete_db(self, dbname: str):
244 """
245 Delete a database.
246
247 See :http:delete:`/{db}`
248 """
249 await self._request("DELETE", dbname)
250
251 # TODO: Database metadata
252
253
[docs]
254class Database:
255 """
256 An individual database.
257 """
258
259 def __init__(self, session, name):
260 """
261 :private:
262 """
263 self._session = session
264 self._name = name
265
266 def _blob2doc(self, blob, db, docid, etag):
267 doc = self._session.loader().loadj(blob)
268 doc.__db = db
269 doc.__docid = docid
270 doc.__etag = etag
271 return doc
272
273 def _doc2blob(self, doc):
274 blob = self._session.loader().dumpj(doc)
275 db = docid = etag = None
276 try:
277 db = doc.__db
278 docid = doc.__docid
279 etag = doc.__etag
280 except AttributeError:
281 pass
282 return blob, db, docid, etag
283
[docs]
284 async def get(
285 self,
286 docid: str,
287 *,
288 attachments: bool = False,
289 conflicts: bool = False,
290 deleted_conflicts: bool = False,
291 latest: bool = False,
292 local_seq: bool = False,
293 meta: bool = False,
294 open_revs: list[str] | Literal["all"] | None = None,
295 rev: str | None = None,
296 revs: bool = False,
297 revs_info: bool = False,
298 ):
299 """
300 Get a document
301
302 See :http:get:`/{db}/{docid}`
303 """
304 resp = await self._session._request(
305 "GET",
306 self._name,
307 docid,
308 params={
309 "attachments": attachments,
310 "conflicts": conflicts,
311 "deleted_conflicts": deleted_conflicts,
312 "latest": latest,
313 "local_seq": local_seq,
314 "meta": meta,
315 "open_revs": open_revs,
316 "rev": rev,
317 "revs": revs,
318 "revs_info": revs_info,
319 },
320 headers={
321 "Accept": "application/json",
322 },
323 )
324
325 blob = resp.json()
326 if blob.get("_deleted", False): # TODO: Flag to override this
327 raise Deleted("Document {self._name}/{docid} is marked as deleted")
328 if "ETag" in resp.headers:
329 etag = resp.headers["ETag"]
330 else:
331 # Conflicts mode
332 etag = f'"{blob["_rev"]}"'
333 doc = self._blob2doc(blob, self._name, docid, etag)
334 return doc
335
336 # TODO: Attachments
337
[docs]
338 async def attempt_put(
339 self,
340 doc,
341 docid: str | None = None,
342 *,
343 batch: bool = False,
344 ):
345 """
346 Update a document.
347
348 db and docid only need to be given if it's a new document.
349
350 See :http:put:`/{db}/{docid}`
351 """
352 blob, _db, _docid, etag = self._doc2blob(doc)
353 assert _db is None or _db == self._name
354 await self._session._request(
355 "PUT",
356 self._name,
357 _docid or docid,
358 params={"batch": "ok"} if batch else {},
359 headers={"If-Match": etag} if etag else {},
360 json=blob,
361 )
362
[docs]
363 async def attempt_delete(self, doc, *, batch: bool = False):
364 """
365 Delete a document
366
367 See :http:delete:`/{db}/{docid}`
368 """
369 _, db, docid, etag = self._doc2blob(doc)
370 assert db == self._name
371 assert docid
372 await self._session._request(
373 "DELETE",
374 db,
375 docid,
376 params={"batch": "ok"} if batch else {},
377 headers={"If-Match": etag},
378 )
379
[docs]
380 async def attempt_copy(self, src_doc, dst_doc, *, batch: bool = False):
381 """
382 Copy a document
383
384 .. todo::
385
386 Implement
387
388 See :http:copy:`/{db}/{docid}`
389 """
390 # FIXME: Figure out signature
391
[docs]
392 async def mutate(self, docid: str) -> AsyncIterator:
393 """
394 A document mutation loop::
395
396 async for doc in couch.mutate_doc("spam"):
397 doc.foo = "bar"
398
399 Will replay the mutation until it goes through.
400 """
401 doc = await self.get(docid)
402 while True:
403 yield doc
404 try:
405 await self.attempt_put(doc)
406 except Conflict:
407 doc = await self.get(docid)
408 else:
409 break
410
411 # TODO: Mango searches
412 # TODO: Database operations
413
414
[docs]
415class SessionPool:
416 """
417 Responsible for giving out Couch connections.
418
419 You probably want to override like::
420
421 class MyPool(SessionPool):
422 session_class = MySession
423 """
424
425 _client: httpx.AsyncClient
426
427 #: Class to use for sessions
428 session_class: type[CouchSession]
429
430 def __init__(self):
431 super().__init__()
432 self._client = self.make_client()
433
[docs]
434 def make_client(self) -> httpx.AsyncClient:
435 """
436 Produce an httpx client.
437 """
438 return httpx.AsyncClient(http2=True, follow_redirects=True)
439
[docs]
440 async def iter_servers(self) -> AsyncIterator[str]:
441 """
442 Produce the list of potential servers.
443
444 Override this
445 """
446 raise NotImplementedError
447 for _ in ():
448 yield
449
450 async def _check_server(self, url: httpx.URL):
451 resp = await self._client.get(url.join("_up"))
452 return resp.is_success
453
[docs]
454 async def session(self) -> CouchSession:
455 """
456 Get a session
457 """
458 async for url in self.iter_servers():
459 url = httpx.URL(url)
460 if await self._check_server(url):
461 return self.session_class(self._client, url)