Skip to content

Commit 523c0a9

Browse files
authored
Merge branch 'main' into 599-feat-add-logic-to-download-results
2 parents 0339135 + eccc7b2 commit 523c0a9

File tree

7 files changed

+104
-179
lines changed

7 files changed

+104
-179
lines changed

cl/api/templates/recap-api-docs-vlatest.html

+1-1
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ <h4 id="dockets">Purchasing Dockets</h4>
229229
<h2 id="recap-upload">RECAP Upload API <small><code>{% url "processingqueue-list" version=version %}</code></small></h2>
230230
<p>This API is used by the RECAP extension and a handful of special partners to upload PACER content to the RECAP Archive. This API is not available to the public. If you have a collection of PACER data you wish to donate to the RECAP Archive so it is permanently available to the public, please <a href="{% url "contact" %}">get in touch</a>.
231231
</p>
232-
<p>We describe the process for completing these uploads below, and you can see examples of them in <a href="https://github.com/freelawproject/courtlistener/blob/main/cl/recap/tests.py">CourtListener's automated test suite</a>. Uploads to these endpoints should be done using HTTP <code>POST</code> requests and multipart form data.
232+
<p>We describe the process for completing these uploads below, and you can see examples of them in <a href="https://github.com/freelawproject/courtlistener/blob/main/cl/recap/tests/tests.py">CourtListener's automated test suite</a>. Uploads to these endpoints should be done using HTTP <code>POST</code> requests and multipart form data.
233233
</p>
234234
<p>When you make an upload, you create a <code>Processing Queue</code> object in the CourtListener system. This object will be returned in the HTTP response to your upload, so you will know its ID. This object will contain the fields you uploaded, and the following fields will be populated as the item is processed:
235235
</p>

cl/corpus_importer/signals.py

+19-11
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,19 @@ def update_latest_case_id_and_schedule_iquery_sweep(docket: Docket) -> None:
5252
tasks_to_schedule = (
5353
incoming_pacer_case_id - iquery_pacer_case_id_current
5454
)
55+
logger.info(
56+
"Found %s tasks to schedule for pacer case IDs ranging from %s to %s.",
57+
tasks_to_schedule,
58+
iquery_pacer_case_id_current,
59+
incoming_pacer_case_id,
60+
)
5561
if tasks_to_schedule > 10_800:
56-
# Considering a Celery countdown of 1 second and a visibility_timeout
57-
# of 6 hours, the maximum countdown time should be set to 21,600 to
58-
# avoid a celery runaway. It's safer to abort if more than 10,800
59-
# tasks are attempted to be scheduled. This could indicate an issue
60-
# with retrieving the highest_known_pacer_case_id or a loss of the
62+
# Considering a Celery countdown of 1 second applied via
63+
# throttle_task and a visibility_timeout of 6 hours, the maximum
64+
# countdown time should be set to 21,600 to avoid a celery runaway.
65+
# It's safer to abort if more than 10,800 tasks are attempted to be
66+
# scheduled. This could indicate an issue with retrieving the
67+
# highest_known_pacer_case_id or a loss of the
6168
# iquery_pacer_case_id_current for the court in Redis.
6269
logger.error(
6370
"Tried to schedule more than 10,800 iquery pages to scrape for "
@@ -66,20 +73,21 @@ def update_latest_case_id_and_schedule_iquery_sweep(docket: Docket) -> None:
6673
)
6774
release_redis_lock(r, update_lock_key, lock_value)
6875
return None
69-
task_scheduled_countdown = 0
76+
task_to_schedule_count = 0
7077
while iquery_pacer_case_id_current + 1 < incoming_pacer_case_id:
7178
iquery_pacer_case_id_current += 1
72-
task_scheduled_countdown += 1
73-
# Schedule the next task with a 1-second countdown increment
79+
task_to_schedule_count += 1
80+
# Schedule the next task.
7481
make_docket_by_iquery_sweep.apply_async(
7582
args=(court_id, iquery_pacer_case_id_current),
7683
kwargs={"skip_iquery_sweep": True},
77-
countdown=task_scheduled_countdown,
7884
queue=settings.CELERY_IQUERY_QUEUE,
7985
)
8086
logger.info(
81-
f"Enqueued iquery docket case ID: {iquery_pacer_case_id_current} "
82-
f"for court {court_id} with countdown {task_scheduled_countdown}"
87+
"Enqueued %s iquery docket with case ID: %s for court %s",
88+
task_to_schedule_count,
89+
iquery_pacer_case_id_current,
90+
court_id,
8391
)
8492

8593
# Update the iquery_pacer_case_id_current in Redis

cl/corpus_importer/tasks.py

