-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpdf_to_markdown.py
200 lines (175 loc) · 7.27 KB
/
pdf_to_markdown.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import os
from pathlib import Path
from pdf2image import convert_from_path
from openai import OpenAI
from dotenv import load_dotenv
import base64
from io import BytesIO
import logging
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Load environment variables
load_dotenv()
logging.info("Environment variables loaded")
def clean_markdown(content):
"""Remove markdown code blocks and clean up the content"""
# Remove ```markdown and ``` tags
content = content.replace("```markdown", "").replace("```", "")
# Remove leading/trailing whitespace
content = content.strip()
return content
def encode_image_to_base64(pil_image):
"""Convert PIL Image to base64 string"""
try:
buffered = BytesIO()
# Reduced quality and size further for speed
pil_image.save(buffered, format="JPEG", quality=70, optimize=True)
base64_string = base64.b64encode(buffered.getvalue()).decode('utf-8')
logging.info(f"Image converted to base64 (length: {len(base64_string)} chars)")
return base64_string
except Exception as e:
logging.error(f"Error converting image to base64: {str(e)}")
raise
async def convert_page_to_markdown(client, image_base64):
"""Convert a page using GPT-4o-mini"""
try:
logging.info("Sending request to GPT-4o-mini")
start_time = time.time()
response = await asyncio.get_event_loop().run_in_executor(
None,
lambda: client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": "Convert this page to markdown format. Return the markdown directly without any markdown code block tags or other formatting."
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image_base64}"
}
}
]
}
],
timeout=30 # Add timeout
)
)
duration = time.time() - start_time
markdown_content = clean_markdown(response.choices[0].message.content)
logging.info(f"Received markdown response in {duration:.2f}s (length: {len(markdown_content)} chars)")
return markdown_content
except Exception as e:
logging.error(f"Error in GPT-4o-mini API call: {str(e)}")
raise
class PageBuffer:
"""Buffer to handle out-of-order page processing"""
def __init__(self):
self.pages = {}
self.next_page = 1
self.lock = asyncio.Lock()
async def add_page(self, page_num, content, output_path):
"""Add a page to the buffer and write any sequential pages that are ready"""
async with self.lock:
self.pages[page_num] = content
await self._write_sequential_pages(output_path)
async def _write_sequential_pages(self, output_path):
"""Write any sequential pages that are ready to the output file"""
while self.next_page in self.pages:
content = self.pages[self.next_page]
mode = 'w' if self.next_page == 1 else 'a'
with open(output_path, mode, encoding='utf-8') as f:
if self.next_page > 1:
f.write("\n\n---\n\n")
f.write(content)
logging.info(f"Written page {self.next_page} to file")
del self.pages[self.next_page]
self.next_page += 1
async def process_page(image, page_num, client, output_path, page_buffer):
"""Process a single page"""
try:
base64_image = encode_image_to_base64(image)
markdown_content = await convert_page_to_markdown(client, base64_image)
await page_buffer.add_page(page_num, markdown_content, output_path)
logging.info(f"Page {page_num} processed and buffered")
finally:
image.close()
async def process_pdf_batch(pdf_path, start_page, end_page, client, output_path, page_buffer):
"""Process a batch of PDF pages"""
try:
logging.info(f"Converting PDF pages {start_page} to {end_page}...")
images = convert_from_path(
pdf_path,
first_page=start_page,
last_page=end_page,
dpi=150 # Reduced DPI further for speed
)
logging.info(f"Converted batch of {len(images)} images")
# Process pages in parallel
tasks = []
for i, image in enumerate(images, start=start_page):
task = asyncio.create_task(process_page(image, i, client, output_path, page_buffer))
tasks.append(task)
await asyncio.gather(*tasks)
except Exception as e:
logging.error(f"Error processing batch {start_page}-{end_page}: {str(e)}")
raise
async def convert_pdf_to_markdown(pdf_path, output_path, batch_size=4):
"""Convert PDF to markdown using GPT-4o-mini"""
logging.info(f"Starting conversion of PDF: {pdf_path}")
# Initialize OpenAI client
try:
client = OpenAI()
logging.info("OpenAI client initialized")
except Exception as e:
logging.error(f"Failed to initialize OpenAI client: {str(e)}")
raise
# Get total number of pages
from pdf2image.pdf2image import pdfinfo_from_path
pdf_info = pdfinfo_from_path(pdf_path)
total_pages = pdf_info["Pages"]
logging.info(f"PDF has {total_pages} pages")
# Initialize page buffer
page_buffer = PageBuffer()
# Process PDF in batches
tasks = []
for start_page in range(1, total_pages + 1, batch_size):
end_page = min(start_page + batch_size - 1, total_pages)
task = asyncio.create_task(process_pdf_batch(pdf_path, start_page, end_page, client, output_path, page_buffer))
tasks.append(task)
await asyncio.gather(*tasks)
async def main_async():
logging.info("Starting PDF to Markdown conversion process")
# Define the PDF path here
pdf_path = "106-24_1.pdf" # Replace with your PDF file path
output_path = Path(pdf_path).with_suffix('.md')
logging.info(f"Using PDF file: {pdf_path}")
logging.info(f"Output will be saved to: {output_path}")
if not os.path.exists(pdf_path):
logging.error(f"PDF file not found at {pdf_path}")
print(f"Error: PDF file not found at {pdf_path}")
return
try:
start_time = time.time()
# Convert PDF to markdown, saving each page as we go
await convert_pdf_to_markdown(pdf_path, output_path)
duration = time.time() - start_time
print(f"\nConversion complete in {duration:.2f} seconds! Markdown file saved to: {output_path}")
except Exception as e:
logging.error(f"Error during conversion: {str(e)}")
print(f"Error during conversion: {str(e)}")
def main():
asyncio.run(main_async())
if __name__ == "__main__":
main()