Fun With Crockford

Sat Feb 29, 2020

I'm trying something new this time. I've recently watched and followed along with this video, which is a long talk/exercise lecture by Douglas Crockford. The rest of this post is a giant, literate-ish code-block that encodes my attempt at following along in Python.

If you want to run it, just paste the below into a py file and import it. It uses some Python 3 features, so I'd recommend python3.7 or higher.

class TrivialTest:
    # I want to write tests as I go, so I need a bit of plumbing first.
    def __init__(self):
        self.failures = []
        self.tested = 0

    def trivial_test(self, *test_forms):
        self.tested += 1
        print(f"--- [{self.tested}]---------------------------")
        for test_form in test_forms:
            res = eval(test_form)
            msg = f"{res} <= [ {test_form} ]"
            if not res:
                self.failures.append((self.tested, msg))
            print(msg)

    def report(self):
        print(f"==============================")
        if not self.failures:
            print("ALL TESTS PASSED")
        else:
            for block, fail in self.failures:
                print(f"{block} => {fail}")
        self.failures = []
        self.tested = 0

tests = TrivialTest()
tt = tests.trivial_test  # I'm lazy, fuck you

def identity(x):
    return x

tt("identity(3) == 3")


def add(a, b):
    return a + b

def sub(a, b):
    return a - b

def mul(a, b):
    return a * b

def identityf(value):
    return lambda: value

tt("identityf(3)() == 3", "identityf(3)() == identity(3)")


def addf(a):
    return lambda b: a + b

tt("addf(3)(4) == 7", "addf(3)(4) == add(3, 4)")


def liftf(fn):
    return lambda a: lambda b: fn(a, b)

tt("liftf(add)(3)(4) == addf(3)(4) == add(3, 4)")


def curry(fn, a):
    return liftf(fn)(a)

def curry(fn, *args):
    return lambda *brgs: fn(*args, *brgs)

tt(
    "curry(add, 3)(4) == 7",
    "curry(mul, 5)(6) == 30",
    "curry(mul, 5)(6) == liftf(mul)(5)(6)",
)

inc_tests = ["inc(1) == 2", "inc(inc(1)) == 3", "inc(5) == 6"]

inc = curry(add, 1)
tt(*inc_tests)
inc = liftf(add)(1)
tt(*inc_tests)
inc = addf(1)
tt(*inc_tests)


def twice(fn):
    return lambda a: fn(a, a)

doubl = twice(add)
square = twice(mul)

tt("doubl(11) == 22", "square(11) == 121")


def reverse(fn):
    return lambda a, b: fn(b, a)

tt("reverse(sub)(3, 2) == -1")


def composeu(fna, fnb):
    return lambda a: fnb(fna(a))

tt("composeu(doubl, square)(5) == 100")


def composeb(fna, fnb):
    return lambda a, b, c: fnb(fna(a, b), c)

tt("composeb(add, mul)(2, 3, 7) == 35")


def limit(fn, times):
    def _internal(*args):
        nonlocal times
        times -= 1
        if times >= 0:
            return fn(*args)

    return _internal

add_ltd = limit(add, 1)
tt("add_ltd(3, 4) == 7", "add_ltd(3, 5) is None")


def from_(initial):
    initial -= 1

    def _internal():
        nonlocal initial
        initial += 1
        return initial

    return _internal

index = from_(0)
tt("index() == 0", "index() == 1", "index() == 2")


def to_(gen, limit):
    def _internal():
        nonlocal gen
        if gen:
            val = gen()
            if limit > val:
                return val
            else:
                gen = None
                return None

    return _internal

index = to_(from_(1), 3)
tt("index() == 1", "index() == 2", "index() is None")


def fromTo(initial, limit):
    return to_(from_(initial), limit)

index = fromTo(0, 3)
tt("index() == 0", "index() == 1", "index() == 2", "index() is None")


def element(arr, gen):
    def _internal():

        ix = gen()
        #   We need to check for length to avoid errors if the generator returns
        # indices past the end of the array
        return (
            arr[ix] if (ix is not None) and len(arr) >= ix else None
        )

    return _internal

