Source code for acutils.pathology

# Functions prefixed with "tmnt" are treatments and must have those parameters:
#  - src: absolute path to the file that will be processed (str)
#  - dstdir: absolute path to the directory that should contain new files (str)
# Also, nothing should be returned.

import math
import numpy as np
import os

try:
    import cv2
except ImportError:
    print("|WRN| You must install 'opencv-python' (cv2) to use pathology module. "
          "Leaving.")
    exit()

try:
    from openslide import OpenSlide
except ImportError:
    print("|WRN| You must install 'openslide-python' to use pathology module. "
          "Be careful, it also requires Openslide installation on your computer"
          ", the 'openslide-python' Python library is just a mapping. Leaving.")
    exit()

try:
    from PIL import Image
except ImportError:
    print("|WRN| You must install 'Pillow' to use pathology module. "
          "Leaving.")
    exit()
    
try:
    from skimage.io import imread, imsave
    from skimage.data import skin as skimgskin
except ImportError:
    print("|WRN| You must install 'scikit-image' to use pathology module. "
          "You might need a skimage optinal dependencie: 'pooch'. Leaving.")
    exit()

from . import gpu

Image.MAX_IMAGE_PIXELS = None
import numpy as aunp
import skimage as auski
gpu.set_gpu_computation() # if True import cucim.skimage as auski and cupy as aunp



[docs] def update_ref_image(img_path=None): ''' Change reference image using when calling harmonize function. If you called `gpu.set_gpu_computation(activate=True)`, calling will load the image on the GPU (using cupy instead of numpy). PARAMETERS ---------- (str) img_path=None: Absolute path to the image that will be the reference to harmonize. RETURNS ------- None ''' global IHC_REF_IMAGE, IHC_H_REF, IHC_E_REF, IHC_D_REF if img_path is None: IHC_REF_IMAGE = aunp.array(IHC_REF_IMAGE) # for switching from CPU to GPU else: IHC_REF_IMAGE = aunp.array(imread(img_path)) ihc_hed = (auski.color.rgb2hed(IHC_REF_IMAGE)) null = aunp.zeros_like(ihc_hed[:, :, 0]) IHC_H_REF = auski.color.hed2rgb(aunp.stack((ihc_hed[:, :, 0], null, null), axis=-1)) IHC_E_REF = auski.color.hed2rgb(aunp.stack((null, ihc_hed[:, :, 1], null), axis=-1)) IHC_D_REF = auski.color.hed2rgb(aunp.stack((null, null, ihc_hed[:, :, 2]), axis=-1))
IHC_REF_IMAGE = skimgskin() update_ref_image() # default ref image is skimage.data.skin()
[docs] def harmonize(img): ''' Harmonize the image through hed conversion, erase the color and keep the stains. Then match the histogram with a reference image. You can change the reference image calling update_ref_image function. PARAMETERS ---------- (numpy.array or cupy.array of int) img: RGB image to harmonize. RETURNS ------- None REFERENCES ---------- [1] A. C. Ruifrok and D. A. Johnston: "Quantification of histochemical staining by color deconvolution.," Analytical and quantitative cytology and histology / the International Academy of Cytology [and] American Society of Cytology, vol. 23, no. 4, pp. 291-9, Aug. 2001. ''' ihc_hed = (auski.color.rgb2hed(img)) null = aunp.zeros_like(ihc_hed[:, :, 0]) ihc_h = auski.color.hed2rgb( aunp.stack((ihc_hed[:, :, 0], null, null), axis=-1)) hist_h = auski.exposure.match_histograms(ihc_h, IHC_H_REF) del ihc_h ihc_e = auski.color.hed2rgb( aunp.stack((null, ihc_hed[:, :, 1], null), axis=-1)) hist_e = auski.exposure.match_histograms(ihc_e, IHC_E_REF) del ihc_e ihc_d = auski.color.hed2rgb( aunp.stack((null, null, ihc_hed[:, :, 2]), axis=-1)) hist_d = auski.exposure.match_histograms(ihc_d, IHC_D_REF) del ihc_d, ihc_hed, null return (aunp.dstack((hist_h[:, :, 0], hist_e[:, :, 1], hist_d[:, :, 2]) )*255).astype(aunp.uint8)
[docs] def tmnt_harmonize(src, dstdir): ''' Read a segment, harmonize it calling harmonize function, then save it. PARAMETERS ---------- (str) src: Absolute path to the file that will be processed. (str) dstdir: Absolute path to the directory that should contain the new file. RETURNS ------- None ''' img = harmonize(aunp.array(imread(src))) imsave(os.path.join(dstdir, os.path.basename(src)), gpu.cupy_to_numpy(img), check_contrast=False)
[docs] def get_preview(slide, lvl=-1, divider=None): ''' Read the slide and returns it with low resolution. PARAMETERS ---------- (openslide.OpenSlide) slide: The slide. (int) lvl=-1: Level taken. (float) divider=None: Scale the preview by dividing the slide. The new width and height will be the old ones divided by the divider. It is also the size of the area used to load the slide, to ensure no memory overload. RETURNS ------- (numpy.array or cupy.array of int) preview: Scaled loaded slide. ''' if divider is None: preview = aunp.array(np.asarray(slide.get_thumbnail( slide.level_dimensions[lvl]))) else: width, height = slide.level_dimensions[lvl] pw, ph = int(width/divider), int(height/divider) # size of the preview tw, th = int(pw/divider), int(ph/divider) # tile size from the preview preview = aunp.zeros((pw, ph, 3), dtype=aunp.uint8) + 255 # Update preview area per area for i in range(divider): for j in range(divider): preview[j*th:(j+1)*th, i*tw:(i+1)*tw] = aunp.array( cv2.resize( np.array(slide.read_region((i*pw,j*ph), 0, (pw, ph)), dtype=np.uint8)[:,:,:3], dsize=(tw, th), interpolation=cv2.INTER_NEAREST ) ) return preview
[docs] def get_cleaned_binary(preview, fp_val=16, sigma=2): ''' Split foreground (1) and background (0) using mathematic morphology. PARAMETERS ---------- (numpy.array of cupy.array of int) preview: Fully loaded slide with low resolution. (int) fpval=16: Value that is used to creates footprints for segmentation. (float) sigma=2: Value for gaussian filter (applied on the slide before segmentation). RETURNS ------- (numpy.array of cupy.array of bool) bw: Binary image from the preview. ''' bw = auski.color.rgb2gray(preview) bw = auski.filters.threshold_otsu(bw) > auski.filters.gaussian(bw, sigma) bw = auski.morphology.opening(bw, auski.morphology.square(fp_val)) bw += auski.segmentation.clear_border(~bw) bw = auski.morphology.dilation(bw, auski.morphology.disk(int(fp_val*0.25))) return bw
[docs] def mask_rgb(rgb, mask): ''' Mask rgb image with a binary image. PARAMETERS ---------- (numpy.array or cupy.array of int) rgb: RGB image. (numpy.array or cupy.array of bool) mask: Binary image. RETURNS ------- (numpy.array or cupy.array of uint8) masked_rgb: RGB image masked with the binary image. ''' mask_rgb = aunp.repeat(mask[...,None],3,axis=2) return mask_rgb*rgb + (~mask_rgb*255).astype(aunp.uint8)
[docs] def browse_segments(slide, bw, lvl=0, required_area=10000, do_harmonize=False): ''' Find slices inside the preview and load it from de slide. PARAMETERS ---------- (openslide.OpenSlide) slide: The slide. (numpy.array of cupy.array of bool) bw: Binary image from the preview. (int) lvl=0: level taken. (int) required_area=10000: Minimum area (pixels) to keep the slice. (bool) do_harmonize=False: Harmonize slices using harmonize function. RETURNS ------- (generator of numpy.array of uint8) Segments: Found slices. ''' width, height = slide.level_dimensions[lvl] new_width, new_height = bw.shape[1], bw.shape[0] # for preview xresolution = width / new_width yresolution = height / new_height for region in auski.measure.regionprops(auski.measure.label(bw)): x, y = region.bbox[:2] w, h = region.bbox[2] - x, region.bbox[3] - y if w*h > required_area: x = int(x*xresolution) y = int(y*yresolution) w = int(w*xresolution) h = int(h*yresolution) segment_mask = auski.transform.resize(region.image, (w,h)) if do_harmonize: segment = harmonize(aunp.array(np.array(slide.read_region( (y,x), 0, (h, w)), dtype=aunp.uint8)[:,:,:3])) else: segment = aunp.array(np.array(slide.read_region( (y,x), 0, (h, w)), dtype=aunp.uint8)[:,:,:3]) yield gpu.cupy_to_numpy(mask_rgb(rgb=segment, mask=segment_mask))
[docs] def load_slice_preview_and_ext(src, lvl, divider=None, device=None): ''' Load slide file and extract a preview of it using get_preview function. PARAMETERS ---------- (str) src: Absolute path to the slide. (int) lvl: Slide level taken. (float) divider=None: Scale the preview by dividing the slide. (int) device=None: Taken GPU. RETURNS ------- (openslide.OpenSlide) slide: The slide. (numpy.array of cupy.array of int) preview: Fully loaded slide with low resolution. (str) slide_ext: Slide extension. ''' gpu.select_device(device) slide = OpenSlide(src) preview = get_preview(slide, lvl, divider) _, slide_ext = os.path.splitext(src) return slide, preview, slide_ext
[docs] def tmnt_save_preview_from_slide(src, dstdir, lvl, divider=None, ext="png", device=None): ''' Load slide preview and save it. PARAMETERS ---------- (str) src: Absolute path to the slide. (str) dstdir: Absolute path to the directory where to save the preview. (int) lvl: Slide level taken. (float) divider=None: Scale the preview by dividing the slide. (str) ext="png": Saved preview extension. (int) device=None: Taken GPU. RETURNS ------- None ''' _, preview, slide_ext = load_slice_preview_and_ext(src, lvl, divider, device) imsave(os.path.join(dstdir, f"{os.path.basename(src)[:-len(slide_ext)]}.{ext}"), gpu.cupy_to_numpy(preview), check_contrast=False)
[docs] def tmnt_save_segments_from_slide(src, dstdir, lvlpreview, lvlsegment, fpval, sigma, do_harmonize=False, divider=None, ext="png", device=None): ''' Find slices inside the slide and save them. PARAMETERS ---------- (str) src: Absolute path to the slide. (str) dstdir: Absolute path to the directory where to save the preview. (int) lvlpreview: Slide level taken for the preview. (int) lvlsegment: Slide level taken for the segment. (int) fpval: Value that is used to creates footprints for segmentation. (float) sigma: Value for gaussian filter (applied on the slide before segmentation). (bool) do_harmonize=False: Harmonize slices using harmonize function. (float) divider=None: Scale the preview by dividing the slide. (str) ext="png": Saved segments extension. (int) device=None: Taken GPU. RETURNS ------- None ''' slide, preview, slide_ext = load_slice_preview_and_ext(src, lvlpreview, divider, device) bw = get_cleaned_binary(preview, fpval, sigma) for i, segment in enumerate(browse_segments(slide, bw, lvlsegment, do_harmonize=do_harmonize)): imsave(os.path.join(dstdir, f"{os.path.basename(src)[:-len(slide_ext)]}_{i}.{ext}"), segment, check_contrast=False)
[docs] def browse_tiles(segment, size=512, blank_tol=0.35): ''' Regulary crop a segment into multiple tiles of same size. PARAMETERS ---------- (numpy.array of int) segment: Slice to tile. (int) size=512: Size of each tile. (float) blank_tol=0.35: Percentage of white pixels tolerated for a tile. RETURNS ------- (generator of tuple of int) coordinates: Tiles coordinates (x,y). - tiles generator of numpy.array of int): Keeped tiles. ''' width, height = segment.shape[1], segment.shape[0] blank_size = size*size*blank_tol # number of pixels that can be white # Center the tiles (if needed) extra_width = width - int(width/size)*size left_nudge = int(extra_width/2) extra_height = height - int(height/size)*size top_nudge = int(extra_height/2) # Tile the segment, yield if not too many white pixels for x in range(left_nudge, width-size, size): for y in range(top_nudge, height-size, size): if np.where(segment[y:y+size, x:x+size].sum(axis=2) == 765)[0].shape[0] <= blank_size: yield (x,y), segment[y:y+size, x:x+size]
[docs] def tmnt_save_tiles_from_segment(src, dstdir, size=512, blank_tol=0.35, ext="png"): ''' Find slices inside the slide and save them. PARAMETERS ---------- (str) src: Absolute path to the slide. (str) dstdir: Absolute path to the directory where to save the preview. (int) size=512: Size of each tile. (float) blank_tol=0.35: Percentage of white pixels tolerated for a tile. (str) ext="png": Saved tiles extension. RETURNS ------- None ''' segment = np.array(imread(src)) _, segment_ext = os.path.splitext(src) for i, (coords,tile) in enumerate( browse_tiles(segment, size=size, blank_tol=blank_tol)): imsave(os.path.join(dstdir, f"{os.path.basename(src)[:-len(segment_ext)]}" f"_{i}_x{coords[0]}_y{coords[1]}.{ext}"), tile, check_contrast=False)