响应式编程,但是 Python


认识我的人或多或少应该都知道,我的理想之一就是把 JavaScript 的开发体验赋能 Python。marimo 做的不错,但是我这种追求细节的人不止于此,所以自己实现一套响应式编程是逃不过的(另外也是因为,搜了一圈,似乎 Python 真没这种东西)

昨天在床上一时兴起,了解了一下 signal 的这种风格的观察者模式,然后让 GPT 用 python 实现了一遍。今天考完试就立马来试了下,感觉还不错。

是不是很熟悉的感觉?


我的目的是,让使用方式尽量 pythonic,比如上面的代码中:

from reactivity import State

class Counter:
    value = State()

value看起来是个类变量,但其实是个 descriptor,是类似property这样的东西。所以在类里类外都可以像访问普通的变量一样访问它,类似c.valueself.value都是可以的,而且类型检查器友好,完全泛型化。

实现如下:

class State[T]:
    def __init__(self, initial_value: T = None):
        self._value: T = initial_value
        self.subscribers = WeakSet[Callable[[], Any]]()

    def get(self):
        for computation in _current_computations:
            self.subscribers.add(computation)
            computation.dependencies.add(self)
        return self._value

    def set(self, value: T):
        self._value = value
        if _batches:
            _batches[-1].callbacks.extend(self.subscribers)
        else:
            for subscriber in self.subscribers:
                subscriber()

    def __get__(self, instance, owner):
        return self.get()

    def __set__(self, instance, value: T):
        self.set(value)

_current_computations = []
_batches = []

Computation 的实现如下:

class Computation[T]:
    def __init__(self, fn: Callable[[], T], auto_run=True):
        self.fn = fn
        self.dependencies = WeakSet[State]()

        if auto_run:
            self()

    def __call__(self):
        self.dependencies.clear()  # clear old dependencies
        _current_computations.append(self)
        value = self.fn()
        last = _current_computations.pop()
        assert last is self  # sanity check
        return value

    def dispose(self):
        for dep in self.dependencies:
            dep.subscribers.remove(self)

很简单吧,但是其实没有做什么 cleanup 之类的。

然后 batch 的实现如下:

class Batch:
    def __init__(self):
        self.callbacks: list[Callable] = []

    def __enter__(self):
        _batches.append(self)

    def flush(self):
        callbacks = set(self.callbacks)
        self.callbacks.clear()
        for callback in callbacks:
            callback()

    def __exit__(self, *_):
        last = _batches.pop()
        assert last is self
        self.flush()

这控制了什么时候会触发回调。使用上也很方便,直接

with Batch():
    ...

就好了,所有的 callbacks 会在退出这个块的时候执行。

另外,我还实现了一些函数式编程的工具,比如:

from collections.abc import Callable
from functools import wraps
from typing import overload

from .memo import Memoized, MemoizedProperty
from .primitives import Batch, Computation, State


def create_signal[T](initial_value: T = None) -> tuple[Callable[[], T], Callable[[T], None]]:
    signal = State(initial_value)
    return signal.get, signal.set


def create_effect[T](fn: Callable[[], T], auto_run=True):
    return Computation(fn, auto_run)


def create_memo[T](fn: Callable[[], T]):
    return Memoized(fn)


def memoized_property[T, Self](method: Callable[[Self], T]):
    return MemoizedProperty(method)


@overload
def batch() -> Batch: ...
@overload
def batch[**P, T](func: Callable[P, T]) -> Callable[P, T]: ...


def batch[**P, T](func: Callable[P, T] | None = None) -> Callable[P, T] | Batch:
    if func is not None:

        @wraps(func)
        def wrapped(*args, **kwargs):
            with Batch():
                return func(*args, **kwargs)

        return wrapped

    return Batch()

batch不仅可以with这样用,也能@batch这样作为一个装饰器来用。


留点悬念。这里面还有用到两个create_memomemoized_property没讲