import cv2 import numpy as np class CompositionAnalyzer: def __init__(self): self.previous_gray = None self.previous_saliency = None self.cached_result = { "score": 1.0, "state": "green", "motion": (0.0, 0.0), "subject_center": (0.5, 0.5), "direction": None, "metrics": {}, } self.motion_ema = np.array( [0.0, 0.0], dtype=np.float32, ) self.centroid_ema = np.array( [0.5, 0.5], dtype=np.float32, ) self.last_histogram = None def analyze( self, frame, ): gray = cv2.cvtColor( frame, cv2.COLOR_BGR2GRAY, ) frame_delta = self.compute_frame_delta(gray) histogram_delta = self.compute_histogram_delta(gray) if frame_delta < 1.25 and histogram_delta < 0.015: return self.cached_result saliency = self.compute_saliency(gray) centroid = np.array( self.compute_subject_centroid(saliency), dtype=np.float32, ) self.centroid_ema = self.centroid_ema * 0.84 + centroid * 0.16 motion_vector = np.array( self.compute_optical_flow(gray), dtype=np.float32, ) self.motion_ema = self.motion_ema * 0.82 + motion_vector * 0.18 thirds_score = self.compute_thirds_score(self.centroid_ema) edge_score = self.compute_edge_tension(self.centroid_ema) balance_score = self.compute_balance(saliency) negative_space = self.compute_negative_space(saliency) clutter_score = self.compute_clutter(gray) margin_score = self.compute_safe_margin(self.centroid_ema) motion_score = self.compute_motion_stability(self.motion_ema) drift_score = self.compute_predictive_drift( self.centroid_ema, self.motion_ema, ) edge_density = self.compute_edge_density_balance(gray) subject_isolation = self.compute_subject_isolation(saliency) score = ( thirds_score * 0.18 + edge_score * 0.12 + balance_score * 0.11 + negative_space * 0.10 + clutter_score * 0.10 + margin_score * 0.12 + drift_score * 0.10 + motion_score * 0.07 + edge_density * 0.05 + subject_isolation * 0.05 ) score = float( np.clip( score, 0.0, 1.0, ) ) if score >= 0.72: state = "green" elif score >= 0.48: state = "yellow" else: state = "red" direction = self.compute_directional_feedback( self.centroid_ema, self.motion_ema, ) result = { "score": score, "state": state, "motion": ( float(self.motion_ema[0]), float(self.motion_ema[1]), ), "subject_center": ( float(self.centroid_ema[0]), float(self.centroid_ema[1]), ), "direction": direction, "metrics": { "thirds": thirds_score, "edge": edge_score, "balance": balance_score, "negative_space": negative_space, "clutter": clutter_score, "margin": margin_score, "drift": drift_score, "motion": motion_score, "edge_density": edge_density, "subject_isolation": subject_isolation, }, } self.previous_gray = gray self.previous_saliency = saliency self.cached_result = result return result def compute_frame_delta( self, gray, ): if self.previous_gray is None: return 255.0 delta = cv2.absdiff( self.previous_gray, gray, ) return float(np.mean(delta)) def compute_histogram_delta( self, gray, ): histogram = cv2.calcHist( [gray], [0], None, [64], [0, 256], ) histogram = cv2.normalize( histogram, histogram, ).flatten() if self.last_histogram is None: self.last_histogram = histogram return 1.0 delta = cv2.compareHist( self.last_histogram.astype(np.float32), histogram.astype(np.float32), cv2.HISTCMP_BHATTACHARYYA, ) self.last_histogram = histogram return float(delta) def compute_saliency( self, gray, ): blur = cv2.GaussianBlur( gray, (3, 3), 0, ) spectrum = np.fft.fft2(blur) log_amplitude = np.log(np.abs(spectrum) + 1e-6) phase = np.angle(spectrum) avg = cv2.blur( log_amplitude, (3, 3), ) residual = log_amplitude - avg spectral = np.exp(residual + 1j * phase) saliency = np.abs(np.fft.ifft2(spectral)) ** 2 saliency = cv2.GaussianBlur( saliency, (9, 9), 2.5, ) saliency = cv2.normalize( saliency, None, 0, 255, cv2.NORM_MINMAX, ) return saliency.astype(np.uint8) def compute_subject_centroid( self, saliency, ): moments = cv2.moments(saliency) if moments["m00"] <= 1: return ( 0.5, 0.5, ) cx = moments["m10"] / moments["m00"] cy = moments["m01"] / moments["m00"] h, w = saliency.shape return ( cx / w, cy / h, ) def compute_thirds_score( self, centroid, ): x, y = centroid targets = ( (1 / 3, 1 / 3), (2 / 3, 1 / 3), (1 / 3, 2 / 3), (2 / 3, 2 / 3), ) distance = min( np.hypot( x - tx, y - ty, ) for tx, ty in targets ) return float( max( 0.0, 1.0 - distance * 2.4, ) ) def compute_edge_tension( self, centroid, ): x, y = centroid edge_distance = min( x, y, 1 - x, 1 - y, ) return float( min( 1.0, edge_distance * 4, ) ) # v4.5 def compute_balance( self, saliency, ): h, w = saliency.shape left = np.sum( saliency[:, : w // 2], dtype=np.float64, ) right = np.sum( saliency[:, w // 2 :], dtype=np.float64, ) diff = abs(float(left) - float(right)) total = max( 1.0, float(left + right), ) return float( np.clip( 1.0 - (diff / total), 0.0, 1.0, ) ) def compute_negative_space( self, saliency, ): threshold = saliency > 32 active = np.count_nonzero(threshold) total = saliency.size occupied = active / total return float( max( 0.0, 1.0 - abs(occupied - 0.38), ) ) def compute_clutter( self, gray, ): edges = cv2.Canny( gray, 80, 160, ) density = np.count_nonzero(edges) / edges.size return float( max( 0.0, 1.0 - density * 4, ) ) def compute_safe_margin( self, centroid, ): x, y = centroid margin = 0.08 if x < margin or y < margin or x > 1 - margin or y > 1 - margin: return 0.1 return 1.0 def compute_optical_flow( self, gray, ): if self.previous_gray is None: return ( 0.0, 0.0, ) points = cv2.goodFeaturesToTrack( self.previous_gray, maxCorners=64, qualityLevel=0.01, minDistance=12, ) if points is None: return ( 0.0, 0.0, ) next_points, status, _ = cv2.calcOpticalFlowPyrLK( self.previous_gray, gray, points, None, ) if next_points is None or status is None: return ( 0.0, 0.0, ) status = status.reshape(-1) valid_old = points[status == 1] valid_new = next_points[status == 1] if len(valid_old) == 0 or len(valid_new) == 0: return ( 0.0, 0.0, ) motion = (valid_new - valid_old).reshape(-1, 2) if motion.size == 0: return ( 0.0, 0.0, ) vector = np.mean( motion, axis=0, dtype=np.float32, ) return ( float(vector[0]), float(vector[1]), ) def compute_predictive_drift( self, centroid, motion, ): x, y = centroid mx, my = motion future_x = x + (mx * 0.0008) future_y = y + (my * 0.0008) edge_distance = min( future_x, future_y, 1 - future_x, 1 - future_y, ) return float( max( 0.0, min( 1.0, edge_distance * 4, ), ) ) def compute_motion_stability( self, motion, ): magnitude = np.linalg.norm(motion) return float( max( 0.0, 1.0 - (magnitude / 42.0), ) ) def compute_edge_density_balance( self, gray, ): edges = cv2.Canny( gray, 100, 180, ) h, w = edges.shape quadrants = [ edges[: h // 2, : w // 2], edges[: h // 2, w // 2 :], edges[h // 2 :, : w // 2], edges[h // 2 :, w // 2 :], ] densities = [np.count_nonzero(q) / q.size for q in quadrants] std = np.std(densities) return float( max( 0.0, 1.0 - (std * 12.0), ) ) def compute_subject_isolation( self, saliency, ): blurred = cv2.GaussianBlur( saliency, (31, 31), 0, ) center_energy = np.max(blurred) surrounding = np.mean(blurred) if surrounding <= 0: return 1.0 ratio = center_energy / surrounding return float( np.clip( ratio / 4.0, 0.0, 1.0, ) ) def compute_directional_feedback( self, centroid, motion, ): x = float(centroid[0]) y = float(centroid[1]) mx = float(motion[0]) my = float(motion[1]) if x < 0.28: return "move_left" if x > 0.72: return "move_right" if y < 0.20: return "reduce_headroom" if y > 0.80: return "recenter_subject" if mx > 14: return "stabilize_right_motion" if mx < -14: return "stabilize_left_motion" if my > 14: return "reduce_vertical_motion" return None