Skip to content

Commit

Permalink
Add mgmt command to clean completed items from queue (PASTA adapter)
Browse files Browse the repository at this point in the history
  • Loading branch information
rogerdahl committed Mar 8, 2019
1 parent 8102be4 commit d8ffcf8
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# -*- coding: utf-8 -*-

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Clean completed items from the population queue.
All but the latest version of a package scope/identifier can be deleted. The very latest
version must be preserved in order for the adapter to know which object to update.
"""
import argparse
import logging

import django.core.management.base
import django.db

import pasta_gmn_adapter.app.management.commands._util as util


class Command(django.core.management.base.BaseCommand):
def _init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def add_arguments(self, parser):
parser.description = __doc__
parser.formatter_class = argparse.RawDescriptionHelpFormatter
parser.add_argument("--debug", action="store_true", help="Debug level logging")

def handle(self, *args, **options):
util.log_setup(options["debug"])
util.exit_if_other_instance_is_running(__name__)
logging.info("Running management command: {}".format(__name__))

for (
package_scope,
package_identifier,
latest_revision,
) in select_all_latest_revisions():
print("{}.{}.{}".format(package_scope, package_identifier, latest_revision))
deleted_package_count = delete_all_but_latest_revision(
package_scope, package_identifier, latest_revision
)
print("Deleted earlier revisions: {}".format(deleted_package_count))


def select_all_latest_revisions():
"""Select the package scope, identifier and latest revision for each package
"""
cursor = django.db.connection.cursor()
cursor.execute(
"""
select package_scope, package_identifier, max(package_revision) latest_revision
from adapter_population_queue q
join adapter_population_queue_package_scope s on s.id = q.package_scope_id
group by package_scope, package_identifier
order by package_scope, package_identifier
;
"""
)
return cursor


def delete_all_but_latest_revision(package_scope, package_identifier, latest_revision):
cursor = django.db.connection.cursor()
cursor.execute(
"""
delete from adapter_population_queue q
where package_scope_id = (
select id from adapter_population_queue_package_scope where package_scope = %s
)
and package_identifier = %s
and package_revision < %s
;
""",
[package_scope, package_identifier, latest_revision],
)
return cursor.rowcount
8 changes: 4 additions & 4 deletions lter_pasta/src/pasta_gmn_adapter/pasta_gmn_adapter.sql
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,16 @@ ALTER TABLE ONLY adapter_process_status_status
ADD CONSTRAINT adapter_process_status_status_status_key UNIQUE (status);

ALTER TABLE ONLY adapter_population_queue
ADD CONSTRAINT adapter_population_queue_package_scope_id_fkey FOREIGN KEY (package_scope_id) REFERENCES adapter_population_queue_package_scope(id) DEFERRABLE INITIALLY DEFERRED;
ADD CONSTRAINT adapter_population_queue_package_scope_id_fkey FOREIGN KEY (package_scope_id) REFERENCES adapter_population_queue_package_scope(id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED;

ALTER TABLE ONLY adapter_process_status
ADD CONSTRAINT adapter_process_status_return_body_id_fkey FOREIGN KEY (return_body_id) REFERENCES adapter_process_status_return_body(id) DEFERRABLE INITIALLY DEFERRED;
ADD CONSTRAINT adapter_process_status_return_body_id_fkey FOREIGN KEY (return_body_id) REFERENCES adapter_process_status_return_body(id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED;

ALTER TABLE ONLY adapter_process_status
ADD CONSTRAINT adapter_process_status_population_queue_item_id_fkey FOREIGN KEY (population_queue_item_id) REFERENCES adapter_population_queue(id) DEFERRABLE INITIALLY DEFERRED;
ADD CONSTRAINT adapter_process_status_population_queue_item_id_fkey FOREIGN KEY (population_queue_item_id) REFERENCES adapter_population_queue(id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED;

ALTER TABLE ONLY adapter_process_status
ADD CONSTRAINT adapter_process_status_status_id_fkey FOREIGN KEY (status_id) REFERENCES adapter_process_status_status(id) DEFERRABLE INITIALLY DEFERRED;
ADD CONSTRAINT adapter_process_status_status_id_fkey FOREIGN KEY (status_id) REFERENCES adapter_process_status_status(id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED;

-- Indexes

Expand Down

0 comments on commit d8ffcf8

Please sign in to comment.