/
psql.py
106 lines (89 loc) · 3.58 KB
/
psql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# YouTubeMDBot
# Copyright (C) 2019 - Javinator9889
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from typing import Any
from typing import Optional
from psycopg2 import pool
from psycopg2.pool import PoolError
from . import Query
from .. import DB_HOST
from .. import DB_NAME
from .. import DB_PASSWORD
from .. import DB_PORT
from .. import DB_USER
from .. import MAX_PROCESS
class PostgreSQL(MultiprocessBase):
def __new__(cls, **kwargs):
if cls.__instance is None:
connection = pool.ThreadedConnectionPool(minconn=1,
maxconn=MAX_PROCESS,
user=DB_USER,
password=DB_PASSWORD,
dbname=DB_NAME,
host=DB_HOST,
port=DB_PORT)
return super().__new__(cls, connection=connection)
return super().__new__(cls)
def free_connection(self, connection):
super().free_connection(connection)
self.connection.putconn(connection)
def get_connection(self) -> Optional[Any]:
super().get_connection()
try:
return self.connection.getconn()
except PoolError:
return None
def execute(self, query: Query):
super().new(self._execute, query)
def fetchone(self, query: Query):
super().new(self._fetchone, query)
def fetchall(self, query: Query):
super().new(self._fetchall, query)
def fetchiter(self, query: Query):
super().new(self._fetchiter, query)
@staticmethod
def _execute(self, query: Query, connection):
with connection.cursor() as cursor:
cursor.execute(query.query)
query._result = cursor.rowcount
query.is_completed = True
@staticmethod
def _fetchone(query: Query, connection):
with connection.cursor() as cursor:
cursor.execute(query.query)
query._result = cursor.fetchone()
query.is_completed = True
@staticmethod
def _fetchall(query: Query, connection):
with connection.cursor() as cursor:
cursor.execute(query.query)
query._result = cursor.fetchall()
query.is_completed = True
@staticmethod
def _fetchiter(query: Query, connection):
with connection.cursor() as cursor:
cursor.execute(query.query)
def generate():
while True:
items = cursor.fetchmany()
if not items:
break
for item in items:
yield item
query._result = generate()
query.is_completed = True
def __del__(self):
super().__del__()
self.connection.closeall()