Skip to content

DetectionDataset

Source code in detection_datasets/detection_dataset.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
class DetectionDataset:

    COLUMNS = [
        "image_id",
        "image_path",
        "width",
        "height",
        "split",
        "bbox_id",
        "category_id",
        "category",
        "bbox",
        "area",
    ]

    _data = pd.DataFrame(columns=COLUMNS).set_index(["image_id", "bbox_id"])

    def __init__(self, data: pd.DataFrame = None) -> None:
        """Initialize the dataset.

        Don't call the constructr directly, use `from_hub()` or `from_disk()` methods instead.

        Args:
            data: The data used to initialize the dataset.
                Defaults to None.
        """

        self._format = "init"

        if data is not None:
            self._concat(data)

    @property
    def data(self) -> pd.DataFrame:
        """Getter for the data, with annotations grouped by images.

        Returns:
            The data contained in the dataset as a Pandas DataFrame.
        """

        return self.get_data()

    def get_data(self, index: str = "image") -> pd.DataFrame:
        """Getter for the data, with the possibility to specify the format.

        Args:
            index: The desired format of the data.
                Can be either "image" or "bbox".
                Defaults to "image".

        Returns:
            The data contained in the dataset as a Pandas DataFrame in the specified format.
        """

        data = self.set_format(index=index)

        return data

    @property
    def format(self) -> str:
        """Getter for the current format of the data, which can either be "image" or "bbox".

        Returns:
            The current format of the data.
        """

        return self._format

    @property
    def temp_dir_instance(self) -> str:
        """Name of the temporary directory used by the DetectionDataset instance.

        When downloading images from the Hub, images are first downloaded as parquet files in the Hugging Face
        cache directory, before being extracted as jpeg files in the detection_dataset cache.
        Each instance creates its own sub-directory in the detection_dataset cache.
        The subdirectory for an instance is named after its id.

        Returns:
            Path to the ssubdirectory for the instance in the detection_dataset cache.
        """

        lib_temp_dir = get_temp_dir()
        temp_dir = os.path.join(lib_temp_dir.as_posix(), str(id(self)))
        os.makedirs(temp_dir, exist_ok=True)

        return temp_dir

    def _concat(self, other_data: pd.DataFrame, other_data_format: str = "bbox") -> None:
        """Concatenate the existing data with new data.

        This allows to load multiple datasets, potentially from different sources (disk & hub) into one larger dataset.

        Args:
            other_data: The data being added to the dataset.
            other_data_format: The format of the new data.
                Defaults to "bbox".
        """

        self.set_format(index=other_data_format)
        self._data = pd.concat([self._data.reset_index()[self.COLUMNS], other_data[self.COLUMNS]])
        self.set_format(index="image")

    def from_hub(self, dataset_name: str, repo_name: str = ORGANISATION) -> DetectionDataset:
        """Load a dataset from the Hugging Face Hub.

        Args:
            dataset_name: name of the dataset, without the organisation's prefix.
            repo_name: name of the Hugging Face profile or organisation where the dataset is stored.
                Defaults to "detection-datasets".

        Returns:
            The DetectionDataset instance. This allows for method cascading.
        """

        if dataset_name not in available_in_hub(repo_name=repo_name):
            raise ValueError(
                f"""{dataset_name} is not available on the Hub.
            Use `DetectionDataset.available_in_hub() to get the list of available datasets."""
            )

        path = "/".join([repo_name, dataset_name])
        ds = load_dataset(path=path)
        categories = ds[list(ds.keys())[0]].features["objects"].feature["category"]

        def download_images(row):
            file_path = "".join([self.temp_dir_instance, "/", str(row["image_id"]), ".jpg"])
            row["image"].save(file_path)
            row["image_path"] = file_path
            return row

        ds = ds.map(
            download_images,
            remove_columns="image",
            load_from_cache_file=False,
            desc="Extracting images from parquet",
        )

        df_splits = []
        for key in ds.keys():
            df_split = ds[key].to_pandas()
            df_split["split"] = key

            df_splits.append(df_split)

        df = pd.concat(df_splits)
        df = df.reset_index(drop=True)
        objects = pd.json_normalize(df["objects"])
        data = df.join(objects)

        if "image_path" not in data.columns:
            data["image_path"] = [x["bytes"] for x in data.loc[:, "image"]]

        data = data.drop(columns=["objects", "image"], errors="ignore")
        data["category_id"] = data.loc[:, "category"]
        data["category"] = [[categories.int2str(int(x)) for x in row["category"]] for _, row in data.iterrows()]

        data = data.explode(["bbox_id", "category_id", "category", "bbox", "area"])
        data["bbox"] = [Bbox.from_voc(row.bbox, row.width, row.height, row.bbox_id) for _, row in data.iterrows()]

        self._concat(other_data=data)

        return self

    def from_disk(self, dataset_format: str, path: str, **kwargs) -> DetectionDataset:
        """Load a dataset from disk.

        This is a factory method that can read the dataset from different formats,
        when the dataset is already in a local directory.

        Args:
            dataset_format: Format of the dataset.
                Currently supported values and formats:
                - "coco": COCO format
            path: Path to the dataset on the local filesystem.
            **kwargs: Keyword arguments specific to the dataset_format.

        Returns:
            The DetectionDataset instance. This allows for method cascading.

        Example:
            ```Python
            config = {
                "dataset_format": "coco",
                "path": "PATH/TO/DATASET",
                "splits": {
                    "train": (train_annotations.json, 'train'),
                    "val": (test_annotations.json, 'test'),
                },
            }
            dd = DetectionDataset().from_disk(**config)
            ```
        """

        reader = reader_factory.get(dataset_format=dataset_format.lower(), path=path, **kwargs)
        data = reader.read()

        self._concat(other_data=data)

        return self

    def to_hub(self, dataset_name: str, repo_name: str, **kwargs) -> DetectionDataset:
        """Push the dataset to the hub as a Parquet dataset.

        This method wraps Hugging Face's DatasetDict.push_to_hub() method.

        The dataset is pushed as a DatasetDict, meaning the each split (train, val, test), if present,
        will be a separate Dataset instance inside this DatasetDict.

        Args:
            dataset_name: name of the dataset inside the user/organisation's repository.
            repo_name: user of organisation to push the dataset to.

        Returns:
            The DetectionDataset instance. This allows for method cascading.
        """

        repo_id = "/".join([repo_name, dataset_name])

        hf_dataset_dict = self._get_hf_dataset()
        hf_dataset_dict.push_to_hub(repo_id=repo_id, **kwargs)
        print(f"The dataset was uploaded to https://huggingface.co/datasets/{repo_id}")

        return self

    def _get_hf_dataset(self) -> DatasetDict:
        """Get the data formatted as an Hugging Face DatasetDict instance.

        The DatasetDict contains a Dataset for each split present in the data.
        All methods and properties of the DatasetDict can then be used.

        Returns:
            Data formatted as an Hugging Face DatasetDict instance
        """

        data = self.set_format(index="image").copy().reset_index()
        data["image_id"] = data.loc[:, "image_id"].astype(int)
        data["bbox_id"] = [[int(bbox_id) for bbox_id in bbox_ids] for bbox_ids in data.bbox_id]
        data["bbox"] = [[bbox.to_voc() for bbox in bboxes] for bboxes in data.bbox]

        hf_dataset_dict = DatasetDict()

        for split in self.splits:
            split_data = data[data.split == split]
            images_data = []

            for _, row in split_data.iterrows():
                objects = {}
                objects["bbox_id"] = row["bbox_id"]
                objects["category"] = row["category"]
                objects["bbox"] = row["bbox"]
                objects["area"] = row["area"]

                image = {}
                image["image_id"] = row["image_id"]
                image["image"] = row["image_path"]
                image["width"] = row["width"]
                image["height"] = row["height"]
                image["objects"] = objects

                images_data.append(image)

            df = pd.DataFrame.from_dict(images_data)

            features = self._get_hf_features()

            ds = Dataset.from_pandas(df=df, features=features, split=split)
            hf_dataset_dict[split] = ds

        return hf_dataset_dict

    def to_disk(self, dataset_format: str, name: str, absolute_path: str) -> DetectionDataset:
        """Write the dataset to disk.

        This is a factory method that can write the dataset to disk in the selected format (e.g. COCO, MMDET, YOLO)

        Args:
            dataset_format: Format of the dataset.
                Currently supported formats:
                - "yolo": YOLO format
                - "mmdet": MMDET internal format, see:
                    https://mmdetection.readthedocs.io/en/latest/tutorials/customize_dataset.html#reorganize-new-data-format-to-middle-format
                - "coco": COCO format
            name: Name of the dataset to be created in the "path" directory.
            absolute_path: Absolute path to the directory where the dataset will be created.
            **kwargs: Keyword arguments specific to the dataset_format.

        Returns:
            The DetectionDataset instance. This allows for method cascading.
        """

        writer = writer_factory.get(dataset_format=dataset_format.lower(), dataset=self, name=name, path=absolute_path)
        writer.write()

        return self

    def _get_hf_features(self) -> Features:
        """Get the feature types for the Hugging Face dataset.

        Returns:
            Features for the Hugging Face dataset.
        """

        return Features(
            {
                "image_id": Value(dtype="int64"),
                "image": Image(decode=True),
                "width": Value(dtype="int64"),
                "height": Value(dtype="int64"),
                "objects": Sequence(
                    {
                        "bbox_id": Value(dtype="int64"),
                        "category": ClassLabel(names=self.category_names),
                        "bbox": Sequence(feature=Value(dtype="float64"), length=4),
                        "area": Value(dtype="float64"),
                    }
                ),
            }
        )

    def set_format(self, index: str) -> pd.DataFrame:
        """Set the format of the data.

        The data contained in the dataset can either have:
        - One row per image, with the annotations grouped as a list
        - One row per annotation, with each image appearing on multiple rows

        Args:
            index: How to organise the data, can be "image" or "bbox".

        Raises:
            ValueError: If the specified format is unknown.

        Returns:
            Data contained in the dataset.
        """

        if index == self._format:
            pass
        elif index == "image":
            self._data_by_image()
        elif index == "bbox":
            self._data_by_bbox()
        else:
            raise ValueError(f"The index must be either 'image' or 'bbox', not '{index}'.")

        return self._data.copy()

    def _data_by_image(self) -> pd.DataFrame:
        """Returns the data grouped by image.

        Returns:
            A DataFrame grouped by image, meaning that each may contain data related to multiple bboxes.
        """

        data = self._data.reset_index().groupby("image_id")
        self._data = pd.DataFrame(
            {
                "image_path": data["image_path"].first(),
                "width": data["width"].first(),
                "height": data["height"].first(),
                "split": data["split"].first(),
                "bbox_id": data["bbox_id"].apply(list),
                "bbox": data["bbox"].apply(list),
                "category_id": data["category_id"].apply(list),
                "category": data["category"].apply(list),
                "area": data["area"].apply(list),
            }
        )

        self._format = "image"

    def _data_by_bbox(self) -> pd.DataFrame:
        """Converts a DataFrame arranged by image to a DataFrame arranged by bbox.

        This method reverses the effect of calling self._data_by_image().

        Args:
            data: Dataframe to explode.

        Returns:
            A DataFrame arranged by bbox instead of images.
        """

        self._data = (
            self._data.reset_index()
            .explode(["bbox_id", "category_id", "category", "bbox", "area"])
            .set_index(["image_id", "bbox_id"])
        )

        self._format = "bbox"

    def select(self, n_images: int, seed: int = 42) -> DetectionDataset:
        """Limits the number of images to n_images.

        Args:
            n_images: Number of images to include in the dataset.
                The original proportion of images between splits will be respected.
            seed: Random seed.

        Returns:
            The DetectionDataset instance. This allows for method cascading.
        """

        data_by_image = self.set_format(index="image")

        if self.n_images > len(data_by_image):
            raise ValueError(
                "The number of images to include in the dataset is greater than the number of existing images."
            )

        split_data = []

        for split in self.splits:
            sample_size = int(n_images * self.split_proportions[split])
            split_data.append(
                data_by_image.loc[data_by_image.split == split, :].sample(n=sample_size, random_state=seed)
            )

        self._data = pd.concat(split_data)

        return self

    def shuffle(self, seed: int = 42) -> DetectionDataset:
        """Shuffles the dataset.

        Args:
            seed: Random seed.

        Returns:
            The DetectionDataset instance. This allows for method cascading.
        """

        data_by_image = self.set_format(index="image")

        split_data = []

        for split in self.splits:
            split_data.append(data_by_image.loc[data_by_image.split == split, :].sample(frac=1, random_state=seed))

        self._data = pd.concat(split_data)

        return self

    def split(self, splits: Iterable[float]) -> DetectionDataset:
        """Splits the dataset into train, val and test.

        Args:
            splits: Iterable containing the proportion of images to include in the train, val and test splits.
                The sum of the values in the iterable must be equal to 1.
                The original splits will be overwritten.

        Returns:
            The DetectionDataset instance. This allows for method cascading.
        """

        if len(splits) != 3:
            raise ValueError("The splits must contain 3 elements.")

        if sum(splits) != 1:
            raise ValueError(f"The sum of the proportion for each split must be equal to 1, here it is: {sum(splits)}.")

        data_by_image = self.set_format(index="image")

        n_train = int(splits[0] * len(data_by_image))
        n_val = int(n_train + splits[1] * len(data_by_image))
        n_test = int(n_val + splits[2] * len(data_by_image))

        data_by_image = data_by_image.sample(frac=1, random_state=42)
        data_train, data_val, data_test, _ = np.split(data_by_image, [n_train, n_val, n_test])
        data_train["split"] = TRAIN
        data_val["split"] = VALIDATION
        data_test["split"] = TEST

        self._data = pd.concat([data_train, data_val, data_test])

        return self

    def map_categories(self, mapping: dict[str, str]) -> DetectionDataset:
        """Maps the categories to the new categories.

        The new categoy names replace the existing ones.
        Annotations with categories not present in the mapping are dropped.
        The new category_ids correspond the the rank of the new categories in alphabetical order.

        Args:
            mapping: A dictionnary mapping original categories to new categories.

        Returns:
            The DetectionDataset instance. This allows for method cascading.
        """

        data = self.set_format(index="bbox").reset_index()
        data["category"] = data.loc[:, "category"].map(mapping)
        data = data[~data.category.isna()]

        categories = sorted(data.category.unique())
        data["category_id"] = data.loc[:, "category"].apply(lambda cat: categories.index(cat))

        self._data = data.set_index(["image_id", "bbox_id"])

        return self

    def show(self, image_id: int = None) -> PILImage:
        """Show the image with bounding boxes and labels.

        Args:
            image_id: Id of the image.
                If not provided, a random image is selected.
                Defaults to None.

        Returns:
            Image with bounding boxes and labels.
        """

        data = self.set_format(index="bbox")

        if image_id is None:
            index = np.random.randint(0, len(data))
            image_id = data.reset_index().iloc[index]["image_id"]

        rows = data.loc[image_id]

        image = show_image_bbox(rows=rows)

        print(f"Showing image id {image_id}.")

        return image

    @property
    def n_images(self) -> int:
        """Returns the number of images in the dataset.

        Returns:
            The number of images in the dataset.
        """

        data = self.set_format(index="image")

        return len(data)

    @property
    def n_bbox(self) -> int:
        """Returns the number of images in the dataset.

        Returns:
            The number of images in the dataset.
        """

        data = self.set_format(index="bbox")

        return len(data)

    @property
    def splits(self) -> list[str]:
        """Returns the splits of the dataset.

        Returns:
            The splits present in the dataset.
        """

        return self._data.split.unique().tolist()

    @property
    def split_proportions(self) -> pd.DataFrame:
        """Returns the proportion of images in the train, val and test splits.

        Returns:
            The proportion of images in the train, val and test splits.
        """

        data = self.set_format(index="image")

        return pd.DataFrame({s: [len(data[data.split == s]) / len(data)] for s in self.splits})

    @property
    def categories(self) -> pd.DataFrame:
        """Creates a DataFrame containing the categories found in the data with their id.

        Returns:
            A dataframe containing the categories with the category_id as index.
        """

        data = self.set_format(index="bbox")

        return (
            data.loc[:, ["category_id", "category"]]
            .drop_duplicates()
            .astype({"category_id": int, "category": "object"})
            .sort_values("category_id")
            .set_index("category_id")
        )

    @property
    def category_names(self) -> list[str]:
        """Returns the categories names.

        Returns:
            The categories names.
        """

        return list(self.categories["category"].unique())

    @property
    def n_categories(self) -> int:
        """Returns the number of categories.

        Returns:
            The number of categories.
        """

        return self.categories["category"].nunique()

    def __del__(self) -> None:
        self.delete()

    def delete(self) -> None:
        """Delete the instance and the temporary directory it may use.

        The temporary directory is created by a DetectionDataset instance when calling the `from_hub()` method, and is
        used to store image files.
        """

        shutil.rmtree(self.temp_dir_instance)
        print("The instance and its temporary directory have been deleted.")

