Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Fair lock recipe? #115

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open

Add a Fair lock recipe? #115

wants to merge 2 commits into from

Conversation

grantjenks
Copy link
Owner

Some prototyping for a fair lock recipe.

@grantjenks grantjenks mentioned this pull request Jun 7, 2019
@grantjenks
Copy link
Owner Author

The tricky part is what to do when one of the cache key expires but not both of them. Maybe there needs to be a single cache key to track tickets and serving. It still seems the cache key could expire while someone is holding the lock. I'm not sure how they later discover that the lock was lost.

I'm sure there's research on this problem. Need to look there rather than re-implementing.

@grantjenks
Copy link
Owner Author

Brainstorming, came up with:

class FairRLock:
    """Recipe for cross-process and cross-thread re-entrant lock.

    Assumes the key will not be evicted. Set the eviction policy to 'none' on
    the cache to guarantee the key is not evicted.

    >>> import diskcache
    >>> cache = diskcache.Cache()
    >>> rlock = FairRLock(cache, 'user-123')
    >>> rlock.acquire()
    >>> rlock.acquire()
    >>> rlock.release()
    >>> with rlock:
    ...     pass
    >>> rlock.release()

    """
    def __init__(self, cache, key, expire=None, tag=None):
        self._cache = cache
        self._key = key
        self._expire = expire
        self._tag = tag
        value = {
            'tickets': 0,
            'current': 1,
            'pid_tid': '',
            'time': 0,
            'count': 0,
        }
        self._cache.add(self._key, value, tag=self._tag)

    def _acquire_ticket(self):
        with self._cache.transact():
            state = self._cache.get(self._key)
            state['tickets'] += 1
            self._cache.set(self._key, state, tag=self._tag)
        return state['tickets']

    def _release_ticket(self, state):
        state['current'] += 1
        state['pid_tid'] = ''
        state['time'] = 0
        state['count'] = 0
        self._cache.set(self._key, state, tag=self._tag)

    def _get_pid_tid(self):
        pid_tid = f'{os.getpid()}-{threading.get_ident()}'
        return pid_tid

    def acquire(self):
        pid_tid = self._get_pid_tid()
        ticket = self._acquire_ticket()

        while True:
            with self._cache.transact():
                state = self._cache[self._key]

                if state['current'] == ticket:
                    state['pid_tid'] = pid_tid
                    state['time'] = time.monotonic()
                    state['count'] += 1
                    self._cache.set(self._key, state, tag=self._tag)
                    break

                expired = (
                    self._expire is not None
                    and time.monotonic() - state['time'] > self._expire
                )

                if expired:
                    self._release_ticket(state)

            time.sleep(0.01)

        return ticket

    def release(self, ticket):
        pid_tid = self._get_pid_tid()

        with self._cache.transact():
            state = self._cache.get(self._key)

            if pid_tid != state['pid_tid']:
                return

            if state['count'] > 1:
                state['time'] = time.monotonic()
                state['count'] -= 1
                self._cache.set(self._key, state, tag=self._tag)

            assert state['count'] == 1
            self._release_ticket(state)

    def __enter__(self):
        self.acquire()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()

But I think this'll break because the current ticket number is only incremented by release(). If one of the release() calls doesn't happen, then everything will be stuck.

@grantjenks
Copy link
Owner Author

Change the code such that if the lock expires then it is immediately reacquired by another thread or process in the acquire() method rather than by first releasing it.

Update the release ticket code to set time correctly. This will allow the acquire method to skip tickets if they are not acquired after a long time.

@grantjenks
Copy link
Owner Author

When first acquiring a ticket, if the pid_tid already holds the lock, then don't increment the tickets count.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant