자율주행

Optical Flow in OpenCV

coding art 2022. 8. 5. 12:38
728x90

참조: Optical Flow in OpenCV

https://learnopencv.com/optical-flow-in-opencv/

 

비디오 또는 프레임 시퀀스에서 광학 흐름(optical flow)을 계산하기 위한 다양한 알고리즘에 대해 알아보자.

광학 흐름은 하나의 비디오에서 두 개의 연속 프레임 사이의 픽셀당 모션 추정 작업이다. 기본적으로 optical flow 작업은 픽셀에 대한 이동 벡터를 두 개의 인접한 이미지 간의 물체 변위 차이로 계산하는 것을 의미한다. optical flow의 중요 아이디어는 모션 또는 카메라 움직임으로 인한 객체의 변위 벡터를 추정하는 것이다.

 

픽셀 강도 행렬인 회색조의 이미지를 가정해 보자. 픽셀의 강도 함수 I(x,y,t)을 정의한다. 여기서 x,y 는 픽셀 좌표이고 t 는 프레임 번호입니다. I(x,y,t) 함수는 프레임 t 에서 정확한 픽셀 강도이다. 

 

객체가 변위하더라도 그 객체에 속하는 픽셀 강도가 변하지 않는다 가정하자. 즉, I(x,y,t) = I(x+x, y+ y, t + t).  편의상 t = 1 로 둔다. 중요 관심사는 모션 벡터 (x, y) 를 찾는 것이다. 

 

Taylor 급수 전개를 사용하여

I(x,y,t) - I(x+△x, y+ △y, t + △t) = 0 을 

I'x u + I'y v = - I't 로 다시 쓸 수 있다.

여기서 u = {dx}/{dt}, v = {dy}/{dt} 는 각각 속도를,

I'_x, I'_y는 이미지 기울기를 나타낸다.

여기에서 고차 Taylor 급수의 일부는 무시하고 1차 Taylor 확장만 사용하는 함수 근사이다.

 

두 프레임 I_1과 I_2 사이의 픽셀 모션 차이는 I_1 - I_2 I'_x u + I'_y v + I'_t 로 쓸 수 있다. 이제 두 개의 속도변수 u, v 와 방정식 하나만 있으므로 당장은 방정식을 풀 수 없지만 아래의 알고리즘에서 공개할 몇 가지 트릭을 사용하여 풀도록 하자.

 

optical flow는 물체의 움직임 정보가 중요한 많은 영역에서 사용될 수 있다. optical flow는 압축, 안정화, 슬로우 모션 등을 위한 비디오 편집기에서 일반적으로 발견되기도 한다. 또한 optical flow는 action recognition 작업 및 실시간 추적 시스템에서 응용을 찾아 볼 수 있다.

 

optical flow의 두 가지 유형 중 첫 번째는 sparse optical flow이다.  sparse 하다는 의미는 매트릭스나 또는 리스트 데이터 구조에서 대부분의 데이터 값이 0 이고 조금만 0 이아닌 데이터를 포함할 경우를 의미한다. 특정 객체 세트(예: 이미지에서 감지된 모서리)에 대한 모션 벡터를 계산한다. 따라서 이미지에서 특징을 추출하기 위해 약간의 전처리가 필요하며, 이는 optical flow 계산의 기초가 된다. OpenCV는 sparse optical flow 작업을 해결하기 위한 몇 가지 알고리즘 구현을 제공한다.

희소한 특징 세트만 사용한다는 것은 여기에 포함되지 않은 픽셀에 대한 모션 정보가 없다는 것을 의미한다. 이 제한은 이미지의 모든 픽셀에 대한 모션 벡터를 계산해야 하는 dense optical flow 알고리즘을 사용하여 해제할 수 있다. 일부 dense optical flow 알고리즘은 이미 OpenCV에 구현되어 있다.

 

Lukas Kanade 알고리듬과 dense optical flow

인접한 픽셀이 동일한 모션 벡터(x, y)를 갖는다고 가정하자. 고정 크기 창을 사용하여 연립방정식을 만들어 보자.

p_i = (x_i, y_i) 를 n개 요소를 픽셀 좌표라 두자. 다음과 같이 방정식 시스템을정의할 수 있다.

매트릭스 형으로 방정식을 다시 써 보기로 하자.

결과적으로 행렬 방정식 A γ = b가 얻어진다. 최소 제곱법을 사용하여 해답 벡터 γ를 계산할 수 있다.

 

이 Lukas Knade 알고리듬은 오브젝트가 돌발적으로 움직일 경우 제대로 작동하지 못한다는 점에 유의하자.

# dense optical flow

import numpy as np
import cv2
import time