__init__(data=None) #

Initialize the dataset.

Don't call the constructr directly, use from_hub() or from_disk() methods instead.

Parameters:

Name Type Description Default
data pd.DataFrame

The data used to initialize the dataset. Defaults to None.

None
Source code in detection_datasets/detection_dataset.py
def __init__(self, data: pd.DataFrame = None) -> None:
    """Initialize the dataset.

    Don't call the constructr directly, use `from_hub()` or `from_disk()` methods instead.

    Args:
        data: The data used to initialize the dataset.
            Defaults to None.
    """

    self._format = "init"

    if data is not None:
        self._concat(data)

categories() property #

Creates a DataFrame containing the categories found in the data with their id.

Returns:

Type Description
pd.DataFrame

A dataframe containing the categories with the category_id as index.

Source code in detection_datasets/detection_dataset.py
@property
def categories(self) -> pd.DataFrame:
    """Creates a DataFrame containing the categories found in the data with their id.

    Returns:
        A dataframe containing the categories with the category_id as index.
    """

    data = self.set_format(index="bbox")

    return (
        data.loc[:, ["category_id", "category"]]
        .drop_duplicates()
        .astype({"category_id": int, "category": "object"})
        .sort_values("category_id")
        .set_index("category_id")
    )

category_names() property #

