-
Notifications
You must be signed in to change notification settings - Fork 19
/
50_Priority-Queue.py
71 lines (60 loc) · 1.81 KB
/
50_Priority-Queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#!/usr/bin/python
# coding=utf-8
'''
__author__ = 'sunp'
__date__ = '2019/2/12'
Implement a Priority Queue
'''
class PriorityQueue(object):
'''
lowest first: min heap
'''
def __init__(self):
self._queue = []
def size(self):
return len(self._queue)
def push(self, item, priority):
self._queue.append((item, priority))
self._shiftup(self.size()-1)
def pop(self):
last = self._queue.pop()
if self._queue:
top = self._queue[0]
self._queue[0] = last
self._shiftdown(0)
return top[0]
return last[0]
def _shiftdown(self, pos):
item, prior = self._queue[pos]
childpos = (pos << 1) + 1
while childpos < self.size():
if childpos < self.size()-1 and self._queue[childpos+1][1] < self._queue[childpos][1]:
childpos += 1
if prior <= self._queue[childpos][1]:
break
self._queue[pos] = self._queue[childpos]
pos = childpos
childpos = (pos << 1) + 1
self._queue[pos] = (item, prior)
def _shiftup(self, pos):
item, prior = self._queue[pos]
parentpos = (pos - 1) >> 1
while parentpos >= 0:
if prior >= self._queue[parentpos][1]:
break
self._queue[pos] = self._queue[parentpos]
pos = parentpos
parentpos = (pos - 1) >> 1
self._queue[pos] = (item, prior)
if __name__ == '__main__':
test = PriorityQueue()
test.push('fourth', 4)
test.push('first', 1)
test.push('second', 2)
test.push('third', 3)
assert test.size() == 4
assert test.pop() == 'first'
assert test.size() == 3
assert test.pop() == 'second'
assert test.pop() == 'third'
assert test.pop() == 'fourth'