1
0

proj: Add intercept.py

This commit is contained in:
2025-01-20 19:06:27 +01:00
parent 526893b78a
commit bcf82f91a4
3 changed files with 174 additions and 40 deletions

148
proj/server/src/intercept.py Executable file
View File

@@ -0,0 +1,148 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from socketserver import UnixStreamServer, StreamRequestHandler, ThreadingMixIn
import argparse
import os
class ThreadedUnixStreamServer(ThreadingMixIn, UnixStreamServer):
pass
class Handler(StreamRequestHandler):
pid: int
stack: list[tuple[str, list[str]]]
def before(self) -> None:
pass
def after(self) -> None:
pass
def handle(self):
first = self.rfile.readline()
self.pid = int(first.split(b':')[1])
self.stack = []
print(f'Process with PID {self.pid} connected')
self.before()
try:
while True:
msg = self.rfile.readline()
if not msg:
return
self.handle_msg(msg)
finally:
self.after()
def handle_msg(self, msg: bytes):
timestamp, data = msg.rstrip(b'\n').split(b' ', 1)
if not data.startswith(b'return ') and not data == b'return':
call = data.decode('utf-8')
print(f'[{self.pid}] {call}')
func_name = call[:call.find('(')]
args = call[call.find('(') + 1:-1].split(', ')
self.stack.append((func_name, args))
try:
func = getattr(self, f'before_{func_name}')
if callable(func):
command = func(args)
else:
command = 'ok'
except AttributeError:
command = 'ok'
print(f'[{self.pid}] -> {command}')
self.wfile.write(command.encode('utf-8') + b'\n')
else:
ret = data.decode('utf-8')
ret_value = ret[7:]
func_name, args = self.stack.pop()
try:
func = getattr(self, f'after_{func_name}')
if callable(func):
func(args, ret_value)
except AttributeError:
pass
print(f'[{self.pid}] -> {ret}')
class MemoryAllocationTester(Handler):
allocated: dict[int, int]
max_allocated: int
num_malloc: int
num_realloc: int
num_free: int
def before(self):
self.allocated = {}
self.max_allocated = 0
self.num_malloc = 0
self.num_realloc = 0
self.num_free = 0
def after(self):
if len(self.allocated) > 0:
print("Not free'd:")
for ptr, size in self.allocated.items():
print(f' 0x{ptr:x}: {size} bytes')
else:
print("All blocks free'd!")
print(f'Max allocated: {self.max_allocated} bytes')
def update_max_allocated(self):
total = sum(self.allocated.values())
if total > self.max_allocated:
self.max_allocated = total
def after_malloc(self, args: list[str], ret_value: str) -> None:
self.num_malloc += 1
if ret_value != '(nil)':
size = int(args[0], 0)
self.allocated[int(ret_value, 0)] = size
self.update_max_allocated()
def after_calloc(self, args: list[str], ret_value: str) -> None:
self.num_malloc += 1
if ret_value != '(nil)':
size = int(args[0], 0) * int(args[1], 0)
self.allocated[int(ret_value, 0)] = size
self.update_max_allocated()
def after_realloc(self, args: list[str], ret_value: str) -> None:
self.num_realloc += 1
if args[0] != '(nil)':
new_size = int(args[1], 0)
if ret_value != '(nil)':
del self.allocated[int(args[0], 0)]
self.allocated[int(ret_value, 0)] = new_size
self.update_max_allocated()
def after_free(self, args: list[str], ret_value: str) -> None:
self.num_free += 1
if args[0] != '(nil)':
del self.allocated[int(args[0], 0)]
def intercept(socket: str, handler: type[Handler]) -> None:
try:
with ThreadedUnixStreamServer(socket, handler) as server:
server.serve_forever()
except KeyboardInterrupt:
print('\nBye')
server.shutdown()
finally:
try:
os.unlink(socket)
except FileNotFoundError:
pass
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('socket', metavar='FILE')
args = parser.parse_args()
intercept(args.socket, Handler)
if __name__ == '__main__':
main()

View File

@@ -1,54 +1,40 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from socketserver import UnixStreamServer, StreamRequestHandler, ThreadingMixIn
import os
import argparse
class Handler(StreamRequestHandler):
def handle(self):
first = self.rfile.readline()
pid = int(first.split(b':')[1])
print(f'Process with PID {pid} connected')
while True:
msg = self.rfile.readline()
if not msg:
return
timestamp, data = msg.rstrip(b'\n').split(b' ', 1)
if not data.startswith(b'return ') and not data == b'return':
call = data.decode('utf-8')
print(f'[{pid}] {call}')
if call.startswith('malloc('):
command = 'fail ENOMEM'
else:
command = 'ok'
print(f'[{pid}] -> {command}')
self.wfile.write(command.encode('utf-8') + b'\n')
else:
ret = data.decode('utf-8')
print(f'[{pid}] -> {ret}')
import intercept
class ThreadedUnixStreamServer(ThreadingMixIn, UnixStreamServer):
pass
class Handler(intercept.Handler):
allocated: dict[int, int]
def before(self):
self.allocated = {}
def after(self):
if len(self.allocated) > 0:
print("Not free'd:")
for ptr, size in self.allocated.items():
print(f' 0x{ptr:x}: {size} bytes')
else:
print("All blocks free'd!")
def after_malloc(self, args: list[str], value: str) -> None:
if value != '(nil)':
self.allocated[int(value, 0)] = int(args[0], 0)
def before_free(self, args: list[str]) -> str:
if args[0] != '(nil)':
del self.allocated[int(args[0], 0)]
return 'ok'
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('socket', metavar='FILE')
args = parser.parse_args()
try:
with ThreadedUnixStreamServer(args.socket, Handler) as server:
server.serve_forever()
except KeyboardInterrupt:
print('\nBye')
server.shutdown()
finally:
try:
os.unlink(args.socket)
except FileNotFoundError:
pass
intercept.intercept(args.socket, intercept.MallocTester)
if __name__ == '__main__':