ele = element(["a", "b", "c", "d"], fromTo(1, 3))
tt("ele() == 'b'", "ele() == 'c'", "ele() is None")


def element(arr, gen=None):
    if gen is None:
        gen = fromTo(0, len(arr))

    def _internal():
        ix = gen()
        return arr[ix] if (ix is not None) and len(arr) >= ix else None  # See above

    return _internal

ele = element(["a", "b", "c", "d"])
tt("ele() == 'a'", "ele() == 'b'", "ele() == 'c'", "ele() == 'd'", "ele() is None")


def collect(gen, arr):
    def _internal():
        val = gen()
        if val is not None:  # 0 is falsy, so we need the None check
            arr.append(val)
            return val

    return _internal

ARRAY = []
col = collect(fromTo(0, 2), ARRAY)

tt("col() == 0", "col() == 1", "col() is None", "ARRAY == [0, 1]")


def filter(gen, pred):
    def _internal():  # Crockford has us do a do loop here instead
        val = gen()
        if val is None or pred(val):
            return val
        return _internal()

    return _internal

fil = filter(fromTo(0, 5), lambda v: (v % 3) == 0)
tt("fil() == 0", "fil() == 3", "fil() is None")


def concat(gena, genb):
    gen = gena

    def _internal():
        nonlocal gen
        val = gen()
        if val is not None:
            return val
        gen = genb
        return gen()

    return _internal

con = concat(fromTo(0, 3), fromTo(0, 2))
tt(
    "con() == 0",
    "con() == 1",
    "con() == 2",
    "con() == 0",
    "con() == 1",
    "con() is None",
)


def gensymf(sym):
    gen = from_(1)
    return lambda: f"{sym}{gen()}"

geng = gensymf("G")
genh = gensymf("H")
tt("geng() == 'G1'", "genh() == 'H1'", "geng() == 'G2'", "genh() == 'H2'")


def fibonnacif(a, b):
    def _internal():
        # Crockford starts us off with a switch statement.
        # But ended up using this solution with element instead of fromTo in
        # the concat call below.
        nonlocal a, b
        n = a + b
        a = b
        b = n
        return n

    return concat(fromTo(a, b + 1), _internal)

fib = fibonnacif(0, 1)
tt(
    "fib() == 0",
    "fib() == 1",
    "fib() == 1",
    "fib() == 2",
    "fib() == 3",
    "fib() == 5",
    "fib() == 8",
)

##############################
# Ok, so Python doesn't have an object literal syntax in the same
# sense as Javascript. This is a piece of utility for
# compatibility purposes.

from collections import namedtuple

def Obj(**kwargs):
    return namedtuple("Object", " ".join(kwargs.keys()))(**kwargs)
##############################


def counter(init):
    def _up():
        nonlocal init
        init += 1
        return init

    def _down():
        nonlocal init
        init -= 1
        return init

    return Obj(up=_up, down=_down)


OBJECT = counter(10)
up = OBJECT.up
down = OBJECT.down
tt("up() == 11", "down() == 10", "down() == 9", "up() == 10")


def revocable(fn):
    f = fn

    def revoke():
        nonlocal f
        f = None

    return Obj(invoke=lambda *args: f(*args) if f else None, revoke=revoke)


rev = revocable(add)
tt("rev.invoke(3, 4) == 7", "rev.revoke() is None", "rev.invoke(3, 4) is None")


def m(value, source=None):
    return Obj(value=value, source=source if source is not None else str(value))

import math
tt(
    "m(1).value == 1",
    "m(1).source == '1'",
    "m(math.pi, 'Pi').value == math.pi",
    "m(math.pi, 'Pi').source == 'Pi'",
)


def addm(a, b):
    return m(add(a.value, b.value), f"({a.source}+{b.source})")

tt(
    "addm(m(3), m(4)).value == 7",
    "addm(m(3), m(4)).source == '(3+4)'",
    "addm(m(1), m(math.pi, 'pi')).source == '(1+pi)'",
    "addm(m(1), m(math.pi, 'pi')).value == (1+math.pi)",
)