Returns the categories names.

Returns:

Type Description
list[str]

The categories names.

Source code in detection_datasets/detection_dataset.py
@property
def category_names(self) -> list[str]:
    """Returns the categories names.

    Returns:
        The categories names.
    """

    return list(self.categories["category"].unique())

data() property #

Getter for the data, with annotations grouped by images.

Returns:

Type Description
pd.DataFrame

The data contained in the dataset as a Pandas DataFrame.

Source code in detection_datasets/detection_dataset.py
@property
def data(self) -> pd.DataFrame:
    """Getter for the data, with annotations grouped by images.

    Returns:
        The data contained in the dataset as a Pandas DataFrame.
    """

    return self.get_data()

delete() #

Delete the instance and the temporary directory it may use.

The temporary directory is created by a DetectionDataset instance when calling the from_hub() method, and is used to store image files.

Source code in detection_datasets/detection_dataset.py
def delete(self) -> None:
    """Delete the instance and the temporary directory it may use.

    The temporary directory is created by a DetectionDataset instance when calling the `from_hub()` method, and is
    used to store image files.
    """

    shutil.rmtree(self.temp_dir_instance)
    print("The instance and its temporary directory have been deleted.")

format() property #

Getter for the current format of the data, which can either be "image" or "bbox".

Returns:

Type Description
str

The current format of the data.

Source code in detection_datasets/detection_dataset.py
@property
def format(self) -> str:
    """Getter for the current format of the data, which can either be "image" or "bbox".

    Returns:
        The current format of the data.
    """

    return self._format

from_disk(dataset_format, path, **kwargs) #

Load a dataset from disk.

This is a factory method that can read the dataset from different formats, when the dataset is already in a local directory.

Parameters:

Name Type Description Default
dataset_format str

Format of the dataset. Currently supported values and formats: - "coco": COCO format

required
path str

Path to the dataset on the local filesystem.

required
**kwargs

Keyword arguments specific to the dataset_format.

{}

Returns:

Type Description
DetectionDataset

The DetectionDataset instance. This allows for method cascading.

Example
config = {
    "dataset_format": "coco",
    "path": "PATH/TO/DATASET",
    "splits": {
        "train": (train_annotations.json, 'train'),
        "val": (test_annotations.json, 'test'),
    },
}
dd = DetectionDataset().from_disk(**config)
Source code in detection_datasets/detection_dataset.py
def from_disk(self, dataset_format: str, path: str, **kwargs) -> DetectionDataset:
    """Load a dataset from disk.

    This is a factory method that can read the dataset from different formats,
    when the dataset is already in a local directory.

    Args:
        dataset_format: Format of the dataset.
            Currently supported values and formats:
            - "coco": COCO format
        path: Path to the dataset on the local filesystem.
        **kwargs: Keyword arguments specific to the dataset_format.

    Returns:
        The DetectionDataset instance. This allows for method cascading.

    Example:
        ```Python
        config = {
            "dataset_format": "coco",
            "path": "PATH/TO/DATASET",
            "splits": {
                "train": (train_annotations.json, 'train'),
                "val": (test_annotations.json, 'test'),
            },
        }
        dd = DetectionDataset().from_disk(**config)
        ```
    """

    reader = reader_factory.get(dataset_format=dataset_format.lower(), path=path, **kwargs)
    data = reader.read()

    self._concat(other_data=data)

    return self

