Skip to content

unitorch.cli.models¤

Image¤

Processor for image-related operations.

Source code in src/unitorch/cli/models/image_utils.py
25
26
27
28
29
30
31
32
33
def __init__(
    self,
    image_type: Optional[str] = None,
    image_size: tuple = (256, 256),
    http_url: Optional[str] = None,
):
    self.image_type = image_type
    self.image_size = image_size
    self.http_url = http_url

image_type instance-attribute ¤

image_type = image_type

image_size instance-attribute ¤

image_size = image_size

http_url instance-attribute ¤

http_url = http_url

from_config classmethod ¤

from_config(config, **kwargs)
Source code in src/unitorch/cli/models/image_utils.py
35
36
37
38
@classmethod
@config_defaults_init("core/process/image")
def from_config(cls, config, **kwargs):
    pass

_request_url ¤

_request_url(url)

Retry-loop GET request returning the response.

Source code in src/unitorch/cli/models/image_utils.py
40
41
42
43
44
45
46
47
def _request_url(self, url):
    """Retry-loop GET request returning the response."""
    while True:
        try:
            doc = requests.get(url, timeout=600)
            return doc
        except Exception:
            time.sleep(random() * 2)

_read ¤

_read(image, image_type=None)

Read an image from a file path, base64/hex string, or HTTP URL.

Source code in src/unitorch/cli/models/image_utils.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
@register_process("core/process/image/read")
def _read(
    self,
    image,
    image_type=None,
):
    """Read an image from a file path, base64/hex string, or HTTP URL."""
    image_type = image_type if image_type is not None else self.image_type
    try:
        if image_type == "base64":
            image = io.BytesIO(base64.b64decode(image))
            return Image.open(image).convert("RGB")

        if image_type == "hex":
            image = io.BytesIO(bytes.fromhex(image))
            return Image.open(image).convert("RGB")

        if self.http_url is None:
            return Image.open(image).convert("RGB")

        url = self.http_url.format(image)
        doc = self._request_url(url)
        if doc.status_code != 200 or doc.content == b"":
            raise ValueError(f"can't find the image {image}")

        return Image.open(io.BytesIO(doc.content)).convert("RGB")
    except Exception:
        logging.debug(f"core/process/image/read use fake image for {image}")
        return Image.new("RGB", self.image_size, (255, 255, 255))

_translate ¤

_translate(
    image: Image,
    up: Optional[int] = 0,
    left: Optional[int] = 0,
)

Translate the image by (left, up) pixels using an affine transform.

Source code in src/unitorch/cli/models/image_utils.py
79
80
81
82
83
84
85
86
87
@register_process("core/process/image/translate")
def _translate(
    self,
    image: Image.Image,
    up: Optional[int] = 0,
    left: Optional[int] = 0,
):
    """Translate the image by (left, up) pixels using an affine transform."""
    return image.transform(image.size, Image.AFFINE, (1, 0, left, 0, 1, up))

_scale ¤

_scale(image: Image, scale: Optional[float] = 0.0)

Scale the image by the given factor.

Source code in src/unitorch/cli/models/image_utils.py
89
90
91
92
93
94
95
96
@register_process("core/process/image/scale")
def _scale(
    self,
    image: Image.Image,
    scale: Optional[float] = 0.0,
):
    """Scale the image by the given factor."""
    return ImageOps.scale(image, scale)

_rotate ¤

_rotate(image: Image, degree: Optional[int] = 0.0)

Rotate the image by the given degree with expansion.

Source code in src/unitorch/cli/models/image_utils.py
 98
 99
100
101
102
103
104
105
@register_process("core/process/image/rotate")
def _rotate(
    self,
    image: Image.Image,
    degree: Optional[int] = 0.0,
):
    """Rotate the image by the given degree with expansion."""
    return image.rotate(degree, Image.NEAREST, expand=1)

_flip ¤

