Source code for deepdataspace.task

"""
deepdataspace.task

The celery task module.
"""

import logging
import os
import traceback
from logging import StreamHandler

from celery.signals import after_setup_logger
from celery.signals import worker_ready

from deepdataspace import environs
from deepdataspace.task.celery import app
from deepdataspace.task.ping import ping

logger = logging.getLogger("celery")


[docs]@after_setup_logger.connect def logger_setup_handler(logger, **kwargs): if environs.VERBOSE: handler = StreamHandler() handler.setLevel(logging.INFO) fmt = "%(asctime)s %(levelname)s [%(name)s] %(message)s" formatter = logging.Formatter(fmt) handler.setFormatter(formatter) logger.addHandler(handler)
@app.task def import_and_process_dataset(dataset_dir: str, enforce: bool = False, auto_triggered: bool = False): from deepdataspace.model.dataset import DataSet from deepdataspace.task.import_dataset import import_dataset from deepdataspace.task.process_dataset import process_dataset logger.info(f"import_and_process_dataset starts, dataset_dir={dataset_dir}, enforce={enforce}") result = import_dataset(dataset_dir, enforce) if isinstance(result, list): for dataset in result: process_dataset(dataset.path, enforce, auto_triggered=auto_triggered) elif isinstance(result, DataSet): dataset = result process_dataset(dataset.path, enforce, auto_triggered=auto_triggered) return result @app.task def import_and_process_data_dir(data_dir: str, enforce: bool = False, auto_triggered: bool = False): """ Import and process all datasets inside directory. :param data_dir: the containing directory of all datasets. :param enforce: import and process all datasets, even if they are imported/processed before. :param auto_triggered: is this function called automatically on program start up? """ logger.info(f"import_and_check_data_dir starts, data_dir={data_dir}") data_dir = os.path.abspath(data_dir) for item in os.listdir(data_dir): target_path = os.path.join(data_dir, item) try: import_and_process_dataset(target_path, enforce, auto_triggered) except Exception as err: tb_list = traceback.format_tb(err.__traceback__) errors = "".join(tb_list) logger.error(errors) logger.error(str(err))
[docs]@worker_ready.connect def startup_jobs(sender=None, headers=None, body=None, **kwargs): # import all datasets data_dir = environs.DATASET_DIR import_and_process_data_dir.apply_async(args=(data_dir, False, True))