def draw_flow(img, flow, step=16):

    h, w = img.shape[:2]
    y, x = np.mgrid[step/2:h:step, step/2:w:step].reshape(2,-1).astype(int)
    fx, fy = flow[y,x].T

    lines = np.vstack([x, y, x-fx, y-fy]).T.reshape(-1, 2, 2)
    lines = np.int32(lines + 0.5)

    img_bgr = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
    cv2.polylines(img_bgr, lines, 0, (0, 255, 0))

    for (x1, y1), (_x2, _y2) in lines:
        cv2.circle(img_bgr, (x1, y1), 1, (0, 255, 0), -1)

    return img_bgr

def draw_hsv(flow):

    h, w = flow.shape[:2]
    fx, fy = flow[:,:,0], flow[:,:,1]

    ang = np.arctan2(fy, fx) + np.pi
    v = np.sqrt(fx*fx+fy*fy)

    hsv = np.zeros((h, w, 3), np.uint8)
    hsv[...,0] = ang*(180/np.pi/2)
    hsv[...,1] = 255
    hsv[...,2] = np.minimum(v*4, 255)
    bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

    return bgr

#cap = cv2.VideoCapture('drive.mp4')
cap = cv2.VideoCapture(0)
suc, prev = cap.read()
prevgray = cv2.cvtColor(prev, cv2.COLOR_BGR2GRAY)

