Nonlinear Polyrhythm: A Multiverse Fork and Merge Logic
############################################################
## 0. LEXICON AND INTENT
############################################################
# Goal:
# Simulate multiple musical universes that fork over time, each running
# independent polyrhythmic timelines with nonlinear tempo fields, then
# reconcile them by a merge that respects causality, rhythmic consonance,
# and information integrity.
# Convention:
# Time is tracked in both physical ticks and beat space.
# Each universe carries a causal DAG of events and vector clocks.
# Merge produces a canonical score tape with embedded lineage.
############################################################
## 1. CORE TYPES
############################################################
TYPE UID = 128-bit identifier
TYPE UniverseID = UID
TYPE TrackID = UID
TYPE EventID = UID
TYPE PathID = UID # lineage of forks
TYPE Rational = {num: int, den: int} # reduced positive rational
TYPE VecClock = map<TrackID, int> # vector clock
TYPE Phase = real in [0, 1) # phase modulo 1
TYPE BeatIndex = real # fractional beats
TYPE Tick = integer # discrete scheduler tick
TYPE Timestamp = real # seconds
STRUCT Clock:
lamport: int
vector: VecClock
beat: BeatIndex
tick: Tick
STRUCT Event:
id: EventID
track: TrackID
world: UniverseID
path: PathID
t_physical: Timestamp
t_beat: BeatIndex
payload: any # note-on, marker, control, meta
clock: Clock
ancestry: set<EventID> # immediate parents in DAG
STRUCT Edge:
src: EventID
dst: EventID
STRUCT DAG:
nodes: map<EventID, Event>
edges: set<Edge>
STRUCT TrackState:
id: TrackID
tempo_ratio: Rational # beats of this track per base beat
phase: Phase # local phase in [0,1)
clock: Clock
queue: priority_queue<Event> # time-ordered by t_beat then lamport
accumulator: real # sub-beat integration
jitter_seed: int
STRUCT NonlinearField:
# Nonlinear tempo modulation field θ(t, beat, track)
# theta returns a scalar multiplier applied to tempo_ratio
fn_theta: function(Timestamp, BeatIndex, TrackID) -> real
# optional chaotic map for microtiming drift
fn_microslip: function(real) -> real
STRUCT Universe:
id: UniverseID
parent: UniverseID | null
path: PathID
tracks: map<TrackID, TrackState>
dag: DAG
field: NonlinearField
policy: MergePolicy
now: Timestamp
base_bpm: real # reference tempo in beats per minute
tick_hz: real # scheduler frequency
halted: bool
STRUCT MergePolicy:
window_beats: real # size of rhythmic reconciliation window
prefer_older_lamport: bool
tie_breaker: function(Event, Event) -> Event
crdt: function(Event, Event) -> Event # associative, commutative, idempotent
consonance_metric: function(Event, Event) -> real # lower is better
lineage_keep: bool # annotate merged events with full ancestry
############################################################
## 2. UTILITY FUNCTIONS
############################################################
FUNCTION gcd(a: int, b: int) -> int:
WHILE b != 0:
(a, b) := (b, a mod b)
RETURN abs(a)
FUNCTION lcm(a: int, b: int) -> int:
RETURN abs(a * b) / gcd(a, b)
FUNCTION reduce(r: Rational) -> Rational:
g := gcd(r.num, r.den)
RETURN {num: r.num / g, den: r.den / g}
FUNCTION lcm_of_tempi(R: list<Rational>) -> int:
# LCM of denominators for alignment grid
d := 1
FOR r IN R:
d := lcm(d, r.den)
RETURN d
FUNCTION advance_clock(c: Clock, tbeats: real) -> Clock:
c.beat := c.beat + tbeats
c.lamport := c.lamport + 1
RETURN c
FUNCTION next_phase(phase: Phase, incr: real) -> Phase:
x := phase + incr
RETURN x - floor(x)
FUNCTION rational_mul(r: Rational, scalar: real) -> real:
RETURN (r.num / r.den) * scalar
FUNCTION logistic(x: real, r: real) -> real:
# chaotic microtiming map in (0,1)
y := r * x * (1 - x)
RETURN clamp(y, 0.0, 1.0)
############################################################
## 3. FIELD DEFINITIONS
############################################################
FUNCTION default_theta(t: Timestamp, b: BeatIndex, tr: TrackID) -> real:
# Smooth nonlinear warp composed of a slow sine and a cubic easing
slow := 1.0 + 0.05 * sin(2 * PI * 0.07 * t + hash(tr) * 0.001)
curve := 1.0 + 0.03 * ((2 * frac(b * 0.125) - 1)^3)
RETURN slow * curve
FUNCTION default_microslip(x: real) -> real:
RETURN (logistic(x, 3.71) - 0.5) * 0.002 # small symmetric slip
############################################################
## 4. INITIALIZATION
############################################################
FUNCTION init_universe(base_bpm: real, tick_hz: real, tempos: map<TrackID, Rational>) -> Universe:
U := new Universe
U.id := new_uid()
U.parent := null
U.path := new_uid()
U.base_bpm := base_bpm
U.tick_hz := tick_hz
U.now := 0.0
U.halted := false
U.field.fn_theta := default_theta
U.field.fn_microslip := default_microslip
U.policy := default_merge_policy()
U.dag := DAG{nodes: {}, edges: {}}
FOR tr, rat IN tempos:
T := new TrackState
T.id := tr
T.tempo_ratio := reduce(rat)
T.phase := 0.0
T.clock := Clock{lamport: 0, vector: {}, beat: 0.0, tick: 0}
T.queue := empty_priority_queue()
T.accumulator := 0.0
T.jitter_seed := hash(tr) mod 65521
U.tracks[tr] := T
RETURN U
FUNCTION default_merge_policy() -> MergePolicy:
M := new MergePolicy
M.window_beats := 8.0
M.prefer_older_lamport := true
M.tie_breaker := function(a, b):
# favor event with tempo closer to small integer ratio against base
if consonant_rank(a) < consonant_rank(b): return a else return b
M.crdt := crdt_last_writer_wins_with_lamport
M.consonance_metric := interval_tension
M.lineage_keep := true
RETURN M
FUNCTION consonant_rank(e: Event) -> int:
# rank by proximity to ratios 1:1, 2:3, 3:4, 4:5, etc.
target := [1.0, 2/3, 3/4, 4/5, 5/8, 7/8]
dmin := +INF
FOR r IN target:
dmin := min(dmin, abs(frac(e.t_beat) - r))
RETURN floor(dmin * 1000)
FUNCTION interval_tension(a: Event, b: Event) -> real:
RETURN abs(frac(a.t_beat) - frac(b.t_beat))
FUNCTION crdt_last_writer_wins_with_lamport(a: Event, b: Event) -> Event:
if a.clock.lamport < b.clock.lamport: return b
if a.clock.lamport > b.clock.lamport: return a
# tie on lamport, pick lower event id to keep idempotence
if a.id < b.id: return a else return b
############################################################
## 5. SCHEDULER AND EMISSION
############################################################
FUNCTION tick(U: Universe):
if U.halted: return
dt := 1.0 / U.tick_hz
U.now := U.now + dt
FOR T IN U.tracks.values():
base_bps := U.base_bpm / 60.0
theta := U.field.fn_theta(U.now, T.clock.beat, T.id)
local_bps := base_bps * rational_mul(T.tempo_ratio, 1.0) * theta
microslip := U.field.fn_microslip((T.jitter_seed + U.now) mod 1.0)
advance := local_bps * dt + microslip
T.accumulator := T.accumulator + advance
WHILE T.accumulator >= 1.0:
T.accumulator := T.accumulator - 1.0
T.phase := next_phase(T.phase, 1.0)
e := emit_event(U, T)
push(T.queue, e)
FUNCTION emit_event(U: Universe, T: TrackState) -> Event:
E := new Event
E.id := new_uid()
E.track := T.id
E.world := U.id
E.path := U.path
E.t_physical := U.now
E.t_beat := T.clock.beat
E.payload := synth_payload(T, E)
T.clock := advance_clock(T.clock, 1.0)
E.clock := copy(T.clock)
E.ancestry := {} # filled during merges
U.dag.nodes[E.id] := E
RETURN E
FUNCTION synth_payload(T: TrackState, E: Event) -> any:
# Example: generate note or control with phase coded pitch
pitch := 48 + floor(12 * frac(E.t_beat * (T.tempo_ratio.num + 1)))
vel := 60 + floor(40 * abs(sin(2 * PI * frac(E.t_beat))))
return {type: "note", pitch: pitch, velocity: vel, dur_beats: 0.5}
############################################################
## 6. FORK LOGIC
############################################################
FUNCTION fork_universe(U: Universe, mutations: function(Universe) -> void) -> Universe:
V := deep_copy(U)
V.id := new_uid()
V.parent := U.id
V.path := new_uid()
# mutate tempo fields, policies, or track sets
mutations(V)
# advance clocks independently after fork
FOR T IN V.tracks.values():
T.clock.vector[T.id] := (T.clock.vector.get(T.id, 0)) + 1
RETURN V
# Example mutation: change one track to a new ratio and nonlinear map
FUNCTION mutate_to_polymeter(V: Universe, target_track: TrackID, new_ratio: Rational):
V.tracks[target_track].tempo_ratio := reduce(new_ratio)
V.field.fn_theta := function(t, b, tr):
base := default_theta(t, b, tr)
warp := 1.0 + 0.07 * sin(2 * PI * 0.031 * t + (tr == target_track ? 1.3 : 0.0))
return base * warp
############################################################
## 7. MERGE PREPARATION
############################################################
FUNCTION compute_alignment_grid(A: Universe, B: Universe) -> real:
# compute an alignment step in beats using LCM of denominators
ra := [T.tempo_ratio for T IN A.tracks.values()]
rb := [T.tempo_ratio for T IN B.tracks.values()]
d := lcm_of_tempi(ra + rb)
return 1.0 / d
FUNCTION window_events(U: Universe, center: BeatIndex, span: real) -> list<Event>:
out := []
FOR e IN U.dag.nodes.values():
if abs(e.t_beat - center) <= span / 2:
out.append(e)
sort(out by (e.t_beat, e.clock.lamport))
RETURN out
############################################################
## 8. MERGE ENGINE
############################################################
FUNCTION merge_universes(A: Universe, B: Universe) -> Universe:
C := init_universe(
base_bpm = (A.base_bpm + B.base_bpm) / 2,
tick_hz = max(A.tick_hz, B.tick_hz),
tempos = union_tempos(A.tracks, B.tracks)
)
C.parent := null
C.path := new_uid()
C.policy := select_policy(A.policy, B.policy)
step := compute_alignment_grid(A, B)
# iterate over overlapping beat domain
beat_min := min(min_beat(A), min_beat(B))
beat_max := max(max_beat(A), max_beat(B))
center := beat_min
WHILE center <= beat_max:
Ea := window_events(A, center, C.policy.window_beats)
Eb := window_events(B, center, C.policy.window_beats)
bucket := cluster_by_quantum(Ea + Eb, step)
FOR q IN bucket.keys():
merged := merge_bucket(bucket[q], C.policy)
FOR e IN merged:
insert_merged_event(C, e)
center := center + C.policy.window_beats
C.halted := true
RETURN C
FUNCTION union_tempos(TA: map<TrackID, TrackState>, TB: map<TrackID, TrackState>) -> map<TrackID, Rational>:
out := {}
FOR t IN TA.keys(): out[t] := TA[t].tempo_ratio
FOR t IN TB.keys():
if t not in out: out[t] := TB[t].tempo_ratio
else:
# keep the ratio with smaller denominator to favor simpler grid
ra := out[t]; rb := TB[t].tempo_ratio
out[t] := (ra.den <= rb.den ? ra : rb)
RETURN out
FUNCTION select_policy(Pa: MergePolicy, Pb: MergePolicy) -> MergePolicy:
# conservative choice
M := Pa
M.window_beats := min(Pa.window_beats, Pb.window_beats)
M.prefer_older_lamport := Pa.prefer_older_lamport AND Pb.prefer_older_lamport
RETURN M
FUNCTION min_beat(U: Universe) -> BeatIndex:
if U.dag.nodes.is_empty(): return 0.0
return min([e.t_beat for e IN U.dag.nodes.values()])
FUNCTION max_beat(U: Universe) -> BeatIndex:
if U.dag.nodes.is_empty(): return 0.0
return max([e.t_beat for e IN U.dag.nodes.values()])
FUNCTION cluster_by_quantum(E: list<Event>, q: real) -> map<int, list<Event>>:
buckets := {}
FOR e IN E:
k := floor(e.t_beat / q)
buckets[k] := buckets.get(k, []) + [e]
RETURN buckets
FUNCTION merge_bucket(L: list<Event>, policy: MergePolicy) -> list<Event>:
# 1) partition by track
by_track := group_by(L, key = e.track)
out := []
# 2) within each track, fold by CRDT
FOR tr, group IN by_track:
folded := fold_crdt(group, policy.crdt)
out.append(folded)
# 3) cross-track alignment by consonance
out := align_cross_track(out, policy)
RETURN out
FUNCTION fold_crdt(group: list<Event>, crdt_fn) -> Event:
acc := group[0]
FOR i FROM 1 TO len(group)-1:
acc := crdt_fn(acc, group[i])
RETURN acc
FUNCTION align_cross_track(events: list<Event>, policy: MergePolicy) -> list<Event>:
# adjust fractional beats to closest consonant lattice if within a small epsilon
eps := 1e-3
lattice := [0, 1/8, 1/6, 1/5, 1/4, 1/3, 3/8, 1/2, 5/8, 2/3, 3/4, 4/5, 5/6, 7/8]
FOR e IN events:
fracb := frac(e.t_beat)
closest := argmin(lattice, lambda x: abs(x - fracb))
if abs(closest - fracb) < eps:
e.t_beat := floor(e.t_beat) + closest
RETURN events
FUNCTION insert_merged_event(C: Universe, e_in: Event):
# clone, annotate lineage, update DAG
E := deep_copy(e_in)
if C.policy.lineage_keep:
E.payload.lineage := {world: e_in.world, path: e_in.path}
E.ancestry := E.ancestry union parents_of(e_in)
E.world := C.id
E.path := C.path
E.clock := advance_clock(C.tracks[E.track].clock, 0.0)
C.dag.nodes[E.id] := E
# add edges from known parents if present
FOR p IN E.ancestry:
if p in C.dag.nodes:
C.dag.edges.add(Edge{src: p, dst: E.id})
FUNCTION parents_of(e: Event) -> set<EventID>:
return e.ancestry
############################################################
## 9. CONSISTENCY AND VALIDATION
############################################################
FUNCTION validate_dag(U: Universe) -> bool:
visited := {}
stack := {}
FUNCTION dfs(v: EventID) -> bool:
visited[v] := true
stack[v] := true
FOR edge IN U.dag.edges where edge.src == v:
w := edge.dst
if not visited.get(w, false):
if not dfs(w): return false
elif stack.get(w, false):
return false # cycle found
stack[v] := false
return true
FOR v IN U.dag.nodes.keys():
if not visited.get(v, false):
if not dfs(v): return false
return true
FUNCTION verify_monotonic_clocks(U: Universe) -> bool:
# ensure lamport does not decrease along edges
FOR e IN U.dag.nodes.values():
FOR ed IN U.dag.edges where ed.dst == e.id:
p := U.dag.nodes[ed.src]
if p.clock.lamport > e.clock.lamport:
return false
return true
############################################################
## 10. DRIVER EXAMPLE
############################################################
PROCEDURE main():
# Base universe with three tracks in a 5:4:7 polyrhythm against base
tempos := {
T1: Rational{5,4},
T2: Rational{4,3},
T3: Rational{7,6}
}
A := init_universe(base_bpm = 96, tick_hz = 480, tempos = tempos)
# Run base for N ticks
FOR i IN 1..(96 * 4 * 60): # four minutes at 96 BPM, oversampled
tick(A)
# Fork to B with polymeter mutation on T3
B := fork_universe(A, function(V):
mutate_to_polymeter(V, target_track = T3, new_ratio = Rational{11,8})
)
# Advance both for additional time under divergent fields
FOR i IN 1..(96 * 2 * 60):
tick(A)
tick(B)
# Merge into canonical C
C := merge_universes(A, B)
# Validate integrity
if not validate_dag(C): raise "Merge produced a cyclic DAG"
if not verify_monotonic_clocks(C): raise "Lamport violation in merged graph"
# Export C as a score tape or event log
export_score(C, filename = "multiverse_polyrhythm.score")
############################################################
## 11. EXPORT FORMAT (ABRIDGED)
############################################################
FUNCTION export_score(U: Universe, filename: string):
# Each line: t_beat, track, pitch, velocity, dur_beats, lineage
file := open(filename, "w")
E := list(U.dag.nodes.values())
sort(E by (e.t_beat, e.track, e.clock.lamport))
FOR e IN E:
if e.payload.type == "note":
line := join_csv([
round(e.t_beat, 6),
e.track,
e.payload.pitch,
e.payload.velocity,
e.payload.dur_beats,
encode_lineage(e.payload.lineage)
])
write(file, line)
close(file)
FUNCTION encode_lineage(L: any) -> string:
if L == null: return "{}"
return "{world=" + to_string(L.world) + ";path=" + to_string(L.path) + "}"
############################################################
## 12. NOTES ON PARAMETER CHOICES
############################################################
# window_beats controls how far the merge looks for consonant alignment.
# A larger window smooths across divergent phases but risks smearing accents.
# step from compute_alignment_grid sets a quantization lattice taken from
# the LCM of denominators. This is a musically meaningful grid for polyrhythms.
# fn_theta can be replaced with any smooth field. For sharp metric modulations,
# swap the sine for piecewise linear ramps. Keep microslip small to avoid
# event reordering that breaks vector clock expectations.
END.

No comments:
Post a Comment