Add analyze.py

This commit is contained in:
admin
2026-05-25 23:38:09 +00:00
parent 465ef459fc
commit 5a3cea830c

597
analyze.py Normal file
View File

@@ -0,0 +1,597 @@
import cv2
import numpy as np
class CompositionAnalyzer:
def __init__(self):
self.previous_gray = None
self.previous_saliency = None
self.cached_result = {
"score": 1.0,
"state": "green",
"motion": (0.0, 0.0),
"subject_center": (0.5, 0.5),
"direction": None,
"metrics": {},
}
self.motion_ema = np.array(
[0.0, 0.0],
dtype=np.float32,
)
self.centroid_ema = np.array(
[0.5, 0.5],
dtype=np.float32,
)
self.last_histogram = None
def analyze(
self,
frame,
):
gray = cv2.cvtColor(
frame,
cv2.COLOR_BGR2GRAY,
)
frame_delta = self.compute_frame_delta(gray)
histogram_delta = self.compute_histogram_delta(gray)
if frame_delta < 1.25 and histogram_delta < 0.015:
return self.cached_result
saliency = self.compute_saliency(gray)
centroid = np.array(
self.compute_subject_centroid(saliency),
dtype=np.float32,
)
self.centroid_ema = self.centroid_ema * 0.84 + centroid * 0.16
motion_vector = np.array(
self.compute_optical_flow(gray),
dtype=np.float32,
)
self.motion_ema = self.motion_ema * 0.82 + motion_vector * 0.18
thirds_score = self.compute_thirds_score(self.centroid_ema)
edge_score = self.compute_edge_tension(self.centroid_ema)
balance_score = self.compute_balance(saliency)
negative_space = self.compute_negative_space(saliency)
clutter_score = self.compute_clutter(gray)
margin_score = self.compute_safe_margin(self.centroid_ema)
motion_score = self.compute_motion_stability(self.motion_ema)
drift_score = self.compute_predictive_drift(
self.centroid_ema,
self.motion_ema,
)
edge_density = self.compute_edge_density_balance(gray)
subject_isolation = self.compute_subject_isolation(saliency)
score = (
thirds_score * 0.18
+ edge_score * 0.12
+ balance_score * 0.11
+ negative_space * 0.10
+ clutter_score * 0.10
+ margin_score * 0.12
+ drift_score * 0.10
+ motion_score * 0.07
+ edge_density * 0.05
+ subject_isolation * 0.05
)
score = float(
np.clip(
score,
0.0,
1.0,
)
)
if score >= 0.72:
state = "green"
elif score >= 0.48:
state = "yellow"
else:
state = "red"
direction = self.compute_directional_feedback(
self.centroid_ema,
self.motion_ema,
)
result = {
"score": score,
"state": state,
"motion": (
float(self.motion_ema[0]),
float(self.motion_ema[1]),
),
"subject_center": (
float(self.centroid_ema[0]),
float(self.centroid_ema[1]),
),
"direction": direction,
"metrics": {
"thirds": thirds_score,
"edge": edge_score,
"balance": balance_score,
"negative_space": negative_space,
"clutter": clutter_score,
"margin": margin_score,
"drift": drift_score,
"motion": motion_score,
"edge_density": edge_density,
"subject_isolation": subject_isolation,
},
}
self.previous_gray = gray
self.previous_saliency = saliency
self.cached_result = result
return result
def compute_frame_delta(
self,
gray,
):
if self.previous_gray is None:
return 255.0
delta = cv2.absdiff(
self.previous_gray,
gray,
)
return float(np.mean(delta))
def compute_histogram_delta(
self,
gray,
):
histogram = cv2.calcHist(
[gray],
[0],
None,
[64],
[0, 256],
)
histogram = cv2.normalize(
histogram,
histogram,
).flatten()
if self.last_histogram is None:
self.last_histogram = histogram
return 1.0
delta = cv2.compareHist(
self.last_histogram.astype(np.float32),
histogram.astype(np.float32),
cv2.HISTCMP_BHATTACHARYYA,
)
self.last_histogram = histogram
return float(delta)
def compute_saliency(
self,
gray,
):
blur = cv2.GaussianBlur(
gray,
(3, 3),
0,
)
spectrum = np.fft.fft2(blur)
log_amplitude = np.log(np.abs(spectrum) + 1e-6)
phase = np.angle(spectrum)
avg = cv2.blur(
log_amplitude,
(3, 3),
)
residual = log_amplitude - avg
spectral = np.exp(residual + 1j * phase)
saliency = np.abs(np.fft.ifft2(spectral)) ** 2
saliency = cv2.GaussianBlur(
saliency,
(9, 9),
2.5,
)
saliency = cv2.normalize(
saliency,
None,
0,
255,
cv2.NORM_MINMAX,
)
return saliency.astype(np.uint8)
def compute_subject_centroid(
self,
saliency,
):
moments = cv2.moments(saliency)
if moments["m00"] <= 1:
return (
0.5,
0.5,
)
cx = moments["m10"] / moments["m00"]
cy = moments["m01"] / moments["m00"]
h, w = saliency.shape
return (
cx / w,
cy / h,
)
def compute_thirds_score(
self,
centroid,
):
x, y = centroid
targets = (
(1 / 3, 1 / 3),
(2 / 3, 1 / 3),
(1 / 3, 2 / 3),
(2 / 3, 2 / 3),
)
distance = min(
np.hypot(
x - tx,
y - ty,
)
for tx, ty in targets
)
return float(
max(
0.0,
1.0 - distance * 2.4,
)
)
def compute_edge_tension(
self,
centroid,
):
x, y = centroid
edge_distance = min(
x,
y,
1 - x,
1 - y,
)
return float(
min(
1.0,
edge_distance * 4,
)
)
# v4.5
def compute_balance(
self,
saliency,
):
h, w = saliency.shape
left = np.sum(
saliency[:, : w // 2],
dtype=np.float64,
)
right = np.sum(
saliency[:, w // 2 :],
dtype=np.float64,
)
diff = abs(float(left) - float(right))
total = max(
1.0,
float(left + right),
)
return float(
np.clip(
1.0 - (diff / total),
0.0,
1.0,
)
)
def compute_negative_space(
self,
saliency,
):
threshold = saliency > 32
active = np.count_nonzero(threshold)
total = saliency.size
occupied = active / total
return float(
max(
0.0,
1.0 - abs(occupied - 0.38),
)
)
def compute_clutter(
self,
gray,
):
edges = cv2.Canny(
gray,
80,
160,
)
density = np.count_nonzero(edges) / edges.size
return float(
max(
0.0,
1.0 - density * 4,
)
)
def compute_safe_margin(
self,
centroid,
):
x, y = centroid
margin = 0.08
if x < margin or y < margin or x > 1 - margin or y > 1 - margin:
return 0.1
return 1.0
def compute_optical_flow(
self,
gray,
):
if self.previous_gray is None:
return (
0.0,
0.0,
)
points = cv2.goodFeaturesToTrack(
self.previous_gray,
maxCorners=64,
qualityLevel=0.01,
minDistance=12,
)
if points is None:
return (
0.0,
0.0,
)
next_points, status, _ = cv2.calcOpticalFlowPyrLK(
self.previous_gray,
gray,
points,
None,
)
if next_points is None or status is None:
return (
0.0,
0.0,
)
status = status.reshape(-1)
valid_old = points[status == 1]
valid_new = next_points[status == 1]
if len(valid_old) == 0 or len(valid_new) == 0:
return (
0.0,
0.0,
)
motion = (valid_new - valid_old).reshape(-1, 2)
if motion.size == 0:
return (
0.0,
0.0,
)
vector = np.mean(
motion,
axis=0,
dtype=np.float32,
)
return (
float(vector[0]),
float(vector[1]),
)
def compute_predictive_drift(
self,
centroid,
motion,
):
x, y = centroid
mx, my = motion
future_x = x + (mx * 0.0008)
future_y = y + (my * 0.0008)
edge_distance = min(
future_x,
future_y,
1 - future_x,
1 - future_y,
)
return float(
max(
0.0,
min(
1.0,
edge_distance * 4,
),
)
)
def compute_motion_stability(
self,
motion,
):
magnitude = np.linalg.norm(motion)
return float(
max(
0.0,
1.0 - (magnitude / 42.0),
)
)
def compute_edge_density_balance(
self,
gray,
):
edges = cv2.Canny(
gray,
100,
180,
)
h, w = edges.shape
quadrants = [
edges[: h // 2, : w // 2],
edges[: h // 2, w // 2 :],
edges[h // 2 :, : w // 2],
edges[h // 2 :, w // 2 :],
]
densities = [np.count_nonzero(q) / q.size for q in quadrants]
std = np.std(densities)
return float(
max(
0.0,
1.0 - (std * 12.0),
)
)
def compute_subject_isolation(
self,
saliency,
):
blurred = cv2.GaussianBlur(
saliency,
(31, 31),
0,
)
center_energy = np.max(blurred)
surrounding = np.mean(blurred)
if surrounding <= 0:
return 1.0
ratio = center_energy / surrounding
return float(
np.clip(
ratio / 4.0,
0.0,
1.0,
)
)
def compute_directional_feedback(
self,
centroid,
motion,
):
x = float(centroid[0])
y = float(centroid[1])
mx = float(motion[0])
my = float(motion[1])
if x < 0.28:
return "move_left"
if x > 0.72:
return "move_right"
if y < 0.20:
return "reduce_headroom"
if y > 0.80:
return "recenter_subject"
if mx > 14:
return "stabilize_right_motion"
if mx < -14:
return "stabilize_left_motion"
if my > 14:
return "reduce_vertical_motion"
return None