def liftm(fn, label):
    return lambda *args: m(
        fn(*[a.value for a in args]), f"({label.join([a.source for a in args])})"
    )

addm = liftm(add, "+")
tt(
    "addm(m(3), m(4)).value == 7",
    "addm(m(3), m(4)).source == '(3+4)'",
    "addm(m(1), m(math.pi, 'pi')).source == '(1+pi)'",
    "addm(m(1), m(math.pi, 'pi')).value == (1+math.pi)",
)


def liftm(fn, label):
    # I did a different thing than Crockford here.
    # He coerces incoming arguments into ms, I just have a getter/setter
    # that self-evaluateish if we get passed something that doesn't
    # have a .value or .source
    def val(obj):
        try:
            return obj.value
        except AttributeError:
            return obj

    def src(obj):
        try:
            return obj.source
        except AttributeError:
            return str(obj)

    return lambda *args: m(
        fn(*[val(a) for a in args]), f"({label.join([src(a) for a in args])})"
    )

addm = liftm(add, "+")
tt(
    "addm(m(3), m(4)).value == 7",
    "addm(m(3), m(4)).source == '(3+4)'",
    "addm(m(1), m(math.pi, 'pi')).source == '(1+pi)'",
    "addm(m(1), m(math.pi, 'pi')).value == (1+math.pi)",
    "addm(3, 4).value == 7",
    "addm(3, 4).source == '(3+4)'",
)


def exp(sae):
    if type(sae) is list and callable(sae[0]):
        return sae[0](*sae[1:])
    else:
        return sae

tt("exp([mul, 5, 11]) == 55", "exp(42) == 42")


def exp(nae):
    if type(nae) is list and callable(nae[0]):
        return nae[0](*[exp(arg) for arg in nae[1:]])
    else:
        return nae

nae = [math.sqrt, [add, [square, 3], [square, 4]]]
tt("exp(nae) == 5")


def addg(a=None):
    if a is None:
        return None
    else:
        total = a

        def recur(b=None):
            nonlocal total
            if b is None:
                return total
            else:
                total += b
                return recur
        # This is the first time I heard of the term "retursion",
        # and I'm not sure how I feel about it.

        return recur

tt(
    "addg() is None",
    "addg(2)() == 2",
    "addg(2)(7)() == 9",
    "addg(3)(0)(4)() == 7",
    "addg(1)(2)(4)(8)() == 15",
)


def liftg(fn):
    def fng(a=None):
        if a is None:
            return None
        else:
            acc = a

            def recur(b=None):
                nonlocal acc
                if b is None:
                    return acc
                else:
                    acc = fn(acc, b)
                    return recur

            return recur

    return fng

addg = liftg(add)
tt(
    "addg() is None",
    "addg(2)() == 2",
    "addg(2)(7)() == 9",
    "addg(3)(0)(4)() == 7",
    "addg(1)(2)(4)(8)() == 15",
    "liftg(mul)() is None",
    "liftg(mul)(3)() == 3",
    "liftg(mul)(3)(0)(4)() == 0",
    "liftg(mul)(1)(2)(4)(8)() == 64",
)


def arrayg(a=None):
    if a is None:
        return []
    else:
        return liftg(lambda memo, elem: memo + [elem])([a])  # I got extra credit :p

tt("arrayg() == []", "arrayg(3)() == [3]", "arrayg(3)(4)(5)() == [3, 4, 5]")


def continuize(fn):
    return lambda callback, *args: callback(fn(*args))

tt(
    "continuize(math.sqrt)(identity, 81) == 9",
    "continuize(math.sqrt)(lambda v: str(int(v)), 81) == '9'",
)

def vector():
    vec = []
    def store(ix, elem):
        vec[ix] = elem
    return Obj(append = lambda elem: vec.append(elem),
               store = store,
               get = lambda ix: vec[ix])

