"""
deepdataspace.model.label_task
The label project related models.
"""
import copy
import logging
import time
import uuid
from typing import ClassVar
from typing import Dict
from typing import List
from typing import Type
from typing import Union
from pydantic import BaseModel as _Base
from pydantic import Field
from pymongo.collection import Collection
from pymongo.typings import _DocumentType
from deepdataspace.constants import AnnotationType
from deepdataspace.constants import DatasetStatus
from deepdataspace.constants import ErrCode
from deepdataspace.constants import LabelImageQAActions
from deepdataspace.constants import LabelProjectQAActions
from deepdataspace.constants import LabelProjectRoles
from deepdataspace.constants import LabelProjectStatus
from deepdataspace.constants import LabelTaskImageStatus
from deepdataspace.constants import LabelTaskQAActions
from deepdataspace.constants import LabelTaskStatus
from deepdataspace.constants import LabelType
from deepdataspace.model._base import BaseModel
from deepdataspace.model.category import Category
from deepdataspace.model.dataset import DataSet
from deepdataspace.model.image import Image
from deepdataspace.model.image import ImageModel
from deepdataspace.model.label import Label
from deepdataspace.model.object import Object
from deepdataspace.model.user import User
from deepdataspace.utils.http import APIException
from deepdataspace.utils.string import get_str_md5
Num = Union[float, int]
logger = logging.getLogger("io.model.label_task")
[docs]def current_ts():
"""
Get the current timestamp in millisecond.
"""
return int(time.time() * 1000)
[docs]def gen_uuid():
"""
Generate a uuid.
"""
return uuid.uuid4().hex
[docs]class LabelProjectError(APIException):
"""
The label project related error.
"""
pass
[docs]class LabelTaskError(APIException):
"""
The label task related error.
"""
pass
[docs]class LabelProject(BaseModel):
"""
| The label project model.
| Each label project is associated with one or more datasets, and one project owner and several managers.
| The project will distribute the datasets to label tasks, which are labeled by labelers and reviewed by reviewers,
which are lead by label leaders and review leaders.
"""
[docs] @classmethod
def get_collection(cls, *args, **kwargs) -> Collection[_DocumentType]:
"""
Label projects are stored in the "label_projects" collection.
"""
return cls.db["label_projects"]
# the mandatory fields
id: str
name: str
datasets: List[dict] # [{id:"id", "name":"name"}]
created_ts: int
owner: dict # {id:"id", "name":"name"}
managers: List[Dict] # [{id:"id", "name":"name"}]
# the optional fields
description: str = ""
status: str = LabelProjectStatus.Waiting # waiting for a manager
batch_size: int = None # 0 means don't divide dataset to multiple tasks
label_times: int = None # each image needs to be labeled by label_times people
review_times: int = None # each image needs to be reviewed by review_times people
task_num_total: int = 0
task_num_waiting: int = 0
task_num_working: int = 0
task_num_reviewing: int = 0
task_num_rejected: int = 0
task_num_accepted: int = 0
categories: str = ""
pre_label: str = None # objects belong to pre_label will be imported as default labels
[docs] @classmethod
def create_project(cls,
name: str,
owner: User,
datasets: List[DataSet],
managers: List[User],
categories: List[str],
description: str = "",
pre_label: str = None,
) -> "LabelProject":
"""
Create a label project.
:param name: the project name.
:param owner: the project owner.
:param datasets: the project datasets, which cannot be empty list.
:param managers: the project managers, which cannot be empty list.
:param categories: the categories for classification and annotation task, which cannot be empty list.
:param description: the project description
:param pre_label: the pre label set to be imported as default labels
"""
# check data requirements
if not managers:
raise LabelProjectError(
ErrCode.CreateLabelProjectRequireManager,
ErrCode.CreateLabelProjectRequireManagerMsg,
400)
if not datasets:
raise LabelProjectError(
ErrCode.CreateLabelProjectRequireDataset,
ErrCode.CreateLabelProjectRequireDatasetMsg,
400)
if not categories:
raise LabelProjectError(
ErrCode.CreateLabelProjectRequireCategory,
ErrCode.CreateLabelProjectRequireCategoryMsg,
400
)
if not owner.is_staff:
raise LabelProjectError(
ErrCode.UserCantCreateLabelProject,
ErrCode.UserCantCreateLabelProjectMsg,
403)
# create the project
project_id = uuid.uuid4().hex
created_ts = int(time.time() * 1000)
owner = {"id": owner.id, "name": owner.name}
datasets = [{"id": d.id, "name": d.name} for d in datasets]
managers = [{"id": u.id, "name": u.name} for u in managers]
dataset_ids = list(d["id"] for d in datasets)
exist_project = LabelProject.find_one({"datasets.id": {"$in": dataset_ids}})
if exist_project is not None:
raise LabelProjectError(
ErrCode.CreateLabelProjectDatasetOccupied,
ErrCode.CreateLabelProjectDatasetOccupiedMsg,
400)
status = LabelProjectStatus.Waiting
project = LabelProject(id=project_id,
name=name,
datasets=datasets,
created_ts=created_ts,
owner=owner,
managers=managers,
description=description,
pre_label=pre_label,
status=status)
project._set_categories(categories)
project.save()
# set initial roles
ProjectRole.add_role(project, owner["id"], LabelProjectRoles.Owner)
ProjectRole.add_roles(project, [m["id"] for m in managers], LabelProjectRoles.Manager)
return project
def _set_categories(self, categories):
"""
strip, convert to lower case, deduplicate, sort, join to a string
"""
categories = sorted(set([c.strip().lower() for c in categories if c.strip()]))
self.categories = ",".join(categories)
[docs] def edit_project(self,
desc: str = None,
managers: List[User] = None,
):
"""
Edit project description and/or managers.
:param desc: the project description, if None, then it won't be updated.
:param managers: the project managers, if None, then it won't be updated. Otherwise, it cannot be an empty list.
"""
if desc is not None:
self.description = desc
if managers is not None:
if not managers: # empty list
raise LabelProjectError(
ErrCode.EditLabelProjectRequireManager,
ErrCode.EditLabelProjectRequireManagerMsg,
400)
old_managers = self.managers
old_ids = [m["id"] for m in old_managers]
new_ids = [m.id for m in managers]
self.managers = [{"id": m.id, "name": m.name} for m in managers]
del_manager_ids = list(set(old_ids) - set(new_ids))
new_manager_ids = list(set(new_ids) - set(old_ids))
ProjectRole.del_roles(self, del_manager_ids, LabelProjectRoles.Manager)
ProjectRole.add_roles(self, new_manager_ids, LabelProjectRoles.Manager)
self.save()
def _init_tasks(self):
"""
Generate label tasks for all datasets of this project.
Each task contains a batch of images of a dataset.
Images of a task are of the same dataset.
"""
task_idx = 0
task_num_total = 0
# for every dataset, distribute images to tasks
category_names = set()
for dataset in self.datasets:
dataset_id = dataset["id"]
IModel = Image(dataset_id)
LTIModel = LabelTaskImage(dataset_id)
num_images = IModel.count_num({})
includes = ["id", "url", "url_full_res", "objects"]
includes = {i: 1 for i in includes}
images = IModel.find_many({}, includes=includes, sort=[("idx", 1), ("id", 1)], to_dict=True)
# distribute all images to one task if bach_size is 0
if self.batch_size == 0:
num_tasks = 1
batch_size = num_images
else: # distribute images to tasks by batch_size
num_tasks, left = divmod(num_images, self.batch_size)
num_tasks += 1 if left > 0 else 0
batch_size = self.batch_size
task_num_total += num_tasks
tasks = []
for idx in range(num_tasks):
task = LabelTask(id=gen_uuid(),
idx=task_idx,
project_id=self.id,
dataset_id=dataset_id,
num_total=0,
created_ts=current_ts())
task.save()
task_idx += 1
tasks.append(task)
for idx, image in enumerate(images):
task = tasks[idx // batch_size]
task.num_total += 1
# add import pre label set
annotations = []
for obj in image["objects"]:
if self.pre_label is None or obj["label_name"] != self.pre_label:
continue
anno = {
"category_id" : obj["category_name"],
"category_name": obj["category_name"],
"bounding_box" : obj["bounding_box"]
}
annotations.append(anno)
category_names.add(obj["category_name"])
default_labels = UserLabelData(
user_id="_pre",
user_name="_pre",
annotations=annotations
)
lti = LTIModel(
id=f"{task.id}_{idx}",
idx=idx,
image_id=image["id"],
default_labels=default_labels,
task_id=task.id,
url=image["url"],
url_full_res=image["url_full_res"])
lti.batch_save()
LTIModel.finish_batch_save()
[task.batch_save() for task in tasks]
LabelTask.finish_batch_save()
self.task_num_total = task_num_total
self.task_num_waiting = task_num_total
# expand categories from pre-label data
categories = self.categories.split(",")
categories.extend(category_names)
categories = sorted(set(categories))
self.categories = ",".join(categories)
[docs] def init_project(self,
batch_size: int = None,
label_times: int = None,
review_times: int = None,
):
"""
Init project with configurations.
Each project can be inited only once.
:param batch_size: the number of images in a task, if 0, then all images of a dataset are in a task.
:param label_times: the number of labelers to label every image in a task.
:param review_times: the number of reviewers to review every label of every labeler of a task.
"""
if self.status != LabelProjectStatus.Waiting:
raise LabelProjectError(
ErrCode.InitLabelProjectMustBeWaiting,
ErrCode.InitLabelProjectMustBeWaitingMsg,
400
)
if batch_size is None or \
label_times is None or \
review_times is None:
raise LabelProjectError(
ErrCode.InitLabelProjectTaskConfigError,
ErrCode.InitLabelProjectTaskConfigErrorMsg,
400
)
self.status = LabelProjectStatus.Initializing
self.batch_size = batch_size
self.label_times = label_times
self.review_times = review_times
self.save()
self._init_tasks()
self.status = LabelProjectStatus.Working
self.save()
[docs] def update_subtask_counter(self):
"""
Update the number of tasks in different status.
This is done by mongodb aggregation.
"""
facet, project = {}, {}
for status in LabelTaskStatus.ALL_:
facet[status] = [{"$match": {"status": status, "project_id": self.id}}, {"$count": status}]
project[status] = {"$arrayElemAt": [f"${status}.{status}", 0]}
pipeline = [{"$facet": facet}, {"$project": project}]
counters = list(LabelTask.aggregate(pipeline))[0]
for status in LabelTaskStatus.ALL_:
if status == LabelTaskStatus.Waiting:
self.task_num_waiting = counters.get(status, 0)
elif status == LabelTaskStatus.Working:
self.task_num_working = counters.get(status, 0)
elif status == LabelTaskStatus.Reviewing:
self.task_num_reviewing = counters.get(status, 0)
elif status == LabelTaskStatus.Rejected:
self.task_num_rejected = counters.get(status, 0)
elif status == LabelTaskStatus.Accepted:
self.task_num_accepted = counters.get(status, 0)
if self.task_num_accepted == self.task_num_total:
self.status = LabelProjectStatus.Reviewing
self.save()
[docs] def accept_project(self):
"""
Accept the project, change the status from 'reviewing' to 'accepted'.
"""
if self.status != LabelProjectStatus.Reviewing:
raise LabelProjectError(
ErrCode.QALabelProjectMustBeReviewing,
ErrCode.QALabelProjectMustBeReviewingMsg,
400
)
self.status = LabelProjectStatus.Accepted
self.save()
[docs] def reject_project(self):
"""
Reject the project, change the status from 'reviewing' to 'rejected'.
"""
if self.status != LabelProjectStatus.Reviewing:
raise LabelProjectError(
ErrCode.QALabelProjectMustBeReviewing,
ErrCode.QALabelProjectMustBeReviewingMsg,
400)
self.status = LabelProjectStatus.Rejected
self.save()
[docs] def qa_project(self, action):
"""
QA the project, change the status from 'reviewing' to 'accepted' or 'rejected'.
"""
if action == LabelProjectQAActions.Accept:
return self.accept_project()
elif action == LabelProjectQAActions.Reject:
return self.reject_project()
else:
raise LabelProjectError(
ErrCode.QALabelProjectActionError,
ErrCode.QALabelProjectActionErrorMsg,
400)
@staticmethod
def _get_image_batch(dataset_id, offset):
"""
Get a batch of images from the dataset.
"""
IModel = Image(dataset_id)
images: Dict[int, ImageModel] = {i.id: i for i in
IModel.find_many({}, sort=[("id", 1)], size=100, skip=offset)}
return images, offset + 100
@staticmethod
def _get_label(dataset_id: str, label_set_name: str):
"""
Get the label set of saving annotations.
"""
label_id = get_str_md5(f"{dataset_id}_{label_set_name}")
label_obj = Label(name=label_set_name, id=label_id, type=LabelType.GroundTruth, dataset_id=dataset_id)
label_obj.post_init()
label_obj.save()
return label_obj
@staticmethod
def _get_category(dataset_id: str, category_name: str, categories: dict):
"""
Get the category of saving annotation.
"""
cat_obj = categories.get(category_name, None)
if cat_obj is None:
cat_id = get_str_md5(f"{dataset_id}_{category_name}")
cat_obj = Category(id=cat_id, name=category_name, dataset_id=dataset_id)
cat_obj.post_init()
cat_obj.save()
categories[category_name] = cat_obj
return cat_obj
def _export_dataset(self, dataset: DataSet, label_set_name: str):
# the label set
label_obj = self._get_label(dataset.id, label_set_name)
# the categories cache, we cache it to avoid duplicated db query and insertion
categories = {}
# the queue of target images
LTImage = LabelTaskImage(dataset.id)
images, offset = self._get_image_batch(dataset.id, 0)
has_bbox = False # whether the annotations have bbox
# iter every label image, save every annotation to target image
for ltimage in LTImage.find_many({}, sort=[("image_id", 1)]):
if not images: # no more images in the queue, get a new batch
images, offset = self._get_image_batch(dataset.id, offset)
# match label image and target image
image_id = ltimage.image_id
image = images.pop(image_id, None)
if image is None:
continue
image.objects = [o for o in image.objects if o.label_id != label_obj.id]
# save annotations
labels = ltimage.labels
for labeler_id, label_list in labels.items():
if not label_list:
continue
label_data = label_list[0]
annotations = label_data.annotations
for anno in annotations:
category = anno["category_name"]
if not category:
continue
bounding_box = anno["bounding_box"]
if not bounding_box:
continue
has_bbox = True
cat_obj = self._get_category(dataset.id, category, categories)
anno_obj = Object(label_name=label_obj.name, label_type=label_obj.type, label_id=label_obj.id,
category_name=cat_obj.name, category_id=cat_obj.id,
bounding_box=anno["bounding_box"])
anno_obj.post_init()
image.objects.append(anno_obj)
image.batch_save()
Image(dataset.id).finish_batch_save()
if has_bbox:
if AnnotationType.Classification not in dataset.object_types:
dataset.object_types.append(AnnotationType.Classification)
if AnnotationType.Detection not in dataset.object_types:
dataset.object_types.append(AnnotationType.Detection)
dataset.save()
[docs] def export_project(self, label_set_name: str):
"""
Export the label data back to datasets.
"""
if self.status != LabelProjectStatus.Accepted:
raise LabelProjectError(
ErrCode.ExportLabelProjectMustBeAccepted,
ErrCode.ExportLabelProjectMustBeAcceptedMsg,
400
)
label_set_name = f"{label_set_name}[{self.id[:8]}]"
logger.info(f"exporting project {self.id} to label set {label_set_name}")
num = len(self.datasets)
for idx, dataset in enumerate(self.datasets):
dataset_id = dataset["id"]
dataset = DataSet.find_one({"id": dataset_id})
if dataset is None:
continue
status = DatasetStatus.Ready
try:
update_data = {"status" : DatasetStatus.Importing,
"detail_status.export_label_project": DatasetStatus.Importing}
DataSet.update_one({"id": dataset_id}, update_data)
logger.info(f"[{idx + 1}/{num}]exporting label project to dataset {dataset_id}")
self._export_dataset(dataset, label_set_name)
self.status = LabelProjectStatus.Exported
self.save()
except Exception as e:
status = DatasetStatus.Failed
logger.warning(f"[{idx + 1}/{num}]export label project to dataset {dataset_id} failed: {e}")
else:
status = DatasetStatus.Ready
logger.info(f"[{idx + 1}/{num}]export label project to dataset {dataset_id} success")
finally:
update_data = {"status" : DatasetStatus.Ready,
"detail_status.export_label_project": status}
DataSet.update_one({"id": dataset_id}, update_data)
[docs]class ProjectRole(BaseModel):
"""
Every user has one or more roles in a project.
This model defines common interfaces for project roles.
"""
[docs] @classmethod
def get_collection(cls, *args, **kwargs) -> Collection[_DocumentType]:
return cls.db["label_project_roles"]
id: str
project_id: str
user_id: str
role: str
[docs] @classmethod
def add_role(cls, project: LabelProject, user_id: str, role: str):
"""
Assign a role to a user in a project.
"""
new_role = cls(id=gen_uuid(), project_id=project.id, user_id=user_id, role=role)
new_role.save()
return new_role
[docs] @classmethod
def add_roles(cls, project: LabelProject, user_ids: List[str], role: str):
"""
Assign a role to a list of users in a project.
"""
if not user_ids:
return
roles = []
for user_id in user_ids:
role_id = gen_uuid()
proj_role = cls(id=role_id, project_id=project.id, user_id=user_id, role=role)
proj_role.batch_save()
roles.append(proj_role)
cls.finish_batch_save()
return roles
[docs] @classmethod
def del_role(cls, project: LabelProject, user_id: str, role: str):
"""
Delete a role of a user in a project.
"""
filters = {"project_id": project.id, "user_id": user_id, "role": role}
if role == LabelProjectRoles.Owner:
num_del_owner = cls.count_num(filters)
num_cur_owner = cls.count_num({"project_id": project.id, "role": LabelProjectRoles.Owner})
if num_del_owner == num_cur_owner:
raise LabelTaskError(
ErrCode.CantDeleteAllOwnersOfLabelProject,
ErrCode.CantDeleteAllOwnersOfLabelProjectMsg,
400)
cls.delete_many(filters)
[docs] @classmethod
def del_roles(cls, project: LabelProject, user_ids: List[str], role: str):
"""
Delete a role of a list of users in a project.
"""
if not user_ids:
return
filters = {"project_id": project.id}
or_filters = []
del_owner = set()
num_cur_owner = cls.count_num({"project_id": project.id, "role": LabelProjectRoles.Owner})
for user_id in user_ids:
or_filter = {"user_id": user_id, "role": role}
or_filters.append(or_filter)
if role == LabelProjectRoles.Owner:
del_owner.add(user_id)
if del_owner == num_cur_owner:
raise LabelTaskError(
ErrCode.CantDeleteAllOwnersOfLabelProject,
ErrCode.CantDeleteAllOwnersOfLabelProjectMsg,
400)
filters["$or"] = or_filters
cls.delete_many(filters)
[docs] @staticmethod
def is_member(user: User, project_id: str):
"""
Check if target user has any role in the project.
"""
filters = {"project_id": project_id, "user_id": user.id}
return ProjectRole.find_one(filters) is not None
[docs] @staticmethod
def is_owner(user: User, project_id: str):
"""
Check if target user is the owner of the project.
"""
filters = {"project_id": project_id, "user_id": user.id, "role": LabelProjectRoles.Owner}
return ProjectRole.find_one(filters) is not None
[docs] @staticmethod
def is_manager(user: User, project_id: str):
"""
Check if target user is the manager of the project.
"""
filters = {"project_id": project_id, "user_id": user.id, "role": LabelProjectRoles.Manager}
return ProjectRole.find_one(filters) is not None
[docs] @staticmethod
def is_leader(user: User, project_id: str):
"""
Check if target user is the leader of the project.
"""
filters = {"project_id": project_id,
"user_id" : user.id,
"role" : {"$in": list(LabelProjectRoles.Leaders_)}}
return ProjectRole.find_one(filters) is not None
[docs] @staticmethod
def is_gte_leader(user: User, project_id: str):
"""
Check if target user bears any role above or equal to leader in the project.
"""
filters = {"project_id": project_id,
"user_id" : user.id,
"role" : {"$in": list(LabelProjectRoles.GTELeaders_)}}
return ProjectRole.find_one(filters) is not None
[docs] @staticmethod
def is_gt_leader(user: User, project_id: str):
"""
Check if target user bears any role above leader in the project.
"""
filters = {"project_id": project_id,
"user_id" : user.id,
"role" : {"$in": list(LabelProjectRoles.GTLeaders_)}}
return ProjectRole.find_one(filters) is not None
[docs] @staticmethod
def is_label_leader(user: User, project_id: str):
"""
Check if target user is the label leader of the project.
"""
filters = {"project_id": project_id, "user_id": user.id, "role": LabelProjectRoles.LabelLeader}
return ProjectRole.find_one(filters) is not None
[docs] @staticmethod
def is_review_leader(user: User, project_id: str):
"""
Check if target user is the review leader of the project.
"""
filters = {"project_id": project_id, "user_id": user.id, "role": LabelProjectRoles.ReviewLeader}
return ProjectRole.find_one(filters) is not None
[docs] @staticmethod
def is_worker(user: User, project_id: str):
"""
Check if target user is a worker of the project.
"""
filters = {"project_id": project_id,
"user_id" : user.id,
"role" : {"$in": list(LabelProjectRoles.Workers_)}}
return ProjectRole.find_one(filters) is not None
[docs] @staticmethod
def is_label_worker(user: User, project_id: str):
"""
Check if target user is a label worker of the project.
"""
filters = {"project_id": project_id, "user_id": user.id, "role": LabelProjectRoles.Labeler}
return ProjectRole.find_one(filters) is not None
[docs] @staticmethod
def is_review_worker(user: User, project_id: str):
"""
Check if target user is a review worker of the project.
"""
filters = {"project_id": project_id, "user_id": user.id, "role": LabelProjectRoles.Reviewer}
return ProjectRole.find_one(filters) is not None
[docs] @staticmethod
def can_create_project(user: User):
"""
Check if target user can create a project.
"""
return user.is_staff
[docs] @staticmethod
def can_edit_project(user: User, project_id: str):
"""
Check if target user can edit the project.
"""
return ProjectRole.is_owner(user, project_id)
[docs] @staticmethod
def can_init_project(user: User, project_id):
"""
Check if target user can init the project.
"""
return ProjectRole.is_manager(user, project_id)
[docs] @staticmethod
def can_view_project(user: User, project_id):
"""
Check if target user can view the project.
"""
return ProjectRole.is_member(user, project_id)
[docs] @staticmethod
def can_view_project_progress(user: User, project_id):
"""
Check if target user can view the project progress.
"""
return ProjectRole.is_gte_leader(user, project_id)
[docs] @staticmethod
def can_assign_leader(user: User, project_id):
"""
Check if target user can assign a leader to the project.
"""
return ProjectRole.is_manager(user, project_id)
[docs] @staticmethod
def can_view_all_tasks(user: User, project_id):
"""
Check if target user can view all tasks of the project.
"""
return ProjectRole.is_gt_leader(user, project_id)
[docs] @staticmethod
def can_qa_project(user: User, project_id):
"""
Check if target user can QA the project.
"""
return ProjectRole.is_owner(user, project_id)
[docs] @staticmethod
def can_export_project(user: User, project_id):
"""
Check if target user can export the project.
"""
return ProjectRole.is_owner(user, project_id)
[docs]class TaskRole(BaseModel):
"""
The role of a user in a task.
Each project can contain multiple tasks, and users can be assigned to different roles in different tasks.
"""
[docs] @classmethod
def get_collection(cls, *args, **kwargs) -> Collection[_DocumentType]:
return cls.db["label_task_roles"]
id: str
project_id: str
task_id: str
user_id: str
user_name: str
role: str
is_active: bool = True # if the user is active in this task
# Below are kinds of numbers of this user.
# Roughly speaking, they are divided into two parts:
# - The task progress bonded numbers.
# These numbers present the task progress of this user.
# If the team leader decide to replace an old user, these numbers will be inherited by the new user.
# - The user bonded numbers
# These numbers presents the user's contribution to the task.
# If the team leader decide to replace an old user, the old user's number will be kept and the new user will start from 0.
label_num_waiting: int = 0 # the num of images wait for label
review_num_waiting: int = 0 # the num of images wait for review
review_num_rejected: int = 0 # the num of images rejected in review
review_num_accepted: int = 0 # the num of accepted in review
label_completed: bool = False # this is true when all images are labeled
review_completed: bool = False # this is true when all images are passed review
@classmethod
def _check_init_role(cls, project: LabelProject, task: "LabelTask", users: List[User], role: str):
"""
Check the pre-conditions of init the role.
- The role must be task bonded.
- If the role is of leader type, there can be only one user.
- If the role is of review type, the review must be enabled.
- If the role is of labeler, the number of users must be equal to label_times.
- If the role is of reviewer, the number of users must be equal to review_times.
- If the role can be assigned to any number of users before.
"""
if role not in LabelProjectRoles.TaskBondedRoles_:
raise LabelTaskError(
ErrCode.LabelProjectRoleIsNotTaskLevel,
ErrCode.LabelProjectTaskNotFoundMsg,
400
)
if role in LabelProjectRoles.Leaders_ and len(users) > 1:
raise LabelTaskError(
ErrCode.NumOfTaskLeaderMismatchesConfig,
ErrCode.NumOfTaskLeaderMismatchesConfigMsg,
400)
if role in LabelProjectRoles.ReviewKinds_ and len(users) >= 1 and project.review_times == 0:
raise LabelTaskError(
ErrCode.TaskDoesNotRequireReviewer,
ErrCode.TaskDoesNotRequireReviewerMsg,
400)
if role == LabelProjectRoles.Labeler and len(users) != project.label_times:
raise LabelTaskError(
ErrCode.NumOfTaskLabelerMismatchesConfig,
ErrCode.NumOfTaskLabelerMismatchesConfigMsg,
400)
if role == LabelProjectRoles.Reviewer and len(users) != project.review_times:
raise LabelTaskError(
ErrCode.NumOfTaskReviewerMismatchesConfig,
ErrCode.NumOfTaskReviewerMismatchesConfigMsg,
400)
filters = {"task_id": task.id, "role": role, "is_active": True}
num_active = TaskRole.count_num(filters)
if num_active > 0:
raise LabelTaskError(ErrCode.TryInitRoleForTaskWithActiveRoles,
ErrCode.TryInitRoleForTaskWithActiveRolesMsg,
400)
return True
@classmethod
def _init_roles(cls,
task: "LabelTask",
users: List[User],
role: str
) -> List["TaskRole"]:
"""
Create the task role object and corresponding project role for each user.
"""
roles = []
label_num_waiting = task.num_total
for user in users:
role_id = gen_uuid()
task_role = cls(id=role_id,
label_num_waiting=label_num_waiting,
project_id=task.project_id,
task_id=task.id,
user_id=user.id,
user_name=user.name,
role=role)
task_role.batch_save()
roles.append(task_role)
cls.finish_batch_save()
ProjectRole.add_roles(task.project, [u.id for u in users], role)
return roles
@classmethod
def _init_image_role_data(cls, task: "LabelTask", roles: List["TaskRole"]):
"""
Init the role data on every image of the task.
"""
update_data = {}
filters = {"task_id": task.id}
LTIModel = LabelTaskImage(task.dataset_id)
for role in roles:
if role.role in LabelProjectRoles.Leaders_:
update_data[f"role_status.{role.role}"] = LabelTaskImageStatus.Labeling
else:
update_data[f"role_status.{role.role}_{role.user_id}"] = LabelTaskImageStatus.Labeling
LTIModel.update_many(filters, update_data)
[docs] @classmethod
def init_roles(cls,
task: "LabelTask",
users: List[User],
role: str
) -> List["TaskRole"]:
"""
Initialize the role of a task, assign the role to users.
- check pre-conditions
- create task role and project role for every user
- init role data on every image of the task
Task roles can only be set in two ways:
- init_roles: grant one role to all target users in the same time
- replace_role: replace one user with another user for a role
"""
cls._check_init_role(task.project, task, users, role)
roles = cls._init_roles(task, users, role)
cls._init_image_role_data(task, roles)
return roles
@classmethod
def _check_replace_role(cls, task: "LabelTask", old_user: User, new_user: User, role: str):
"""
Check the pre-conditions of replace the role.
- The role must be task bonded.
- The old user and new user must be different.
- The old user must already bear the role in the task.
- The new user must not bear the role in the task.
"""
if role not in LabelProjectRoles.TaskBondedRoles_:
raise LabelTaskError(
ErrCode.LabelProjectRoleIsNotTaskLevel,
ErrCode.LabelProjectRoleIsNotTaskLevelMsg,
400)
if old_user.id == new_user.id:
raise LabelTaskError(
ErrCode.CantReplaceRoleWithTheSameUser,
ErrCode.CantReplaceRoleWithTheSameUserMsg,
400)
filters = {"task_id": task.id, "user_id": old_user.id, "role": role, "is_active": True}
old_role = TaskRole.find_one(filters)
if old_role is None:
raise LabelTaskError(
ErrCode.OldUserDoesNotHaveTheTaskRole,
ErrCode.OldUserDoesNotHaveTheTaskRoleMsg,
400)
filters["user_id"] = new_user.id
new_role = TaskRole.find_one(filters)
if new_role is not None:
raise LabelTaskError(
ErrCode.NewUserAlreadyHaveTheTaskRole,
ErrCode.NewUserAlreadyHaveTheTaskRoleMsg,
400)
return old_role
@staticmethod
def _transfer_roles(task: "LabelTask", old_role: "TaskRole", new_role: "TaskRole"):
"""
Transfer the role data of task from old role to new role.
If the user of old role does not bear any role in the project, delete project role for the user.
"""
if old_role.task_id != new_role.task_id:
raise LabelTaskError(
ErrCode.CantTransferRoleBetweenDifferentTask,
ErrCode.CantTransferRoleBetweenDifferentTaskMsg,
400)
if old_role.role != new_role.role:
raise LabelTaskError(
ErrCode.CantTransferRoleToDifferentKind,
ErrCode.CantTransferRoleToDifferentKindMsg,
400)
# transfer task progress
new_role.label_num_waiting = old_role.label_num_waiting
new_role.review_num_waiting = old_role.review_num_waiting
new_role.review_num_rejected = old_role.review_num_rejected
new_role.review_num_accepted = old_role.review_num_accepted
new_role.label_completed = old_role.label_completed
new_role.review_completed = old_role.review_completed
# save status
old_role.is_active = False
old_role.batch_save()
new_role.batch_save()
TaskRole.finish_batch_save()
# delete old role from project
filters = {"project_id": task.project_id, "is_active": True,
"user_id" : old_role.user_id, "role": old_role.role,
}
has_active_role = TaskRole.count_num(filters) > 0
if not has_active_role:
ProjectRole.del_role(task.project, old_role.user_id, old_role.role)
@staticmethod
def _transfer_image_role_data(task: "LabelTask", old_role: "TaskRole", new_role: "TaskRole"):
"""
Transfer role data of every image in task from old role to new role.
"""
if old_role.role not in LabelProjectRoles.Workers_: # we only transfer image role data for workers
return
# transfer image status
LTIModel = LabelTaskImage(task.dataset_id)
old_field = f"role_status.{old_role.role}_{old_role.user_id}"
new_field = f"role_status.{new_role.role}_{new_role.user_id}"
pipelines = [
{"$set": {new_field: f"${old_field}"}},
{"$unset": old_field}
]
LTIModel.get_collection().update_many({"task_id": task.id}, pipelines)
# transfer label data:
ts = str(int(time.time()))
new_user_id = new_role.user_id
new_user_name = new_role.user_name
if old_role.role == LabelProjectRoles.Labeler:
copy_data = {f"labels.{new_role.user_id}": f"$labels.{old_role.user_id}"}
modify_cond = {"task_id": task.id, f"labels.{new_user_id}": {"$exists": True}}
modify_data = {
f"labels.{new_user_id}": {
"$map": {
"input": f"$labels.{new_user_id}",
"as" : "label",
"in" : {
"$mergeObjects": [
"$$label",
{
"id" : {"$concat": ["$$label.id", f"_{ts}", ts]},
"user_id" : new_user_id,
"user_name": new_user_name
}
]
}
}
}
}
elif old_role.role == LabelProjectRoles.Reviewer:
copy_data = {f"reviews.{new_role.user_id}": f"$reviews.{old_role.user_id}"}
modify_cond = {"task_id": task.id, f"reviews.{new_user_id}": {"$exists": True}}
modify_data = {
f"reviews.{new_user_id}": {
"$map": {
"input": f"$reviews.{new_user_id}",
"as" : "review",
"in" : {
"$mergeObjects": [
"$$review",
{
"id" : {"$concat": ["$$review.id", f"_{ts}"]},
"user_id" : new_user_id,
"user_name": new_user_name
}
]
}
}
}
}
else:
return
# copy old role data to new role, and modified copied data according to new user
LTIModel.get_collection().update_many({"task_id": task.id}, [{"$set": copy_data}]) # copy
LTIModel.get_collection().update_many(modify_cond, [{"$set": modify_data}]) # modify
[docs] @classmethod
def replace_role(cls,
task: "LabelTask",
old_user: User,
new_user: User,
role: str):
"""
Reassign the role of task from old user to new user, transfer role data of task and task images accordingly.
"""
old_role = cls._check_replace_role(task, old_user, new_user, role)
new_role = TaskRole._init_roles(task, [new_user], role)[0]
cls._transfer_roles(task, old_role, new_role)
cls._transfer_image_role_data(task, old_role, new_role)
return old_role, new_role
[docs] @staticmethod
def is_task_label_leader(user: User, task_id: str):
"""
Check if target user is label leader of task.
"""
filters = {"task_id": task_id, "user_id": user.id, "is_active": True, "role": LabelProjectRoles.LabelLeader}
return TaskRole.find_one(filters) is not None
[docs] @staticmethod
def is_task_review_leader(user: User, task_id: str):
"""
Check if target user is review leader of task.
"""
filters = {"task_id": task_id, "user_id": user.id, "is_active": True, "role": LabelProjectRoles.ReviewLeader}
return TaskRole.find_one(filters) is not None
[docs] @staticmethod
def is_task_labeler(user: User, task_id):
"""
Check if target user is labeler of task.
"""
filters = {"task_id": task_id, "user_id": user.id, "is_active": True, "role": LabelProjectRoles.Labeler}
return TaskRole.find_one(filters) is not None
[docs] @staticmethod
def is_task_reviewer(user: User, task_id):
"""
Check if target user is reviewer of task.
"""
filters = {"task_id": task_id, "user_id": user.id, "is_active": True, "role": LabelProjectRoles.Reviewer}
return TaskRole.find_one(filters) is not None
[docs] @staticmethod
def can_init_label_worker(user: User, task_id):
"""
Check if target user can init label worker for task.
"""
if not TaskRole.is_task_label_leader(user, task_id):
return False
# task has no labeler
filters = {"task_id": task_id, "is_active": True, "role": LabelProjectRoles.Labeler}
return TaskRole.find_one(filters) is None
[docs] @staticmethod
def can_init_review_worker(user: User, task_id):
"""
Check if target user can init review worker for task.
"""
if not TaskRole.is_task_review_leader(user, task_id):
return False
# task has no reviewer
filters = {"task_id": task_id, "is_active": True, "role": LabelProjectRoles.Reviewer}
return TaskRole.find_one(filters) is None
[docs] @staticmethod
def can_replace_label_worker(user: User, task_id):
"""
Check if target user can replace label worker for task.
"""
return TaskRole.is_task_label_leader(user, task_id)
[docs] @staticmethod
def can_replace_review_worker(user: User, task_id):
"""
Check if target user can replace review worker for task.
"""
return TaskRole.is_task_review_leader(user, task_id)
[docs] @staticmethod
def can_commit_review(user: User, task_id):
"""
Check if target user can commit review for task.
"""
return TaskRole.is_task_reviewer(user, task_id)
[docs] @staticmethod
def can_restart_task(user: User, task_id):
"""
Check if target user can restart task.
"""
return TaskRole.is_task_label_leader(user, task_id)
[docs] @staticmethod
def can_qa_task(user: User, project_id):
"""
Check if target user can qa task.
"""
return ProjectRole.is_manager(user, project_id)
[docs] @staticmethod
def can_view_all_roles(user: User, project_id):
"""
Check if target user can view all roles' data.
"""
return ProjectRole.is_gte_leader(user, project_id)
[docs] @staticmethod
def can_label_image(user: User, task_id):
"""
Check if target user can label image.
"""
return TaskRole.is_task_labeler(user, task_id)
[docs] @staticmethod
def can_review_image(user: User, task_id):
"""
Check if target user can review image.
"""
return TaskRole.is_task_reviewer(user, task_id)
[docs] @staticmethod
def update_progress_for_all_roles(task_id):
"""
Update progress for all roles of task.
This ensures the progress for every role is up-to-date, without concerning the data integrity and consistency.
- 1. count image status for every role, update their count number
- 2. for every role, label_completed = False, review_completed = False
- 3. for every role, label_completed = True if label_num_waiting == 0 and review_num_rejected == 0
- 4. for every role, review_completed = True if project.review_times == 0 review_num_accepted == task.num_total
- 5. task status = LabelTaskStatus.Reviewing if all(role.label_completed is True and role.review_completed is True for role in roles)
- 6. project update subtask progress
"""
task = LabelTask.find_one({"id": task_id})
dataset_id = task.dataset_id
# 1. count image status for every role, update their count number
facet, project = {}, {}
roles = list(TaskRole.find_many({"task_id": task_id, "is_active": True}))
for role in roles:
if role.role == LabelProjectRoles.Labeler:
for status in LabelTaskImageStatus.ALL_:
output = f"{role.user_id}_{status}"
target = f"role_status.labeler_{role.user_id}"
match = {target: status, "task_id": task_id}
facet[output] = [{"$match": match}, {"$count": status}]
project[f"labeler_{role.user_id}.{status}"] = {"$arrayElemAt": [f"${output}.{status}", 0]}
elif role.role == LabelProjectRoles.Reviewer:
for status in LabelTaskImageStatus.ALL_:
output = f"{role.user_id}_{status}"
target = f"role_status.reviewer_{role.user_id}"
match = {target: status, "task_id": task_id}
facet[output] = [{"$match": match}, {"$count": status}]
project[f"reviewer_{role.user_id}.{status}"] = {"$arrayElemAt": [f"${output}.{status}", 0]}
elif role.role in LabelProjectRoles.Leaders_:
for status in LabelTaskImageStatus.ALL_:
output = f"{role.role}_{status}"
target = f"role_status.{role.role}"
match = {target: status, "task_id": task_id}
facet[output] = [{"$match": match}, {"$count": status}]
project[f"{role.role}.{status}"] = {"$arrayElemAt": [f"${output}.{status}", 0]}
LTIModel = LabelTaskImage(dataset_id)
pipeline = [{"$facet": facet}, {"$project": project}]
counter = list(LTIModel.aggregate(pipeline))[0]
for role in roles:
if role.role == LabelProjectRoles.Labeler:
key = f"labeler_{role.user_id}"
elif role.role == LabelProjectRoles.Reviewer:
key = f"reviewer_{role.user_id}"
elif role.role in LabelProjectRoles.Leaders_:
key = role.role
else:
continue
role_count = counter.get(key, {})
role.label_num_waiting = role_count.get(LabelTaskImageStatus.Labeling, 0)
role.review_num_waiting = role_count.get(LabelTaskImageStatus.Reviewing, 0)
role.review_num_rejected = role_count.get(LabelTaskImageStatus.Rejected, 0)
role.review_num_accepted = role_count.get(LabelTaskImageStatus.Accepted, 0)
# 2. for every role, label_completed = False, review_completed = False
for role in roles:
role.label_completed = False
role.review_completed = False
# 3. for every role, label_completed = True
# if label_num_waiting == 0 and review_num_rejected == 0
for role in roles:
label_completed = role.label_num_waiting == 0 and role.review_num_rejected == 0
role.label_completed = label_completed
# 4. for every role, review_completed = True
# if project.review_times == 0 review_num_accepted == task.num_total
project = LabelProject.find_one({"id": task.project_id})
for role in roles:
review_completed = project.review_times == 0 or role.review_num_accepted == task.num_total
role.review_completed = review_completed
for role in roles:
role.batch_save()
TaskRole.finish_batch_save()
# 5. task status = LabelTaskStatus.Reviewing
# if all(role.label_completed == True and role.all review_completed == True for role in roles)
# 6. project update subtask counter
if all(role.label_completed is True and role.review_completed is True for role in roles):
task.status = LabelTaskStatus.Reviewing
task.save()
project.update_subtask_counter()
[docs]class LabelTask(BaseModel):
"""
The label task model.
"""
[docs] @classmethod
def get_collection(cls, *args, **kwargs) -> Collection[_DocumentType]:
return cls.db["label_tasks"]
# the mandatory fields
id: str
idx: int
project_id: str
dataset_id: str
created_ts: int
# the optional fields
num_total: int = 0
status: str = LabelTaskStatus.Waiting
_project = None
@property
def project(self):
if self._project is None:
self._project = LabelProject.find_one({"id": self.project_id})
return self._project
[docs] def set_leader(self, leader: User, role: str):
"""
Set leader for the task.
Leader role can be either label leader or review leader.
"""
if role not in LabelProjectRoles.Leaders_:
raise LabelTaskError(
ErrCode.TaskRoleNotOfLeaderKind,
ErrCode.TaskRoleNotOfLeaderKindMsg,
400)
old_leader_role = TaskRole.find_one({"task_id": self.id, "role": role, "is_active": True})
if old_leader_role is not None:
old_leader = User.find_one({"id": old_leader_role.user_id})
old_role, new_role = TaskRole.replace_role(self, old_leader, leader, role)
else:
new_role = TaskRole.init_roles(self, [leader], role)[0]
old_role = new_role
if self.status == LabelTaskStatus.Waiting:
if role == LabelProjectRoles.LabelLeader and self.project.review_times == 0:
self.status = LabelTaskStatus.Working
self.save()
self.project.update_subtask_counter()
elif role == LabelProjectRoles.ReviewLeader:
self.status = LabelTaskStatus.Working
self.save()
self.project.update_subtask_counter()
return old_role, new_role
[docs] def init_workers(self, workers: List[User], role: str):
"""
Init workers for the task, set all workers of role type at the same time.
Worker role can be either labeler or reviewer.
"""
worker_roles = TaskRole.init_roles(self, workers, role)
if role == LabelProjectRoles.Reviewer:
filters = {"task_id": self.id, "role": LabelProjectRoles.Labeler, "is_active": True}
labelers = list(TaskRole.find_many(filters))
if labelers:
filters = {
f"role_status.labeler_{l.user_id}": LabelTaskImageStatus.Reviewing
for l in labelers
}
LTIModel = LabelTaskImage(self.dataset_id)
for image in LTIModel.find_many(filters):
set_data = {f"role_status.reviewer_{role.user_id}": LabelTaskImageStatus.Reviewing
for role in worker_roles}
LTIModel.batch_update({"id": image.id}, set_data)
LTIModel.finish_batch_update()
TaskRole.update_progress_for_all_roles(self.id)
return worker_roles
[docs] def replace_worker(self, old_user: User, new_user: User, role: str):
"""
Designate the old user from role, assign the new user to role.
"""
return TaskRole.replace_role(self, old_user, new_user, role)
[docs] def restart_task(self):
"""
Restart a rejected task by leader.
When team leader restart a task:
- 1. task status should be 'rejected'
- 2. all images' status of all roles should be 'rejected'
- 3. update progress for all roles
"""
# 1. task status should be 'rejected'
if self.status != LabelTaskStatus.Rejected:
raise LabelTaskError(
ErrCode.RestartLabelTaskMustBeRejected,
ErrCode.RestartLabelTaskMustBeRejectedMsg,
400)
self.status = LabelTaskStatus.Working
self.save()
# 2. all images' status of all roles should be 'rejected'
role_status = {}
filters = {"task_id": self.id, "is_active": True}
roles = TaskRole.find_many(filters)
for role in roles:
if role.role in LabelProjectRoles.Leaders_:
role_status[role.role] = LabelTaskImageStatus.Rejected
elif role.role in LabelProjectRoles.Workers_:
role_status[f"{role.role}_{role.user_id}"] = LabelTaskImageStatus.Rejected
LTIModel = LabelTaskImage(self.dataset_id)
LTIModel.get_collection().update_many({"task_id": self.id}, {"$set": {"role_status": role_status}})
# 3. update progress for all roles
TaskRole.update_progress_for_all_roles(self.id)
[docs] def reject_task(self):
"""
Reject a task by manager.
"""
if self.status != LabelTaskStatus.Reviewing:
raise LabelTaskError(
ErrCode.RejectLabelTaskMustBeReviewing,
ErrCode.RejectLabelTaskMustBeReviewingMsg,
400
)
self.status = LabelTaskStatus.Rejected
self.save()
self.project.update_subtask_counter()
[docs] def accept_task(self):
"""
Accept a task by manager.
"""
if self.status != LabelTaskStatus.Reviewing:
raise LabelTaskError(
ErrCode.AcceptLabelTaskMustBeReviewing,
ErrCode.AcceptLabelTaskMustBeReviewingMsg,
400)
self.status = LabelTaskStatus.Accepted
self.save()
self.project.update_subtask_counter()
[docs] def accept_rejected_task(self):
"""
Accepted a rejected task by manager.
"""
if self.status != LabelTaskStatus.Rejected:
raise LabelTaskError(
ErrCode.ForceAcceptLabelTaskMustBeRejected,
ErrCode.ForceAcceptLabelTaskMustBeRejectedMsg,
400)
self.status = LabelTaskStatus.Accepted
self.save()
self.project.update_subtask_counter()
[docs] def qa_task(self, action: str):
"""
QA a task by manager.
"""
if action == LabelTaskQAActions.Accept:
return self.accept_task()
elif action == LabelTaskQAActions.Reject:
return self.reject_task()
elif action == LabelTaskQAActions.ForceAccept:
return self.accept_rejected_task()
else:
raise LabelTaskError(
ErrCode.QALabelTaskActionError,
ErrCode.QALabelTaskActionErrorMsg,
400)
[docs] def get_max_role(self, user: User):
"""
Get the role of max permission of current task for the user.
"""
if ProjectRole.is_gte_leader(user, self.project_id):
filters = {"task_id": self.id, "is_active": True}
elif ProjectRole.is_label_worker(user, self.project_id) or ProjectRole.is_review_worker(user, self.project_id):
filters = {"task_id": self.id, "user_id": user.id, "is_active": True}
else:
return None
roles = list(TaskRole.find_many(filters))
if not roles:
return None
roles = sorted(roles, key=lambda x: LabelProjectRoles.Levels_[x.role])
return roles[0]
[docs]class UserLabelData(_Base):
"""
The user label data model.
This does not refer to a mongodb collection directly, but is used as a data serializer.
"""
user_id: str
user_name: str
# [{"category_id": "str", "category_name": "str", "bounding_box": {"xmin": 0.5,...}}...]
annotations: List[Dict] = []
id: str = Field(default_factory=gen_uuid)
created_ts: int = Field(default_factory=current_ts)
[docs]class UserReviewData(_Base):
"""
The user review data model.
This does not refer to a mongodb collection directly, but is used as a data serializer.
"""
user_id: str
user_name: str
action: str
label_id: str
id: str = Field(default_factory=gen_uuid)
created_ts: int = Field(default_factory=current_ts)
[docs]class LabelTaskImageModel(BaseModel):
"""
The label task image model.
This model behaves like ImageModel, but is used for label task.
So to use this model, you should create a LabelTaskImageModel class dynamically with the LabelTaskImage shortcut.
"""
belong_dataset: ClassVar[str] = None
id: str
idx: int
image_id: int
task_id: str # the task id this image belongs to
url: str # the same as image url
url_full_res: str # the same as image url_full_res
default_labels: UserLabelData = [] # the default labels come from model pre-process for this image
labels: Dict[str, List[UserLabelData]] = {} # the label data for every labeler
reviews: Dict[str, List[UserReviewData]] = {} # the review data for every reviewer
role_status: Dict = {} # the image status for different roles: label_leader, review_leader, labeler_x, reviewer_a
[docs] @classmethod
def get_collection(cls, *args, **kwargs) -> Collection[_DocumentType]:
"""
Instead of returning a collection for all dataset, return a collection for each dataset.
"""
return cls.db[f"label_task_images@dataset_{cls.belong_dataset}"]
[docs] @classmethod
def get_cls_id(cls):
"""
Instead of returning the class name directly, return the class name with dataset id.
"""
return f"{cls.__name__}.{cls.belong_dataset}"
[docs] def ensure_status_for_labeling(self, task: LabelTask, labeler: User):
"""
Check if target labeler can set label for target task.
Labeler cannot set label in any of these conditions:
* the task is not in working status.
* his label is accepted by all reviewer.
"""
if task.status != LabelTaskStatus.Working:
raise LabelTaskError(
ErrCode.LabelImageRequireTaskStatusWorking,
ErrCode.LabelImageRequireTaskStatusWorkingMsg,
400)
labels = self.labels.get(labeler.id, [])
if not labels:
return True
label_id = labels[-1].id
accepted_by_all = False
for reviewer_id, review_data_list in self.reviews.items():
for review in review_data_list:
if review.label_id == label_id and review.action == LabelImageQAActions.Reject:
accepted_by_all = False
if accepted_by_all:
raise LabelTaskError(
ErrCode.LabelImageRequireUnfinishedReviewing,
ErrCode.LabelImageRequireUnfinishedReviewingMsg,
400)
return True
def _update_label(self, labeler, label_annotations: List[Dict]):
"""
Update label for target labeler.
- Add the new label
- Delete reviews for old label
"""
new_label = UserLabelData(user_id=labeler.id, user_name=labeler.name, annotations=label_annotations)
new_labels = [new_label]
old_labels = self.labels.get(labeler.id, [])
self.labels[labeler.id] = new_labels
self.save()
# delete reviews for old label
if old_labels:
old_label = old_labels[0]
reviews = copy.deepcopy(self.reviews)
for reviewer_id, review_data_list in reviews.items():
reviews[reviewer_id] = []
for review_data in review_data_list:
if review_data.label_id != old_label.id:
reviews[reviewer_id].append(review_data)
self.reviews = reviews
return new_label
def _update_label_progress(self, task: LabelTask, labeler: User):
"""
When a labeler labels an image, these status may be changed:
- 1. his status to this image should be reviewing,
- 2. all (reviewers', leaders') status to this image should be reviewing,
if all labeler finished labeling this image
- 3. update progress for all roles
"""
# 1. his status to this image should be reviewing
self.role_status[f"labeler_{labeler.id}"] = LabelTaskImageStatus.Reviewing
# 2. all (reviewers', leaders') status to this image should be reviewing,
# if all labeler finished labeling this image
all_labeler_status = [s for k, s in self.role_status.items() if k.startswith("labeler_")]
all_labeler_reviewing = all(s == LabelTaskImageStatus.Reviewing for s in all_labeler_status)
if all_labeler_reviewing:
update = {k: LabelTaskImageStatus.Reviewing for k, s in self.role_status.items()}
self.role_status.update(update)
self.save()
# 3. update progress for all roles
TaskRole.update_progress_for_all_roles(task.id)
[docs] def set_label(self, task: LabelTask, labeler: User, label_annotations: List[Dict]):
"""
Update the label annotations for a labeler.
:param task: the task this image belongs to.
:param labeler: the labeler who are updating this image labels.
:param label_annotations: the label annotations.
:return: the label data dict.
A sample label_annotations::
label_annotations = [
{
"category_name": "str",
"category_id" : "str",
"bounding_box" : {
"xmin": float,
"ymin": float,
"xmax": float,
"ymax": float,
}
}
]
"""
new_label = self._update_label(labeler, label_annotations)
self._update_label_progress(task, labeler)
return new_label.dict()
[docs] def ensure_status_for_reviewing(self, task: LabelTask, reviewer: User, label_id: str):
"""
Reviewer cannot set review in any of these conditions:
- the task is not in working status.
- image is not labeled by all labelers.
- the target label does not exist
- reviewer has reviewed target label before.
"""
if task.status != LabelTaskStatus.Working:
raise LabelTaskError(
ErrCode.ReviewImageRequireTaskStatusWorking,
ErrCode.ReviewImageRequireTaskStatusWorkingMsg,
400)
labeler_status = [s for k, s in self.role_status.items() if k.startswith("labeler_")]
if any(s in LabelTaskImageStatus.WaitForLabeling_ for s in labeler_status):
raise LabelTaskError(
ErrCode.ReviewImageRequireFinishedLabeling,
ErrCode.ReviewImageRequireFinishedLabelingMsg,
400)
found_label = False
for labeler_id, label_data_list in self.labels.items():
for label_data in label_data_list:
if label_data.id == label_id:
found_label = True
break
if not found_label:
raise LabelTaskError(
ErrCode.ReviewImageTargetLabelNotFound,
ErrCode.ReviewImageTargetLabelNotFoundMsg,
400)
for review_data in self.reviews.get(reviewer.id, []):
if review_data.label_id == label_id:
raise LabelTaskError(
ErrCode.ReviewImageFoundExistedReview,
ErrCode.ReviewImageFoundExistedReviewMsg,
400)
return True
def _update_review(self, reviewer, label_id, action):
"""
Update the review to label for target reviewer.
- Add the new review
- Remove the old review to target label
"""
new_review = UserReviewData(user_id=reviewer.id, user_name=reviewer.name, label_id=label_id, action=action)
old_reviews = self.reviews.get(reviewer.id, [])
new_reviews = [new_review]
for review in old_reviews:
if review.label_id != label_id:
new_reviews.append(review)
self.reviews[reviewer.id] = new_reviews
self.save()
return new_review
def _get_labeler_id_for_label(self, label_id):
labeler_id = None
for labeler_id, label_data_list in self.labels.items():
for label_data in label_data_list:
if label_data.id == label_id:
return labeler_id
return labeler_id
def _update_review_progress_for_accept(self, task: LabelTask, reviewer: User, label_id: str):
"""
When a reviewer accepts a label:
- 1. his status of this image should be accepted,
if he accepted all labels of this image
- 2. target labeler's status of this image should be accepted,
if his label is accepted by all reviewer
- 3. all leaders status of this image should be accepted,
if all labelers' statuses of this image are accepted
- 4. update progress for all roles
"""
labeler_id = self._get_labeler_id_for_label(label_id)
# 1. his status of this image should be accepted,
# if he accepted all labels of this image
all_labels = {}
for _, label_data_list in self.labels.items():
for label_data in label_data_list:
all_labels[label_data.id] = False
for review_data in self.reviews.get(reviewer.id, []):
all_labels[review_data.label_id] = review_data.action == LabelImageQAActions.Accept
if all(all_labels.values()):
self.role_status[f"reviewer_{reviewer.id}"] = LabelTaskImageStatus.Accepted
# 2. target labeler's status of this image should be accepted,
# if his label is accepted by all reviewer
all_reviewers = [k for k in self.role_status if k.startswith("reviewer_")]
all_reviewer_accepted = {k.split("_")[-1]: False for k in all_reviewers}
for review_id, review_data_list in self.reviews.items():
for review_data in review_data_list:
if review_data.label_id == label_id:
all_reviewer_accepted[review_id] = review_data.action == LabelImageQAActions.Accept
if all(all_reviewer_accepted.values()):
self.role_status[f"labeler_{labeler_id}"] = LabelTaskImageStatus.Accepted
# 3. all leaders status of this image should be accepted,
# if all labelers' statuses of this image are accepted
all_labeler_status = [s for k, s in self.role_status.items() if k.startswith("labeler_")]
all_labeler_accepted = all(s == LabelTaskImageStatus.Accepted for s in all_labeler_status)
if all_labeler_accepted:
self.role_status["label_leader"] = LabelTaskImageStatus.Accepted
self.role_status["review_leader"] = LabelTaskImageStatus.Accepted
self.save()
# 4. update progress for all roles
TaskRole.update_progress_for_all_roles(task.id)
def _update_review_progress_for_reject(self, task: LabelTask, reviewer: User, label_id: str):
"""
When a reviewer rejects a label:
- 1. his status of this image should be rejected,
- 2. the target labeler's status of this image should be rejected,
- 3. all leaders status of this image should be rejected,
- 4. update progress for all roles
"""
labeler_id = self._get_labeler_id_for_label(label_id)
# 1. his review status of this image should be rejected,
self.role_status[f"reviewer_{reviewer.id}"] = LabelTaskImageStatus.Rejected
# 2. the target labeler's status of this image should be rejected,
self.role_status[f"labeler_{labeler_id}"] = LabelTaskImageStatus.Rejected
# 3. all leaders status of this image should be rejected,
self.role_status["label_leader"] = LabelTaskImageStatus.Rejected
self.role_status["review_leader"] = LabelTaskImageStatus.Rejected
self.save()
# 4. update progress for all roles by mongodb aggregation and batch updating
TaskRole.update_progress_for_all_roles(task.id)
[docs] def set_review(self, task: LabelTask, reviewer: User, label_id: str, action: str):
"""
Update the review for a label.
:param task: the task this image belongs to.
:param reviewer: the reviewer who are reviewing target label.
:param label_id: the target label id that reviewer is reviewing.
:param action: the review action.
"""
new_review = self._update_review(reviewer, label_id, action)
if action == LabelImageQAActions.Accept:
self._update_review_progress_for_accept(task, reviewer, label_id)
else:
self._update_review_progress_for_reject(task, reviewer, label_id)
return new_review.dict()
_image_task_models = {}
[docs]def LabelTaskImage(dataset_id: str) -> Type[LabelTaskImageModel]:
"""
A shortcut to create the LabelTaskImageModel for target dataset.
"""
model = _image_task_models.setdefault(dataset_id, copy.deepcopy(LabelTaskImageModel))
model.belong_dataset = dataset_id
return model