diff --git a/scripts/default_settings.txt b/scripts/default_settings.txt
index 32e8eb914..a6c346dc0 100644
--- a/scripts/default_settings.txt
+++ b/scripts/default_settings.txt
@@ -143,7 +143,8 @@
"hybrid_comp_save_extra_frames": false,
"parseq_manifest": "",
"parseq_use_deltas": true,
- "parseq_non_schedule_overrides": true,
+ "parseq_non_schedule_overrides": true,
+ "parseq_key_frame_redistribution": "off",
"use_looper": false,
"init_images": "{\n \"0\": \"https://deforum.github.io/a1/Gi1.png\",\n \"max_f/4-5\": \"https://deforum.github.io/a1/Gi2.png\",\n \"max_f/2-10\": \"https://deforum.github.io/a1/Gi3.png\",\n \"3*max_f/4-15\": \"https://deforum.github.io/a1/Gi4.jpg\",\n \"max_f-20\": \"https://deforum.github.io/a1/Gi1.png\"\n}",
"image_strength_schedule": "0:(0.75)",
diff --git a/scripts/deforum_helpers/args.py b/scripts/deforum_helpers/args.py
index 5be0b3eac..2b881e7a2 100644
--- a/scripts/deforum_helpers/args.py
+++ b/scripts/deforum_helpers/args.py
@@ -22,7 +22,8 @@
import modules.paths as ph
import modules.shared as sh
from modules.processing import get_fixed_seed
-from .defaults import get_guided_imgs_default_json, mask_fill_choices, get_samplers_list, get_schedulers_list
+from .defaults import (get_guided_imgs_default_json, get_parseq_keyframe_redistributions_list,
+ get_samplers_list, get_schedulers_list)
from .deforum_controlnet import controlnet_component_names
from .general_utils import get_os, substitute_placeholders
@@ -928,7 +929,7 @@ def DeforumArgs():
"type": "checkbox",
"value": False,
"info": "Preview motion only. Uses a static picture for init, and draw motion reference rectangle."
- },
+ },
}
def LoopArgs():
@@ -990,7 +991,14 @@ def ParseqArgs():
"type": "checkbox",
"value": True,
"info": "Recommended. If you uncheck this, the FPS, max_frames and cadence in the Parseq doc are ignored, and the values in the A1111 UI are used instead."
- }
+ },
+ "parseq_key_frame_redistribution": {
+ "label": "Parseq key frame redistribution.",
+ "type": "dropdown",
+ "choices": get_parseq_keyframe_redistributions_list().values(),
+ "value": "None",
+ "info": "Gain Parseq precision at the cost of cadence regularity. Allows for fast generations at high cadence."
+ },
}
def DeforumOutputArgs():
@@ -1119,8 +1127,7 @@ def DeforumOutputArgs():
"value": False,
"info": "Interpolate upscaled images, if available",
"visible": False
- },
-
+ },
}
def get_component_names():
diff --git a/scripts/deforum_helpers/defaults.py b/scripts/deforum_helpers/defaults.py
index ac5f17199..007808732 100644
--- a/scripts/deforum_helpers/defaults.py
+++ b/scripts/deforum_helpers/defaults.py
@@ -47,6 +47,13 @@ def get_schedulers_list():
'sgm uniform': 'SGM Uniform'
}
+def get_parseq_keyframe_redistributions_list():
+ return {
+ 'off': 'Off',
+ 'parseq_only': 'Parseq Only (no cadence)',
+ 'uniform_with_parseq': 'Uniform with Parseq (pseudo-cadence)'
+ }
+
def DeforumAnimPrompts():
return r"""{
"0": "tiny cute bunny, vibrant diffraction, highly detailed, intricate, ultra hd, sharp photo, crepuscular rays, in focus",
diff --git a/scripts/deforum_helpers/render.py b/scripts/deforum_helpers/render.py
index 964498c3a..f6696f23d 100644
--- a/scripts/deforum_helpers/render.py
+++ b/scripts/deforum_helpers/render.py
@@ -48,11 +48,18 @@
from .prompt import prepare_prompt
from modules.shared import opts, cmd_opts, state, sd_model
from modules import lowvram, devices, sd_hijack
+from .rendering import experimental_core
from .RAFT import RAFT
from deforum_api import JobStatusTracker
def render_animation(args, anim_args, video_args, parseq_args, loop_args, controlnet_args, root):
+ is_use_parseq = parseq_args.parseq_manifest and parseq_args.parseq_manifest.strip()
+ is_use_key_frame_redistribution = parseq_args.parseq_key_frame_redistribution != "Off"
+ is_use_new_render_core = is_use_parseq and is_use_key_frame_redistribution
+ if is_use_new_render_core:
+ experimental_core.render_animation(args, anim_args, video_args, parseq_args, loop_args, controlnet_args, root)
+ return
# initialise Parseq adapter
parseq_adapter = ParseqAdapter(parseq_args, anim_args, video_args, controlnet_args, loop_args)
@@ -538,7 +545,7 @@ def render_animation(args, anim_args, video_args, parseq_args, loop_args, contro
devices.torch_gc()
lowvram.setup_for_low_vram(sd_model, cmd_opts.medvram)
sd_hijack.model_hijack.hijack(sd_model)
-
+
optical_flow_redo_generation = anim_args.optical_flow_redo_generation if not args.motion_preview_mode else 'None'
# optical flow redo before generation
@@ -637,7 +644,7 @@ def render_animation(args, anim_args, video_args, parseq_args, loop_args, contro
args.seed = next_seed(args, root)
- last_preview_frame = render_preview(args, anim_args, video_args, root, frame_idx, last_preview_frame)
+ last_preview_frame = render_preview(args, anim_args, video_args, root, frame_idx, last_preview_frame)
JobStatusTracker().update_phase(root.job_id, phase="GENERATING", progress=frame_idx/anim_args.max_frames)
@@ -647,4 +654,3 @@ def render_animation(args, anim_args, video_args, parseq_args, loop_args, contro
if load_raft:
raft_model.delete_model()
-
diff --git a/scripts/deforum_helpers/rendering/__init__.py b/scripts/deforum_helpers/rendering/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/scripts/deforum_helpers/rendering/data/__init__.py b/scripts/deforum_helpers/rendering/data/__init__.py
new file mode 100644
index 000000000..77adfb648
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/data/__init__.py
@@ -0,0 +1,6 @@
+from .images import Images
+from .indexes import Indexes
+from .mask import Mask
+from .schedule import Schedule
+from .render_data import RenderData
+from .turbo import Turbo
diff --git a/scripts/deforum_helpers/rendering/data/anim/__init__.py b/scripts/deforum_helpers/rendering/data/anim/__init__.py
new file mode 100644
index 000000000..24b69f2a0
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/data/anim/__init__.py
@@ -0,0 +1,2 @@
+from .animation_keys import AnimationKeys
+from .animation_mode import AnimationMode
diff --git a/scripts/deforum_helpers/rendering/data/anim/animation_keys.py b/scripts/deforum_helpers/rendering/data/anim/animation_keys.py
new file mode 100644
index 000000000..efce69056
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/data/anim/animation_keys.py
@@ -0,0 +1,24 @@
+from dataclasses import dataclass
+
+from ....animation_key_frames import DeformAnimKeys, LooperAnimKeys
+
+
+@dataclass(init=True, frozen=True, repr=False, eq=False)
+class AnimationKeys:
+ deform_keys: DeformAnimKeys
+ looper_keys: LooperAnimKeys
+
+ @staticmethod
+ def _choose_default_or_parseq_keys(default_keys, parseq_keys, parseq_adapter):
+ return default_keys if not parseq_adapter.use_parseq else parseq_keys
+
+ @staticmethod
+ def from_args(step_args, parseq_adapter, seed):
+ ada = parseq_adapter
+
+ def _choose(default_keys):
+ return AnimationKeys._choose_default_or_parseq_keys(default_keys, ada.anim_keys, ada)
+
+ # Parseq keys are decorated, see ParseqAnimKeysDecorator and ParseqLooperKeysDecorator
+ return AnimationKeys(_choose(DeformAnimKeys(step_args.anim_args, seed)),
+ _choose(LooperAnimKeys(step_args.loop_args, step_args.anim_args, seed)))
diff --git a/scripts/deforum_helpers/rendering/data/anim/animation_mode.py b/scripts/deforum_helpers/rendering/data/anim/animation_mode.py
new file mode 100644
index 000000000..c0206bf7b
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/data/anim/animation_mode.py
@@ -0,0 +1,95 @@
+import os
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Any
+
+from ...util import opt_utils
+from ....RAFT import RAFT
+from ....hybrid_video import hybrid_generation
+
+
+@dataclass(init=True, frozen=False, repr=False, eq=False)
+class AnimationMode:
+ has_video_input: bool = False
+ hybrid_input_files: Any = None
+ hybrid_frame_path: str = ""
+ prev_flow: Any | None = None
+ is_keep_in_vram: bool = False
+ depth_model: Any = None
+ raft_model: Any = None
+
+ def is_predicting_depths(self) -> bool:
+ return self.depth_model is not None
+
+ def is_raft_active(self) -> bool:
+ return self.raft_model is not None
+
+ def unload_raft_and_depth_model(self):
+ if self.is_predicting_depths() and not self.is_keep_in_vram:
+ self.depth_model.delete_model() # handles adabins too
+ if self.is_raft_active():
+ self.raft_model.delete_model()
+
+ @staticmethod
+ def _has_video_input(anim_args) -> bool:
+ return AnimationMode._is_2d_or_3d_mode(anim_args) and AnimationMode._is_using_hybrid_frames(anim_args)
+
+ @staticmethod
+ def _is_2d_or_3d_mode(anim_args):
+ return anim_args.animation_mode in ['2D', '3D']
+
+ @staticmethod
+ def _is_using_hybrid_frames(anim_args):
+ return (anim_args.hybrid_composite != 'None'
+ or anim_args.hybrid_motion in ['Affine', 'Perspective', 'Optical Flow'])
+
+ @staticmethod
+ def _is_requiring_hybrid_frames(anim_args):
+ return AnimationMode._is_2d_or_3d_mode(anim_args) and AnimationMode._is_using_hybrid_frames(anim_args)
+
+ @staticmethod
+ def _is_load_depth_model_for_3d(args, anim_args):
+ is_depth_warped_3d = anim_args.animation_mode == '3D' and anim_args.use_depth_warping
+ has_depth_or_depth_video_mask = anim_args.hybrid_comp_mask_type in ['Depth', 'Video Depth']
+ is_composite_with_depth_mask = anim_args.hybrid_composite and has_depth_or_depth_video_mask
+ is_depth_used = is_depth_warped_3d or anim_args.save_depth_maps or is_composite_with_depth_mask
+ return is_depth_used and not args.motion_preview_mode
+
+ @staticmethod
+ def load_raft_if_active(anim_args, args):
+ is_cadenced_raft = anim_args.optical_flow_cadence == "RAFT" and int(anim_args.diffusion_cadence) > 1
+ is_optical_flow_raft = anim_args.hybrid_motion == "Optical Flow" and anim_args.hybrid_flow_method == "RAFT"
+ is_raft_redo = anim_args.optical_flow_redo_generation == "RAFT"
+ is_load_raft = (is_cadenced_raft or is_optical_flow_raft or is_raft_redo) and not args.motion_preview_mode
+ if is_load_raft:
+ print("Loading RAFT model...")
+ return RAFT() if is_load_raft else None
+
+ @staticmethod
+ def load_depth_model_if_active(args, anim_args):
+ return AnimationMode._is_load_depth_model_for_3d(args, anim_args) \
+ if opt_utils.keep_3d_models_in_vram() else None
+
+ @staticmethod
+ def initial_hybrid_files(sa) -> list[Path]:
+ """Returns a list of initial hybrid input files if required, otherwise an empty list."""
+ if AnimationMode._is_requiring_hybrid_frames(sa.anim_args):
+ # may cause side effects on args and anim_args.
+ _, __, init_hybrid_input_files = hybrid_generation(sa.args, sa.anim_args, sa.root)
+ return init_hybrid_input_files
+ return []
+
+ @staticmethod
+ def from_args(step_args):
+ sa = step_args # RenderInitArgs
+ # path required by hybrid functions, even if hybrid_comp_save_extra_frames is False
+ hybrid_input_files: Any = os.path.join(sa.args.outdir, 'hybridframes')
+ previous_flow = None
+ return AnimationMode(
+ AnimationMode._has_video_input(sa.anim_args),
+ AnimationMode.initial_hybrid_files(sa),
+ hybrid_input_files,
+ previous_flow,
+ opt_utils.keep_3d_models_in_vram(),
+ AnimationMode.load_depth_model_if_active(sa.args, sa.anim_args),
+ AnimationMode.load_raft_if_active(sa.anim_args, sa.args))
diff --git a/scripts/deforum_helpers/rendering/data/frame/__init__.py b/scripts/deforum_helpers/rendering/data/frame/__init__.py
new file mode 100644
index 000000000..f2ec90d1b
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/data/frame/__init__.py
@@ -0,0 +1,4 @@
+from .key_frame_distribution import KeyFrameDistribution
+from .key_frame_data import KeyFrameData
+from .key_frame import KeyFrame
+from .tween_frame import Tween
diff --git a/scripts/deforum_helpers/rendering/data/frame/key_frame.py b/scripts/deforum_helpers/rendering/data/frame/key_frame.py
new file mode 100644
index 000000000..2b52fea73
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/data/frame/key_frame.py
@@ -0,0 +1,318 @@
+import os
+from dataclasses import dataclass
+from typing import Any, List
+
+import PIL
+import cv2
+import numpy as np
+from PIL import Image
+
+from . import KeyFrameData, KeyFrameDistribution
+from .tween_frame import Tween
+from .. import RenderData, Schedule
+from ... import img_2_img_tubes
+from ...util import depth_utils, filename_utils, log_utils, opt_utils, utils
+from ...util.call.anim import call_anim_frame_warp
+from ...util.call.gen import call_generate
+from ...util.call.hybrid import (
+ call_get_flow_for_hybrid_motion, call_get_flow_for_hybrid_motion_prev, call_get_matrix_for_hybrid_motion,
+ call_get_matrix_for_hybrid_motion_prev, call_hybrid_composite)
+from ...util.call.images import call_add_noise
+from ...util.call.mask import call_compose_mask_with_check, call_unsharp_mask
+from ...util.call.subtitle import call_format_animation_params, call_write_frame_subtitle
+from ...util.call.video_and_audio import call_render_preview
+from ....colors import maintain_colors
+from ....hybrid_video import image_transform_ransac, image_transform_optical_flow
+from ....save_images import save_image
+from ....seed import next_seed
+
+
+@dataclass(init=True, frozen=False, repr=False, eq=False)
+class KeyFrame:
+ """Key steps are the steps for frames that actually get diffused (as opposed to tween frame steps)."""
+ i: int
+ step_data: KeyFrameData
+ render_data: RenderData
+ schedule: Schedule
+ depth: Any # assigned during generation
+ subtitle_params_to_print: Any
+ subtitle_params_string: str
+ last_preview_frame: int
+ tweens: List[Tween]
+ tween_values: List[float]
+
+ def has_tween_frames(self):
+ return len(self.tweens) > 0
+
+ def is_optical_flow_redo_before_generation(self, optical_flow_redo_generation, images):
+ has_flow_redo = optical_flow_redo_generation != 'None'
+ return has_flow_redo and images.has_previous() and self.step_data.has_strength()
+
+ def maybe_write_frame_subtitle(self):
+ data = self.render_data
+ if data.turbo.is_first_step_with_subtitles():
+ params_string = opt_utils.generation_info_for_subtitles()
+ self.subtitle_params_to_print = params_string
+ self.subtitle_params_string = call_format_animation_params(data, data.indexes.frame.i, params_string)
+ call_write_frame_subtitle(data, data.indexes.frame.i, params_string)
+
+ def apply_frame_warp_transform(self, data: RenderData, image):
+ is_not_last_frame = self.i < data.args.anim_args.max_frames
+ if is_not_last_frame:
+ previous, self.depth = call_anim_frame_warp(data, self.i, image, None)
+ return previous
+
+ def _do_hybrid_compositing_on_cond(self, data: RenderData, image, condition):
+ i = data.indexes.frame.i
+ schedules = self.step_data.hybrid_comp_schedules
+ if condition:
+ _, composed = call_hybrid_composite(data, i, image, schedules)
+ return composed
+ return image
+
+ def do_hybrid_compositing_before_motion(self, data: RenderData, image):
+ condition = data.is_hybrid_composite_before_motion()
+ return self._do_hybrid_compositing_on_cond(data, image, condition)
+
+ def do_normal_hybrid_compositing_after_motion(self, data: RenderData, image):
+ condition = data.is_normal_hybrid_composite()
+ return self._do_hybrid_compositing_on_cond(data, image, condition)
+
+ def apply_scaling(self, image):
+ return (image * self.step_data.contrast).round().astype(np.uint8)
+
+ def apply_anti_blur(self, data: RenderData, image):
+ if self.step_data.amount > 0:
+ return call_unsharp_mask(data, self, image, data.mask)
+ return image
+
+ def apply_frame_noising(self, data: RenderData, mask, image):
+ is_use_any_mask = data.args.args.use_mask or data.args.anim_args.use_noise_mask
+ if is_use_any_mask:
+ seq = self.schedule.noise_mask
+ vals = mask.noise_vals
+ contrast_image = image
+ data.args.root.noise_mask = call_compose_mask_with_check(data, seq, vals, contrast_image)
+ return call_add_noise(data, self, image)
+
+ def create_color_match_for_video(self):
+ data = self.render_data
+ if data.args.anim_args.color_coherence == 'Video Input' and data.is_hybrid_available():
+ if int(data.indexes.frame.i) % int(data.args.anim_args.color_coherence_video_every_N_frames) == 0:
+ prev_vid_img = Image.open(filename_utils.preview_video_image_path(data, data.indexes))
+ prev_vid_img = prev_vid_img.resize(data.dimensions(), PIL.Image.LANCZOS)
+ data.images.color_match = np.asarray(prev_vid_img)
+ return cv2.cvtColor(data.images.color_match, cv2.COLOR_RGB2BGR)
+ return None
+
+ def _generate_and_update_noise(self, data, image, contrasted_noise_tube):
+ noised_image = contrasted_noise_tube(data, self)(image)
+ data.update_sample_and_args_for_current_progression_step(self, noised_image)
+ return image # return original as passed.
+
+ def transform_and_update_noised_sample(self, frame_tube, contrasted_noise_tube):
+ data = self.render_data
+ if data.images.has_previous(): # skipping 1st iteration
+ transformed_image = frame_tube(data, self)(data.images.previous)
+ if transformed_image is None:
+ log_utils.warn("Image transformation failed, using fallback.")
+ transformed_image = data.images.previous
+ return self._generate_and_update_noise(data, transformed_image, contrasted_noise_tube)
+ return None
+
+ def prepare_generation(self, frame_tube, contrasted_noise_tube):
+ self.render_data.images.color_match = self.create_color_match_for_video()
+ self.render_data.images.previous = self.transform_and_update_noised_sample(frame_tube, contrasted_noise_tube)
+ self.render_data.prepare_generation(self.render_data, self, self.i)
+ self.maybe_redo_optical_flow()
+ self.maybe_redo_diffusion()
+
+ # Conditional Redoes
+ def maybe_redo_optical_flow(self):
+ data = self.render_data
+ optical_flow_redo_generation = data.optical_flow_redo_generation_if_not_in_preview_mode()
+ is_redo_optical_flow = self.is_optical_flow_redo_before_generation(optical_flow_redo_generation, data.images)
+ if is_redo_optical_flow:
+ data.args.root.init_sample = self.do_optical_flow_redo_before_generation()
+
+ def maybe_redo_diffusion(self):
+ data = self.render_data
+ is_pos_redo = data.has_positive_diffusion_redo
+ is_diffusion_redo = is_pos_redo and data.images.has_previous() and self.step_data.has_strength()
+ is_not_preview = data.is_not_in_motion_preview_mode()
+ if is_diffusion_redo and is_not_preview:
+ self.do_diffusion_redo()
+
+ def generate(self):
+ return call_generate(self.render_data, self)
+
+ def after_diffusion(self, image):
+ self.render_data.images.color_match = img_2_img_tubes.conditional_color_match_tube(self)(image)
+ self.progress_and_save(image)
+ self.render_data.args.args.seed = self.next_seed()
+ self.update_render_preview()
+
+ def progress_and_save(self, image):
+ next_index = self._progress_save_and_get_next_index(image)
+ self.render_data.indexes.update_frame(next_index)
+
+ def _progress_save_and_get_next_index(self, image):
+ data = self.render_data
+ """Will progress frame or turbo-frame step, save the image, update `self.depth` and return next index."""
+ opencv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
+ if not data.animation_mode.has_video_input:
+ data.images.previous = opencv_image
+ filename = filename_utils.frame_filename(data, self.i)
+
+ is_overwrite = True # replace the processed tween frame with the original one? (probably not)
+ if is_overwrite or not os.path.exists(os.path.join(data.args.args.outdir, filename)):
+ # In many cases, the original images may look more detailed or 'better' than the processed ones,
+ # but we only save the frames that were processed tough the flows to keep the output consistent.
+ # However, it may be preferable to use them for the 1st and for the last frame, or as thumbnails.
+ # TODO? add option to save original frames in a different sub dir.
+ save_image(image, 'PIL', filename, data.args.args, data.args.video_args, data.args.root)
+
+ self.depth = depth_utils.generate_and_save_depth_map_if_active(data, opencv_image)
+ if data.turbo.has_steps():
+ return data.indexes.frame.i + data.turbo.progress_step(data.indexes, opencv_image)
+ return data.indexes.frame.i + 1 # normal (i.e. 'non-turbo') step always increments by 1.
+
+ def next_seed(self):
+ return next_seed(self.render_data.args.args, self.render_data.args.root)
+
+ def update_render_preview(self):
+ self.last_preview_frame = call_render_preview(self.render_data, self.last_preview_frame)
+
+ def do_optical_flow_redo_before_generation(self):
+ data = self.render_data
+ redo = data.args.anim_args.optical_flow_redo_generation
+ stored_seed = data.args.args.seed # keep original to reset it after executing the optical flow
+ data.args.args.seed = utils.generate_random_seed() # create and set a new random seed
+ log_utils.print_optical_flow_info(data, redo)
+
+ sample_image = call_generate(data, self)
+ optical_tube = img_2_img_tubes.optical_flow_redo_tube(data, self, redo)
+ transformed_sample_image = optical_tube(sample_image)
+
+ data.args.args.seed = stored_seed # restore stored seed
+ return Image.fromarray(transformed_sample_image)
+
+ def do_diffusion_redo(self):
+ data = self.render_data
+ stored_seed = data.args.args.seed
+ last_diffusion_redo_index = int(data.args.anim_args.diffusion_redo)
+ for n in range(0, last_diffusion_redo_index):
+ log_utils.print_redo_generation_info(data, n)
+ data.args.args.seed = utils.generate_random_seed()
+ diffusion_redo_image = call_generate(data, self)
+ diffusion_redo_image = cv2.cvtColor(np.array(diffusion_redo_image), cv2.COLOR_RGB2BGR)
+ # color match on last one only
+ is_last_iteration = n == last_diffusion_redo_index
+ if is_last_iteration:
+ mode = data.args.anim_args.color_coherence
+ diffusion_redo_image = maintain_colors(data.images.previous, data.images.color_match, mode)
+ data.args.args.seed = stored_seed
+ data.args.root.init_sample = Image.fromarray(cv2.cvtColor(diffusion_redo_image, cv2.COLOR_BGR2RGB))
+
+ @staticmethod
+ def create(data: RenderData):
+ step_data = KeyFrameData.create(data)
+ schedule = Schedule.create(data)
+ return KeyFrame(0, step_data, data, schedule, None, None, "", 0, list(), list())
+
+ @staticmethod
+ def apply_color_matching(data: RenderData, image):
+ return KeyFrame.apply_color_coherence(image, data) if data.has_color_coherence() else image
+
+ @staticmethod
+ def apply_color_coherence(image, data: RenderData):
+ if data.images.color_match is None:
+ # Initialize color_match for next iteration with current image, but don't do anything yet.
+ if image is not None:
+ data.images.color_match = image.copy()
+ return image
+ return maintain_colors(image, data.images.color_match, data.args.anim_args.color_coherence)
+
+ @staticmethod
+ def transform_to_grayscale_if_active(data: RenderData, image):
+ if data.args.anim_args.color_force_grayscale:
+ grayscale = cv2.cvtColor(data.images.previous, cv2.COLOR_BGR2GRAY)
+ return cv2.cvtColor(grayscale, cv2.COLOR_GRAY2BGR)
+ return image
+
+ @staticmethod
+ def apply_hybrid_motion_ransac_transform(data: RenderData, image):
+ """hybrid video motion - warps `images.previous` to match motion, usually to prepare for compositing"""
+ motion = data.args.anim_args.hybrid_motion
+ if motion in ['Affine', 'Perspective']:
+ last_i = data.indexes.frame.i - 1
+ reference_images = data.images
+ matrix = call_get_matrix_for_hybrid_motion_prev(data, last_i, reference_images.previous) \
+ if data.args.anim_args.hybrid_motion_use_prev_img \
+ else call_get_matrix_for_hybrid_motion(data, last_i)
+ return image_transform_ransac(image, matrix, data.args.anim_args.hybrid_motion)
+ return image
+
+ @staticmethod
+ def apply_hybrid_motion_optical_flow(data: RenderData, key_frame, image):
+ motion = data.args.anim_args.hybrid_motion
+ if motion in ['Optical Flow']:
+ last_i = data.indexes.frame.i - 1
+ reference_images = data.images
+ flow = call_get_flow_for_hybrid_motion_prev(data, last_i, reference_images.previous) \
+ if data.args.anim_args.hybrid_motion_use_prev_img \
+ else call_get_flow_for_hybrid_motion(data, last_i)
+ transformed = image_transform_optical_flow(
+ reference_images.previous, flow, key_frame.step_data.flow_factor())
+ data.animation_mode.prev_flow = flow
+ return transformed
+ return image
+
+ @staticmethod
+ def create_all_frames(data, index_dist: KeyFrameDistribution = KeyFrameDistribution.default()):
+ """Creates a list of key steps for the entire animation."""
+ start_index = data.turbo.find_start(data)
+ num_key_steps = 1 + int((data.args.anim_args.max_frames - start_index) / data.cadence())
+ if data.parseq_adapter.use_parseq and index_dist is KeyFrameDistribution.PARSEQ_ONLY:
+ num_key_steps = len(data.parseq_adapter.parseq_json["keyframes"])
+
+ key_steps = [KeyFrame.create(data) for _ in range(0, num_key_steps)]
+ actual_num_key_steps = len(key_steps)
+
+ recalculated_key_steps = KeyFrame._recalculate_and_check_tweens(
+ data, key_steps, start_index, actual_num_key_steps, data.parseq_adapter, index_dist)
+ log_utils.print_tween_step_creation_info(key_steps, index_dist)
+
+ return recalculated_key_steps
+
+ @staticmethod
+ def _recalculate_and_check_tweens(data, key_frames, start_index, num_key_steps,
+ parseq_adapter, index_distribution):
+ max_frames = data.args.anim_args.max_frames
+ key_frames = index_distribution.calculate(key_frames, start_index, max_frames, num_key_steps, parseq_adapter)
+ key_frames = KeyFrame._add_tweens_to_key_steps(key_frames)
+ log_utils.print_key_step_debug_info_if_verbose(key_frames)
+
+ # The number of generated tweens depends on index since last key-frame. The last tween has the same
+ # index as the key_step it belongs to and is meant to replace the unprocessed original key frame.
+ assert len(key_frames) == num_key_steps
+ assert key_frames[0].i == 1 # 1st key frame is at index 1
+ assert key_frames[0].tweens == [] # 1st key frame has no tweens
+ if index_distribution != KeyFrameDistribution.PARSEQ_ONLY: # just using however many key frames Parseq defines.
+ assert key_frames[-1].i == max_frames # last index is same as max frames
+
+ return key_frames
+
+ @staticmethod
+ def _add_tweens_to_key_steps(key_steps):
+ log_utils.info(f"Adding tweens to {len(key_steps)} keyframes...")
+ for i in range(1, len(key_steps)): # skipping 1st key frame
+ data = key_steps[i].render_data
+ from_i = key_steps[i - 1].i
+ to_i = key_steps[i].i
+ tweens, values = Tween.create_in_between_steps(key_steps, i, data, from_i, to_i)
+ log_utils.debug(f"Creating {len(tweens)} tweens ({from_i}->{to_i}) for key frame at {key_steps[i].i}")
+ key_steps[i].tweens = tweens
+ key_steps[i].tween_values = values
+ key_steps[i].render_data.indexes.update_tween_start(data.turbo)
+ return key_steps
diff --git a/scripts/deforum_helpers/rendering/data/frame/key_frame_data.py b/scripts/deforum_helpers/rendering/data/frame/key_frame_data.py
new file mode 100644
index 000000000..a5eba3ec7
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/data/frame/key_frame_data.py
@@ -0,0 +1,55 @@
+from dataclasses import dataclass
+from typing import Any
+
+from ....animation_key_frames import DeformAnimKeys
+
+
+@dataclass(init=True, frozen=True, repr=False, eq=False)
+class KeyFrameData:
+ noise: Any = None
+ strength: Any = None
+ scale: Any = None
+ contrast: Any = None
+ kernel: int = 0
+ sigma: Any = None
+ amount: Any = None
+ threshold: Any = None
+ cadence_flow_factor: Any = None
+ redo_flow_factor: Any = None
+ hybrid_comp_schedules: Any = None
+
+ def kernel_size(self) -> tuple[int, int]:
+ return self.kernel, self.kernel
+
+ def flow_factor(self):
+ return self.hybrid_comp_schedules['flow_factor']
+
+ def has_strength(self):
+ return self.strength > 0
+
+ @staticmethod
+ def create(data):
+ i = data.indexes.frame.i
+ keys: DeformAnimKeys = data.animation_keys.deform_keys
+ return KeyFrameData(
+ keys.noise_schedule_series[i],
+ keys.strength_schedule_series[i],
+ keys.cfg_scale_schedule_series[i],
+ keys.contrast_schedule_series[i],
+ int(keys.kernel_schedule_series[i]),
+ keys.sigma_schedule_series[i],
+ keys.amount_schedule_series[i],
+ keys.threshold_schedule_series[i],
+ keys.cadence_flow_factor_schedule_series[i],
+ keys.redo_flow_factor_schedule_series[i],
+ KeyFrameData._hybrid_comp_args(keys, i))
+
+ @staticmethod
+ def _hybrid_comp_args(keys, i):
+ return {
+ "alpha": keys.hybrid_comp_alpha_schedule_series[i],
+ "mask_blend_alpha": keys.hybrid_comp_mask_blend_alpha_schedule_series[i],
+ "mask_contrast": keys.hybrid_comp_mask_contrast_schedule_series[i],
+ "mask_auto_contrast_cutoff_low": int(keys.hybrid_comp_mask_auto_contrast_cutoff_low_schedule_series[i]),
+ "mask_auto_contrast_cutoff_high": int(keys.hybrid_comp_mask_auto_contrast_cutoff_high_schedule_series[i]),
+ "flow_factor": keys.hybrid_flow_factor_schedule_series[i]}
diff --git a/scripts/deforum_helpers/rendering/data/frame/key_frame_distribution.py b/scripts/deforum_helpers/rendering/data/frame/key_frame_distribution.py
new file mode 100644
index 000000000..dd48c3845
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/data/frame/key_frame_distribution.py
@@ -0,0 +1,87 @@
+import random
+from enum import Enum
+from typing import List
+
+from ...util import log_utils
+
+
+class KeyFrameDistribution(Enum):
+ OFF = "Off"
+ PARSEQ_ONLY = "Parseq Only" # cadence is ignored. all frames not present in the Parseq table are handled as tweens.
+ UNIFORM_WITH_PARSEQ = "Uniform with Parseq" # similar to uniform, but parseq key frame diffusion is enforced.
+
+ @staticmethod
+ def from_UI_tab(data):
+ redistribution = data.args.parseq_args.parseq_key_frame_redistribution
+ match redistribution:
+ case "Off":
+ return KeyFrameDistribution.OFF
+ case "Parseq Only (no cadence)":
+ return KeyFrameDistribution.PARSEQ_ONLY
+ case "Uniform with Parseq (pseudo-cadence)":
+ return KeyFrameDistribution.UNIFORM_WITH_PARSEQ
+ case _:
+ raise ValueError(f"Invalid parseq_key_frame_redistribution from UI: {redistribution}")
+
+ @staticmethod
+ def default():
+ return KeyFrameDistribution.OFF
+
+ def calculate(self, key_frames, start_index, max_frames, num_key_steps, parseq_adapter) -> List[int]:
+ key_indices: List[int] = self._calculate(start_index, max_frames, num_key_steps, parseq_adapter)
+ for i, key_step in enumerate(key_indices):
+ key_frames[i].i = key_indices[i]
+ return key_frames
+
+ def _calculate(self, start_index, max_frames, num_key_steps, parseq_adapter) -> List[int]:
+ match self:
+ case KeyFrameDistribution.PARSEQ_ONLY: # same as UNIFORM_SPACING, if no Parseq keys are present.
+ return self._parseq_only_indexes(start_index, max_frames, num_key_steps, parseq_adapter)
+ case KeyFrameDistribution.UNIFORM_WITH_PARSEQ:
+ return self._uniform_with_parseq_indexes(start_index, max_frames, num_key_steps, parseq_adapter)
+ case KeyFrameDistribution.OFF:
+ log_utils.warn("Called new core without key frame redistribution. Using 'PARSEQ_ONLY'.")
+ return self._parseq_only_indexes(start_index, max_frames, num_key_steps, parseq_adapter)
+ case _:
+ raise ValueError(f"Invalid KeyFrameDistribution: {self}")
+
+ @staticmethod
+ def _uniform_indexes(start_index, max_frames, num_key_steps):
+ return [1 + start_index + int(n * (max_frames - 1 - start_index) / (num_key_steps - 1))
+ for n in range(num_key_steps)]
+
+ @staticmethod
+ def _parseq_only_indexes(start_index, max_frames, num_key_steps, parseq_adapter):
+ """Only Parseq key frames are used. Cadence settings are ignored."""
+ if not parseq_adapter.use_parseq:
+ log_utils.warn("PARSEQ_ONLY, but Parseq is not active, using UNIFORM_SPACING instead.")
+ return KeyFrameDistribution._uniform_indexes(start_index, max_frames, num_key_steps)
+
+ parseq_key_frames = [keyframe["frame"] for keyframe in parseq_adapter.parseq_json["keyframes"]]
+ shifted_parseq_frames = [frame + 1 for frame in parseq_key_frames]
+ return shifted_parseq_frames
+
+ @staticmethod
+ def _uniform_with_parseq_indexes(start_index, max_frames, num_key_steps, parseq_adapter):
+ """Calculates uniform indices according to cadence, but parseq key frames replace the closest deforum key."""
+ uniform_indices = KeyFrameDistribution._uniform_indexes(start_index, max_frames, num_key_steps)
+ if not parseq_adapter.use_parseq:
+ log_utils.warn("UNIFORM_WITH_PARSEQ, but Parseq is not active, using UNIFORM_SPACING instead.")
+ return uniform_indices
+
+ parseq_key_frames = [keyframe["frame"] for keyframe in parseq_adapter.parseq_json["keyframes"]]
+ shifted_parseq_frames = [frame + 1 for frame in parseq_key_frames]
+ key_frames_set = set(uniform_indices) # set for faster membership checks
+
+ # Insert parseq keyframes while maintaining keyframe count
+ for current_frame in shifted_parseq_frames:
+ if current_frame not in key_frames_set:
+ # Find the closest index in the set to replace (1st and last frame excluded)
+ closest_index = min(list(key_frames_set)[1:-1], key=lambda x: abs(x - current_frame))
+ key_frames_set.remove(closest_index)
+ key_frames_set.add(current_frame)
+
+ key_frames = list(key_frames_set)
+ key_frames.sort()
+ assert len(key_frames) == num_key_steps
+ return key_frames
diff --git a/scripts/deforum_helpers/rendering/data/frame/tween_frame.py b/scripts/deforum_helpers/rendering/data/frame/tween_frame.py
new file mode 100644
index 000000000..c15a9de97
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/data/frame/tween_frame.py
@@ -0,0 +1,127 @@
+from dataclasses import dataclass
+from itertools import chain
+from typing import Any, Iterable, Tuple, List
+
+from ..turbo import Turbo
+from ...data.indexes import Indexes, IndexWithStart
+from ...data.render_data import RenderData
+from ...util import image_utils, log_utils, opt_utils, web_ui_utils
+from ...util.call.subtitle import call_format_animation_params, call_write_frame_subtitle
+
+
+@dataclass(init=True, frozen=False, repr=False, eq=False)
+class Tween:
+ """cadence vars"""
+ indexes: Indexes
+ value: float
+ cadence_flow: Any # late init
+ cadence_flow_inc: Any # late init
+ depth: Any
+ depth_prediction: Any # reassigned
+
+ def i(self):
+ return self.indexes.tween.i
+
+ def from_key_step_i(self):
+ return self.indexes.frame.start
+
+ def to_key_step_i(self):
+ return self.indexes.frame.i
+
+ def emit_frame(self, last_frame, grayscale_tube, overlay_mask_tube):
+ """Emits this tween frame."""
+ max_frames = last_frame.render_data.args.anim_args.max_frames
+ if self.i() >= max_frames:
+ return # skipping tween emission on the last frame
+
+ data = last_frame.render_data
+ # data.turbo.steps = len(last_step.tweens)
+ self.handle_synchronous_status_concerns(data)
+ self.process(last_frame, data)
+
+ new_image = self.generate_tween_image(data, grayscale_tube, overlay_mask_tube)
+ new_image = image_utils.save_and_return_frame(data, self, self.i(), new_image)
+
+ # updating reference images to calculate hybrid motions in next iteration
+ data.images.previous = new_image
+
+ def generate_tween_image(self, data, grayscale_tube, overlay_mask_tube):
+ warped = data.turbo.do_optical_flow_cadence_after_animation_warping(data, self.indexes, self)
+ recolored = grayscale_tube(data)(warped)
+ is_tween = True
+ masked = overlay_mask_tube(data, is_tween)(recolored)
+ return masked
+
+ def process(self, last_frame, data):
+ data.turbo.advance_optical_flow_cadence_before_animation_warping(data, last_frame, self)
+ self.depth_prediction = Tween.calculate_depth_prediction(data, data.turbo)
+ data.turbo.advance(data, self.indexes.tween.i, self.depth)
+ data.turbo.do_hybrid_video_motion(data, last_frame, self.indexes, data.images)
+
+ def handle_synchronous_status_concerns(self, data):
+ self.write_tween_frame_subtitle_if_active(data) # TODO? decouple from execution and calc all in advance.
+ log_utils.print_tween_frame_info(data, self.indexes, self.cadence_flow, self.value)
+ web_ui_utils.update_progress_during_cadence(data, self.indexes)
+
+ def write_tween_frame_subtitle_if_active(self, data: RenderData):
+ if opt_utils.is_generate_subtitles():
+ params_to_print = opt_utils.generation_info_for_subtitles()
+ params_string = call_format_animation_params(data, self.indexes.tween.i, params_to_print)
+ is_cadence = self.value < 1.0
+ call_write_frame_subtitle(data, self.indexes.tween.i, params_string, is_cadence)
+
+ def has_cadence(self):
+ return self.cadence_flow is not None
+
+ @staticmethod
+ def create_in_between_steps(key_frames, i, data, from_i, to_i):
+ tween_range = range(from_i, to_i)
+ tween_indexes_list: List[Indexes] = Tween.create_indexes(data.indexes, tween_range)
+ last_step = key_frames[i]
+ tween_steps_and_values = Tween.create_steps(last_step, tween_indexes_list)
+ for tween in tween_steps_and_values[0]:
+ tween.indexes.update_tween_index(tween.i() + key_frames[i].i)
+ return tween_steps_and_values
+
+ @staticmethod
+ def _calculate_expected_tween_frames(num_entries):
+ if num_entries <= 0:
+ raise ValueError("Number of entries must be positive")
+ offset = 1.0 / num_entries
+ positions = [offset + (i / num_entries) for i in range(num_entries)]
+ return positions
+
+ @staticmethod
+ def _increment(original_indexes, tween_count, from_start):
+ inc = original_indexes.frame.i - tween_count - original_indexes.tween.start + from_start
+ original_indexes.tween = IndexWithStart(original_indexes.tween.start, original_indexes.tween.start + inc)
+ return original_indexes
+
+ @staticmethod
+ def create_steps_from_values(last_frame, values):
+ count = len(values)
+ r = range(count)
+ indexes_list = [Tween._increment(last_frame.render_data.indexes.copy(), count, i + 1) for i in r]
+ return list((Tween(indexes_list[i], values[i], None, None, last_frame.depth, None) for i in r))
+
+ @staticmethod
+ def create_indexes(base_indexes: Indexes, frame_range: Iterable[int]) -> list[Indexes]:
+ return list(chain.from_iterable([Indexes.create_from_last(base_indexes, i)] for i in frame_range))
+
+ @staticmethod
+ def create_steps(last_frame, tween_indexes_list: list[Indexes]) -> Tuple[list['Tween'], list[float]]:
+ if len(tween_indexes_list) > 0:
+ expected_tween_frames = Tween._calculate_expected_tween_frames(len(tween_indexes_list))
+ return Tween.create_steps_from_values(last_frame, expected_tween_frames), expected_tween_frames
+ return list(), list()
+
+ @staticmethod
+ def calculate_depth_prediction(data, turbo: Turbo):
+ has_depth = data.depth_model is not None
+ has_next = turbo.next.image is not None
+ if has_depth and has_next:
+ image = turbo.next.image
+ weight = data.args.anim_args.midas_weight
+ precision = data.args.root.half_precision
+ # log_utils.info(f"weight {weight} precision {precision}")
+ return data.depth_model.predict(image, weight, precision)
diff --git a/scripts/deforum_helpers/rendering/data/images.py b/scripts/deforum_helpers/rendering/data/images.py
new file mode 100644
index 000000000..e9a444c40
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/data/images.py
@@ -0,0 +1,31 @@
+from dataclasses import dataclass
+
+import PIL
+import cv2
+import numpy as np
+from cv2.typing import MatLike
+
+from ...load_images import load_image
+
+
+@dataclass(init=True, frozen=False, repr=False, eq=True)
+class Images:
+ color_match: MatLike = None
+ previous: MatLike | None = None
+
+ def has_previous(self):
+ return self.previous is not None
+
+ @staticmethod
+ def _load_color_match_sample(init) -> MatLike:
+ """get color match for 'Image' color coherence only once, before loop"""
+ if init.args.anim_args.color_coherence == 'Image':
+ image_box: PIL.Image.Image = None
+ # noinspection PyTypeChecker
+ raw_image = load_image(init.args.anim_args.color_coherence_image_path, image_box)
+ resized = raw_image.resize(init.dimensions(), PIL.Image.LANCZOS)
+ return cv2.cvtColor(np.array(resized), cv2.COLOR_RGB2BGR)
+
+ @staticmethod
+ def create(data):
+ return Images(Images._load_color_match_sample(data))
diff --git a/scripts/deforum_helpers/rendering/data/indexes.py b/scripts/deforum_helpers/rendering/data/indexes.py
new file mode 100644
index 000000000..9478c9123
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/data/indexes.py
@@ -0,0 +1,54 @@
+from dataclasses import dataclass
+
+
+@dataclass(init=True, frozen=True, repr=True, eq=True)
+class IndexWithStart:
+ start: int = 0
+ i: int = 0
+
+ def copy(self):
+ return IndexWithStart(start=self.start, i=self.i)
+
+
+@dataclass(init=True, frozen=False, repr=False, eq=False)
+class Indexes:
+ frame: IndexWithStart = None
+ tween: IndexWithStart = None
+
+ @staticmethod
+ def create(init, turbo):
+ frame_start = turbo.find_start(init)
+ tween_start = 0
+ return Indexes(IndexWithStart(frame_start), IndexWithStart(tween_start))
+
+ @staticmethod
+ def create_from_last(last_indexes, i: int):
+ """Creates a new `Indexes` object based on the last one, but updates the tween start index."""
+ return Indexes(last_indexes.frame, IndexWithStart(last_indexes.tween.start, i))
+
+ def create_next_tween(self):
+ return Indexes(self.frame, IndexWithStart(self.tween.start, self.tween.i + 1))
+
+ def update_tween_start(self, turbo):
+ tween_start = max(self.frame.start, self.frame.i - turbo.cadence)
+ self.tween = IndexWithStart(tween_start, self.tween.i)
+
+ def update_tween_index(self, i):
+ self.tween = IndexWithStart(self.tween.start, i)
+
+ def update_tween_start_index(self, i):
+ self.tween = IndexWithStart(i, self.tween.start)
+
+ def update_frame(self, i: int):
+ self.frame = IndexWithStart(self.frame.start, i)
+
+ def is_not_first_frame(self):
+ return self.frame.i > 0
+
+ def is_first_frame(self):
+ return self.frame.i == 0
+
+ def copy(self):
+ return Indexes(
+ frame=self.frame.copy() if self.frame else None,
+ tween=self.tween.copy() if self.tween else None)
diff --git a/scripts/deforum_helpers/rendering/data/mask.py b/scripts/deforum_helpers/rendering/data/mask.py
new file mode 100644
index 000000000..68790e111
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/data/mask.py
@@ -0,0 +1,59 @@
+from dataclasses import dataclass
+from typing import Any
+
+from PIL import Image
+
+from ..util import put_all
+from ..util.utils import create_img, call_or_use_on_cond
+from ...load_images import get_mask, load_img
+from ...rendering.util.call.images import call_get_mask_from_file
+
+
+@dataclass(init=True, frozen=True, repr=False, eq=False)
+class Mask:
+ image: Image
+ vals: Any
+ noise_vals: Any
+
+ def has_mask_image(self):
+ return self.image is not None
+
+ @staticmethod
+ def _create_vals(count, dimensions):
+ return list(map(lambda _: {'everywhere': create_img(dimensions)}, range(count)))
+
+ @staticmethod
+ def _assign(init, i, is_mask_image, dicts):
+ # Grab the first frame masks since they wont be provided until next frame
+ # Video mask overrides the init image mask, also, won't be searching for init_mask if use_mask_video is set
+ # Made to solve https://github.com/deforum-art/deforum-for-automatic1111-webui/issues/386
+ key = 'video_mask'
+ if init.args.anim_args.use_mask_video:
+ mask = call_get_mask_from_file(init, i, True)
+ init.args.args.mask_file = mask
+ init.args.root.noise_mask = mask
+ put_all(dicts, key, mask)
+ elif is_mask_image is None and init.is_use_mask:
+ put_all(dicts, key, get_mask(init.args.args))
+
+ @staticmethod
+ def _load_mask(init, args):
+ return load_img(args.init_image, args.init_image_box, shape=init.dimensions(),
+ use_alpha_as_mask=args.use_alpha_as_mask)[1]
+
+ @staticmethod
+ def _create_mask_image(init):
+ args = init.args.args
+ return call_or_use_on_cond(init.is_using_init_image_or_box(), lambda: Mask._load_mask(init, args))
+
+ @staticmethod
+ def _create(init, i, mask_image):
+ mask_and_noise_mask = Mask._create_vals(2, init.dimensions())
+ put_all(mask_and_noise_mask, 'video_mask', mask_image)
+ Mask._assign(init, i, mask_image, mask_and_noise_mask)
+ return Mask(mask_image, mask_and_noise_mask[0], mask_and_noise_mask[1])
+
+ @staticmethod
+ def create(init, i):
+ mask_image = Mask._create_mask_image(init)
+ return call_or_use_on_cond(mask_image is not None, Mask._create(init, i, mask_image))
diff --git a/scripts/deforum_helpers/rendering/data/render_data.py b/scripts/deforum_helpers/rendering/data/render_data.py
new file mode 100644
index 000000000..1cb27f536
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/data/render_data.py
@@ -0,0 +1,349 @@
+import os
+from dataclasses import dataclass
+from typing import Any
+
+import cv2
+import numexpr
+import numpy as np
+import pandas as pd
+from PIL import Image
+
+from .anim import AnimationKeys, AnimationMode
+from .images import Images
+from .indexes import Indexes
+from .mask import Mask
+from .subtitle import Srt
+from .turbo import Turbo
+from ..util import depth_utils, log_utils, memory_utils, opt_utils
+from ..util.call.images import call_get_mask_from_file_with_frame
+from ..util.call.mask import call_compose_mask_with_check
+from ..util.call.video_and_audio import call_get_next_frame
+from ...args import DeforumArgs, DeforumAnimArgs, LoopArgs, ParseqArgs, RootArgs
+from ...deforum_controlnet import unpack_controlnet_vids, is_controlnet_enabled
+from ...generate import (isJson)
+from ...parseq_adapter import ParseqAdapter
+from ...prompt import prepare_prompt
+from ...settings import save_settings_from_animation_run
+
+
+@dataclass(init=True, frozen=True, repr=False, eq=False)
+class RenderInitArgs:
+ args: DeforumArgs = None
+ parseq_args: ParseqArgs = None
+ anim_args: DeforumAnimArgs = None
+ video_args: Any = None
+ controlnet_args: Any = None
+ loop_args: LoopArgs = None
+ root: RootArgs = None
+
+
+@dataclass(init=True, frozen=False, repr=False, eq=False)
+class RenderData:
+ """The purpose of this class is to group and control all data used in render_animation"""
+ images: Images | None
+ turbo: Turbo | None
+ indexes: Indexes | None
+ mask: Mask | None
+ seed: int
+ args: RenderInitArgs
+ parseq_adapter: ParseqAdapter
+ srt: Any
+ animation_keys: AnimationKeys
+ animation_mode: AnimationMode
+ prompt_series: Any
+ depth_model: Any
+ output_directory: str
+ is_use_mask: bool
+
+ @staticmethod
+ def create(args, parseq_args, anim_args, video_args, controlnet_args, loop_args, root) -> 'RenderData':
+ ri_args = RenderInitArgs(args, parseq_args, anim_args, video_args, controlnet_args, loop_args, root)
+
+ output_directory = args.outdir
+ is_use_mask = args.use_mask
+ parseq_adapter = RenderData.create_parseq_adapter(ri_args)
+ srt = Srt.create_if_active(output_directory, root.timestring, video_args.fps)
+ animation_keys = AnimationKeys.from_args(ri_args, parseq_adapter, args.seed)
+ animation_mode = AnimationMode.from_args(ri_args)
+ prompt_series = RenderData.select_prompts(parseq_adapter, anim_args, animation_keys, root)
+ depth_model = depth_utils.create_depth_model_and_enable_depth_map_saving_if_active(
+ animation_mode, root, anim_args, args)
+
+ # Temporary instance only exists for using it to easily create other objects required by the actual instance.
+ # Feels slightly awkward, but it's probably not worth optimizing since only 1st and gc can take care of it fine.
+ incomplete_init = RenderData(None, None, None, None, args.seed, ri_args, parseq_adapter, srt, animation_keys,
+ animation_mode, prompt_series, depth_model, output_directory, is_use_mask)
+ images = Images.create(incomplete_init)
+ turbo = Turbo.create(incomplete_init)
+ indexes = Indexes.create(incomplete_init, turbo)
+ mask = Mask.create(incomplete_init, indexes.frame.i)
+
+ instance = RenderData(images, turbo, indexes, mask, args.seed, ri_args, parseq_adapter, srt, animation_keys,
+ animation_mode, prompt_series, depth_model, output_directory, is_use_mask)
+ RenderData.init_looper_if_active(args, loop_args)
+ RenderData.handle_controlnet_video_input_frames_generation(controlnet_args, args, anim_args)
+ RenderData.create_output_directory_for_the_batch(args.outdir)
+ RenderData.save_settings_txt(args, anim_args, parseq_args, loop_args, controlnet_args, video_args, root)
+ RenderData.maybe_resume_from_timestring(anim_args, root)
+ return instance
+
+ # The following methods are meant to provide easy and centralized access to the most important
+ # arguments and settings relevant for rendering. All bools use naming with 'is_' or 'has_'.
+ def is_3d(self) -> bool:
+ return self.args.anim_args.animation_mode == '3D'
+
+ def is_3d_or_2d(self) -> bool:
+ return self.args.anim_args.animation_mode in ['2D', '3D']
+
+ def has_parseq_keyframe_redistribution(self) -> bool:
+ return self.args.parseq_args.parseq_key_frame_redistribution != "Off"
+
+ def has_optical_flow_cadence(self) -> bool:
+ return self.args.anim_args.optical_flow_cadence != 'None'
+
+ def has_optical_flow_redo(self) -> bool:
+ return self.args.anim_args.optical_flow_redo_generation != 'None'
+
+ def is_3d_or_2d_with_optical_flow(self) -> bool:
+ return self.is_3d_or_2d() and self.has_optical_flow_cadence()
+
+ def is_3d_with_med_or_low_vram(self) -> bool:
+ return self.is_3d() and memory_utils.is_low_or_med_vram()
+
+ def has_keyframe_redistribution(self) -> bool:
+ return self.args.parseq_args
+
+ def width(self) -> int:
+ return self.args.args.W
+
+ def height(self) -> int:
+ return self.args.args.H
+
+ def dimensions(self) -> tuple[int, int]:
+ return self.width(), self.height()
+
+ # hybrid stuff
+ def is_hybrid_composite(self) -> bool:
+ return self.args.anim_args.hybrid_composite != 'None'
+
+ def is_normal_hybrid_composite(self) -> bool:
+ return self.args.anim_args.hybrid_composite == 'Normal'
+
+ def has_hybrid_motion(self) -> bool:
+ return self.args.anim_args.hybrid_motion in ['Optical Flow', 'Affine', 'Perspective']
+
+ def is_hybrid_available(self) -> bool:
+ return self.is_hybrid_composite() or self.has_hybrid_motion()
+
+ def is_hybrid_composite_before_motion(self) -> bool:
+ return self.args.anim_args.hybrid_composite == 'Before Motion'
+
+ def is_hybrid_composite_after_generation(self) -> bool:
+ return self.args.anim_args.hybrid_composite == 'After Generation'
+ # end hybrid stuff
+
+ def is_initialize_color_match(self, color_match_sample) -> bool:
+ """Determines whether to initialize color matching based on the given conditions."""
+ has_video_input = self.args.anim_args.color_coherence == 'Video Input' and self.is_hybrid_available()
+ has_image_color_coherence = self.args.anim_args.color_coherence == 'Image'
+ has_coherent_non_legacy_color_match = (self.args.anim_args.color_coherence != 'None'
+ and not self.args.anim_args.legacy_colormatch)
+ has_any_color_sample = color_match_sample is not None
+ has_sample_and_match = has_any_color_sample and has_coherent_non_legacy_color_match
+ return has_video_input or has_image_color_coherence or has_sample_and_match
+
+ def has_color_coherence(self) -> bool:
+ return self.args.anim_args.color_coherence != 'None'
+
+ def has_non_video_or_image_color_coherence(self) -> bool:
+ return self.args.anim_args.color_coherence not in ['Image', 'Video Input']
+
+ def is_resuming_from_timestring(self) -> bool:
+ return self.args.anim_args.resume_from_timestring
+
+ def has_video_input(self) -> bool:
+ return self.animation_mode.has_video_input
+
+ def cadence(self) -> int:
+ return int(self.args.anim_args.diffusion_cadence)
+
+ def _has_init_image(self) -> bool:
+ return self.args.args.init_image is not None and self.args.args.init_image != ''
+
+ def _has_init_box(self) -> bool:
+ return self.args.args.init_image_box is not None
+
+ def _has_init_image_or_box(self) -> bool:
+ return self._has_init_image() or self._has_init_box()
+
+ def is_using_init_image_or_box(self) -> bool:
+ return self.args.args.use_init and self._has_init_image_or_box()
+
+ def is_not_in_motion_preview_mode(self) -> bool:
+ return not self.args.args.motion_preview_mode
+
+ def color_coherence_mode(self):
+ return self.args.anim_args.color_coherence
+
+ def diffusion_redo(self):
+ return self.args.anim_args.diffusion_redo
+
+ def diffusion_redo_as_int(self):
+ return int(self.diffusion_redo())
+
+ def has_positive_diffusion_redo(self) -> bool:
+ return self.diffusion_redo_as_int() > 0
+
+ def optical_flow_redo_generation_if_not_in_preview_mode(self):
+ is_not_preview = self.is_not_in_motion_preview_mode()
+ return self.args.anim_args.optical_flow_redo_generation if is_not_preview else 'None'
+
+ def is_do_color_match_conversion(self, step) -> bool:
+ is_legacy_cm = self.args.anim_args.legacy_colormatch
+ is_use_init = self.args.args.use_init
+ is_not_legacy_with_use_init = not is_legacy_cm and not is_use_init
+ is_legacy_cm_without_strength = is_legacy_cm and step.step_data.strength == 0
+ is_maybe_special_legacy = is_not_legacy_with_use_init or is_legacy_cm_without_strength
+ return is_maybe_special_legacy and self.has_non_video_or_image_color_coherence()
+
+ def update_sample_and_args_for_current_progression_step(self, step, noised_image):
+ # use transformed previous frame as init for current
+ self.args.args.use_init = True
+ self.args.root.init_sample = Image.fromarray(cv2.cvtColor(noised_image, cv2.COLOR_BGR2RGB))
+ self.args.args.strength = max(0.0, min(1.0, step.step_data.strength))
+
+ def update_some_args_for_current_step(self, step, i):
+ keys = self.animation_keys.deform_keys
+ # Pix2Pix Image CFG Scale - does *nothing* with non pix2pix checkpoints
+ self.args.args.pix2pix_img_cfg_scale = float(keys.pix2pix_img_cfg_scale_series[i])
+ self.args.args.prompt = self.prompt_series[i] # grab prompt for current frame
+ self.args.args.scale = step.step_data.scale
+
+ def update_seed_and_checkpoint_for_current_step(self, i):
+ keys = self.animation_keys.deform_keys
+ is_seed_scheduled = self.args.args.seed_behavior == 'schedule'
+ is_seed_managed = self.parseq_adapter.manages_seed()
+ is_seed_scheduled_or_managed = is_seed_scheduled or is_seed_managed
+ if is_seed_scheduled_or_managed:
+ self.args.args.seed = int(keys.seed_schedule_series[i])
+ self.args.args.checkpoint = keys.checkpoint_schedule_series[i] \
+ if self.args.anim_args.enable_checkpoint_scheduling else None
+
+ def update_sub_seed_schedule_for_current_step(self, i):
+ keys = self.animation_keys.deform_keys
+ is_subseed_scheduling_enabled = self.args.anim_args.enable_subseed_scheduling
+ is_seed_managed_by_parseq = self.parseq_adapter.manages_seed()
+ if is_subseed_scheduling_enabled or is_seed_managed_by_parseq:
+ self.args.root.subseed = int(keys.subseed_schedule_series[i])
+ if is_subseed_scheduling_enabled and not is_seed_managed_by_parseq:
+ self.args.root.subseed_strength = float(keys.subseed_strength_schedule_series[i])
+ if is_seed_managed_by_parseq:
+ self.args.root.subseed_strength = keys.subseed_strength_schedule_series[i]
+ self.args.anim_args.enable_subseed_scheduling = True # TODO? move to init.
+
+ def prompt_for_current_step(self, i):
+ """returns value to be set back into the prompt"""
+ prompt = self.args.args.prompt
+ max_frames = self.args.anim_args.max_frames
+ seed = self.args.args.seed
+ return prepare_prompt(prompt, max_frames, seed, i)
+
+ def _update_video_input_for_current_frame(self, i, step):
+ video_init_path = self.args.anim_args.video_init_path
+ init_frame = call_get_next_frame(self, i, video_init_path)
+ log_utils.print_init_frame_info(init_frame)
+ self.args.args.init_image = init_frame
+ self.args.args.init_image_box = None # init_image_box not used in this case
+ self.args.args.strength = max(0.0, min(1.0, step.step_data.strength))
+
+ def _update_video_mask_for_current_frame(self, i):
+ video_mask_path = self.args.anim_args.video_mask_path
+ is_mask = True
+ mask_init_frame = call_get_next_frame(self, i, video_mask_path, is_mask)
+ new_mask = call_get_mask_from_file_with_frame(self, mask_init_frame)
+ self.args.args.mask_file = new_mask
+ self.args.root.noise_mask = new_mask
+ self.mask.vals['video_mask'] = new_mask
+
+ def update_video_data_for_current_frame(self, i, step):
+ if self.animation_mode.has_video_input:
+ self._update_video_input_for_current_frame(i, step)
+ if self.args.anim_args.use_mask_video:
+ self._update_video_mask_for_current_frame(i)
+
+ def update_mask_image(self, step, mask):
+ is_use_mask = self.args.args.use_mask
+ if is_use_mask:
+ has_sample = self.args.root.init_sample is not None
+ if has_sample:
+ mask_seq = step.schedule.mask_seq
+ sample = self.args.root.init_sample
+ self.args.args.mask_image = call_compose_mask_with_check(self, mask_seq, mask.vals, sample)
+ else:
+ self.args.args.mask_image = None # we need it only after the first frame anyway
+
+ def prepare_generation(self, data, step, i):
+ if i > self.args.anim_args.max_frames - 1:
+ return
+ self.update_some_args_for_current_step(step, i)
+ self.update_seed_and_checkpoint_for_current_step(i)
+ self.update_sub_seed_schedule_for_current_step(i)
+ self.prompt_for_current_step(i)
+ self.update_video_data_for_current_frame(i, step)
+ self.update_mask_image(step, data.mask)
+ self.animation_keys = AnimationKeys.from_args(self.args, self.parseq_adapter, self.seed)
+ opt_utils.setup(step.schedule)
+ memory_utils.handle_vram_if_depth_is_predicted(data)
+
+ @staticmethod
+ def create_output_directory_for_the_batch(directory):
+ os.makedirs(directory, exist_ok=True)
+ print(f"Saving animation frames to:\n{directory}")
+
+ @staticmethod
+ def create_parseq_adapter(args):
+ adapter = ParseqAdapter(args.parseq_args, args.anim_args, args.video_args, args.controlnet_args, args.loop_args)
+ # Always enable pseudo-3d with parseq. No need for an extra toggle:
+ # Whether it's used or not in practice is defined by the schedules
+ if adapter.use_parseq:
+ args.anim_args.flip_2d_perspective = True
+ return adapter
+
+ @staticmethod
+ def init_looper_if_active(args, loop_args):
+ if loop_args.use_looper:
+ print("Using Guided Images mode: seed_behavior will be set to 'schedule' and 'strength_0_no_init' to False")
+ if args.strength == 0:
+ raise RuntimeError("Strength needs to be greater than 0 in Init tab")
+ args.strength_0_no_init = False
+ args.seed_behavior = "schedule"
+ if not isJson(loop_args.init_images):
+ raise RuntimeError("The images set for use with keyframe-guidance are not in a proper JSON format")
+
+ @staticmethod
+ def select_prompts(parseq_adapter, anim_args, animation_keys, root):
+ return animation_keys.deform_keys.prompts if parseq_adapter.manages_prompts() \
+ else RenderData.expand_prompts_out_to_per_frame(anim_args, root)
+
+ @staticmethod
+ def expand_prompts_out_to_per_frame(anim_args, root):
+ prompt_series = pd.Series([np.nan for _ in range(anim_args.max_frames)])
+ for i, prompt in root.animation_prompts.items():
+ if str(i).isdigit():
+ prompt_series[int(i)] = prompt
+ else:
+ prompt_series[int(numexpr.evaluate(i))] = prompt
+ return prompt_series.ffill().bfill()
+
+ @staticmethod
+ def handle_controlnet_video_input_frames_generation(controlnet_args, args, anim_args):
+ if is_controlnet_enabled(controlnet_args):
+ unpack_controlnet_vids(args, anim_args, controlnet_args)
+
+ @staticmethod
+ def save_settings_txt(args, anim_args, parseq_args, loop_args, controlnet_args, video_args, root):
+ save_settings_from_animation_run(args, anim_args, parseq_args, loop_args, controlnet_args, video_args, root)
+
+ @staticmethod
+ def maybe_resume_from_timestring(anim_args, root):
+ root.timestring = anim_args.resume_timestring if anim_args.resume_from_timestring else root.timestring
diff --git a/scripts/deforum_helpers/rendering/data/schedule.py b/scripts/deforum_helpers/rendering/data/schedule.py
new file mode 100644
index 000000000..6cc5d3c56
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/data/schedule.py
@@ -0,0 +1,92 @@
+from dataclasses import dataclass
+from typing import Optional, Any
+
+from .render_data import RenderData
+from ...animation_key_frames import DeformAnimKeys
+from ...args import DeforumAnimArgs, DeforumArgs
+
+
+@dataclass(init=True, frozen=True, repr=False, eq=False)
+class Schedule:
+ steps: int
+ sampler_name: str
+ clipskip: int
+ noise_multiplier: float
+ eta_ddim: float
+ eta_ancestral: float
+ mask: Optional[Any]
+ noise_mask: Optional[Any]
+
+ @staticmethod
+ def create(data: RenderData):
+ """Create a new Schedule instance based on the provided parameters."""
+ i = data.indexes.frame.i
+ args: DeforumArgs = data.args.args
+ anim_args: DeforumAnimArgs = data.args.anim_args
+ keys: DeformAnimKeys = data.animation_keys.deform_keys
+ steps = Schedule.schedule_steps(keys, i, anim_args)
+ sampler_name = Schedule.schedule_sampler(keys, i, anim_args)
+ clipskip = Schedule.schedule_clipskip(keys, i, anim_args)
+ noise_multiplier = Schedule.schedule_noise_multiplier(keys, i, anim_args)
+ eta_ddim = Schedule.schedule_ddim_eta(keys, i, anim_args)
+ eta_ancestral = Schedule.schedule_ancestral_eta(keys, i, anim_args)
+ mask = Schedule.schedule_mask(keys, i, args)
+ is_use_mask_without_noise = data.is_use_mask and not data.args.anim_args.use_noise_mask
+ noise_mask = mask if is_use_mask_without_noise else Schedule.schedule_noise_mask(keys, i, anim_args)
+ return Schedule(steps, sampler_name, clipskip, noise_multiplier, eta_ddim, eta_ancestral, mask, noise_mask)
+
+ @staticmethod
+ def _has_schedule(keys, i):
+ return keys.steps_schedule_series[i] is not None
+
+ @staticmethod
+ def _has_mask_schedule(keys, i):
+ return keys.mask_schedule_series[i] is not None
+
+ @staticmethod
+ def _has_noise_mask_schedule(keys, i):
+ return keys.noise_mask_schedule_series[i] is not None
+
+ @staticmethod
+ def _use_on_cond_if_scheduled(keys, i, value, cond):
+ return value if cond and Schedule._has_schedule(keys, i) else None
+
+ @staticmethod
+ def schedule_steps(keys, i, anim_args):
+ return Schedule._use_on_cond_if_scheduled(keys, i, int(keys.steps_schedule_series[i]),
+ anim_args.enable_steps_scheduling)
+
+ @staticmethod
+ def schedule_sampler(keys, i, anim_args):
+ return Schedule._use_on_cond_if_scheduled(keys, i, keys.sampler_schedule_series[i].casefold(),
+ anim_args.enable_sampler_scheduling)
+
+ @staticmethod
+ def schedule_clipskip(keys, i, anim_args):
+ return Schedule._use_on_cond_if_scheduled(keys, i, int(keys.clipskip_schedule_series[i]),
+ anim_args.enable_clipskip_scheduling)
+
+ @staticmethod
+ def schedule_noise_multiplier(keys, i, anim_args):
+ return Schedule._use_on_cond_if_scheduled(keys, i, float(keys.noise_multiplier_schedule_series[i]),
+ anim_args.enable_noise_multiplier_scheduling)
+
+ @staticmethod
+ def schedule_ddim_eta(keys, i, anim_args):
+ return Schedule._use_on_cond_if_scheduled(keys, i, float(keys.ddim_eta_schedule_series[i]),
+ anim_args.enable_ddim_eta_scheduling)
+
+ @staticmethod
+ def schedule_ancestral_eta(keys, i, anim_args):
+ return Schedule._use_on_cond_if_scheduled(keys, i, float(keys.ancestral_eta_schedule_series[i]),
+ anim_args.enable_ancestral_eta_scheduling)
+
+ @staticmethod
+ def schedule_mask(keys, i, args):
+ return keys.mask_schedule_series[i] \
+ if args.use_mask and Schedule._has_mask_schedule(keys, i) else None
+
+ @staticmethod
+ def schedule_noise_mask(keys, i, anim_args):
+ return keys.noise_mask_schedule_series[i] \
+ if anim_args.use_noise_mask and Schedule._has_noise_mask_schedule(keys, i) else None
diff --git a/scripts/deforum_helpers/rendering/data/subtitle/__init__.py b/scripts/deforum_helpers/rendering/data/subtitle/__init__.py
new file mode 100644
index 000000000..236ac2b47
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/data/subtitle/__init__.py
@@ -0,0 +1 @@
+from .srt import Srt
diff --git a/scripts/deforum_helpers/rendering/data/subtitle/srt.py b/scripts/deforum_helpers/rendering/data/subtitle/srt.py
new file mode 100644
index 000000000..1e3594faf
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/data/subtitle/srt.py
@@ -0,0 +1,23 @@
+import os
+
+from dataclasses import dataclass
+from decimal import Decimal
+
+from ...util import opt_utils
+from ....subtitle_handler import init_srt_file
+
+
+@dataclass(init=True, frozen=True, repr=False, eq=False)
+class Srt:
+ filename: str
+ frame_duration: Decimal
+
+ @staticmethod
+ def create_if_active(out_dir: str, timestring: str, fps: float) -> 'Srt | None':
+ if not opt_utils.is_subtitle_generation_active():
+ return None
+ else:
+ # create .srt file and set timeframe mechanism using FPS
+ filename = os.path.join(out_dir, f"{timestring}.srt")
+ frame_duration = init_srt_file(filename, fps)
+ return Srt(filename, frame_duration)
diff --git a/scripts/deforum_helpers/rendering/data/turbo.py b/scripts/deforum_helpers/rendering/data/turbo.py
new file mode 100644
index 000000000..b1270bb1b
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/data/turbo.py
@@ -0,0 +1,178 @@
+from dataclasses import dataclass
+
+from cv2.typing import MatLike
+
+from ..util import opt_utils
+from ..util.call.anim import call_anim_frame_warp
+from ..util.call.hybrid import (call_get_flow_for_hybrid_motion_prev, call_get_flow_for_hybrid_motion,
+ call_get_matrix_for_hybrid_motion, call_get_matrix_for_hybrid_motion_prev)
+from ..util.call.resume import call_get_resume_vars
+from ...hybrid_video import (get_flow_from_images, image_transform_ransac,
+ image_transform_optical_flow, rel_flow_to_abs_flow)
+
+
+@dataclass(init=True, frozen=False, repr=False, eq=True)
+class ImageFrame:
+ image: MatLike | None
+ index: int
+
+
+# Disabling transformations of previous frames may not be suited for all scenarios,
+# but depending on setup can speed up generations significantly and without changing
+# the visual output in a noticeable way. Leaving it off should be fine for current use cases.
+IS_TRANSFORM_PREV = False # TODO? benchmark and visually compare results. make configurable from UI or remove?
+
+
+@dataclass(frozen=False)
+class Turbo:
+ cadence: int
+ prev: ImageFrame
+ next: ImageFrame
+
+ @staticmethod
+ def create(data):
+ steps = 1 if data.has_video_input() else data.cadence()
+ return Turbo(steps, ImageFrame(None, 0), ImageFrame(None, 0))
+
+ def advance(self, data, i: int, depth):
+ if self._has_prev_image() and IS_TRANSFORM_PREV:
+ self.prev.image, _ = call_anim_frame_warp(data, i, self.prev.image, depth)
+ if self._has_next_image():
+ self.next.image, _ = call_anim_frame_warp(data, i, self.next.image, depth)
+
+ def do_hybrid_video_motion(self, data, last_frame, indexes, reference_images):
+ """Warps the previous and/or the next to match the motion of the provided reference images."""
+ motion = data.args.anim_args.hybrid_motion
+
+ def _is_do_motion(motions):
+ return indexes.tween.i > 0 and motion in motions
+
+ if _is_do_motion(['Affine', 'Perspective']):
+ self.advance_hybrid_motion_ransac_transform(data, indexes, reference_images)
+ if _is_do_motion(['Optical Flow']):
+ self.advance_hybrid_motion_optical_tween_flow(data, indexes, reference_images, last_frame)
+
+ def advance_optical_flow(self, tween_step, flow_factor: int = 1):
+ flow = tween_step.cadence_flow * -1
+ self.next.image = image_transform_optical_flow(self.next.image, flow, flow_factor)
+
+ def advance_optical_tween_flow(self, indexes, last_frame, flow):
+ flow_factor = last_frame.step_data.flow_factor()
+ i = indexes.tween.i
+ if self.is_advance_prev(i):
+ self.prev.image = image_transform_optical_flow(self.prev.image, flow, flow_factor)
+ if self.is_advance_next(i):
+ self.next.image = image_transform_optical_flow(self.next.image, flow, flow_factor)
+
+ def advance_hybrid_motion_optical_tween_flow(self, data, indexes, reference_images, last_frame):
+ last_i = indexes.tween.i - 1
+ flow = (call_get_flow_for_hybrid_motion(data, last_i)
+ if not data.args.anim_args.hybrid_motion_use_prev_img
+ else call_get_flow_for_hybrid_motion_prev(data, last_i, reference_images.previous))
+ self.advance_optical_tween_flow(indexes, last_frame, flow)
+ data.animation_mode.prev_flow = flow
+
+ def advance_cadence_flow(self, data, tween_frame):
+ ff_string = data.args.anim_args.cadence_flow_factor_schedule
+ flow_factor = float(ff_string.split(": ")[1][1:-1])
+ i = tween_frame.i()
+ flow = tween_frame.cadence_flow_inc
+ if self.is_advance_prev(i):
+ self.prev.image = image_transform_optical_flow(self.prev.image, flow, flow_factor)
+ if self.is_advance_next(i):
+ self.next.image = image_transform_optical_flow(self.next.image, flow, flow_factor)
+
+ def advance_ransac_transform(self, data, matrix):
+ i = data.indexes.tween.i
+ motion = data.args.anim_args.hybrid_motion
+ if self.is_advance_prev(i):
+ self.prev.image = image_transform_ransac(self.prev.image, matrix, motion)
+ if self.is_advance_next(i):
+ self.next.image = image_transform_ransac(self.next.image, matrix, motion)
+
+ def advance_hybrid_motion_ransac_transform(self, data, indexes, reference_images):
+ last_i = indexes.tween.i - 1
+ matrix = (call_get_matrix_for_hybrid_motion(data, last_i)
+ if not data.args.anim_args.hybrid_motion_use_prev_img
+ else call_get_matrix_for_hybrid_motion_prev(data, last_i, reference_images.previous))
+ self.advance_ransac_transform(data, matrix)
+
+ def advance_optical_flow_cadence_before_animation_warping(self, data, last_frame, tween_frame):
+ if data.is_3d_or_2d_with_optical_flow():
+ if self._is_do_flow(data, tween_frame):
+ method = data.args.anim_args.optical_flow_cadence # string containing the flow method (e.g. "RAFT").
+ flow = get_flow_from_images(self.prev.image, self.next.image, method, data.animation_mode.raft_model)
+ tween_frame.cadence_flow = flow / len(last_frame.tweens)
+ if tween_frame.has_cadence():
+ self.advance_optical_flow(tween_frame)
+ flow_factor = 1.0
+ self.next.image = image_transform_optical_flow(self.next.image, -tween_frame.cadence_flow, flow_factor)
+
+ def _is_do_flow(self, data, tween_frame):
+ i = data.indexes.tween.start
+ has_tween_schedule = data.animation_keys.deform_keys.strength_schedule_series[i] > 0
+ has_images = self.prev.image is not None and self.next.image is not None
+ has_step_and_images = tween_frame.cadence_flow is None and has_images
+ return has_tween_schedule and has_step_and_images and data.animation_mode.is_raft_active()
+
+ def do_optical_flow_cadence_after_animation_warping(self, data, indexes, tween_frame):
+ if not data.animation_mode.is_raft_active():
+ return self.next.image
+ if tween_frame.cadence_flow is not None:
+ # TODO Calculate all increments before running the generation (and try to avoid abs->rel->abs conversions).
+ # temp_flow = abs_flow_to_rel_flow(tween_step.cadence_flow, data.width(), data.height())
+ # new_flow, _ = call_anim_frame_warp(data, indexes.tween.i, temp_flow, None)
+ new_flow, _ = call_anim_frame_warp(data, indexes.tween.i, self.prev.image, None)
+ tween_frame.cadence_flow = new_flow
+ abs_flow = rel_flow_to_abs_flow(tween_frame.cadence_flow, data.width(), data.height())
+ tween_frame.cadence_flow_inc = abs_flow * tween_frame.value
+ self.advance_cadence_flow(data, tween_frame)
+ self.prev.index = self.next.frame_idx = indexes.tween.i if indexes is not None else 0
+ if self.prev.image is not None and tween_frame.value < 1.0:
+ return self.prev.image * (1.0 - tween_frame.value) + self.next.image * tween_frame.value
+ return self.next.image
+
+ def progress_step(self, indexes, opencv_image):
+ self.prev.image, self.prev.index = self.next.image, self.next.index
+ self.next.image, self.next.index = opencv_image, indexes.frame.i
+ return self.cadence
+
+ def _set_up_step_vars(self, data):
+ # determine last frame and frame to start on
+ prev_frame, next_frame, prev_img, next_img = call_get_resume_vars(data, self)
+ if self.cadence > 1:
+ self.prev.image, self.prev.index = prev_img, prev_frame if prev_frame >= 0 else 0
+ self.next.image, self.next.index = next_img, next_frame if next_frame >= 0 else 0
+
+ def find_start(self, data) -> int:
+ """Maybe resume animation (requires at least two frames - see function)."""
+ if data.is_resuming_from_timestring():
+ # set start_frame to next frame
+ self._set_up_step_vars(data)
+ # instead of "self.next.index + 1" we always return 0, to print a message
+ # for every frame that is skipped because it already exists.
+ return 0
+
+ def has_steps(self):
+ return self.cadence > 1
+
+ def _has_prev_image(self):
+ return self.prev.image is not None
+
+ def is_advance_prev(self, i: int) -> bool:
+ return IS_TRANSFORM_PREV and self._has_prev_image() and i > self.prev.index
+
+ def _has_next_image(self):
+ return self.next.image is not None
+
+ def is_advance_next(self, i: int) -> bool:
+ return i > self.next.index
+
+ def is_first_step(self) -> bool:
+ return self.cadence == 1
+
+ def is_first_step_with_subtitles(self) -> bool:
+ return self.is_first_step() and opt_utils.is_subtitle_generation_active()
+
+ def is_emit_in_between_frames(self) -> bool:
+ return self.cadence > 1
diff --git a/scripts/deforum_helpers/rendering/experimental_core.py b/scripts/deforum_helpers/rendering/experimental_core.py
new file mode 100644
index 000000000..6c74788d6
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/experimental_core.py
@@ -0,0 +1,103 @@
+import os
+from pathlib import Path
+from typing import List
+
+# noinspection PyUnresolvedReferences
+from modules.shared import cmd_opts, progress_print_out, state
+from tqdm import tqdm
+
+from . import img_2_img_tubes
+from .data.frame import KeyFrameDistribution, KeyFrame
+from .data.render_data import RenderData
+from .util import filename_utils, image_utils, log_utils, opt_utils, memory_utils, web_ui_utils
+
+
+def render_animation(args, anim_args, video_args, parseq_args, loop_args, controlnet_args, root):
+ log_utils.info("Using experimental render core.")
+ data = RenderData.create(args, parseq_args, anim_args, video_args, controlnet_args, loop_args, root)
+ _check_experimental_render_conditions(data)
+ web_ui_utils.init_job(data)
+ key_frames = KeyFrame.create_all_frames(data, KeyFrameDistribution.from_UI_tab(data))
+ run_render_animation(data, key_frames)
+ data.animation_mode.unload_raft_and_depth_model()
+
+
+def run_render_animation(data: RenderData, key_frames: List[KeyFrame]):
+ for key_frame in key_frames:
+ if is_resume(data, key_frame):
+ continue
+ pre_process_key_frame_and_emit_tweens(data, key_frame)
+ image = key_frame.generate()
+ if image is None:
+ log_utils.print_warning_generate_returned_no_image()
+ break
+ post_process_key_frame(key_frame, image)
+
+
+def pre_process_key_frame_and_emit_tweens(data, key_frame):
+ memory_utils.handle_med_or_low_vram_before_step(data)
+ web_ui_utils.update_job(data)
+ if key_frame.has_tween_frames():
+ emit_tweens(data, key_frame)
+ log_utils.print_animation_frame_info(key_frame.i, data.args.anim_args.max_frames)
+ key_frame.maybe_write_frame_subtitle()
+ frame_tube = img_2_img_tubes.frame_transformation_tube
+ contrasted_noise_tube = img_2_img_tubes.contrasted_noise_transformation_tube
+ key_frame.prepare_generation(frame_tube, contrasted_noise_tube)
+
+
+def post_process_key_frame(key_frame, image):
+ if not image_utils.is_PIL(image): # check is required when resuming from timestring
+ image = img_2_img_tubes.conditional_frame_transformation_tube(key_frame)(image)
+ state.assign_current_image(image)
+ key_frame.after_diffusion(image)
+ web_ui_utils.update_status_tracker(key_frame.render_data)
+
+
+def is_resume(data, key_step):
+ filename = filename_utils.frame_filename(data, key_step.i)
+ full_path = Path(data.output_directory) / filename
+ is_file_existing = os.path.exists(full_path)
+ if is_file_existing:
+ log_utils.warn(f"Frame {filename} exists, skipping to next key frame.")
+ key_step.render_data.args.args.seed = key_step.next_seed()
+ return is_file_existing
+
+
+def emit_tweens(data, key_step):
+ _update_pseudo_cadence(data, len(key_step.tweens) - 1)
+ log_utils.print_tween_frame_from_to_info(key_step)
+ grayscale_tube = img_2_img_tubes.conditional_force_tween_to_grayscale_tube
+ overlay_mask_tube = img_2_img_tubes.conditional_add_overlay_mask_tube
+ tweens = _tweens_with_progress(key_step)
+ [tween.emit_frame(key_step, grayscale_tube, overlay_mask_tube) for tween in tweens]
+
+
+def _check_experimental_render_conditions(data):
+ if data.has_parseq_keyframe_redistribution():
+ msg = "Experimental conditions: Using 'Parseq keyframe redistribution' together with '{method}'. {results}. \
+ In case of problems, consider deactivating either one."
+ dark_or_dist = "Resulting images may quickly end up looking dark or distorted."
+ if data.has_optical_flow_cadence():
+ log_utils.warn(msg.format(method="optical flow cadence", results=dark_or_dist))
+ if data.has_optical_flow_redo():
+ log_utils.warn(msg.format(method="optical flow generation", results=dark_or_dist))
+ if data.is_hybrid_available():
+ log_utils.warn(msg.format(method="hybrid video", results="Render process may not run stable."))
+
+
+def _update_pseudo_cadence(data, value):
+ data.turbo.cadence = value
+ data.parseq_adapter.cadence = value
+ data.parseq_adapter.a1111_cadence = value
+ data.args.anim_args.diffusion_cadence = value
+ data.args.anim_args.cadence_flow_factor_schedule = f"0: ({value})"
+
+
+def _tweens_with_progress(key_step):
+ # only use tween progress bar when extra console output (aka "dev mode") is disabled.
+ if not opt_utils.is_verbose():
+ log_utils.clear_previous_line()
+ return tqdm(key_step.tweens, desc="Tweens progress", file=progress_print_out,
+ disable=cmd_opts.disable_console_progressbars, colour='#FFA468')
+ return key_step.tweens
diff --git a/scripts/deforum_helpers/rendering/img_2_img_tubes.py b/scripts/deforum_helpers/rendering/img_2_img_tubes.py
new file mode 100644
index 000000000..efa24fd43
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/img_2_img_tubes.py
@@ -0,0 +1,123 @@
+from typing import Callable
+
+import cv2
+import numpy as np
+from PIL import ImageOps, Image
+from cv2.typing import MatLike
+
+from .data.frame.key_frame import KeyFrame
+from .data.render_data import RenderData
+from .util import image_utils
+from .util.call.hybrid import call_hybrid_composite
+from .util.fun_utils import tube
+from ..colors import maintain_colors
+from ..hybrid_video import get_flow_from_images, image_transform_optical_flow
+from ..masks import do_overlay_mask
+
+"""
+This module provides functions for conditionally processing images through various transformations.
+The `tube` function allows chaining these transformations together to create flexible image processing pipelines.
+Easily experiment by changing, or changing the order of function calls in the tube without having to worry
+about the larger context and without having to invent unnecessary names for intermediary processing results.
+
+All functions within the tube take and return an image (`img` argument). They may (and must) pass through
+the original image unchanged if a specific transformation is disabled or not required.
+
+Example:
+transformed_image = my_tube(arguments)(original_image)
+"""
+
+# ImageTubes are functions that take a MatLike image and return a newly processed (or the same unchanged) MatLike image.
+ImageTube = Callable[[MatLike], MatLike]
+PilImageTube = Callable[[Image.Image], Image.Image]
+
+
+def frame_transformation_tube(data: RenderData, key_frame: KeyFrame) -> ImageTube:
+ # make sure `img` stays the last argument in each call.
+ return tube(lambda img: key_frame.apply_frame_warp_transform(data, img),
+ lambda img: key_frame.do_hybrid_compositing_before_motion(data, img),
+ lambda img: KeyFrame.apply_hybrid_motion_ransac_transform(data, img),
+ lambda img: KeyFrame.apply_hybrid_motion_optical_flow(data, key_frame, img),
+ lambda img: key_frame.do_normal_hybrid_compositing_after_motion(data, img),
+ lambda img: KeyFrame.apply_color_matching(data, img),
+ lambda img: KeyFrame.transform_to_grayscale_if_active(data, img))
+
+
+def contrast_transformation_tube(data: RenderData, key_frame: KeyFrame) -> ImageTube:
+ return tube(lambda img: key_frame.apply_scaling(img),
+ lambda img: key_frame.apply_anti_blur(data, img))
+
+
+def noise_transformation_tube(data: RenderData, key_frame: KeyFrame) -> ImageTube:
+ return tube(lambda img: key_frame.apply_frame_noising(data, key_frame, img))
+
+
+def optical_flow_redo_tube(data: RenderData, key_frame: KeyFrame, optical_flow) -> ImageTube:
+ return tube(lambda img: image_utils.pil_to_numpy(img),
+ lambda img: image_utils.bgr_to_rgb(img),
+ lambda img: image_transform_optical_flow(
+ img, get_flow_from_images(data.images.previous, img, optical_flow, data.animation_mode.raft_model),
+ key_frame.step_data.redo_flow_factor))
+
+
+# Conditional Tubes (can be switched on or off by providing a Callable[Boolean] `is_do_process` predicate).
+def conditional_hybrid_video_after_generation_tube(key_frame: KeyFrame) -> PilImageTube:
+ data = key_frame.render_data
+ step_data = key_frame.step_data
+ return tube(lambda img: call_hybrid_composite(data, data.indexes.frame.i, img, step_data.hybrid_comp_schedules),
+ lambda img: image_utils.numpy_to_pil(img),
+ is_do_process=lambda: data.indexes.is_not_first_frame() and data.is_hybrid_composite_after_generation())
+
+
+def conditional_extra_color_match_tube(data: RenderData) -> PilImageTube:
+ # color matching on first frame is after generation, color match was collected earlier,
+ # so we do an extra generation to avoid the corruption introduced by the color match of first output
+ return tube(lambda img: maintain_colors(img, data.images.color_match, data.args.anim_args.color_coherence),
+ lambda img: maintain_colors(img, data.images.color_match, data.args.anim_args.color_coherence),
+ lambda img: image_utils.numpy_to_pil(img),
+ is_do_process=lambda: data.indexes.is_first_frame() and data.is_initialize_color_match(
+ data.images.color_match))
+
+
+def conditional_color_match_tube(key_frame: KeyFrame) -> ImageTube:
+ # on strength 0, set color match to generation
+ return tube(lambda img: image_utils.bgr_to_rgb(np.asarray(img)),
+ is_do_process=lambda: key_frame.render_data.is_do_color_match_conversion(key_frame))
+
+
+def conditional_force_to_grayscale_tube(data: RenderData) -> PilImageTube:
+ return tube(lambda img: ImageOps.grayscale(img),
+ lambda img: ImageOps.colorize(img, black="black", white="white"),
+ is_do_process=lambda: data.args.anim_args.color_force_grayscale)
+
+
+def conditional_add_overlay_mask_tube(data: RenderData, is_tween) -> PilImageTube:
+ is_use_overlay = data.args.args.overlay_mask
+ is_use_mask = data.args.anim_args.use_mask_video or data.args.args.use_mask
+ index = data.indexes.tween.i if is_tween else data.indexes.frame.i
+ is_bgr_array = True
+ return tube(lambda img: ImageOps.grayscale(img),
+ lambda img: do_overlay_mask(data.args.args, data.args.anim_args, img, index, is_bgr_array),
+ is_do_process=lambda: is_use_overlay and is_use_mask)
+
+
+def conditional_force_tween_to_grayscale_tube(data: RenderData) -> ImageTube:
+ return tube(lambda img: cv2.cvtColor(img.astype(np.uint8), cv2.COLOR_BGR2GRAY),
+ lambda img: cv2.cvtColor(img, cv2.COLOR_GRAY2BGR),
+ is_do_process=lambda: data.args.anim_args.color_force_grayscale)
+
+
+# Composite Tubes, made from other Tubes.
+def contrasted_noise_transformation_tube(data: RenderData, key_frame: KeyFrame) -> ImageTube:
+ """Combines contrast and noise transformation tubes."""
+ contrast_tube: ImageTube = contrast_transformation_tube(data, key_frame)
+ noise_tube: ImageTube = noise_transformation_tube(data, key_frame)
+ return tube(lambda img: noise_tube(contrast_tube(img)))
+
+
+def conditional_frame_transformation_tube(key_frame: KeyFrame, is_tween: bool = False) -> PilImageTube:
+ hybrid_tube: PilImageTube = conditional_hybrid_video_after_generation_tube(key_frame)
+ extra_tube: PilImageTube = conditional_extra_color_match_tube(key_frame.render_data)
+ gray_tube: PilImageTube = conditional_force_to_grayscale_tube(key_frame.render_data)
+ mask_tube: PilImageTube = conditional_add_overlay_mask_tube(key_frame.render_data, is_tween)
+ return tube(lambda img: mask_tube(gray_tube(extra_tube(hybrid_tube(img)))))
diff --git a/scripts/deforum_helpers/rendering/util/__init__.py b/scripts/deforum_helpers/rendering/util/__init__.py
new file mode 100644
index 000000000..ea37bcc42
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/util/__init__.py
@@ -0,0 +1,2 @@
+# All modules in this package are intended to not hold any state.
+from .utils import call_or_use_on_cond, generate_random_seed, put_all, put_if_present
diff --git a/scripts/deforum_helpers/rendering/util/call/__init__.py b/scripts/deforum_helpers/rendering/util/call/__init__.py
new file mode 100644
index 000000000..0e1fc9734
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/util/call/__init__.py
@@ -0,0 +1,24 @@
+"""
+This module provides utility functions for simplifying calls to other modules within the `experimental_core.py` module.
+
+**Purpose:**
+- **Reduce Argument Complexity:** Provides a way to call functions in other modules without directly handling
+ a large number of complex arguments. This simplifies code within the core by encapsulating argument management.
+- **Minimize Namespace Pollution:** Provides an alternative to overloading methods in the original modules,
+ which would introduce the `RenderInit` class into namespaces where it's not inherently needed.
+
+**Structure:**
+- **Simple Call Forwarding:** Functions in this module primarily act as wrappers. They perform minimal logic,
+ often just formatting or passing arguments, and directly call the corresponding method.
+- **Naming Convention:**
+ - Function names begin with "call_", followed by the name of the actual method to call.
+ - The `data` object is always passed as the first argument.
+ - Frame indices (e.g., `frame_idx`, `twin_frame_idx`) are passed as the second argument "i", when relevant.
+
+**Example:**
+```python
+# Example function in this module
+def call_some_function(data, i, ...):
+ return some_module.some_function(data.arg77, data.arg.arg.whatever, i, ...)
+```
+"""
diff --git a/scripts/deforum_helpers/rendering/util/call/anim.py b/scripts/deforum_helpers/rendering/util/call/anim.py
new file mode 100644
index 000000000..9452ea8d7
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/util/call/anim.py
@@ -0,0 +1,7 @@
+from ....animation import anim_frame_warp
+
+
+def call_anim_frame_warp(data, i, image, depth):
+ ia = data.args
+ return anim_frame_warp(image, ia.args, ia.anim_args, data.animation_keys.deform_keys, i, data.depth_model,
+ depth=depth, device=ia.root.device, half_precision=ia.root.half_precision)
diff --git a/scripts/deforum_helpers/rendering/util/call/gen.py b/scripts/deforum_helpers/rendering/util/call/gen.py
new file mode 100644
index 000000000..049a6ac6a
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/util/call/gen.py
@@ -0,0 +1,7 @@
+from ....generate import generate
+
+
+def call_generate(data, step):
+ ia = data.args
+ return generate(ia.args, data.animation_keys.deform_keys, ia.anim_args, ia.loop_args, ia.controlnet_args,
+ ia.root, data.parseq_adapter, data.indexes.frame.i, sampler_name=step.schedule.sampler_name)
diff --git a/scripts/deforum_helpers/rendering/util/call/hybrid.py b/scripts/deforum_helpers/rendering/util/call/hybrid.py
new file mode 100644
index 000000000..73fbb6cce
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/util/call/hybrid.py
@@ -0,0 +1,55 @@
+from ....hybrid_video import (
+ # Functions related to flow calculation
+ get_flow_for_hybrid_motion,
+ get_flow_for_hybrid_motion_prev,
+
+ # Functions related to matrix calculation
+ get_matrix_for_hybrid_motion,
+ get_matrix_for_hybrid_motion_prev,
+
+ # Other hybrid functions
+ hybrid_composite)
+
+
+def call_get_flow_for_hybrid_motion_prev(init, i, image):
+ mode = init.animation_mode
+ aa = init.args.anim_args
+ return get_flow_for_hybrid_motion_prev(
+ i, init.dimensions(),
+ mode.hybrid_input_files,
+ mode.hybrid_frame_path,
+ mode.prev_flow,
+ image,
+ aa.hybrid_flow_method,
+ mode.raft_model,
+ aa.hybrid_flow_consistency,
+ aa.hybrid_consistency_blur,
+ aa.hybrid_comp_save_extra_frames)
+
+
+def call_get_flow_for_hybrid_motion(init, i):
+ mode = init.animation_mode
+ args = init.args.anim_args
+ return get_flow_for_hybrid_motion(
+ i, init.dimensions(), mode.hybrid_input_files, mode.hybrid_frame_path,
+ mode.prev_flow, args.hybrid_flow_method, mode.raft_model,
+ args.hybrid_flow_consistency, args.hybrid_consistency_blur, args)
+
+
+def call_get_matrix_for_hybrid_motion_prev(init, i, image):
+ return get_matrix_for_hybrid_motion_prev(
+ i, init.dimensions(), init.animation_mode.hybrid_input_files,
+ image, init.args.anim_args.hybrid_motion)
+
+
+def call_get_matrix_for_hybrid_motion(init, i):
+ return get_matrix_for_hybrid_motion(
+ i, init.dimensions(), init.animation_mode.hybrid_input_files,
+ init.args.anim_args.hybrid_motion)
+
+
+def call_hybrid_composite(init, i, image, hybrid_comp_schedules):
+ ia = init.args
+ return hybrid_composite(
+ ia.args, ia.anim_args, i, image,
+ init.depth_model, hybrid_comp_schedules, init.args.root)
diff --git a/scripts/deforum_helpers/rendering/util/call/images.py b/scripts/deforum_helpers/rendering/util/call/images.py
new file mode 100644
index 000000000..4b3f225c1
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/util/call/images.py
@@ -0,0 +1,22 @@
+from ....load_images import get_mask_from_file
+from ....noise import add_noise
+
+
+def call_add_noise(init, step, image):
+ aa = init.args.anim_args
+ amount: float = step.step_data.noise
+ seed: int = init.args.args.seed
+ n_type: str = aa.noise_type
+ perlin_arguments = (aa.perlin_w, aa.perlin_h, aa.perlin_octaves, aa.perlin_persistence)
+ mask = init.args.root.noise_mask
+ is_do_maks_invert = init.args.args.invert_mask
+ return add_noise(image, amount, seed, n_type, perlin_arguments, mask, is_do_maks_invert)
+
+
+def call_get_mask_from_file(init, i, is_mask: bool = False):
+ next_frame = get_next_frame(init.output_directory, init.args.anim_args.video_mask_path, i, is_mask)
+ return get_mask_from_file(next_frame, init.args.args)
+
+
+def call_get_mask_from_file_with_frame(init, frame):
+ return get_mask_from_file(frame, init.args.args)
diff --git a/scripts/deforum_helpers/rendering/util/call/mask.py b/scripts/deforum_helpers/rendering/util/call/mask.py
new file mode 100644
index 000000000..856fcb964
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/util/call/mask.py
@@ -0,0 +1,14 @@
+from ....composable_masks import compose_mask_with_check
+from ....image_sharpening import unsharp_mask
+
+
+def call_compose_mask_with_check(init, mask_seq, val_masks, image):
+ return compose_mask_with_check(init.args.root, init.args.args, mask_seq, val_masks,
+ Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)))
+
+
+def call_unsharp_mask(init, step, image, mask):
+ kernel_size = (step.step_data.kernel, step.step_data.kernel)
+ mask_image = mask.image if init.args.args.use_mask else None
+ return unsharp_mask(image, kernel_size, step.step_data.sigma, step.step_data.amount,
+ step.step_data.threshold, mask_image)
diff --git a/scripts/deforum_helpers/rendering/util/call/resume.py b/scripts/deforum_helpers/rendering/util/call/resume.py
new file mode 100644
index 000000000..c9d6e3115
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/util/call/resume.py
@@ -0,0 +1,7 @@
+from ....resume import get_resume_vars
+
+
+def call_get_resume_vars(data, turbo):
+ return get_resume_vars(folder=data.args.args.outdir,
+ timestring=data.args.anim_args.resume_timestring,
+ cadence=turbo.cadence)
diff --git a/scripts/deforum_helpers/rendering/util/call/subtitle.py b/scripts/deforum_helpers/rendering/util/call/subtitle.py
new file mode 100644
index 000000000..a694bbdfc
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/util/call/subtitle.py
@@ -0,0 +1,11 @@
+from ....subtitle_handler import format_animation_params, write_frame_subtitle
+
+
+def call_format_animation_params(render_data, i, params_to_print):
+ return format_animation_params(render_data.animation_keys.deform_keys,
+ render_data.prompt_series, i, params_to_print)
+
+
+def call_write_frame_subtitle(render_data, i, params_string, is_cadence: bool = False) -> None:
+ text = f"F#: {i}; Cadence: {is_cadence}; Seed: {render_data.args.args.seed}; {params_string}"
+ write_frame_subtitle(render_data.srt.filename, i, render_data.srt.frame_duration, text)
diff --git a/scripts/deforum_helpers/rendering/util/call/video_and_audio.py b/scripts/deforum_helpers/rendering/util/call/video_and_audio.py
new file mode 100644
index 000000000..6a8a6812f
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/util/call/video_and_audio.py
@@ -0,0 +1,11 @@
+from ....video_audio_utilities import get_next_frame, render_preview
+
+
+def call_render_preview(init, last_preview_frame):
+ ia = init.args
+ i = init.indexes.frame.i
+ return render_preview(ia.args, ia.anim_args, ia.video_args, ia.root, i, last_preview_frame)
+
+
+def call_get_next_frame(init, i, video_path, is_mask: bool = False):
+ return get_next_frame(init.output_directory, video_path, i, is_mask)
diff --git a/scripts/deforum_helpers/rendering/util/depth_utils.py b/scripts/deforum_helpers/rendering/util/depth_utils.py
new file mode 100644
index 000000000..2a5c97fa1
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/util/depth_utils.py
@@ -0,0 +1,32 @@
+import os
+
+from . import filename_utils, memory_utils
+from ...depth import DepthModel
+
+
+def generate_and_save_depth_map_if_active(data, opencv_image):
+ if data.args.anim_args.save_depth_maps:
+ memory_utils.handle_vram_before_depth_map_generation(data)
+ depth = data.depth_model.predict(opencv_image, data.args.anim_args.midas_weight,
+ data.args.root.half_precision)
+ depth_filename = filename_utils.depth_frame(data, data.indexes)
+ data.depth_model.save(os.path.join(data.output_directory, depth_filename), depth)
+ memory_utils.handle_vram_after_depth_map_generation(data)
+ return depth
+
+
+def is_composite_with_depth_mask(anim_args):
+ return anim_args.hybrid_composite != 'None' and anim_args.hybrid_comp_mask_type == 'Depth'
+
+
+def create_depth_model_and_enable_depth_map_saving_if_active(anim_mode, root, anim_args, args):
+ # depth-based hybrid composite mask requires saved depth maps
+ anim_args.save_depth_maps = anim_mode.is_predicting_depths and is_composite_with_depth_mask(anim_args)
+ return DepthModel(root.models_path,
+ memory_utils.select_depth_device(root),
+ root.half_precision,
+ keep_in_vram=anim_mode.is_keep_in_vram,
+ depth_algorithm=anim_args.depth_algorithm,
+ Width=args.W, Height=args.H,
+ midas_weight=anim_args.midas_weight) \
+ if anim_mode.is_predicting_depths else None
diff --git a/scripts/deforum_helpers/rendering/util/filename_utils.py b/scripts/deforum_helpers/rendering/util/filename_utils.py
new file mode 100644
index 000000000..5a5fc7847
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/util/filename_utils.py
@@ -0,0 +1,49 @@
+from enum import Enum
+from pathlib import Path
+
+from ..data import Indexes
+from ...video_audio_utilities import get_frame_name
+
+
+class FileFormat(Enum):
+ JPG = "jpg"
+ PNG = "png"
+
+ @staticmethod
+ def frame_format():
+ return FileFormat.PNG
+
+ @staticmethod
+ def video_frame_format():
+ return FileFormat.JPG
+
+
+def _frame_filename_index(i: int, file_format: FileFormat) -> str:
+ return f"{i:09}.{file_format.value}"
+
+
+def frame_filename(data, i: int, is_depth=False, file_format=FileFormat.frame_format()) -> str:
+ infix = "_depth_" if is_depth else "_"
+ return f"{data.args.root.timestring}{infix}{_frame_filename_index(i, file_format)}"
+
+
+def frame(data, indexes: Indexes) -> str:
+ return frame_filename(data, indexes.frame.i)
+
+
+def depth_frame(data, indexes: Indexes) -> str:
+ return frame_filename(data, indexes.frame.i, True)
+
+
+def tween_frame_name(data, indexes: Indexes) -> str:
+ return frame_filename(data, indexes.tween.i)
+
+
+def tween_depth_frame(data, indexes: Indexes) -> str:
+ return frame_filename(data, indexes.tween.i, True)
+
+
+def preview_video_image_path(data, indexes: Indexes) -> Path:
+ frame_name = get_frame_name(data.args.anim_args.video_init_path)
+ index = _frame_filename_index(indexes.frame.i, FileFormat.video_frame_format())
+ return Path(data.output_directory) / "inputframes" / (frame_name + index)
diff --git a/scripts/deforum_helpers/rendering/util/fun_utils.py b/scripts/deforum_helpers/rendering/util/fun_utils.py
new file mode 100644
index 000000000..48899341d
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/util/fun_utils.py
@@ -0,0 +1,21 @@
+import collections.abc
+from functools import reduce
+from itertools import chain
+from typing import Callable, TypeVar
+
+
+def flat_map(func, iterable):
+ """Applies a function to each element in an iterable and flattens the results."""
+ mapped_iterable = map(func, iterable)
+ if any(isinstance(item, collections.abc.Iterable) for item in mapped_iterable):
+ return chain.from_iterable(mapped_iterable)
+ else:
+ return mapped_iterable
+
+
+T = TypeVar('T')
+
+
+def tube(*funcs: Callable[[T], T], is_do_process=lambda: True) -> Callable[[T], T]:
+ """Tubes a value through a sequence of functions with a predicate `is_do_process` for skipping."""
+ return lambda value: reduce(lambda x, f: f(x) if is_do_process() else x, funcs, value)
diff --git a/scripts/deforum_helpers/rendering/util/image_utils.py b/scripts/deforum_helpers/rendering/util/image_utils.py
new file mode 100644
index 000000000..2dc9183b0
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/util/image_utils.py
@@ -0,0 +1,45 @@
+import os
+
+import PIL
+import cv2
+import numpy as np
+from PIL import Image
+from cv2.typing import MatLike
+
+from . import filename_utils
+from ..data.render_data import RenderData
+
+
+def bgr_to_rgb(bgr_img):
+ return cv2.cvtColor(bgr_img, cv2.COLOR_BGR2RGB)
+
+
+def numpy_to_pil(np_image: MatLike) -> Image.Image:
+ return Image.fromarray(bgr_to_rgb(np_image))
+
+
+def pil_to_numpy(pil_image: Image.Image) -> MatLike:
+ return np.array(pil_image)
+
+
+def save_cadence_frame(data: RenderData, i: int, image: MatLike, is_overwrite: bool = True):
+ filename = filename_utils.frame_filename(data, i)
+ save_path: str = os.path.join(data.args.args.outdir, filename)
+ if is_overwrite or not os.path.exists(save_path):
+ cv2.imwrite(save_path, image)
+
+
+def save_cadence_frame_and_depth_map_if_active(data: RenderData, frame, i, image):
+ save_cadence_frame(data, i, image)
+ if data.args.anim_args.save_depth_maps:
+ dm_save_path = os.path.join(data.output_directory, filename_utils.frame_filename(data, i, True))
+ data.depth_model.save(dm_save_path, frame.depth)
+
+
+def save_and_return_frame(data: RenderData, frame, i, image):
+ save_cadence_frame_and_depth_map_if_active(data, frame, i, image)
+ return image
+
+
+def is_PIL(image):
+ return type(image) is PIL.Image.Image
diff --git a/scripts/deforum_helpers/rendering/util/log_utils.py b/scripts/deforum_helpers/rendering/util/log_utils.py
new file mode 100644
index 000000000..a3c488919
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/util/log_utils.py
@@ -0,0 +1,105 @@
+from . import opt_utils
+
+ESC = "\033[" # ANSI escape character, same as "\x1b["
+TERM = "m" # ANSI terminator
+
+EIGHT_BIT = "38;5;"
+TEXT = "38;2;"
+BACKGROUND = "48;2;"
+
+COLOUR_RGB = f"{ESC}{TEXT}%d;%d;%d{TERM}"
+BG_COLOUR_RGB = f"{ESC}{BACKGROUND}%d;%d;%d{TERM}"
+RESET_COLOR = f"{ESC}0{TERM}"
+
+RED = f"{ESC}31{TERM}"
+ORANGE = f"{ESC}{EIGHT_BIT}208{TERM}"
+YELLOW = f"{ESC}33{TERM}"
+GREEN = f"{ESC}32{TERM}"
+CYAN = f"{ESC}36{TERM}"
+BLUE = f"{ESC}34{TERM}"
+INDIGO = f"{ESC}{EIGHT_BIT}66{TERM}"
+VIOLET = f"{ESC}{EIGHT_BIT}130{TERM}"
+BLACK = f"{ESC}30{TERM}"
+WHITE = f"{ESC}37{TERM}"
+
+BOLD = f"{ESC}1{TERM}"
+UNDERLINE = f"{ESC}4{TERM}"
+
+
+def clear_previous_line():
+ print(f"{ESC}F{ESC}K", end="") # "F" is cursor up, "K" is clear line.
+
+
+def print_tween_frame_from_to_info(key_step, is_disabled=True):
+ if not is_disabled: # replaced with prog bar, but value info print may be useful
+ tween_values = key_step.tween_values
+ start_i = key_step.tweens[0].i()
+ end_i = key_step.tweens[-1].i()
+ if end_i > 0:
+ formatted_values = [f"{val:.2f}" for val in tween_values]
+ count = end_i - start_i + 1
+ print(f"{ORANGE}Creating in-between: {RESET_COLOR}{count} frames ({start_i}-->{end_i}){formatted_values}")
+
+
+def print_animation_frame_info(i, max_frames):
+ print("")
+ print(f"{CYAN}Animation frame: {RESET_COLOR}{i}/{max_frames}")
+
+
+def print_tween_frame_info(data, indexes, cadence_flow, tween, is_disabled=True):
+ if not is_disabled: # disabled because it's spamming the cli on high cadence settings.
+ msg_flow_name = '' if cadence_flow is None else data.args.anim_args.optical_flow_cadence + ' optical flow '
+ msg_frame_info = f"cadence frame: {indexes.tween.i}; tween: {tween:0.2f};"
+ print(f"Creating in-between {msg_flow_name}{msg_frame_info}")
+
+
+def print_init_frame_info(init_frame):
+ print(f"Using video init frame {init_frame}")
+
+
+def print_optical_flow_info(data, optical_flow_redo_generation):
+ msg_start = "Optical flow redo is diffusing and warping using"
+ msg_end = "optical flow before generation."
+ print(f"{msg_start} {optical_flow_redo_generation} and seed {data.args.args.seed} {msg_end}")
+
+
+def print_redo_generation_info(data, n):
+ print(f"Redo generation {n + 1} of {int(data.args.anim_args.diffusion_redo)} before final generation")
+
+
+def print_tween_step_creation_info(key_steps, index_dist):
+ tween_count = sum(len(ks.tweens) for ks in key_steps)
+ msg_start = f"Created {len(key_steps)} key frames with {tween_count} tweens."
+ msg_end = f"Key frame index distribution: '{index_dist.name}'."
+ info(f"{msg_start} {msg_end}")
+
+
+def print_key_step_debug_info_if_verbose(key_steps):
+ for i, ks in enumerate(key_steps):
+ tween_indices = [t.i() for t in ks.tweens]
+ debug(f"Key frame {ks.i} has {len(tween_indices)} tweens: {tween_indices}")
+
+
+def print_warning_generate_returned_no_image():
+ print(f"{YELLOW}Warning: {RESET_COLOR}Generate returned no image. Skipping to next iteration.")
+
+
+def print_cuda_memory_state(cuda):
+ for i in range(cuda.device_count()):
+ print(f"CUDA memory allocated on device {i}: {cuda.memory_allocated(i)} of {cuda.max_memory_allocated(i)}")
+ print(f"CUDA memory reserved on device {i}: {cuda.memory_reserved(i)} of {cuda.max_memory_reserved(i)}")
+
+
+def info(s: str):
+ print(f"Info: {s}")
+
+
+def warn(s: str):
+ eye_catcher = "###"
+ print(f"{ORANGE}{BOLD}{eye_catcher} Warning: {RESET_COLOR}{s}")
+
+
+def debug(s: str):
+ if opt_utils.is_verbose():
+ eye_catcher = "###"
+ print(f"{YELLOW}{BOLD}{eye_catcher} Debug: {RESET_COLOR}{s}")
diff --git a/scripts/deforum_helpers/rendering/util/memory_utils.py b/scripts/deforum_helpers/rendering/util/memory_utils.py
new file mode 100644
index 000000000..5daf09535
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/util/memory_utils.py
@@ -0,0 +1,47 @@
+# noinspection PyUnresolvedReferences
+from modules import lowvram, devices, sd_hijack
+# noinspection PyUnresolvedReferences
+from modules.shared import cmd_opts, sd_model
+
+
+def is_low_or_med_vram():
+ return cmd_opts.lowvram or cmd_opts.medvram # cmd_opts are imported from elsewhere. keep readonly
+
+
+def handle_med_or_low_vram_before_step(data):
+ if data.is_3d_with_med_or_low_vram():
+ # Unload the main checkpoint and load the depth model
+ lowvram.send_everything_to_cpu()
+ sd_hijack.model_hijack.undo_hijack(sd_model)
+ devices.torch_gc()
+ if data.animation_mode.is_predicting_depths:
+ data.depth_model.to(data.args.root.device)
+
+
+def handle_vram_if_depth_is_predicted(data):
+ if data.animation_mode.is_predicting_depths:
+ if data.is_3d_with_med_or_low_vram():
+ data.depth_model.to('cpu')
+ devices.torch_gc()
+ lowvram.setup_for_low_vram(sd_model, cmd_opts.medvram)
+ sd_hijack.model_hijack.hijack(sd_model)
+
+
+def handle_vram_before_depth_map_generation(data):
+ if is_low_or_med_vram():
+ lowvram.send_everything_to_cpu()
+ sd_hijack.model_hijack.undo_hijack(sd_model)
+ devices.torch_gc()
+ data.depth_model.to(data.args.root.device)
+
+
+def handle_vram_after_depth_map_generation(data):
+ if is_low_or_med_vram():
+ data.depth_model.to('cpu')
+ devices.torch_gc()
+ lowvram.setup_for_low_vram(sd_model, cmd_opts.medvram)
+ sd_hijack.model_hijack.hijack(sd_model)
+
+
+def select_depth_device(root):
+ return 'cpu' if is_low_or_med_vram() else root.device
diff --git a/scripts/deforum_helpers/rendering/util/opt_utils.py b/scripts/deforum_helpers/rendering/util/opt_utils.py
new file mode 100644
index 000000000..995a0ba1f
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/util/opt_utils.py
@@ -0,0 +1,39 @@
+from .utils import put_if_present
+
+# noinspection PyUnresolvedReferences
+from modules.shared import opts
+
+
+def is_subtitle_generation_active():
+ return opts.data.get("deforum_save_gen_info_as_srt", False)
+
+
+def is_verbose():
+ """Checks if extra console output is enabled in deforum settings."""
+ return opts.data.get("deforum_debug_mode_enabled", False)
+
+
+def has_img2img_fix_steps():
+ return 'img2img_fix_steps' in opts.data and opts.data["img2img_fix_steps"]
+
+
+def keep_3d_models_in_vram():
+ return opts.data.get("deforum_keep_3d_models_in_vram", False)
+
+
+def setup(schedule):
+ if has_img2img_fix_steps():
+ # disable "with img2img do exactly x steps" from general setting, as it *ruins* deforum animations
+ opts.data["img2img_fix_steps"] = False
+ put_if_present(opts.data, "CLIP_stop_at_last_layers", schedule.clipskip)
+ put_if_present(opts.data, "initial_noise_multiplier", schedule.noise_multiplier)
+ put_if_present(opts.data, "eta_ddim", schedule.eta_ddim)
+ put_if_present(opts.data, "eta_ancestral", schedule.eta_ancestral)
+
+
+def generation_info_for_subtitles():
+ return opts.data.get("deforum_save_gen_info_as_srt_params", ['Seed'])
+
+
+def is_generate_subtitles():
+ return opts.data.get("deforum_save_gen_info_as_srt")
diff --git a/scripts/deforum_helpers/rendering/util/utils.py b/scripts/deforum_helpers/rendering/util/utils.py
new file mode 100644
index 000000000..80fa1be59
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/util/utils.py
@@ -0,0 +1,28 @@
+import random
+
+from PIL import Image
+
+
+def put_all(dictionaries, key, value):
+ return list(map(lambda d: {**d, key: value}, dictionaries))
+
+
+def put_if_present(dictionary, key, value):
+ if value is not None:
+ dictionary[key] = value
+
+
+def _call_or_use(callable_or_value):
+ return callable_or_value() if callable(callable_or_value) else callable_or_value
+
+
+def call_or_use_on_cond(condition, callable_or_value):
+ return _call_or_use(callable_or_value) if condition else None
+
+
+def create_img(dimensions):
+ return Image.new('1', dimensions, 1)
+
+
+def generate_random_seed():
+ return random.randint(0, 2 ** 32 - 1)
diff --git a/scripts/deforum_helpers/rendering/util/web_ui_utils.py b/scripts/deforum_helpers/rendering/util/web_ui_utils.py
new file mode 100644
index 000000000..882b0d612
--- /dev/null
+++ b/scripts/deforum_helpers/rendering/util/web_ui_utils.py
@@ -0,0 +1,33 @@
+# noinspection PyUnresolvedReferences
+from deforum_api import JobStatusTracker
+# noinspection PyUnresolvedReferences
+from modules.shared import state
+
+WEB_UI_SLEEP_DELAY = 0.1
+
+
+def init_job(data):
+ state.job_count = data.args.anim_args.max_frames
+
+
+def update_job(data):
+ frame = data.indexes.frame.i + 1
+ max_frames = data.args.anim_args.max_frames
+ state.job = f"frame {frame}/{max_frames}"
+ state.job_no = frame + 1
+ if state.skipped:
+ print("\n** PAUSED **")
+ state.skipped = False
+ while not state.skipped:
+ time.sleep(WEB_UI_SLEEP_DELAY)
+ print("** RESUMING **")
+
+
+def update_status_tracker(data):
+ progress = data.indexes.frame.i / data.args.anim_args.max_frames
+ JobStatusTracker().update_phase(data.args.root.job_id, phase="GENERATING", progress=progress)
+
+
+def update_progress_during_cadence(data, indexes):
+ state.job = f"frame {indexes.tween.i + 1}/{data.args.anim_args.max_frames}"
+ state.job_no = indexes.tween.i + 1
diff --git a/scripts/deforum_helpers/ui_elements.py b/scripts/deforum_helpers/ui_elements.py
index f9467e7c5..11aa0705e 100644
--- a/scripts/deforum_helpers/ui_elements.py
+++ b/scripts/deforum_helpers/ui_elements.py
@@ -386,6 +386,10 @@ def get_tab_init(d, da, dp):
parseq_non_schedule_overrides = create_gr_elem(dp.parseq_non_schedule_overrides)
with FormRow():
parseq_use_deltas = create_gr_elem(dp.parseq_use_deltas)
+ gr.HTML(value=f"""
""")
+ with FormRow():
+ parseq_key_frame_redistribution = create_gr_elem(dp.parseq_key_frame_redistribution)
+ create_keyframe_redistribution_info()
return {k: v for k, v in {**locals(), **vars()}.items()}
def get_tab_hybrid(da):
@@ -579,4 +583,32 @@ def get_tab_output(da, dv):
ffmpeg_stitch_imgs_but = gr.Button(value="*Stitch frames to video*")
ffmpeg_stitch_imgs_but.click(fn=direct_stitch_vid_from_frames, inputs=[image_path, fps, add_soundtrack, soundtrack_path])
- return {k: v for k, v in {**locals(), **vars()}.items()}
\ No newline at end of file
+ return {k: v for k, v in {**locals(), **vars()}.items()}
+
+
+def create_keyframe_redistribution_info():
+ bars_mark = "📊"
+ warn_mark = "⚠️"
+ gr.HTML(value=f"""
+ Parseq keyframe redistribution ensures that every frame in the Parseq table is diffused. + It may easily be used at high FPS with just a fixed value for 'strength' in Parseq \ + (e.g. '0.33' for all frames with no logic to detect dips). + Since keyframe redistribution allows for Parseq synchronization at high or no cadence, \ + the generation can be performed much faster compared to a traditional low cadence setup. + Resulting videos tend to be less jittery at high or no cadence, \ + but may introduce 'depth smear' when combined with fast movement. + Optical Flow related settings may not behave as expected and are recommended to be turned off \ + when keyframe redistribution is used (see tab "Keyframes", sub-tab "Coherence"). +