Goal: process 100K sqlite files where each file is approx 300MB (decode blob field and delete matching rows)
Machine specs: cores:64, mem:100G
The machine already has other critical services running hence the max_workers=15 is set. If it is not throttled the memory usage goes through the roof. It is approximately max_workers X size of a file opened
. By default the max_workers = number of cores X 5
Concurrent/Async:
asyn_process.py
import sqlite3
import json
import concurrent.futures
import logging
import time
logger = logging.getLogger('my_logger')
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler('/tmp/mylog.log')
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
def rm_eventx_from_db(sqlitefilename,logger):
try:
conn = sqlite3.connect(sqlitefilename)
cursor = conn.cursor()
cursor.execute('SELECT ID,LOG FROM OLD_LOGS')
idlist=[]
for row in cursor.fetchall():
colid = row[0]
msg = row[1]
m = msg.decode('utf-8')
msgjson = json.loads(m)
# print(msgjson['_normalized_fields']['event_id'])
if msgjson['_normalized_fields']['event_id'] == 12345:
idlist.append(colid)
for delete_id in idlist:
cursor.execute('DELETE FROM OLD_LOGS WHERE ID = ?', (delete_id,))
conn.commit()
cursor.close()
conn.close()
logger.warning(f"processing done for {sqlitefilename}")
except Exception as e:
logger.warning(f"rm_eventx_from_db err: {sqlitefilename} "+str(e))
def vaccumdb(sqlitefilename):
try:
conn = sqlite3.connect(sqlitefilename)
cursor = conn.cursor()
cursor.execute('VACUUM')
cursor.close()
conn.commit()
conn.close()
except Exception as e:
logger.warning(f"vaccum_db err: {sqlitefilename} "+str(e))
def main():
start_time = time.perf_counter()
futures=[]
listfile = '/tmp/filelist.txt'
base_path='/data/storage/archive/'
with open(listfile, 'r') as file:
with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor:
for line in file:
line = line.strip()
file_path=base_path+str(line)
print(file_path)
futures.append(executor.submit(rm_eventx_from_db,file_path,logger))
for future in concurrent.futures.as_completed(futures):
logger.warning("futures msg : "+str(future.result()))
fut_vac=[]
with open(listfile, 'r') as file:
with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor:
for line in file:
line = line.strip()
file_path=base_path+line
fut_vac.append(executor.submit(vaccumdb,file_path))
for future in concurrent.futures.as_completed(fut_vac):
logger.warning("vaccum futures msg : "+str(future.result()))
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f"Elapsed time: {execution_time:.6f} Seconds")
if __name__ == "__main__":
main()
here is some top
stats
# top -H -p 1545043
top - 15:10:49 up 233 days, 23:17, 1 user, load average: 9.39, 11.37, 12.03
Threads: 16 total, 2 running, 14 sleeping, 0 stopped, 0 zombie
%Cpu(s): 11.5 us, 11.4 sy, 0.4 ni, 74.9 id, 1.1 wa, 0.0 hi, 0.6 si, 0.0 st
MiB Mem : 100699.4 total, 3401.5 free, 83303.5 used, 13994.4 buff/cache
MiB Swap: 4096.0 total, 26.1 free, 4069.9 used. 16514.7 avail Mem
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
1545055 root 20 0 5464740 4.3g 15252 S 26.3 4.4 1:59.90 python async_process1.py
1545059 root 20 0 5464740 4.3g 15252 R 25.0 4.4 1:54.33 python async_process1.py
1545061 root 20 0 5464740 4.3g 15252 S 24.7 4.4 1:54.30 python async_process1.py
1545062 root 20 0 5464740 4.3g 15252 S 24.3 4.4 1:53.59 python async_process1.py
1545067 root 20 0 5464740 4.3g 15252 S 24.3 4.4 1:53.75 python async_process1.py
1545057 root 20 0 5464740 4.3g 15252 S 24.0 4.4 1:53.75 python async_process1.py
1545058 root 20 0 5464740 4.3g 15252 R 23.7 4.4 1:53.95 python async_process1.py
1545066 root 20 0 5464740 4.3g 15252 S 23.7 4.4 1:54.01 python async_process1.py
1545063 root 20 0 5464740 4.3g 15252 S 23.3 4.4 1:54.32 python async_process1.py
1545064 root 20 0 5464740 4.3g 15252 S 23.3 4.4 1:54.03 python async_process1.py
1545065 root 20 0 5464740 4.3g 15252 S 23.3 4.4 1:53.85 python async_process1.py
1545068 root 20 0 5464740 4.3g 15252 S 23.3 4.4 1:53.48 python async_process1.py
1545069 root 20 0 5464740 4.3g 15252 S 23.3 4.4 1:54.11 python async_process1.py
1545056 root 20 0 5464740 4.3g 15252 S 23.0 4.4 1:53.73 python async_process1.py
1545054 root 20 0 5464740 4.3g 15252 S 22.7 4.4 1:59.47 python async_process1.py
1545043 root 20 0 5464740 4.3g 15252 S 0.0 4.4 0:01.89 python async_process1.py
the total memory consumed by script is 4.3GB
After observing the log, is is observed that number of processed files per minute can vary from 2 to 15.
Below is a synchronous execution code
sync_process2.py
import sqlite3
import json
import concurrent.futures
import logging
import time
logger = logging.getLogger('my_logger')
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler('/tmp/mylog2.log')
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
def rm_eventx_from_db(sqlitefilename,logger):
try:
conn = sqlite3.connect(sqlitefilename)
cursor = conn.cursor()
cursor.execute('SELECT ID,LOG FROM OLD_LOGS')
idlist=[]
for row in cursor.fetchall():
colid = row[0]
msg = row[1]
m = msg.decode('utf-8')
msgjson = json.loads(m)
# print(msgjson['_normalized_fields']['event_id'])
if msgjson['_normalized_fields']['event_id'] == 36870:
idlist.append(colid)
for delete_id in idlist:
cursor.execute('DELETE FROM OLD_LOGS WHERE ID = ?', (delete_id,))
conn.commit()
cursor.close()
conn.close()
logger.warning(f"processing done for {sqlitefilename}")
except Exception as e:
logger.warning(f"rm_eventx_from_db err: {sqlitefilename} "+str(e))
def vaccumdb(sqlitefilename):
try:
conn = sqlite3.connect(sqlitefilename)
cursor = conn.cursor()
cursor.execute('VACUUM')
cursor.close()
conn.commit()
conn.close()
except Exception as e:
logger.warning(f"vaccum_db err: {sqlitefilename} "+str(e))
def main():
start_time = time.perf_counter()
futures=[]
listfile = '/tmp/filelist2.txt'
base_path='/data/archives/lake/'
with open(listfile, 'r') as file:
for line in file:
line = line.strip()
file_path=base_path+str(line)
print(file_path)
rm_eventx_from_db(file_path,logger)
vaccumdb(file_path)
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f"Elapsed time: {execution_time:.6f} Seconds")
if __name__ == "__main__":
main()
It is observed that 99% of time 3 files are being processed per minute.
Below is cpu + mem usage stat
top - 02:20:56 up 234 days, 10:27, 1 user, load average: 95.08, 95.59, 95.43
Tasks: 1178 total, 2 running, 1176 sleeping, 0 stopped, 0 zombie
%Cpu(s): 10.8 us, 9.8 sy, 0.1 ni, 77.7 id, 1.3 wa, 0.0 hi, 0.4 si, 0.0 st
MiB Mem : 100699.4 total, 637.1 free, 80412.8 used, 19649.5 buff/cache
MiB Swap: 4096.0 total, 17.7 free, 4078.3 used. 19406.4 avail Mem
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
1352886 root 20 0 5223396 4.1g 18236 S 339.0 4.1 284:48.95 python /script/asyn_process.py
2542922 root 20 0 311076 295640 5452 R 99.7 0.3 27:14.71 python /script/sync_process.py
Async Python code execution offers advantages over synchronous execution when it comes to processing files at a faster rate. However, the choice between the two approaches involves tradeoffs that depend on available resources, time constraints, and existing processes.
Considering that the database operation is CPU-intensive, Python may not be the most suitable tool for such tasks.
Top comments (0)