while True:

    suc, img = cap.read()
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    # start time to calculate FPS
    start = time.time()

    flow = cv2.calcOpticalFlowFarneback(prevgray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
    
    prevgray = gray

    # End time
    end = time.time()
    # calculate the FPS for current frame detection
    fps = 1 / (end-start)

    print(f"{fps:.2f} FPS")

    cv2.imshow('flow', draw_flow(gray, flow))
    cv2.imshow('flow HSV', draw_hsv(flow))

    key = cv2.waitKey(5)
    if key == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

 

 

 

실제로 보다 일반적인 접근 방식은 멀티 스케일링 트릭을 사용하는 것이다. 즉 이미지 피라미드를 생성하도록 한다. 여기서 모든 다음 스텝의 이미지는 이전 스텝의 이미지보다 일부 크기 조정 요소(예: 크기 요소는 2임)만큼 커지게 된다.

창의 크기가 고정되어 있다면, 큰 이미지보다 작은 이미지의 급격한 움직임이 더 눈에 띌 것이다. 작은 이미지에서 찾아낸 변위 벡터는 더 나은 결과를 얻을 수 있도록 다음 스텝의 더 큰 피라미드 단계에서 사용된다.

파이선 코드 사례는 유튜브의 Optical Flow for Motion Detection in Python ( https://www.youtube.com/watch?v=WrlH5hHv0gE&t=788s ) 을 참조하자.

코드는 Github  ( https://github.com/niconielsen32/ComputerVision )에서 다운로드 가능하다.

이 코드를 실행하기 위해서는 데스크 톱 PC에 웹캠이 설치되어 있어야 한다.

 

sparse optical flow

사진에서 처럼 각이 진 상자를 움직여 보면 어떻게 코너를 탐지하여 궤적을 보여 주는지 쉽게 이해할 수 있을 것이다.

 

 

 

 

 

 

 

#sparse_optical_flow.py

import numpy as np
import cv2
import time

lk_params = dict(winSize  = (15, 15),maxLevel = 2,
                criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

feature_params = dict(maxCorners = 20,qualityLevel = 0.3,minDistance = 10,blockSize = 7 )

trajectory_len = 40; detect_interval = 5; trajectories = []; frame_idx = 0

cap = cv2.VideoCapture(0)
#cap = cv2.VideoCapture("video1.avi")

suc, prev = cap.read()
prev_gray = cv2.cvtColor(prev, cv2.COLOR_BGR2GRAY)

while True:
    # start time to calculate FPS
    start = time.time()

    suc, frame = cap.read()
    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    img = frame.copy()

    # Calculate optical flow for a sparse feature set using the iterative Lucas-Kanade Method
    if len(trajectories) > 0:
        img0, img1 = prev_gray, frame_gray
        p0 = np.float32([trajectory[-1] for trajectory in trajectories]).reshape(-1, 1, 2)
        p1, _st, _err = cv2.calcOpticalFlowPyrLK(img0, img1, p0, None, **lk_params)
        p0r, _st, _err = cv2.calcOpticalFlowPyrLK(img1, img0, p1, None, **lk_params)
        d = abs(p0-p0r).reshape(-1, 2).max(-1)
        good = d < 1

        new_trajectories = []

        # Get all the trajectories
        for trajectory, (x, y), good_flag in zip(trajectories, p1.reshape(-1, 2), good):
            if not good_flag:
                continue
            trajectory.append((x, y))
            if len(trajectory) > trajectory_len:
                del trajectory[0]
            new_trajectories.append(trajectory)
            # Newest detected point
            cv2.circle(img, (int(x), int(y)), 2, (0, 0, 255), -1)

        trajectories = new_trajectories

        # Draw all the trajectories
        cv2.polylines(img, [np.int32(trajectory) for trajectory in trajectories], False, (0, 255, 0))
        cv2.putText(img, 'track count: %d' % len(trajectories), (20, 50), cv2.FONT_HERSHEY_PLAIN, 1, (0,255,0), 2)

    # Update interval - When to update and detect new features
    if frame_idx % detect_interval == 0:
        mask = np.zeros_like(frame_gray)
        mask[:] = 255

        # Lastest point in latest trajectory
        for x, y in [np.int32(trajectory[-1]) for trajectory in trajectories]:
            cv2.circle(mask, (x, y), 5, 0, -1)

        # Detect the good features to track
        p = cv2.goodFeaturesToTrack(frame_gray, mask = mask, **feature_params)
        if p is not None:
            # If good features can be tracked - add that to the trajectories
            for x, y in np.float32(p).reshape(-1, 2):
                trajectories.append([(x, y)])

    frame_idx += 1
    prev_gray = frame_gray

    # End time
    end = time.time()
    
    # calculate the FPS for current frame detection
    fps = 1 / (end - start)
    
    # Show Results
    cv2.putText(img, f"{fps:.2f} FPS", (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    cv2.imshow('Sparse Optical Flow', img)
    cv2.imshow('Mask', mask)

    if cv2.waitKey(10) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

 

한편 cap = cv2.VideoCapture(0) 에서 cap = cv2.VideoCapture("video1.avi")으로 변경하고 video1.avi 파일을 실행코드와 동일한 폴더에 저장하여 실행하면 아래와 같이 자동차의 움직임을 관찰할 수 있다. 

단 While True: 가 무한 루프이기때문에 비데오 파일이 짧은 시간안에 소진되면 마지막 단계에 에러가 발생할 수도 있으므로 유의하자. 아래의 video1.avi  동영상 파일을 다운하여 실행해 보기 바란다.

video1.avi
2.41MB

 

앞서 언급했듯이 Dense(고밀도) Optical Flow 알고리즘은 희소 특징 세트에 대한 모션 벡터를 계산하므로 여기에서 일반적인 접근 방식은 Shi-Tomasi 의 코너 감지기(corner detector)를 사용하는 것이다. 이미지에서 코너를 찾은 다음 두 개의 연속 프레임 사이의 코너 모션 벡터를 계산하는 데 사용한다. Sparse한 경우 탐지되는 코너가 주로 오브젝트의 경계에서의 코너임에 반해 Dense 에서는 면 전체를 균일하게 그리드 한 후 궤적을 탐지한다.

 

이와같이 원리적으로 동영상의 연속된 frame 들을 사용해 특징점들을 추출해 시간에 따른 흐름을 찾아 자율주행 차량 주변 환경의 개체들을 대상으로 속도 연산의 가능성을 제공한다.

 

#dense_optical_flow_detection.py

import numpy as np
import cv2
import time

def draw_flow(img, flow, step=16):

    h, w = img.shape[:2]
    y, x = np.mgrid[step/2:h:step, step/2:w:step].reshape(2,-1).astype(int)
    fx, fy = flow[y,x].T

    lines = np.vstack([x, y, x-fx, y-fy]).T.reshape(-1, 2, 2)
    lines = np.int32(lines + 0.5)

    img_bgr = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
    cv2.polylines(img_bgr, lines, 0, (0, 255, 0))

    for (x1, y1), (_x2, _y2) in lines:
        cv2.circle(img_bgr, (x1, y1), 1, (0, 255, 0), -1)

    return img_bgr

def draw_hsv(flow):

    h, w = flow.shape[:2]
    fx, fy = flow[:,:,0], flow[:,:,1]

    ang = np.arctan2(fy, fx) + np.pi
    v = np.sqrt(fx*fx+fy*fy)

    hsv = np.zeros((h, w, 3), np.uint8)
    hsv[...,0] = ang*(180/np.pi/2)
    hsv[...,1] = 255
    hsv[...,2] = np.minimum(v*4, 255)
    bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

    return bgr

cap = cv2.VideoCapture(0)

suc, prev = cap.read()
prevgray = cv2.cvtColor(prev, cv2.COLOR_BGR2GRAY)

while True:

    suc, img = cap.read()
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    # start time to calculate FPS
    start = time.time()

    flow = cv2.calcOpticalFlowFarneback(prevgray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
    
    prevgray = gray

    # End time
    end = time.time()
    # calculate the FPS for current frame detection
    fps = 1 / (end-start)

    print(f"{fps:.2f} FPS")

    cv2.imshow('flow', draw_flow(gray, flow))
    cv2.imshow('flow HSV', draw_hsv(flow))

    key = cv2.waitKey(5)
    if key == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()