"""
Import the coco2017 dataset and save metadata into mongodb.
"""
import json
import logging
import os
from typing import Dict
from typing import List
from typing import Tuple
import traceback
from deepdataspace.constants import DatasetFileType
from deepdataspace.constants import DatasetType
from deepdataspace.constants import LabelName
from deepdataspace.constants import LabelType
from deepdataspace.io.importer import FileImporter
from deepdataspace.utils.import_utils import import_from_path
logger = logging.getLogger("plugins.coco.importer")
[docs]class COCO2017Importer(FileImporter):
"""
Importer for coco2017 dataset.
"""
def __init__(self, meta_path: str, enforce: bool = False):
"""
:param meta_path: path to meta file of coco2017 dataset, in a format of python script.
:param enforce: if True, the importer will re-import the dataset even if it is already imported.
"""
self.meta_path = os.path.abspath(meta_path)
info = self.parse_meta(meta_path)
dataset_name = info["dataset_name"]
self.ground_truth = info["ground_truth"]
self.image_root = info["image_root"]
self.predictions = info["predictions"]
super(COCO2017Importer, self).__init__(meta_path, dataset_name, enforce=enforce)
self.dataset.type = DatasetType.COCO2017
self._images = {} # {image_id: image}
self._categories = {} # {category_id: category}
self._annotations = {} # {image_id: [annotation...]}
@staticmethod
def _parse_meta(meta_path: str):
meta_path = os.path.abspath(meta_path)
dir_path = os.path.dirname(meta_path)
module = import_from_path(meta_path)
if not getattr(module, "is_coco_meta", False):
return None
dataset_name = getattr(module, "dataset_name")
assert isinstance(dataset_name, str)
ground_truth = getattr(module, "ground_truth")
ground_truth = os.path.abspath(os.path.join(dir_path, ground_truth))
assert os.path.isfile(ground_truth) and os.path.exists(ground_truth)
assert ground_truth.endswith(".json")
predictions = getattr(module, "predictions", [])
assert isinstance(predictions, list)
for prediction in predictions:
pred_name = prediction["name"]
assert isinstance(pred_name, str)
pred_file = prediction["file"]
pred_file = os.path.abspath(os.path.join(dir_path, pred_file))
assert os.path.isfile(pred_file) and os.path.exists(pred_file)
assert pred_file.endswith(".json")
prediction["file"] = pred_file
image_root = getattr(module, "image_root", None)
if image_root is not None:
image_root = os.path.abspath(os.path.join(dir_path, image_root))
assert os.path.isdir(image_root) and os.path.exists(image_root)
info = {
"dataset_name" : dataset_name,
"ground_truth" : ground_truth,
"predictions" : predictions,
"image_root" : image_root,
"dynamic_caption" : getattr(module, "dynamic_caption", False),
"caption_generator": getattr(module, "caption_generator", None),
}
return info
[docs] def load_ground_truth(self):
with open(self.ground_truth, "r", encoding="utf8") as fp:
coco_data = json.load(fp)
images = coco_data["images"]
self._images = {i["id"]: i for i in images}
categories = coco_data.get("categories", [])
self._categories = {c["id"]: c for c in categories}
annotations = coco_data["annotations"]
for annotation in annotations:
annotation["label_name"] = LabelName.GroundTruth
annotation["label_type"] = LabelType.GroundTruth
image_id = annotation["image_id"]
anno_list = self._annotations.setdefault(image_id, [])
anno_list.append(annotation)
[docs] def load_predictions(self):
for file_tag, file_path in self.dataset.files.items():
if not file_tag.startswith("PRED/"):
continue
pred_name = file_tag.split("/", 1)[-1]
with open(file_path, "r", encoding="utf8") as fp:
pred_data = json.load(fp)
annotations = pred_data.get("annotations", [])
for annotation in annotations:
annotation["label_name"] = pred_name
annotation["label_type"] = LabelType.Prediction
image_id = annotation["image_id"]
anno_list = self._annotations.setdefault(image_id, [])
anno_list.append(annotation)
[docs] def pre_run(self):
super(COCO2017Importer, self).pre_run()
self.load_ground_truth()
self.load_predictions()
self.format_data()
def __iter__(self) -> Tuple[Dict, List[Dict]]:
for coco_image_data in self._images:
# data_sample = {"license" : 2,
# "file_name" : "000000000139.jpg",
# "coco_url" : "http://images.cocodataset.org/val2017/000000000139.jpg",
# "height" : 426,
# "width" : 640,
# "date_captured": "2013-11-21 01:34:01",
# "flickr_url" : "http://farm9.staticflickr.com/8035/8024364858_9c41dc1666_z.jpg",
# "id" : 139}
image_id = coco_image_data["id"]
coco_anno_list = self._annotations.get(image_id, [])
# list_sample = [
# {
# 'segmentation' : [
# [x1, y1, x2, y2 ...],
# ],
# 'area' : 531.8071000000001,
# 'iscrowd' : 0,
# 'image_id' : 139,
# 'bbox' : [x, y, w, h],
# 'category_id' : 64,
# 'keypoints' : [x1, y1, v1, conf1, x2, y2, v2, conf2, ...],
# 'caption' : 'A giraffe eating food from the top of a tree.',
# 'id' : 26547,
# },
# ...
# ]
# prepare image uri
uri = None
# trying to find the image file in local file system
if self.image_root and coco_image_data.get("file_name", None):
image_path = coco_image_data.get("file_name", None)
image_path = os.path.join(self.image_root, image_path)
uri = f"file://{image_path}"
# trying to find the image file in the original coco image urls
if uri is None:
uri = coco_image_data.get("coco_url", None)
if uri is None:
logger.warning(f"Cannot find image file for image {image_id}, skip it.")
continue
coco_image_data.pop("coco_url", None)
coco_image_data.pop("file_name", None)
# prepare other image data
width = coco_image_data.pop("width", None)
height = coco_image_data.pop("height", None)
image = self.format_image_data(uri,
width=width, height=height,
id_=image_id, metadata=coco_image_data,
)
anno_list = []
for anno_data in coco_anno_list:
# prepare label
label_name = anno_data.pop("label_name", LabelName.GroundTruth)
label_type = anno_data.pop("label_type", LabelType.GroundTruth)
# prepare category
category_id = anno_data.pop("category_id", None)
if category_id:
category = self._categories[category_id]
category_name = category["name"]
else:
category = {}
category_name = ""
# prepare bbox
bbox = anno_data.pop("bbox", None)
if bbox is not None:
bbox = tuple(bbox)
# prepare segmentation
segmentation = anno_data.pop("segmentation", None)
# prepare keypoints
keypoint_names = None
keypoint_skeleton = None
keypoint_colors = None
keypoints = []
raw_keypoints = anno_data.pop("keypoints", None)
if raw_keypoints is not None:
keypoint_names = category.get("keypoints", None)
keypoint_skeleton = category.get("skeleton", None)
if keypoint_skeleton is not None:
keypoint_skeleton = [item - 1 for sublist in keypoint_skeleton for item in sublist]
keypoint_colors = category.get("keypoint_colors", None)
if keypoint_colors is not None:
keypoint_colors = [item for sublist in keypoint_colors for item in sublist]
if label_type == LabelType.GroundTruth:
length = len(raw_keypoints) // 3
for idx in range(length):
idx *= 3
conf = 1.0
x, y, v = raw_keypoints[idx], raw_keypoints[idx + 1], raw_keypoints[idx + 2]
keypoints.extend([float(x), float(y), int(v), conf]) # x, y, v, conf
elif label_type == LabelType.Prediction:
length = len(raw_keypoints) // 4
for idx in range(length):
idx *= 4
x, y, v, conf = raw_keypoints[idx], raw_keypoints[idx + 1], raw_keypoints[idx + 2], \
raw_keypoints[idx + 3]
keypoints.extend([float(x), float(y), int(v), conf]) # x, y, v, conf
# prepare is_group
is_group = anno_data.pop("is_group", None)
# prepare confidence
conf = anno_data.pop("score", 1.0)
if label_type == LabelType.GroundTruth:
conf = 1.0
# prepare caption
caption = anno_data.pop("caption", None)
# finally, add the annotation
anno_data = self.format_annotation(category_name,
label_name,
conf=conf,
bbox=bbox,
is_group=is_group,
label_type=label_type,
segmentation=segmentation,
keypoints=keypoints,
keypoint_colors=keypoint_colors,
keypoint_skeleton=keypoint_skeleton,
keypoint_names=keypoint_names,
caption=caption
)
anno_list.append(anno_data)
yield image, anno_list
[docs] @staticmethod
def can_import(path: str):
if os.path.isdir(path):
return False
if not path.endswith(".py"):
return False
try:
assert COCO2017Importer.parse_meta(path) is not None
except:
return False
else:
return True
[docs] def collect_files(self) -> dict:
files = {LabelName.GroundTruth: self.ground_truth}
for pred in self.predictions:
pred_name = pred["name"]
pred_file = pred["file"]
files[f"{DatasetFileType.Prediction}/{pred_name}"] = pred_file
files[DatasetFileType.Meta] = self.meta_path
return files