Source code for acutils.video
# Functions prefixed with "tmnt" are treatments and must have those parameters:
# - src: absolute path to the file that will be processed (str)
# - dstdir: absolute path to the directory that should contain new files (str)
# Also, nothing should be returned.
import os
try:
import cv2
except ImportError:
print("|WRN| You must install 'opencv-python' (cv2) to use video module. "
"Leaving.")
exit()
[docs]
def browse_frames_for_binary_classification(src, bound, extra_down, extra_up,
start=0, end=None):
'''
Browse each frame of a video, from "start" to "end".
Skip from bound-extra_down to bound+extra_up.
PARAMETERS
------
(str) src:
absolute path to the video
(int) bound:
frame number that switch the status from "before" to "after"
(int) extra_down:
how many frames to skip before the bound
(int) extra_up:
how many frames to skip after the bound
(int) start=0:
frame number to start browsing frames
(int) end=None:
frame number to end browsing frames (if None, go for the full duration)
RETURNS
------
(generator of bool) passed:
True from entering inside the skipped range to the end
(generator of int) count:
frame numbers
(generator of numpy.array of uint8) frame:
BGR frames of the video
'''
srst = int(bound-extra_down) # skipped range start time
srse = int(bound+extra_up) # skipped range end time
cap = cv2.VideoCapture(src)
duration = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # as frame amount
duration = end if end is not None and duration < end else duration
count = 0 # use it to name frames
success, frame = cap.read() # load first frame
while success: # read it frame per frame
passed = count > srst # True if not before the skipped range
if passed: # skip frames between srst and srse
if srse >= duration: break # exit now if no more data
if count < srse: # go to the end of the skipped range
count += 1
success, frame = cap.read() # load next frame
continue
elif count <= start: # save nothing before start
count += 1
success, frame = cap.read() # load next frame
continue
yield passed, count, frame
count += 1
success, frame = cap.read()
[docs]
def tmnt_extract_frames_for_binary_classification(src, dstdir, bound,
extra_down, extra_up, start=0, end=None, fext="png", before_name="before",
after_name="after"):
'''
Extract frames of a video, from "start" to "end".
Skip from bound-extra_down to bound+extra_up.
Split them into 2 states "before" or "after" the skipped range.
PARAMETERS
------
(str) src:
absolute path to the video
(str) dstdir:
absolute path to the directory that should contain the frames
(int) bound:
frame number that switch the status from "before" to "after"
(int) extra_down:
how many frames to skip before the bound
(int) extra_up:
how many frames to skip after the bound
(int) start=0:
frame number to start browsing frames
(int) end=None:
frame number to end browsing frames (if None, go for the full duration)
(str) fext="png":
frame extension
(str) before_name="before":
name of the subdir inside dstdir to save frames before skip
(str) after_name="after":
name of the subdir inside dstdir to save frames after skip
RETURNS
-------
None
'''
_, vext = os.path.splitext(src)
vname = os.path.basename(src)[:-len(vext)]
for passed, count, frame in browse_frames_for_binary_classification(
src, bound, extra_down, extra_up, start, end):
cv2.imwrite(os.path.join(dstdir, after_name if passed else before_name,
f"{vname}_{count}.{fext}"), frame)
[docs]
def tmnt_extract_sequences_for_binary_classification(src, dstdir, bound,
extra_down, extra_up, seqlen=6, start=0, end=None, sext='mp4',
codec='mp4v', fps=12, before_name="before", after_name="after"):
'''
Extract sequences of a video, from "start" to "end".
Skip from bound-extra_down to bound+extra_up.
Split them into 2 states "before" or "after" the skipped range.
PARAMETERS
----------
(str) src:
absolute path to the video
(str) dstdir:
Absolute path to the directory that should contain the frames.
(int) bound:
Frame number that switch the status from "before" to "after".
(int) extra_down:
How many frames to skip before the bound.
(int) extra_up:
How many frames to skip after the bound.
(int) seqlen=6:
Amount of frames per sequence.
(int) start=0:
Frame number to start browsing frames.
(int) end=None:
Frame number to end browsing frames (if None, go for the full duration).
(str) sext="mp4":
Sequence extension.
(str) codec="mp4v":
Sequence codec, used for: fourcc = cv2.VideoWriter_fourcc(*codec).
(int) fps=12:
saved Sequence frames per second.
(str) before_name="before":
Name of the subdir inside dstdir to save frames before skip.
(str) after_name="after":
Name of the subdir inside dstdir to save frames after skip.
'''
_, vext = os.path.splitext(src)
vname = os.path.basename(src)[:-len(vext)]
fourcc = cv2.VideoWriter_fourcc(*codec)
out = None # for VideoWriter
seq = []
scount = 0 # sequence counter
old_passed = False
for passed, _, frame in browse_frames_for_binary_classification(src, bound,
extra_down, extra_up, start, end):
if old_passed != passed:
seq = [] # a sequence should be entirely before or after the skip
seq.append(frame) # add the frame to the sequence
if len(seq) == seqlen: # write a sequence file if enough frames
out = cv2.VideoWriter(
os.path.join(dstdir, after_name if passed else before_name,
f'{vname}_{scount}.{sext}'),
fourcc,
fps,
frame.shape[:2][::-1]
)
for img in seq:
out.write(img)
out.release()
seq = []
scount += 1
old_passed = passed