from_hub(dataset_name, repo_name=ORGANISATION) #

Load a dataset from the Hugging Face Hub.

Parameters:

Name Type Description Default
dataset_name str

name of the dataset, without the organisation's prefix.

required
repo_name str

name of the Hugging Face profile or organisation where the dataset is stored. Defaults to "detection-datasets".

ORGANISATION

Returns:

Type Description
DetectionDataset

The DetectionDataset instance. This allows for method cascading.

Source code in detection_datasets/detection_dataset.py
def from_hub(self, dataset_name: str, repo_name: str = ORGANISATION) -> DetectionDataset:
    """Load a dataset from the Hugging Face Hub.

    Args:
        dataset_name: name of the dataset, without the organisation's prefix.
        repo_name: name of the Hugging Face profile or organisation where the dataset is stored.
            Defaults to "detection-datasets".

    Returns:
        The DetectionDataset instance. This allows for method cascading.
    """

    if dataset_name not in available_in_hub(repo_name=repo_name):
        raise ValueError(
            f"""{dataset_name} is not available on the Hub.
        Use `DetectionDataset.available_in_hub() to get the list of available datasets."""
        )

    path = "/".join([repo_name, dataset_name])
    ds = load_dataset(path=path)
    categories = ds[list(ds.keys())[0]].features["objects"].feature["category"]

    def download_images(row):
        file_path = "".join([self.temp_dir_instance, "/", str(row["image_id"]), ".jpg"])
        row["image"].save(file_path)
        row["image_path"] = file_path
        return row

    ds = ds.map(
        download_images,
        remove_columns="image",
        load_from_cache_file=False,
        desc="Extracting images from parquet",
    )

    df_splits = []
    for key in ds.keys():
        df_split = ds[key].to_pandas()
        df_split["split"] = key

        df_splits.append(df_split)

    df = pd.concat(df_splits)
    df = df.reset_index(drop=True)
    objects = pd.json_normalize(df["objects"])
    data = df.join(objects)

    if "image_path" not in data.columns:
        data["image_path"] = [x["bytes"] for x in data.loc[:, "image"]]

    data = data.drop(columns=["objects", "image"], errors="ignore")
    data["category_id"] = data.loc[:, "category"]
    data["category"] = [[categories.int2str(int(x)) for x in row["category"]] for _, row in data.iterrows()]

    data = data.explode(["bbox_id", "category_id", "category", "bbox", "area"])
    data["bbox"] = [Bbox.from_voc(row.bbox, row.width, row.height, row.bbox_id) for _, row in data.iterrows()]

    self._concat(other_data=data)

    return self

