Skip to content

Commit

Permalink
Merge pull request #484 from GovDataOfficial/fix-clearsourcehistory-c…
Browse files Browse the repository at this point in the history
…ommand-keep-latest-objects

Add option keep-current to clearsource_history command
  • Loading branch information
amercader authored Jan 24, 2022
2 parents d84d847 + d2b7340 commit b702bf9
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 30 deletions.
4 changes: 3 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,16 @@ The following operations can be run from the command line as described underneat
- clears all datasets, jobs and objects related to a harvest source,
but keeps the source itself

harvester clearsource-history [{source-id}]
harvester clearsource-history [{source-id}] [-k]
- If no source id is given the history for all harvest sources (maximum is 1000)
will be cleared.
Clears all jobs and objects related to a harvest source, but keeps the source
itself. The datasets imported from the harvest source will **NOT** be deleted!!!
If a source id is given, it only clears the history of the harvest source with
the given source id.

To keep the currently active jobs use the -k option.

harvester sources [all]
- lists harvest sources
If 'all' is defined, it also shows the Inactive sources
Expand Down
9 changes: 7 additions & 2 deletions ckanext/harvest/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,13 @@ def clear(ctx, id):

@source.command()
@click.argument(u"id", metavar=u"SOURCE_ID_OR_NAME", required=False)
@click.option(
"-k",
"--keep-current",
default=False
)
@click.pass_context
def clear_history(ctx, id):
def clear_history(ctx, id, keep_current):
"""If no source id is given the history for all harvest sources
(maximum is 1000) will be cleared.
Expand All @@ -122,7 +127,7 @@ def clear_history(ctx, id):
flask_app = ctx.meta["flask_app"]

with flask_app.test_request_context():
result = utils.clear_harvest_source_history(id)
result = utils.clear_harvest_source_history(id, bool(keep_current))
click.secho(result, fg="green")


Expand Down
15 changes: 13 additions & 2 deletions ckanext/harvest/commands/harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ class Harvester(CkanCommand):
- clears all datasets, jobs and objects related to a harvest source,
but keeps the source itself
harvester clearsource_history [{source-id}]
harvester clearsource_history [{source-id}] [-k]
- If no source id is given the history for all harvest sources (maximum is 1000) will be cleared.
Clears all jobs and objects related to a harvest source, but keeps the source itself.
The datasets imported from the harvest source will NOT be deleted!!!
If a source id is given, it only clears the history of the harvest source with the given source id.
To keep the currently active jobs use the -k option.
harvester sources [all]
- lists harvest sources
If 'all' is defined, it also shows the Inactive sources
Expand Down Expand Up @@ -190,6 +192,14 @@ def __init__(self, name):
will be aborted. You can use comma as a separator to provide multiple source_id's""",
)

self.parser.add_option(
"-k",
"--keep-current",
dest="keep_current",
default=False,
help="Do not delete relevant harvest objects",
)

def command(self):
self._load_config()

Expand Down Expand Up @@ -316,11 +326,12 @@ def create_harvest_source(self):
print(result)

def clear_harvest_source_history(self):
keep_current = bool(self.options.keep_current)
source_id = None
if len(self.args) >= 2:
source_id = six.text_type(self.args[1])

print(utils.clear_harvest_source_history(source_id))
print(utils.clear_harvest_source_history(source_id, keep_current))

def show_harvest_source(self):

Expand Down
62 changes: 50 additions & 12 deletions ckanext/harvest/logic/action/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,14 +322,17 @@ def harvest_sources_job_history_clear(context, data_dict):
'''
check_access('harvest_sources_clear', context, data_dict)

keep_current = data_dict.get('keep_current', False)

job_history_clear_results = []
# We assume that the maximum of 1000 (hard limit) rows should be enough
result = logic.get_action('package_search')(context, {'fq': '+dataset_type:harvest', 'rows': 1000})
harvest_packages = result['results']
if harvest_packages:
for data_dict in harvest_packages:
try:
clear_result = get_action('harvest_source_job_history_clear')(context, {'id': data_dict['id']})
clear_result = get_action('harvest_source_job_history_clear')(
context, {'id': data_dict['id'], 'keep_current': keep_current})
job_history_clear_results.append(clear_result)
except NotFound:
# Ignoring not existent harvest sources because of a possibly corrupt search index
Expand All @@ -352,6 +355,7 @@ def harvest_source_job_history_clear(context, data_dict):
check_access('harvest_source_clear', context, data_dict)

harvest_source_id = data_dict.get('id', None)
keep_current = data_dict.get('keep_current', False)

source = HarvestSource.get(harvest_source_id)
if not source:
Expand All @@ -362,17 +366,51 @@ def harvest_source_job_history_clear(context, data_dict):

model = context['model']

sql = '''begin;
delete from harvest_object_error where harvest_object_id
in (select id from harvest_object where harvest_source_id = '{harvest_source_id}');
delete from harvest_object_extra where harvest_object_id
in (select id from harvest_object where harvest_source_id = '{harvest_source_id}');
delete from harvest_object where harvest_source_id = '{harvest_source_id}';
delete from harvest_gather_error where harvest_job_id
in (select id from harvest_job where source_id = '{harvest_source_id}');
delete from harvest_job where source_id = '{harvest_source_id}';
commit;
'''.format(harvest_source_id=harvest_source_id)
if keep_current:
sql = '''BEGIN;
DELETE FROM harvest_object_error WHERE harvest_object_id
IN (SELECT id FROM harvest_object AS obj WHERE harvest_source_id = '{harvest_source_id}'
AND current != true
AND (NOT EXISTS (SELECT id FROM harvest_job WHERE id = obj.harvest_job_id
AND status = 'Running'))
AND (NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = obj.harvest_job_id
AND current = true))
);
DELETE FROM harvest_object_extra WHERE harvest_object_id
IN (SELECT id FROM harvest_object AS obj WHERE harvest_source_id = '{harvest_source_id}'
AND current != true
AND (NOT EXISTS (SELECT id FROM harvest_job WHERE id = obj.harvest_job_id
AND status = 'Running'))
AND (NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = obj.harvest_job_id
AND current = true))
);
DELETE FROM harvest_object AS obj WHERE harvest_source_id = '{harvest_source_id}'
AND current != true
AND (NOT EXISTS (SELECT id FROM harvest_job WHERE id = obj.harvest_job_id
AND status = 'Running'))
AND (NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = obj.harvest_job_id
AND current = true));
DELETE FROM harvest_gather_error WHERE harvest_job_id
IN (SELECT id FROM harvest_job AS job WHERE source_id = '{harvest_source_id}'
AND job.status != 'Running'
AND NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = job.id));
DELETE FROM harvest_job AS job WHERE source_id = '{harvest_source_id}'
AND job.status != 'Running'
AND NOT EXISTS (SELECT id FROM harvest_object WHERE harvest_job_id = job.id);
COMMIT;
'''.format(harvest_source_id=harvest_source_id)
else:
sql = '''BEGIN;
DELETE FROM harvest_object_error WHERE harvest_object_id
IN (SELECT id FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}');
DELETE FROM harvest_object_extra WHERE harvest_object_id
IN (SELECT id FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}');
DELETE FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}';
DELETE FROM harvest_gather_error WHERE harvest_job_id
IN (SELECT id FROM harvest_job WHERE source_id = '{harvest_source_id}');
DELETE FROM harvest_job WHERE source_id = '{harvest_source_id}';
COMMIT;
'''.format(harvest_source_id=harvest_source_id)

model.Session.execute(sql)

Expand Down
Loading

0 comments on commit b702bf9

Please sign in to comment.