#!/usr/bin/env python3 # -*- coding: utf-8 -*- from typing import TextIO, Callable import os import sys import time import traceback import gc import socketserver import io import signal import math import weasyprint import pypdf VERSION = __version__ = '0.2.4' SOCKET_ADDRESS = ('127.0.0.1', 30983) BATCH_SIZE = 10 def convert_part(output_file: str, batch: list[str], step_cb: Callable, encoding: str = None) -> list[int]: documents = [] for n, file_name in enumerate(batch): html = weasyprint.HTML(filename=file_name, encoding=encoding) doc = html.render() documents.append(doc) del html step_cb() all_pages = [p for doc in documents for p in doc.pages] documents[0].copy(all_pages).write_pdf(output_file) tmp_page_nums = [len(doc.pages) for doc in documents] del documents del all_pages gc.collect() step_cb() return tmp_page_nums def convert(input_files: list[str], output_file: str, encoding: str = None, padding: bool = False, progress: bool = False, out: TextIO = sys.stdout) -> list[int]: # it takes roughly 100ms to generate one document tmp_file_names = [] page_nums = [] html_files = [file.lstrip('!#') for file in input_files if not file.endswith('.pdf')] total_steps = len(html_files) + math.ceil(len(html_files) / BATCH_SIZE) + 1 steps = [0] def next_step() -> None: steps[0] += 1 if progress: print(f'progress: {steps[0]}/{total_steps}', file=out, flush=True) try: tmp_page_nums = [] for i in range(0, len(html_files), BATCH_SIZE): tmp_file = f'{output_file}.{i:04}.part' tmp_file_names.append(tmp_file) batch = html_files[i:i + BATCH_SIZE] tmp_page_nums += convert_part(tmp_file, batch, next_step, encoding=encoding) letterhead = None merger = pypdf.PdfWriter() i = 0 for n, file_name in enumerate(input_files): p0 = len(merger.pages) if letterhead and file_name.startswith('#'): merger.insert_page(letterhead[0], index=letterhead[1]) merger.insert_blank_page(index=letterhead[1] + 1) page_nums[letterhead[1]] = 1 letterhead = None if file_name.endswith('.pdf'): if padding and file_name.startswith('#'): r = pypdf.PdfReader(file_name.lstrip('!#')) letterhead = (r.pages[0], p0) del r else: merger.append(file_name.lstrip('!#')) else: batch_page_nums = tmp_page_nums[i // BATCH_SIZE * BATCH_SIZE:(i // BATCH_SIZE + 1) * BATCH_SIZE] page_start = sum(batch_page_nums[:i % BATCH_SIZE]) tmp_file_name = tmp_file_names[i // BATCH_SIZE] if padding and file_name.startswith('#'): r = pypdf.PdfReader(tmp_file_name) letterhead = (r.pages[page_start], p0) del r else: merger.append(tmp_file_name, pages=(page_start, page_start + tmp_page_nums[i])) i += 1 p1 = len(merger.pages) page_nums.append(p1 - p0) if padding and file_name[0] not in ('!', '#') and len(merger.pages) % 2 != 0: if letterhead: merger.add_page(letterhead[0]) letterhead = None else: merger.add_blank_page() if letterhead: if len(merger.pages) <= letterhead[1]: merger.add_page(letterhead[0]) merger.add_blank_page() else: merger.insert_page(letterhead[0], index=letterhead[1]) merger.insert_blank_page(index=letterhead[1] + 1) page_nums[letterhead[1]] = 1 merger.write(output_file) merger.close() del merger finally: for pdf in tmp_file_names: if os.path.isfile(pdf): os.remove(pdf) next_step() return page_nums def _wrapper_convert(args: list[str], encoding: str = None, padding: bool = False, progress: bool = False, out: TextIO = sys.stdout) -> None: try: if len(args) < 2: print(f'error: Too few arguments', file=out, flush=True) return inputs = args[:-1] output = args[-1] while len(inputs) > 0: if inputs[0] == '-2': inputs.pop(0) padding = True elif inputs[0].startswith('-e'): encoding = inputs.pop(0)[2:].strip() elif inputs[0].startswith('-p'): inputs.pop(0) progress = True else: break if len(inputs) == 0: print(f'error: Too few arguments', file=out, flush=True) return t0 = time.process_time() pages = convert(inputs, output, encoding=encoding, padding=padding, progress=progress, out=out) total = sum(p + 1 if padding and p % 2 != 0 else p for p in pages) t1 = time.process_time() print(f'success: ' f'{len(inputs)} documents, ' f'{total} pages ({", ".join(str(p) for p in pages)}), ' f'{t1 - t0:.1f} sec', file=out, flush=True) except Exception as e: msg = str(e).replace('\n', ' ') print(f'error: {msg}', file=out, flush=True) traceback.print_exception(e, file=sys.stderr) finally: gc.collect() def daemon() -> None: # a tcp server is used due to the lack of unix sockets on Windows with socketserver.ThreadingTCPServer(SOCKET_ADDRESS, ConnectionHandler) as server: def exit_gracefully(_signum: int, _frame) -> None: raise KeyboardInterrupt() signal.signal(signal.SIGINT, exit_gracefully) signal.signal(signal.SIGTERM, exit_gracefully) print('Running as daemon') try: server.serve_forever() except KeyboardInterrupt: print('', file=sys.stderr) print('Shutting down') def usage(error: bool = False) -> None: print(f'usage: {sys.argv[0]} [-h] [-v] [-d DIR] [ -D | [-p] [-2] [-e ENCODING] [ - | INPUT [INPUT...] OUTPUT ] ]\n' '\n' 'options:\n' ' -h, --help show this help message and exit\n' ' -V, --version show version and exit\n' ' -D, --daemon run as a daemon and expose a named socket\n' ' -d, --directory set the working directory\n' ' -e, --encoding encoding of the input files\n' ' -2, --double-paged pad documents to an even number of pages\n' ' -p, --progress show progress updates\n' '\n' ' - use stdin for retrieving input and output file names (semi-colon-seperated)\n' ' INPUT name of a html input file\n' ' OUTPUT name of a pdf output file', file=sys.stderr if error else sys.stdout) sys.exit(1 if error else 0) def version() -> None: print(f'WinziPrint: {__version__}\n' f'WeasyPrint: {weasyprint.__version__}\n' f'pypdf: {pypdf.__version__}') sys.exit(0) def _get_arg(args: list[str], n1: str, n2: str = None, flag: bool = False) -> None | str | bool: v = None for n in [n1] + (n2 and [n2] or []): if flag: if n in args: v = True args.remove(n) else: if n in args: i = args.index(n) if i + 1 >= len(args): usage(True) v = args[i + 1] args.pop(i) args.pop(i) return v if not flag else v or False class ConnectionHandler(socketserver.StreamRequestHandler): def handle(self): try: while True: out = io.TextIOWrapper(self.wfile, encoding='utf-8') for line in io.TextIOWrapper(self.rfile, encoding='utf-8'): _wrapper_convert(line.strip().split(';'), out=out) except ValueError: pass # socket closed by client def main() -> None: args = sys.argv[1:] if len(args) == 0 or '-h' in args or '--help' in args: usage() elif '-V' in args or '--version' in args: version() working_dir = _get_arg(args, '-d', '--directory') if working_dir: os.chdir(working_dir) if '-D' in args: if len(args) != 1: usage(True) daemon() return encoding = _get_arg(args, '-e', '--encoding') progress = _get_arg(args, '-p', '--progress', flag=True) double_paged = _get_arg(args, '-2', '--double-paged', flag=True) if args == ['-']: for line in sys.stdin: _wrapper_convert(line.strip().split(';'), encoding=encoding, padding=double_paged, progress=progress) elif len(args) < 2: usage(True) else: _wrapper_convert(args, encoding=encoding, padding=double_paged, progress=progress) if __name__ == '__main__': main() sys.exit(0)