get_data(index='image') #

Getter for the data, with the possibility to specify the format.

Parameters:

Name Type Description Default
index str

The desired format of the data. Can be either "image" or "bbox". Defaults to "image".

'image'

Returns:

Type Description
pd.DataFrame

The data contained in the dataset as a Pandas DataFrame in the specified format.

Source code in detection_datasets/detection_dataset.py
def get_data(self, index: str = "image") -> pd.DataFrame:
    """Getter for the data, with the possibility to specify the format.

    Args:
        index: The desired format of the data.
            Can be either "image" or "bbox".
            Defaults to "image".

    Returns:
        The data contained in the dataset as a Pandas DataFrame in the specified format.
    """

    data = self.set_format(index=index)

    return data

map_categories(mapping) #

Maps the categories to the new categories.

The new categoy names replace the existing ones. Annotations with categories not present in the mapping are dropped. The new category_ids correspond the the rank of the new categories in alphabetical order.

Parameters:

Name Type Description Default
mapping dict[str, str]

A dictionnary mapping original categories to new categories.

required

Returns:

Type Description
DetectionDataset

The DetectionDataset instance. This allows for method cascading.

Source code in detection_datasets/detection_dataset.py
def map_categories(self, mapping: dict[str, str]) -> DetectionDataset:
    """Maps the categories to the new categories.

    The new categoy names replace the existing ones.
    Annotations with categories not present in the mapping are dropped.
    The new category_ids correspond the the rank of the new categories in alphabetical order.

    Args:
        mapping: A dictionnary mapping original categories to new categories.

    Returns:
        The DetectionDataset instance. This allows for method cascading.
    """

    data = self.set_format(index="bbox").reset_index()
    data["category"] = data.loc[:, "category"].map(mapping)
    data = data[~data.category.isna()]

    categories = sorted(data.category.unique())
    data["category_id"] = data.loc[:, "category"].apply(lambda cat: categories.index(cat))

    self._data = data.set_index(["image_id", "bbox_id"])

    return self

n_bbox() property #

Returns the number of images in the dataset.

Returns:

Type Description
int

The number of images in the dataset.

Source code in detection_datasets/detection_dataset.py
@property
def n_bbox(self) -> int:
    """Returns the number of images in the dataset.

    Returns:
        The number of images in the dataset.
    """

    data = self.set_format(index="bbox")

    return len(data)

n_categories() property #

Returns the number of categories.

Returns:

Type Description
int

The number of categories.

Source code in detection_datasets/detection_dataset.py
@property
def n_categories(self) -> int:
    """Returns the number of categories.

    Returns:
        The number of categories.
    """

    return self.categories["category"].nunique()

n_images() property #

Returns the number of images in the dataset.

Returns:

Type Description
int

The number of images in the dataset.

Source code in detection_datasets/detection_dataset.py
@property
def n_images(self) -> int:
    """Returns the number of images in the dataset.

    Returns:
        The number of images in the dataset.
    """

    data = self.set_format(index="image")

    return len(data)

select(n_images, seed=42) #

Limits the number of images to n_images.

Parameters:

Name Type Description Default
n_images int

Number of images to include in the dataset. The original proportion of images between splits will be respected.

required
seed int

Random seed.

42

Returns:

Type Description
DetectionDataset

The DetectionDataset instance. This allows for method cascading.

Source code in detection_datasets/detection_dataset.py
def select(self, n_images: int, seed: int = 42) -> DetectionDataset:
    """Limits the number of images to n_images.

    Args:
        n_images: Number of images to include in the dataset.
            The original proportion of images between splits will be respected.
        seed: Random seed.

    Returns:
        The DetectionDataset instance. This allows for method cascading.
    """

    data_by_image = self.set_format(index="image")

    if self.n_images > len(data_by_image):
        raise ValueError(
            "The number of images to include in the dataset is greater than the number of existing images."
        )

    split_data = []

    for split in self.splits:
        sample_size = int(n_images * self.split_proportions[split])
        split_data.append(
            data_by_image.loc[data_by_image.split == split, :].sample(n=sample_size, random_state=seed)
        )

    self._data = pd.concat(split_data)

    return self

set_format(index) #

Set the format of the data.