#   There's a long discussion regarding the security of the above object.
# Basically, the question is: given this definition of `vector`, is there
# a way that an attacker given the result of `vector()` can get out a
# pointer to `vec`.
#   In Javascript, it turns out there's an attack that exploits a bunch of
# differnet flaws/misfeatures of JavaScript ot accomplish this.
#   1. the fact that arrays in JS aren't really arrays (you can assign things
#      to their method slots through the same interface as you assign things
#      to their storage slots)
#   2. the semantics of the `this` keyword
# python has neither of these vulnerabilities, so you don't have to take the
# extra step of coercing the first input to `store` into an `int`. You may,
# but you don't have to in order to maintain security in the sense
# Crockford is interested in here.
#   That said, I don't know enough about pythons' implementation to claim that
# you automatically get the desired level of security here. There might be ways
# of exploiting `locals` or `globals`, or some implementation detail of my Obj
# function in order to get at a pointer to vec directly.

def pubsub():
    subscribers = []
    def pub(msg):
        for s in subscribers:
            try:
                s(msg)
            except Exception:
                pass
    return Obj(subscribe=lambda fn: subscribers.append(fn),
               publish=pub)

#   This is almost as naive as the first attempt we get shown for debugging in
# Crockford's video. It wraps the call to the subscriber function with a
# `try`/`catch`. There's still attacks from subscribers we're vulnerable to.
# The big one being sleeping forever.
#   Another attack that Crockford mentions (having an adversary mutate pubsub
# so that the `publish` function does something else) also fails here, because
# our Obj function effectively automatically `freeze`s its' output. That is,
# the thing you get out of `namedtuple` is an immutable object.
#   Yet another attack involves `this`, which we thankfully don't have in python,
# and so we can safely disregard it.
#   The last attack he describes is an out-of order attack. Where you subscribe with something that calls `publish` once itself. This way, you can deliver a message of your choice to all subscribers who came after you before they get the message that you get.

def pubsub():
    import multiprocessing, logging, sys
    from queue import SimpleQueue

    logger = logging.getLogger("PUBSUB")
    logger.setLevel(logging.WARN)
    h = logging.StreamHandler(sys.stdout)
    logger.addHandler(h)

    subscribers = multiprocessing.Queue()
    messages = multiprocessing.Queue()
    def proc():
        logger.debug("Starting proc")
        while True:
            msg = messages.get()
            logger.info(f"Got message <<{msg}>>...")
            logger.debug(f"Going through subscribers {subscribers}")
            for _ in range(subscribers.qsize()):
                s = subscribers.get()
                logger.debug(f"  Delivering to {s}")
                try:
                    p = multiprocessing.Process(target=s, args=(msg,))
                    p.start()
                    logger.debug("  Started process...")
                    p.join(10)
                    logger.debug("  Joined...")
                    if p.is_alive():
                        p.terminate()
                        p.join()
                        logger.warn("  Forced to terminate :( ...")
                    subscribers.put(s)
                except Exception:
                    logger.warn("  Subscriber failed in some way...")
                    pass
    runner = multiprocessing.Process(target=proc, args=())
    def stop(delay):
        runner.join(delay)
        if runner.is_alive():
            runner.terminate()
            runner.join()
    runner.start()
    return Obj(subscribe=lambda fn: subscribers.put(fn),
               publish=lambda msg: messages.put(msg),
               stop=stop)

#   That is... pretty horrifically complicated. Especially compared to
# Crockfords' proposed JS solution, which is to use `setTimeout` to delay
# things a bit. It's unfortunate, because that approach opens up another attack,
# but is ridiculously simple.
#   At the beginning of this talk, I thought we'd stick mostly to functional
# concepts. But now that we've broken through into asynchronous functions,
# multiprocessing, and (based on the motivation bits of the talk) distributed
# computing, I'm very tempted to try this exercise again in a Lisp or ML
# at some point.
#   As always, I'll keep you posted.

tests.report()

Creative Commons License

all articles at langnostic are licensed under a Creative Commons Attribution-ShareAlike 3.0 Unported License

Reprint, rehost and distribute freely (even for profit), but attribute the work and allow your readers the same freedoms. Here's a license widget you can use.

The menu background image is Jewel Wash, taken from Dan Zen's flickr stream and released under a CC-BY license