_flip(
    image: Image,
    horizontal: Optional[bool] = False,
    vertical: Optional[bool] = False,
)

Flip the image horizontally and/or vertically.

Source code in src/unitorch/cli/models/image_utils.py
107
108
109
110
111
112
113
114
115
116
117
118
119
@register_process("core/process/image/flip")
def _flip(
    self,
    image: Image.Image,
    horizontal: Optional[bool] = False,
    vertical: Optional[bool] = False,
):
    """Flip the image horizontally and/or vertically."""
    if horizontal:
        image = image.transpose(Image.FLIP_LEFT_RIGHT)
    if vertical:
        image = image.transpose(Image.FLIP_TOP_BOTTOM)
    return image

_resize ¤

_resize(
    image: Image,
    size: Optional[Tuple[int, int]] = (256, 256),
)

Resize the image using Lanczos resampling.

Source code in src/unitorch/cli/models/image_utils.py
121
122
123
124
125
126
127
128
@register_process("core/process/image/resize")
def _resize(
    self,
    image: Image.Image,
    size: Optional[Tuple[int, int]] = (256, 256),
):
    """Resize the image using Lanczos resampling."""
    return image.resize(size, resample=Image.LANCZOS)

_canny ¤

_canny(image: Image)

Detect edges using Canny (OpenCV if available, otherwise PIL FIND_EDGES).

Source code in src/unitorch/cli/models/image_utils.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
@register_process("core/process/image/canny")
def _canny(
    self,
    image: Image.Image,
):
    """Detect edges using Canny (OpenCV if available, otherwise PIL FIND_EDGES)."""
    if is_opencv_available():
        import cv2

        image = np.array(image, np.uint8)
        image = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
        image = cv2.Canny(image, 100, 200)
        image = Image.fromarray(image)
    else:
        image = image.convert("L")
        image = image.filter(ImageFilter.FIND_EDGES)
    return image

_mask ¤

_mask(image: Image, mask: Image)
Source code in src/unitorch/cli/models/image_utils.py
148
149
150
151
152
153
154
155
156
157
@register_process("core/process/image/mask")
def _mask(
    self,
    image: Image.Image,
    mask: Image.Image,
):
    result = Image.new("RGBA", image.size, (0, 0, 0, 0))
    mask = mask.convert("L").resize(image.size, resample=Image.LANCZOS)
    result.paste(image, (0, 0), mask)
    return result

_dilate ¤

_dilate(
    image: Image,
    kernel_size: Optional[int] = 3,
    iterations: Optional[int] = 1,
)

Dilate the image using an ellipse kernel (requires OpenCV).

Source code in src/unitorch/cli/models/image_utils.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
@register_process("core/process/image/dilate")
def _dilate(
    self,
    image: Image.Image,
    kernel_size: Optional[int] = 3,
    iterations: Optional[int] = 1,
):
    """Dilate the image using an ellipse kernel (requires OpenCV)."""
    if is_opencv_available():
        import cv2

        image = np.array(image, np.uint8)
        kernel = cv2.getStructuringElement(
            cv2.MORPH_ELLIPSE, (kernel_size, kernel_size)
        )
        image = cv2.dilate(image, kernel, iterations=iterations)
        return Image.fromarray(image)
    else:
        raise NotImplementedError("Dilate operation requires OpenCV.")

_crop ¤

_crop(image: Image, box: Tuple[int, int, int, int])

Crop the image to the given (left, upper, right, lower) box.

Source code in src/unitorch/cli/models/image_utils.py
179
180
181
182
183
184
185
186
@register_process("core/process/image/crop")
def _crop(
    self,
    image: Image.Image,
    box: Tuple[int, int, int, int],
):
    """Crop the image to the given (left, upper, right, lower) box."""
    return image.crop(box)

_center_crop ¤

_center_crop(
    image: Image,
    size: Optional[Tuple[int, int]] = (224, 224),
)

Crop the image to the given size from the center.