The data contained in the dataset can either have: - One row per image, with the annotations grouped as a list - One row per annotation, with each image appearing on multiple rows

Parameters:

Name Type Description Default
index str

How to organise the data, can be "image" or "bbox".

required

Raises:

Type Description
ValueError

If the specified format is unknown.

Returns:

Type Description
pd.DataFrame

Data contained in the dataset.

Source code in detection_datasets/detection_dataset.py
def set_format(self, index: str) -> pd.DataFrame:
    """Set the format of the data.

    The data contained in the dataset can either have:
    - One row per image, with the annotations grouped as a list
    - One row per annotation, with each image appearing on multiple rows

    Args:
        index: How to organise the data, can be "image" or "bbox".

    Raises:
        ValueError: If the specified format is unknown.

    Returns:
        Data contained in the dataset.
    """

    if index == self._format:
        pass
    elif index == "image":
        self._data_by_image()
    elif index == "bbox":
        self._data_by_bbox()
    else:
        raise ValueError(f"The index must be either 'image' or 'bbox', not '{index}'.")

    return self._data.copy()

show(image_id=None) #

Show the image with bounding boxes and labels.

Parameters:

Name Type Description Default
image_id int

Id of the image. If not provided, a random image is selected. Defaults to None.

None

Returns:

Type Description
PILImage

Image with bounding boxes and labels.

Source code in detection_datasets/detection_dataset.py
def show(self, image_id: int = None) -> PILImage:
    """Show the image with bounding boxes and labels.

    Args:
        image_id: Id of the image.
            If not provided, a random image is selected.
            Defaults to None.

    Returns:
        Image with bounding boxes and labels.
    """

    data = self.set_format(index="bbox")

    if image_id is None:
        index = np.random.randint(0, len(data))
        image_id = data.reset_index().iloc[index]["image_id"]

    rows = data.loc[image_id]

    image = show_image_bbox(rows=rows)

    print(f"Showing image id {image_id}.")

    return image

shuffle(seed=42) #

Shuffles the dataset.

Parameters:

Name Type Description Default
seed int

Random seed.

42

Returns:

Type Description
DetectionDataset

The DetectionDataset instance. This allows for method cascading.

Source code in detection_datasets/detection_dataset.py
def shuffle(self, seed: int = 42) -> DetectionDataset:
    """Shuffles the dataset.

    Args:
        seed: Random seed.

    Returns:
        The DetectionDataset instance. This allows for method cascading.
    """

    data_by_image = self.set_format(index="image")

    split_data = []

    for split in self.splits:
        split_data.append(data_by_image.loc[data_by_image.split == split, :].sample(frac=1, random_state=seed))

    self._data = pd.concat(split_data)

    return self

split(splits) #

Splits the dataset into train, val and test.

Parameters:

Name Type Description Default
splits Iterable[float]

Iterable containing the proportion of images to include in the train, val and test splits. The sum of the values in the iterable must be equal to 1. The original splits will be overwritten.

required

Returns:

Type Description
DetectionDataset

The DetectionDataset instance. This allows for method cascading.

Source code in detection_datasets/detection_dataset.py
def split(self, splits: Iterable[float]) -> DetectionDataset:
    """Splits the dataset into train, val and test.

    Args:
        splits: Iterable containing the proportion of images to include in the train, val and test splits.
            The sum of the values in the iterable must be equal to 1.
            The original splits will be overwritten.

    Returns:
        The DetectionDataset instance. This allows for method cascading.
    """

    if len(splits) != 3:
        raise ValueError("The splits must contain 3 elements.")

    if sum(splits) != 1:
        raise ValueError(f"The sum of the proportion for each split must be equal to 1, here it is: {sum(splits)}.")

    data_by_image = self.set_format(index="image")

    n_train = int(splits[0] * len(data_by_image))
    n_val = int(n_train + splits[1] * len(data_by_image))
    n_test = int(n_val + splits[2] * len(data_by_image))

    data_by_image = data_by_image.sample(frac=1, random_state=42)
    data_train, data_val, data_test, _ = np.split(data_by_image, [n_train, n_val, n_test])
    data_train["split"] = TRAIN
    data_val["split"] = VALIDATION
    data_test["split"] = TEST

    self._data = pd.concat([data_train, data_val, data_test])

    return self

split_proportions() property #

Returns the proportion of images in the train, val and test splits.

Returns:

Type Description
pd.DataFrame

The proportion of images in the train, val and test splits.

Source code in detection_datasets/detection_dataset.py
@property
def split_proportions(self) -> pd.DataFrame:
    """Returns the proportion of images in the train, val and test splits.

    Returns:
        The proportion of images in the train, val and test splits.
    """

    data = self.set_format(index="image")

    return pd.DataFrame({s: [len(data[data.split == s]) / len(data)] for s in self.splits})

splits() property #

Returns the splits of the dataset.

