winziprint.py: Extract convert_part() from convert()
This commit is contained in:
@ -1,7 +1,7 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
from typing import TextIO
|
from typing import TextIO, Callable
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
@ -10,6 +10,7 @@ import gc
|
|||||||
import socketserver
|
import socketserver
|
||||||
import io
|
import io
|
||||||
import signal
|
import signal
|
||||||
|
import math
|
||||||
|
|
||||||
import weasyprint
|
import weasyprint
|
||||||
import pypdf
|
import pypdf
|
||||||
@ -21,6 +22,24 @@ SOCKET_ADDRESS = ('127.0.0.1', 30983)
|
|||||||
BATCH_SIZE = 10
|
BATCH_SIZE = 10
|
||||||
|
|
||||||
|
|
||||||
|
def convert_part(output_file: str, batch: list[str], step_cb: Callable, encoding: str = None) -> list[int]:
|
||||||
|
documents = []
|
||||||
|
for n, file_name in enumerate(batch):
|
||||||
|
html = weasyprint.HTML(filename=file_name, encoding=encoding)
|
||||||
|
doc = html.render()
|
||||||
|
documents.append(doc)
|
||||||
|
del html
|
||||||
|
step_cb()
|
||||||
|
all_pages = [p for doc in documents for p in doc.pages]
|
||||||
|
documents[0].copy(all_pages).write_pdf(output_file)
|
||||||
|
tmp_page_nums = [len(doc.pages) for doc in documents]
|
||||||
|
del documents
|
||||||
|
del all_pages
|
||||||
|
gc.collect()
|
||||||
|
step_cb()
|
||||||
|
return tmp_page_nums
|
||||||
|
|
||||||
|
|
||||||
def convert(input_files: list[str],
|
def convert(input_files: list[str],
|
||||||
output_files: str,
|
output_files: str,
|
||||||
encoding: str = None,
|
encoding: str = None,
|
||||||
@ -28,34 +47,25 @@ def convert(input_files: list[str],
|
|||||||
progress: bool = False,
|
progress: bool = False,
|
||||||
out: TextIO = sys.stdout) -> list[int]:
|
out: TextIO = sys.stdout) -> list[int]:
|
||||||
# it takes roughly 100ms to generate one document
|
# it takes roughly 100ms to generate one document
|
||||||
tmp_page_nums = []
|
|
||||||
tmp_file_names = []
|
tmp_file_names = []
|
||||||
page_nums = []
|
page_nums = []
|
||||||
|
|
||||||
html_files = [file.lstrip('!') for file in input_files if not file.endswith('.pdf')]
|
html_files = [file.lstrip('!') for file in input_files if not file.endswith('.pdf')]
|
||||||
steps = len(html_files) + len(html_files) // BATCH_SIZE + 1
|
total_steps = len(html_files) + math.ceil(len(html_files) / BATCH_SIZE) + 1
|
||||||
|
convert.steps = 0
|
||||||
|
|
||||||
|
def next_step() -> None:
|
||||||
|
convert.steps += 1
|
||||||
|
if progress:
|
||||||
|
print(f'progress: {convert.steps}/{total_steps}', file=out, flush=True)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
tmp_page_nums = []
|
||||||
for i in range(0, len(html_files), BATCH_SIZE):
|
for i in range(0, len(html_files), BATCH_SIZE):
|
||||||
|
tmp_file = f'{output_files}.{i:04}.part'
|
||||||
|
tmp_file_names.append(tmp_file)
|
||||||
batch = html_files[i:i + BATCH_SIZE]
|
batch = html_files[i:i + BATCH_SIZE]
|
||||||
documents = []
|
tmp_page_nums += convert_part(tmp_file, batch, next_step, encoding=encoding)
|
||||||
for n, file_name in enumerate(batch):
|
|
||||||
html = weasyprint.HTML(filename=file_name, encoding=encoding)
|
|
||||||
doc = html.render()
|
|
||||||
documents.append(doc)
|
|
||||||
del html
|
|
||||||
if progress:
|
|
||||||
print(f'progress: {i + n + i // BATCH_SIZE + 1}/{steps}', file=out, flush=True)
|
|
||||||
all_pages = [p for doc in documents for p in doc.pages]
|
|
||||||
tmp_file_name = f'{output_files}.{i:04}.part'
|
|
||||||
documents[0].copy(all_pages).write_pdf(tmp_file_name)
|
|
||||||
tmp_file_names.append(tmp_file_name)
|
|
||||||
tmp_page_nums += [len(doc.pages) for doc in documents]
|
|
||||||
del documents
|
|
||||||
del all_pages
|
|
||||||
gc.collect()
|
|
||||||
if progress and i + BATCH_SIZE < len(html_files):
|
|
||||||
print(f'progress: {i + BATCH_SIZE + i // BATCH_SIZE + 1}/{steps}', file=out, flush=True)
|
|
||||||
|
|
||||||
merger = pypdf.PdfWriter()
|
merger = pypdf.PdfWriter()
|
||||||
i = 0
|
i = 0
|
||||||
@ -80,8 +90,7 @@ def convert(input_files: list[str],
|
|||||||
if os.path.isfile(pdf):
|
if os.path.isfile(pdf):
|
||||||
os.remove(pdf)
|
os.remove(pdf)
|
||||||
|
|
||||||
if progress:
|
next_step()
|
||||||
print(f'progress: {steps}/{steps}', file=out, flush=True)
|
|
||||||
|
|
||||||
return page_nums
|
return page_nums
|
||||||
|
|
||||||
@ -200,7 +209,7 @@ def main() -> None:
|
|||||||
usage(True)
|
usage(True)
|
||||||
# a tcp server is used due to the lack of unix sockets on Windows
|
# a tcp server is used due to the lack of unix sockets on Windows
|
||||||
with socketserver.ThreadingTCPServer(SOCKET_ADDRESS, ConnectionHandler) as server:
|
with socketserver.ThreadingTCPServer(SOCKET_ADDRESS, ConnectionHandler) as server:
|
||||||
def exit_gracefully(signum: int, frame) -> None:
|
def exit_gracefully(_signum: int, _frame) -> None:
|
||||||
raise KeyboardInterrupt()
|
raise KeyboardInterrupt()
|
||||||
signal.signal(signal.SIGINT, exit_gracefully)
|
signal.signal(signal.SIGINT, exit_gracefully)
|
||||||
signal.signal(signal.SIGTERM, exit_gracefully)
|
signal.signal(signal.SIGTERM, exit_gracefully)
|
||||||
|
Reference in New Issue
Block a user