diff --git a/analyze.py b/analyze.py new file mode 100644 index 0000000..f62c35b --- /dev/null +++ b/analyze.py @@ -0,0 +1,597 @@ +import cv2 +import numpy as np + + +class CompositionAnalyzer: + def __init__(self): + self.previous_gray = None + self.previous_saliency = None + + self.cached_result = { + "score": 1.0, + "state": "green", + "motion": (0.0, 0.0), + "subject_center": (0.5, 0.5), + "direction": None, + "metrics": {}, + } + + self.motion_ema = np.array( + [0.0, 0.0], + dtype=np.float32, + ) + + self.centroid_ema = np.array( + [0.5, 0.5], + dtype=np.float32, + ) + + self.last_histogram = None + + def analyze( + self, + frame, + ): + gray = cv2.cvtColor( + frame, + cv2.COLOR_BGR2GRAY, + ) + + frame_delta = self.compute_frame_delta(gray) + + histogram_delta = self.compute_histogram_delta(gray) + + if frame_delta < 1.25 and histogram_delta < 0.015: + return self.cached_result + + saliency = self.compute_saliency(gray) + + centroid = np.array( + self.compute_subject_centroid(saliency), + dtype=np.float32, + ) + + self.centroid_ema = self.centroid_ema * 0.84 + centroid * 0.16 + + motion_vector = np.array( + self.compute_optical_flow(gray), + dtype=np.float32, + ) + + self.motion_ema = self.motion_ema * 0.82 + motion_vector * 0.18 + + thirds_score = self.compute_thirds_score(self.centroid_ema) + + edge_score = self.compute_edge_tension(self.centroid_ema) + + balance_score = self.compute_balance(saliency) + + negative_space = self.compute_negative_space(saliency) + + clutter_score = self.compute_clutter(gray) + + margin_score = self.compute_safe_margin(self.centroid_ema) + + motion_score = self.compute_motion_stability(self.motion_ema) + + drift_score = self.compute_predictive_drift( + self.centroid_ema, + self.motion_ema, + ) + + edge_density = self.compute_edge_density_balance(gray) + + subject_isolation = self.compute_subject_isolation(saliency) + + score = ( + thirds_score * 0.18 + + edge_score * 0.12 + + balance_score * 0.11 + + negative_space * 0.10 + + clutter_score * 0.10 + + margin_score * 0.12 + + drift_score * 0.10 + + motion_score * 0.07 + + edge_density * 0.05 + + subject_isolation * 0.05 + ) + + score = float( + np.clip( + score, + 0.0, + 1.0, + ) + ) + + if score >= 0.72: + state = "green" + + elif score >= 0.48: + state = "yellow" + + else: + state = "red" + + direction = self.compute_directional_feedback( + self.centroid_ema, + self.motion_ema, + ) + + result = { + "score": score, + "state": state, + "motion": ( + float(self.motion_ema[0]), + float(self.motion_ema[1]), + ), + "subject_center": ( + float(self.centroid_ema[0]), + float(self.centroid_ema[1]), + ), + "direction": direction, + "metrics": { + "thirds": thirds_score, + "edge": edge_score, + "balance": balance_score, + "negative_space": negative_space, + "clutter": clutter_score, + "margin": margin_score, + "drift": drift_score, + "motion": motion_score, + "edge_density": edge_density, + "subject_isolation": subject_isolation, + }, + } + + self.previous_gray = gray + self.previous_saliency = saliency + + self.cached_result = result + + return result + + def compute_frame_delta( + self, + gray, + ): + if self.previous_gray is None: + return 255.0 + + delta = cv2.absdiff( + self.previous_gray, + gray, + ) + + return float(np.mean(delta)) + + def compute_histogram_delta( + self, + gray, + ): + histogram = cv2.calcHist( + [gray], + [0], + None, + [64], + [0, 256], + ) + + histogram = cv2.normalize( + histogram, + histogram, + ).flatten() + + if self.last_histogram is None: + self.last_histogram = histogram + return 1.0 + + delta = cv2.compareHist( + self.last_histogram.astype(np.float32), + histogram.astype(np.float32), + cv2.HISTCMP_BHATTACHARYYA, + ) + + self.last_histogram = histogram + + return float(delta) + + def compute_saliency( + self, + gray, + ): + blur = cv2.GaussianBlur( + gray, + (3, 3), + 0, + ) + + spectrum = np.fft.fft2(blur) + + log_amplitude = np.log(np.abs(spectrum) + 1e-6) + + phase = np.angle(spectrum) + + avg = cv2.blur( + log_amplitude, + (3, 3), + ) + + residual = log_amplitude - avg + + spectral = np.exp(residual + 1j * phase) + + saliency = np.abs(np.fft.ifft2(spectral)) ** 2 + + saliency = cv2.GaussianBlur( + saliency, + (9, 9), + 2.5, + ) + + saliency = cv2.normalize( + saliency, + None, + 0, + 255, + cv2.NORM_MINMAX, + ) + + return saliency.astype(np.uint8) + + def compute_subject_centroid( + self, + saliency, + ): + moments = cv2.moments(saliency) + + if moments["m00"] <= 1: + return ( + 0.5, + 0.5, + ) + + cx = moments["m10"] / moments["m00"] + + cy = moments["m01"] / moments["m00"] + + h, w = saliency.shape + + return ( + cx / w, + cy / h, + ) + + def compute_thirds_score( + self, + centroid, + ): + x, y = centroid + + targets = ( + (1 / 3, 1 / 3), + (2 / 3, 1 / 3), + (1 / 3, 2 / 3), + (2 / 3, 2 / 3), + ) + + distance = min( + np.hypot( + x - tx, + y - ty, + ) + for tx, ty in targets + ) + + return float( + max( + 0.0, + 1.0 - distance * 2.4, + ) + ) + + def compute_edge_tension( + self, + centroid, + ): + x, y = centroid + + edge_distance = min( + x, + y, + 1 - x, + 1 - y, + ) + + return float( + min( + 1.0, + edge_distance * 4, + ) + ) + + # v4.5 + def compute_balance( + self, + saliency, + ): + h, w = saliency.shape + + left = np.sum( + saliency[:, : w // 2], + dtype=np.float64, + ) + + right = np.sum( + saliency[:, w // 2 :], + dtype=np.float64, + ) + + diff = abs(float(left) - float(right)) + + total = max( + 1.0, + float(left + right), + ) + + return float( + np.clip( + 1.0 - (diff / total), + 0.0, + 1.0, + ) + ) + + def compute_negative_space( + self, + saliency, + ): + threshold = saliency > 32 + + active = np.count_nonzero(threshold) + + total = saliency.size + + occupied = active / total + + return float( + max( + 0.0, + 1.0 - abs(occupied - 0.38), + ) + ) + + def compute_clutter( + self, + gray, + ): + edges = cv2.Canny( + gray, + 80, + 160, + ) + + density = np.count_nonzero(edges) / edges.size + + return float( + max( + 0.0, + 1.0 - density * 4, + ) + ) + + def compute_safe_margin( + self, + centroid, + ): + x, y = centroid + + margin = 0.08 + + if x < margin or y < margin or x > 1 - margin or y > 1 - margin: + return 0.1 + + return 1.0 + + def compute_optical_flow( + self, + gray, + ): + if self.previous_gray is None: + return ( + 0.0, + 0.0, + ) + + points = cv2.goodFeaturesToTrack( + self.previous_gray, + maxCorners=64, + qualityLevel=0.01, + minDistance=12, + ) + + if points is None: + return ( + 0.0, + 0.0, + ) + + next_points, status, _ = cv2.calcOpticalFlowPyrLK( + self.previous_gray, + gray, + points, + None, + ) + + if next_points is None or status is None: + return ( + 0.0, + 0.0, + ) + + status = status.reshape(-1) + + valid_old = points[status == 1] + + valid_new = next_points[status == 1] + + if len(valid_old) == 0 or len(valid_new) == 0: + return ( + 0.0, + 0.0, + ) + + motion = (valid_new - valid_old).reshape(-1, 2) + + if motion.size == 0: + return ( + 0.0, + 0.0, + ) + + vector = np.mean( + motion, + axis=0, + dtype=np.float32, + ) + + return ( + float(vector[0]), + float(vector[1]), + ) + + def compute_predictive_drift( + self, + centroid, + motion, + ): + x, y = centroid + + mx, my = motion + + future_x = x + (mx * 0.0008) + + future_y = y + (my * 0.0008) + + edge_distance = min( + future_x, + future_y, + 1 - future_x, + 1 - future_y, + ) + + return float( + max( + 0.0, + min( + 1.0, + edge_distance * 4, + ), + ) + ) + + def compute_motion_stability( + self, + motion, + ): + magnitude = np.linalg.norm(motion) + + return float( + max( + 0.0, + 1.0 - (magnitude / 42.0), + ) + ) + + def compute_edge_density_balance( + self, + gray, + ): + edges = cv2.Canny( + gray, + 100, + 180, + ) + + h, w = edges.shape + + quadrants = [ + edges[: h // 2, : w // 2], + edges[: h // 2, w // 2 :], + edges[h // 2 :, : w // 2], + edges[h // 2 :, w // 2 :], + ] + + densities = [np.count_nonzero(q) / q.size for q in quadrants] + + std = np.std(densities) + + return float( + max( + 0.0, + 1.0 - (std * 12.0), + ) + ) + + def compute_subject_isolation( + self, + saliency, + ): + blurred = cv2.GaussianBlur( + saliency, + (31, 31), + 0, + ) + + center_energy = np.max(blurred) + + surrounding = np.mean(blurred) + + if surrounding <= 0: + return 1.0 + + ratio = center_energy / surrounding + + return float( + np.clip( + ratio / 4.0, + 0.0, + 1.0, + ) + ) + + def compute_directional_feedback( + self, + centroid, + motion, + ): + x = float(centroid[0]) + + y = float(centroid[1]) + + mx = float(motion[0]) + + my = float(motion[1]) + + if x < 0.28: + return "move_left" + + if x > 0.72: + return "move_right" + + if y < 0.20: + return "reduce_headroom" + + if y > 0.80: + return "recenter_subject" + + if mx > 14: + return "stabilize_right_motion" + + if mx < -14: + return "stabilize_left_motion" + + if my > 14: + return "reduce_vertical_motion" + + return None