home / github / issue_comments

Menu
  • Search all tables
  • GraphQL API

issue_comments: 970655304

This data as json

html_url issue_url id node_id user created_at updated_at author_association body reactions issue performed_via_github_app
https://github.com/simonw/datasette/issues/878#issuecomment-970655304 https://api.github.com/repos/simonw/datasette/issues/878 970655304 IC_kwDOBm6k_c452wZI 9599 2021-11-16T20:32:16Z 2021-11-16T20:32:16Z OWNER

This code is really fiddly. I just got to this version: ```python import asyncio from functools import wraps import inspect

try: import graphlib except ImportError: from . import vendored_graphlib as graphlib

class AsyncMeta(type): def new(cls, name, bases, attrs): # Decorate any items that are 'async def' methods registry = {} new_attrs = {"_registry": _registry} for key, value in attrs.items(): if inspect.iscoroutinefunction(value) and not value.__name__ == "resolve": new_attrs[key] = make_method(value) _registry[key] = new_attrs[key] else: new_attrs[key] = value # Gather graph for later dependency resolution graph = { key: { p for p in inspect.signature(method).parameters.keys() if p != "self" and not p.startswith("") } for key, method in _registry.items() } new_attrs["_graph"] = graph return super().new(cls, name, bases, new_attrs)

def make_method(method): @wraps(method) async def inner(self, _results=None, kwargs): print("inner - _results=", _results) parameters = inspect.signature(method).parameters.keys() # Any parameters not provided by kwargs are resolved from registry to_resolve = [p for p in parameters if p not in kwargs and p != "self"] missing = [p for p in to_resolve if p not in self._registry] assert ( not missing ), "The following DI parameters could not be found in the registry: {}".format( missing ) results = {} results.update(kwargs) if to_resolve: resolved_parameters = await self.resolve(to_resolve, _results) results.update(resolved_parameters) return_value = await method(self, results) if _results is not None: _results[method.name] = return_value return return_value

return inner

class AsyncBase(metaclass=AsyncMeta): async def resolve(self, names, results=None): print("\n resolve: ", names) if results is None: results = {}

    # Resolve them in the correct order
    ts = graphlib.TopologicalSorter()
    for name in names:
        ts.add(name, *self._graph[name])
    ts.prepare()

    async def resolve_nodes(nodes):
        print("    resolve_nodes", nodes)
        print("    (current results = {})".format(repr(results)))
        awaitables = [
            self._registry[name](
                self,
                _results=results,
                **{k: v for k, v in results.items() if k in self._graph[name]},
            )
            for name in nodes
            if name not in results
        ]
        print("    awaitables: ", awaitables)
        awaitable_results = await asyncio.gather(*awaitables)
        results.update(
            {p[0].__name__: p[1] for p in zip(awaitables, awaitable_results)}
        )

    if not ts.is_active():
        # Nothing has dependencies - just resolve directly
        print("    no dependencies, resolve directly")
        await resolve_nodes(names)
    else:
        # Resolve in topological order
        while ts.is_active():
            nodes = ts.get_ready()
            print("    ts.get_ready() returned nodes:", nodes)
            await resolve_nodes(nodes)
            for node in nodes:
                ts.done(node)

    print("  End of resolve(), returning", results)
    return {key: value for key, value in results.items() if key in names}

With this test:python class Complex(AsyncBase): def init(self): self.log = []

async def c(self):
    print("LOG: c")
    self.log.append("c")

async def b(self, c):
    print("LOG: b")
    self.log.append("b")

async def a(self, b, c):
    print("LOG: a")
    self.log.append("a")

async def go(self, a):
    print("LOG: go")
    self.log.append("go")
    return self.log

@pytest.mark.asyncio async def test_complex(): result = await Complex().go() # 'c' should only be called once assert result == ["c", "b", "a", "go"] ``` This test sometimes passes, and sometimes fails!

Output for a pass: ``` tests/test_asyncdi.py inner - _results= None

resolve: ['a'] ts.get_ready() returned nodes: ('c', 'b') resolve_nodes ('c', 'b') (current results = {}) awaitables: [<coroutine object Complex.c at 0x1074ac890>, <coroutine object Complex.b at 0x1074ac820>] inner - _results= {} LOG: c inner - _results= {'c': None}

resolve: ['c'] ts.get_ready() returned nodes: ('c',) resolve_nodes ('c',) (current results = {'c': None}) awaitables: [] End of resolve(), returning {'c': None} LOG: b ts.get_ready() returned nodes: ('a',) resolve_nodes ('a',) (current results = {'c': None, 'b': None}) awaitables: [<coroutine object Complex.a at 0x1074ac7b0>] inner - _results= {'c': None, 'b': None} LOG: a End of resolve(), returning {'c': None, 'b': None, 'a': None} LOG: go Output for a fail: tests/test_asyncdi.py inner - _results= None

resolve: ['a'] ts.get_ready() returned nodes: ('b', 'c') resolve_nodes ('b', 'c') (current results = {}) awaitables: [<coroutine object Complex.b at 0x10923c890>, <coroutine object Complex.c at 0x10923c820>] inner - _results= {}

resolve: ['c'] ts.get_ready() returned nodes: ('c',) resolve_nodes ('c',) (current results = {}) awaitables: [<coroutine object Complex.c at 0x10923c6d0>] inner - _results= {} LOG: c inner - _results= {'c': None} LOG: c End of resolve(), returning {'c': None} LOG: b ts.get_ready() returned nodes: ('a',) resolve_nodes ('a',) (current results = {'c': None, 'b': None}) awaitables: [<coroutine object Complex.a at 0x10923c6d0>] inner - _results= {'c': None, 'b': None} LOG: a End of resolve(), returning {'c': None, 'b': None, 'a': None} LOG: go F

=================================================================================================== FAILURES =================================================================================================== _______________ test_complex _________________

@pytest.mark.asyncio
async def test_complex():
    result = await Complex().go()
    # 'c' should only be called once
  assert result == ["c", "b", "a", "go"]

E AssertionError: assert ['c', 'c', 'b', 'a', 'go'] == ['c', 'b', 'a', 'go'] E At index 1 diff: 'c' != 'b' E Left contains one more item: 'go' E Use -v to get the full diff

tests/test_asyncdi.py:48: AssertionError ================== short test summary info ================================ FAILED tests/test_asyncdi.py::test_complex - AssertionError: assert ['c', 'c', 'b', 'a', 'go'] == ['c', 'b', 'a', 'go'] ``` I figured out why this is happening.

a requires b and c

b also requires c

The code decides to run b and c in parallel.

If c completes first, then when b runs it gets to use the already-calculated result for c - so it doesn't need to call c again.

If b gets to that point before c does it also needs to call c.

{
    "total_count": 0,
    "+1": 0,
    "-1": 0,
    "laugh": 0,
    "hooray": 0,
    "confused": 0,
    "heart": 0,
    "rocket": 0,
    "eyes": 0
}
648435885  
Powered by Datasette · Queries took 2.193ms · About: github-to-sqlite