Returns:

Type Description
list[str]

The splits present in the dataset.

Source code in detection_datasets/detection_dataset.py
@property
def splits(self) -> list[str]:
    """Returns the splits of the dataset.

    Returns:
        The splits present in the dataset.
    """

    return self._data.split.unique().tolist()

temp_dir_instance() property #

Name of the temporary directory used by the DetectionDataset instance.

When downloading images from the Hub, images are first downloaded as parquet files in the Hugging Face cache directory, before being extracted as jpeg files in the detection_dataset cache. Each instance creates its own sub-directory in the detection_dataset cache. The subdirectory for an instance is named after its id.

Returns:

Type Description
str

Path to the ssubdirectory for the instance in the detection_dataset cache.

Source code in detection_datasets/detection_dataset.py
@property
def temp_dir_instance(self) -> str:
    """Name of the temporary directory used by the DetectionDataset instance.

    When downloading images from the Hub, images are first downloaded as parquet files in the Hugging Face
    cache directory, before being extracted as jpeg files in the detection_dataset cache.
    Each instance creates its own sub-directory in the detection_dataset cache.
    The subdirectory for an instance is named after its id.

    Returns:
        Path to the ssubdirectory for the instance in the detection_dataset cache.
    """

    lib_temp_dir = get_temp_dir()
    temp_dir = os.path.join(lib_temp_dir.as_posix(), str(id(self)))
    os.makedirs(temp_dir, exist_ok=True)

    return temp_dir

to_disk(dataset_format, name, absolute_path) #

Write the dataset to disk.

This is a factory method that can write the dataset to disk in the selected format (e.g. COCO, MMDET, YOLO)

Parameters:

Name Type Description Default
dataset_format str

Format of the dataset. Currently supported formats: - "yolo": YOLO format - "mmdet": MMDET internal format, see: https://mmdetection.readthedocs.io/en/latest/tutorials/customize_dataset.html#reorganize-new-data-format-to-middle-format - "coco": COCO format

required
name str

Name of the dataset to be created in the "path" directory.

required
absolute_path str

Absolute path to the directory where the dataset will be created.

required
**kwargs

Keyword arguments specific to the dataset_format.

required

Returns:

Type Description
DetectionDataset

The DetectionDataset instance. This allows for method cascading.

Source code in detection_datasets/detection_dataset.py
def to_disk(self, dataset_format: str, name: str, absolute_path: str) -> DetectionDataset:
    """Write the dataset to disk.

    This is a factory method that can write the dataset to disk in the selected format (e.g. COCO, MMDET, YOLO)

    Args:
        dataset_format: Format of the dataset.
            Currently supported formats:
            - "yolo": YOLO format
            - "mmdet": MMDET internal format, see:
                https://mmdetection.readthedocs.io/en/latest/tutorials/customize_dataset.html#reorganize-new-data-format-to-middle-format
            - "coco": COCO format
        name: Name of the dataset to be created in the "path" directory.
        absolute_path: Absolute path to the directory where the dataset will be created.
        **kwargs: Keyword arguments specific to the dataset_format.

    Returns:
        The DetectionDataset instance. This allows for method cascading.
    """

    writer = writer_factory.get(dataset_format=dataset_format.lower(), dataset=self, name=name, path=absolute_path)
    writer.write()

    return self

to_hub(dataset_name, repo_name, **kwargs) #

Push the dataset to the hub as a Parquet dataset.

This method wraps Hugging Face's DatasetDict.push_to_hub() method.

The dataset is pushed as a DatasetDict, meaning the each split (train, val, test), if present, will be a separate Dataset instance inside this DatasetDict.

Parameters:

Name Type Description Default
dataset_name str

name of the dataset inside the user/organisation's repository.

required
repo_name str

user of organisation to push the dataset to.

required

Returns:

Type Description
DetectionDataset

The DetectionDataset instance. This allows for method cascading.

Source code in detection_datasets/detection_dataset.py
def to_hub(self, dataset_name: str, repo_name: str, **kwargs) -> DetectionDataset:
    """Push the dataset to the hub as a Parquet dataset.

    This method wraps Hugging Face's DatasetDict.push_to_hub() method.

    The dataset is pushed as a DatasetDict, meaning the each split (train, val, test), if present,
    will be a separate Dataset instance inside this DatasetDict.

    Args:
        dataset_name: name of the dataset inside the user/organisation's repository.
        repo_name: user of organisation to push the dataset to.

    Returns:
        The DetectionDataset instance. This allows for method cascading.
    """

    repo_id = "/".join([repo_name, dataset_name])

    hf_dataset_dict = self._get_hf_dataset()
    hf_dataset_dict.push_to_hub(repo_id=repo_id, **kwargs)
    print(f"The dataset was uploaded to https://huggingface.co/datasets/{repo_id}")

    return self