-41
Original file line numberDiff line numberDiff line change
@@ -1964,47 +1964,6 @@ def create_attachment_pq(
19641964
return pq
19651965

19661966

1967-
# TODO: Remove after the new related methods have been rolled out.
1968-
@app.task(bind=True, ignore_result=True)
1969-
def make_attachment_pq_object(
1970-
self: Task,
1971-
attachment_report: AttachmentPage,
1972-
rd_pk: int,
1973-
user_pk: int,
1974-
att_report_text: str | None = None,
1975-
) -> int:
1976-
"""Create an item in the processing queue for an attachment page.
1977-
1978-
This is a helper shim to convert attachment page results into processing
1979-
queue objects that can be processed by our standard pipeline.
1980-
1981-
:param self: The celery task
1982-
:param attachment_report: An AttachmentPage object that's already queried
1983-
a page and populated its data attribute.
1984-
:param rd_pk: The RECAP document that the attachment page is associated
1985-
with
1986-
:param user_pk: The user to associate with the ProcessingQueue object when
1987-
it's created.
1988-
:param att_report_text: The attachment page report text if we got it from a
1989-
notification free look link.
1990-
:return: The pk of the ProcessingQueue object that's created.
1991-
"""
1992-
rd = RECAPDocument.objects.get(pk=rd_pk)
1993-
user = User.objects.get(pk=user_pk)
1994-
pq = ProcessingQueue(
1995-
court_id=rd.docket_entry.docket.court_id,
1996-
uploader=user,
1997-
upload_type=UPLOAD_TYPE.ATTACHMENT_PAGE,
1998-
pacer_case_id=rd.docket_entry.docket.pacer_case_id,
1999-
)
2000-
if att_report_text is None:
2001-
att_report_text = attachment_report.response.text
2002-
pq.filepath_local.save(
2003-
"attachment_page.html", ContentFile(att_report_text.encode())
2004-
)
2005-
return pq.pk
2006-
2007-
20081967
@app.task(bind=True, ignore_result=True)
20091968
def save_attachment_pq_object(
20101969
self: Task,

cl/lib/model_helpers.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ def linkify_orig_docket_number(agency: str, og_docket_number: str) -> str:
516516
"""
517517
# Simple pattern for Federal Register citations
518518
fr_match = re.search(
519-
r"(\d{1,3})\s*(?:FR|Fed\.?\s*Reg\.?)\s*(\d{1,5}(?:,\d{3})*)",
519+
r"(\d{1,3})\s*(?:FR|Fed\.?\s*Reg\.?)\s*(\d{1,6}(?:,\d{3})*)",
520520
og_docket_number,
521521
)
522522

cl/lib/tests.py

+10
Original file line numberDiff line numberDiff line change
@@ -1394,6 +1394,16 @@ def test_linkify_orig_docket_number(self):
13941394
"89 Fed. Reg. 34,620",
13951395
"https://www.federalregister.gov/citation/89-FR-34,620",
13961396
),
1397+
(
1398+
"Animal and Plant Health Inspection Service",
1399+
"89 FR 106981",
1400+
"https://www.federalregister.gov/citation/89-FR-106981",
1401+
),
1402+
(
1403+
"Animal and Plant Health Inspection Service",
1404+
"89 Fed. Reg. 106,981",
1405+
"https://www.federalregister.gov/citation/89-FR-106,981",
1406+
),
13971407
(
13981408
"Environmental Protection Agency",
13991409
"EPA-HQ-OW-2020-0005",

cl/search/management/commands/fix_rd_broken_links.py

+73-25
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ def __init__(self, *args, **kwargs):
132132
self.chunk_size = None
133133
self.interval = None
134134
self.ids = None
135+
self.docket_batch_size = None
135136

136137
def add_arguments(self, parser):
137138
parser.add_argument(
@@ -146,6 +147,12 @@ def add_arguments(self, parser):
146147
default="100",
147148
help="The number of items to index in a single celery task.",
148149
)
150+
parser.add_argument(
151+
"--docket-batch-size",
152+
type=int,
153+
default="2000",
154+
help="The number of docket_ids to process per batch.",
155+
)
149156
parser.add_argument(
150157
"--auto-resume",
151158
action="store_true",
@@ -180,30 +187,24 @@ def add_arguments(self, parser):
180187
required=False,
181188
)
182189

183-
def get_and_fix_rds(self, cut_off_date: datetime) -> int:
184-
"""Get the dockets with broken RECAPDocument links and fix their RDs by
185-
re-indexing.
186-
187-
:param cut_off_date: The cutoff date to filter docket events.
188-
:return: The number of dockets affected.
190+
def _process_docket_id_batch(
191+
self, batch_number: int, batch_ids: list[int]
192+
) -> int:
193+
"""Process a batch of docket IDs with broken RECAPDocument links and
194+
fix their RDs by re-indexing.
195+
:param batch_ids: A batch list of docket IDs.
196+
:return: The number of RECAP documents processed for this batch.
189197
"""
190198

191-
chunk = []
192199
affected_rds = 0
193-
docket_ids_to_fix_queryset = get_dockets_to_fix(
194-
cut_off_date, self.pk_offset, self.ids
195-
)
200+
chunk = []
196201
rd_queryset = (
197-
RECAPDocument.objects.filter(
198-
docket_entry__docket_id__in=Subquery(
199-
docket_ids_to_fix_queryset
200-
)
201-
)
202+
RECAPDocument.objects.filter(docket_entry__docket_id__in=batch_ids)
202203
.order_by("pk")
203204
.values_list("pk", flat=True)
204205
)
205206
logger.info(
206-
"Getting the count of recap documents that need to be fixed."
207+
"Getting the count of recap documents that need to be fixed in this batch."
207208
)
208209
count = rd_queryset.count()
209210
for rd_id_to_fix in rd_queryset.iterator():
@@ -226,21 +227,67 @@ def get_and_fix_rds(self, cut_off_date: datetime) -> int:
226227

227228
chunk = []
228229
logger.info(
229-
"Processed %d/%d (%.0f%%), last PK fixed: %s",
230+
"Processed RDs in batch %s: %d/%d (%.0f%%), last PK fixed: %s",
231+
batch_number,
230232
affected_rds,
231233
count,
232234
(affected_rds * 100.0) / count,
233235
rd_id_to_fix,
234236
)
235237

236-
if not affected_rds % 1000:
237-
# Log every 1000 documents processed.
238-
log_last_document_indexed(
239-
rd_id_to_fix, compose_redis_key()
240-
)
241-
242238
return count
243239

240+
def get_and_fix_rds(self, cut_off_date: datetime) -> int:
241+
"""Get the dockets with broken RECAPDocument links and process them in
242+
batches of docket IDs.
243+
244+
:param cut_off_date: The cutoff date to filter docket events.
245+
:return: The total number of RECAP documents affected.
246+
"""
247+
248+
all_affected_rds = 0
249+
docket_ids_queryset = get_dockets_to_fix(
250+
cut_off_date, self.pk_offset, self.ids
251+
)
252+
253+
batch_ids = []
254+
batch_number = 0
255+
affected_dockets = 0
256+
for docket_id in docket_ids_queryset.iterator(
257+
chunk_size=self.docket_batch_size
258+
):
259+
batch_ids.append(docket_id)
260+
affected_dockets += 1
261+
if len(batch_ids) >= self.docket_batch_size:
262+
batch_number += 1
263+
logger.info(
264+
"Processing docket_ids batch %d with %d IDs",
265+
batch_number,
266+
len(batch_ids),
267+
)
268+
all_affected_rds += self._process_docket_id_batch(
269+
batch_number, batch_ids
270+
)
271+
batch_ids = []
272+
273+
if not affected_dockets % 1000:
274+
# Log every 1000 documents processed.
275+
log_last_document_indexed(docket_id, compose_redis_key())
276+
277+
# Process any remaining docket_ids in the final chunk.
278+
if batch_ids:
279+
batch_number += 1
280+
logger.info(
281+
"Processing final docket_ids batch %d with %d IDs",
282+
batch_number,
283+
len(batch_ids),
284+
)
285+
all_affected_rds += self._process_docket_id_batch(
286+
batch_number, batch_ids
287+
)
288+
289+
return all_affected_rds
290+
244291
def handle(self, *args, **options):
245292
super().handle(*args, **options)
246293
self.options = options
@@ -250,6 +297,7 @@ def handle(self, *args, **options):
250297
self.throttle = CeleryThrottle(queue_name=self.queue)
251298
self.interval = self.options["interval"]
252299
self.ids = options.get("ids")
300+
self.docket_batch_size = options.get("docket_batch_size")
253301
auto_resume = options["auto_resume"]
254302
if auto_resume:
255303
self.pk_offset = get_last_parent_document_id_processed(
@@ -259,9 +307,9 @@ def handle(self, *args, **options):
259307
f"Auto-resume enabled starting indexing from ID: {self.pk_offset}."
260308
)
261309
start_date: datetime = options["start_date"]
262-
affected_rds = self.get_and_fix_rds(start_date)
310+
affected_dockets = self.get_and_fix_rds(start_date)
263311
logger.info(
264312
"Successfully fixed %d items from pk %s.",
265-
affected_rds,
313+
affected_dockets,
266314
self.pk_offset,
267315
)

cl/search/tasks.py

-100
Original file line numberDiff line numberDiff line change
@@ -1188,106 +1188,6 @@ def index_parent_or_child_docs_in_es(
11881188
parent_es_document._index.refresh()
11891189

11901190

1191-
# TODO: Remove after the new task is rolled out and all scheduled tasks have been processed.
1192-
@app.task(
1193-
bind=True,
1194-
autoretry_for=(ConnectionError,),
1195-
max_retries=3,
1196-
interval_start=5,
1197-
ignore_result=True,
1198-
)
1199-
def index_parent_or_child_docs(
1200-
self: Task,
1201-
instance_ids: list[int],
1202-
search_type: str,
1203-
document_type: str | None,
1204-
testing_mode: bool = False,
1205-
) -> None:
1206-
"""Index parent or child documents in Elasticsearch.
1207-
1208-
:param self: The Celery task instance
1209-
:param instance_ids: The parent instance IDs to index.
1210-
:param search_type: The Search Type to index parent and child docs.
1211-
:param document_type: The document type to index, 'parent' or 'child' documents
1212-
:param testing_mode: Set to True to enable streaming bulk, which is used in
1213-
TestCase-based tests because parallel_bulk is incompatible with them.
1214-
https://github.com/freelawproject/courtlistener/pull/3324#issue-1970675619
1215-
Default is False.
1216-
:return: None
1217-
"""
1218-
1219-
parent_instances = QuerySet()
1220-
child_instances = QuerySet()
1221-
use_streaming_bulk = True if testing_mode else False
1222-
match search_type:
1223-
case SEARCH_TYPES.RECAP:
1224-
parent_es_document = DocketDocument
1225-
child_es_document = ESRECAPDocument
1226-
child_id_property = "RECAP"
1227-
if document_type == "parent":
1228-
parent_instances = Docket.objects.filter(pk__in=instance_ids)
1229-
elif document_type == "child":
1230-
child_instances = RECAPDocument.objects.filter(
1231-
pk__in=instance_ids
1232-
)
1233-
case SEARCH_TYPES.OPINION:
1234-
parent_es_document = OpinionClusterDocument
1235-
child_es_document = OpinionDocument
1236-
child_id_property = "OPINION"
1237-
if document_type == "parent":
1238-
parent_instances = OpinionCluster.objects.filter(
1239-
pk__in=instance_ids
1240-
)
1241-
elif document_type == "child":
1242-
child_instances = Opinion.objects.filter(pk__in=instance_ids)
1243-
case SEARCH_TYPES.ORAL_ARGUMENT:
1244-
parent_es_document = AudioDocument
1245-
if document_type == "parent":
1246-
parent_instances = Audio.objects.filter(pk__in=instance_ids)
1247-
case _:
1248-
return
1249-
1250-
base_doc = {
1251-
"_op_type": "index",
1252-
"_index": parent_es_document._index._name,
1253-
}
1254-
if document_type == "child":
1255-
# Then index only child documents in bulk.
1256-
failed_docs = index_documents_in_bulk_from_queryset(
1257-
child_instances,
1258-
child_es_document,
1259-
base_doc,
1260-
child_id_property=child_id_property,
1261-
use_streaming_bulk=use_streaming_bulk,
1262-
)
1263-
1264-
if failed_docs:
1265-
model_label = child_es_document.Django.model.__name__.capitalize()
1266-
logger.error(
1267-
f"Error indexing documents from {model_label}, "
1268-
f"Failed Doc IDs are: {failed_docs}"
1269-
)
1270-
1271-
if document_type == "parent":
1272-
# Index only parent documents.
1273-
failed_docs = index_documents_in_bulk_from_queryset(
1274-
parent_instances,
1275-
parent_es_document,
1276-
base_doc,
1277-
use_streaming_bulk=use_streaming_bulk,
1278-
)
1279-
if failed_docs:
1280-
model_label = parent_es_document.Django.model.__name__.capitalize()
1281-
logger.error(
1282-
f"Error indexing documents from {model_label}, "
1283-
f"Failed Doc IDs are: {failed_docs}"
1284-
)
1285-
1286-
if settings.ELASTICSEARCH_DSL_AUTO_REFRESH:
1287-
# Set auto-refresh, used for testing.
1288-
parent_es_document._index.refresh()
1289-
1290-
12911191
@app.task(
12921192
bind=True,
12931193
autoretry_for=(ConnectionError, ConflictError, ConnectionTimeout),

0 commit comments

Comments
 (0)