Source code in src/unitorch/cli/models/image_utils.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
@register_process("core/process/image/center_crop")
def _center_crop(
    self,
    image: Image.Image,
    size: Optional[Tuple[int, int]] = (224, 224),
):
    """Crop the image to the given size from the center."""
    width, height = image.size
    left = (width - size[0]) // 2
    top = (height - size[1]) // 2
    left, top = max(0, left), max(0, top)
    right = left + size[0]
    bottom = top + size[1]
    right, bottom = min(width, right), min(height, bottom)
    return image.crop((left, top, right, bottom))

Labels¤

Processor for label-related operations.

Source code in src/unitorch/cli/models/label_utils.py
18
19
20
21
22
23
24
25
26
27
28
def __init__(
    self,
    num_classes: Optional[int] = None,
    sep: Optional[str] = ",",
    max_seq_length: Optional[int] = 128,
    map_dict: Optional[Dict] = None,
):
    self.num_classes = num_classes
    self.sep = sep
    self.max_seq_length = max_seq_length
    self.map_dict = map_dict if map_dict is not None else {}

num_classes instance-attribute ¤

num_classes = num_classes

sep instance-attribute ¤

sep = sep

max_seq_length instance-attribute ¤

max_seq_length = max_seq_length

map_dict instance-attribute ¤

map_dict = map_dict if map_dict is not None else {}

from_config classmethod ¤

from_config(config, **kwargs)
Source code in src/unitorch/cli/models/label_utils.py
30
31
32
33
@classmethod
@config_defaults_init("core/process/label")
def from_config(cls, config, **kwargs):
    pass

_label ¤

_label(
    text: Union[int, float, str],
    dtype: Optional[str] = "int",
)

Convert a scalar label to a ClassificationTargets tensor.

Source code in src/unitorch/cli/models/label_utils.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@register_process("core/process/label")
def _label(
    self,
    text: Union[int, float, str],
    dtype: Optional[str] = "int",
):
    """Convert a scalar label to a ClassificationTargets tensor."""
    if text in self.map_dict:
        text = self.map_dict[text]

    if dtype == "int":
        outputs = torch.tensor(int(text))
    else:
        outputs = torch.tensor(float(text))
    return ClassificationTargets(targets=outputs)

_sequence_label ¤

_sequence_label(
    text: Union[List, str],
    sep: Optional[str] = None,
    dtype: Optional[str] = "int",
)

Convert a sequence of labels to a ClassificationTargets tensor.

Source code in src/unitorch/cli/models/label_utils.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@register_process("core/process/label/sequence")
def _sequence_label(
    self,
    text: Union[List, str],
    sep: Optional[str] = None,
    dtype: Optional[str] = "int",
):
    """Convert a sequence of labels to a ClassificationTargets tensor."""
    if isinstance(text, str):
        sep = pop_value(sep, self.sep)
        tensor = [float(self.map_dict.get(t, t)) for t in text.split(sep)]
    else:
        tensor = [float(t) for t in text]

    if dtype == "int":
        outputs = torch.tensor(tensor).int()
    else:
        outputs = torch.tensor(tensor)
    return ClassificationTargets(targets=outputs)

_binary_label ¤

_binary_label(
    text: Union[List, str], sep: Optional[str] = None
)

Build a multi-hot binary ClassificationTargets tensor of length num_classes.

Source code in src/unitorch/cli/models/label_utils.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@register_process("core/process/label/binary")
def _binary_label(
    self,
    text: Union[List, str],
    sep: Optional[str] = None,
):
    """Build a multi-hot binary ClassificationTargets tensor of length num_classes."""
    outputs = torch.zeros(self.num_classes)
    if isinstance(text, str):
        sep = pop_value(sep, self.sep)
        indexes = [int(self.map_dict.get(t, t)) for t in text.split(sep)]
    else:
        indexes = [int(t) for t in text]
    outputs[indexes] = 1
    return ClassificationTargets(targets=outputs)