MerlinProcessor User Guide

Overview

MerlinProcessor is a lightweight RPC-style bridge between your PyTorch models and Quandela Cloud via Perceval’s RemoteProcessor. It lets you:

  • Offload quantum leaves (e.g. QuantumLayer) to the cloud while keeping classical layers local.

  • Submit batched inputs; when batches are large, Merlin will chunk them and (optionally) run chunks in parallel.

  • Drive execution synchronously (forward) or asynchronously (forward_async returning a torch.futures.Future).

  • Monitor status, collect job IDs, cancel jobs, and enforce timeouts.

  • Estimate required shot counts per input ahead of time.

Merlin deliberately avoids hidden “auto-shots”: you control sampling. The optional estimator is provided to help you choose appropriate values.

Prerequisites

  • perceval-quandela configured with a valid cloud token (via pcvl.RemoteConfig cache or environment).

  • A Perceval RemoteProcessor instance (e.g. a simulator like "sim:slos" or a QPU-backed platform).

  • A Merlin quantum layer that provides export_config() (e.g. merlin.algorithms.QuantumLayer).

Quick Start

import perceval as pcvl
import torch
import torch.nn as nn

from merlin.algorithms import QuantumLayer
from merlin.builder.circuit_builder import CircuitBuilder
from merlin.core.merlin_processor import MerlinProcessor
from merlin.measurement.strategies import MeasurementStrategy

# 1) Create the Perceval RemoteProcessor (token must already be configured)
rp = pcvl.RemoteProcessor("sim:slos")

# 2) Wrap it with MerlinProcessor
proc = MerlinProcessor(
    rp,
    microbatch_size=32,        # batch chunk size per cloud call (<=32)
    timeout=3600.0,           # default wall-time per forward (seconds)
    max_shots_per_call=None,  # optional cap per cloud call (see below)
    chunk_concurrency=1       # parallel chunk jobs within a quantum leaf
)

# 3) Build a QuantumLayer and a small model
b = CircuitBuilder(n_modes=6)
b.add_rotations(trainable=True, name="theta")
b.add_angle_encoding(modes=[0, 1], name="px")
b.add_entangling_layer()

q = QuantumLayer(
    input_size=2,
    builder=b,
    n_photons=2,
    no_bunching=True,
    measurement_strategy=MeasurementStrategy.PROBABILITIES,  # raw probability vector
).eval()

model = nn.Sequential(
    nn.Linear(3, 2, bias=False),
    q,
    nn.Linear(15, 4, bias=False),   # 15 = C(6,2) from the chosen circuit
    nn.Softmax(dim=-1)
).eval()

# 4) Run remotely with sampling (nsample) or exact probs if available
X = torch.rand(8, 3)
y = proc.forward(model, X, nsample=5000)   # synchronous
print(y.shape)

Instantiation & Options

MerlinProcessor(remote_processor, *, max_batch_size=32, timeout=3600.0, max_shots_per_call=None, chunk_concurrency=1)

  • remote_processor (pcvl.RemoteProcessor): your authenticated platform. Merlin clones it internally per quantum leaf so multiple jobs can run safely in parallel without altering your original instance.

  • max_batch_size (int): maximum number of input rows per cloud job. If your input batch B is larger, the batch is split into chunks of size <= microbatch_size. Hard-capped by Merlin at 32.

  • timeout (float): default wall-clock limit (in seconds) for each forward/forward_async call. Use per-call override (see below). This must be a real number (not None).

  • max_shots_per_call (int | None): cap for each cloud call. If None, Merlin passes a safe default internally for Perceval. If you want a stricter cap, set this explicitly. (This is not an auto-shot chooser.)

  • chunk_concurrency (int): maximum number of chunks submitted in parallel per quantum leaf. Default 1 (serial). Increase for higher throughput when the backend allows it.

Execution API

Synchronous

y = proc.forward(layer_or_model, X, nsample=20000, timeout=15.0)
  • nsample (int | None): * If the backend exposes "probs" in remote_processor.available_commands,

    Merlin uses exact probabilities and ignores nsample.

    • Otherwise, Merlin uses sampling; nsample controls the shots per input. (Subject to your platform limits and max_shots_per_call.)

  • timeout (float | None): overrides the constructor default for this call. * None –> no time limit for this call. * 0 or falsy is treated as “no limit”. * Otherwise –> seconds until a global timeout cancels all in-flight jobs

    launched for this call and raises TimeoutError.

Asynchronous

fut = proc.forward_async(layer_or_model, X, nsample=3000, timeout=None)
# Helpers injected on the Future:
fut.job_ids         # list[str]: job ids across all chunks/leaves
fut.status()        # dict: {state, progress, message, chunks_*}
fut.cancel_remote() # request cancellation; .wait() -> CancelledError
y = fut.wait()
  • Cancellation: * fut.cancel_remote() signals the worker to cancel and issues remote job

    cancellation (best effort). fut.wait() then raises concurrent.futures.CancelledError.

    • proc.cancel_all() cancels all active jobs across all futures.

  • Context manager: Exiting a with MerlinProcessor(...) as proc: block triggers cancel_all(), ensuring stray jobs are stopped.

Batching & Chunking

  • If len(X) > microbatch_size, Merlin splits into chunks of size <= microbatch_size and submits up to chunk_concurrency chunk-jobs in parallel for that quantum leaf.

  • The Future aggregates all job IDs across leaves in future.job_ids. It also exposes chunk counters via future.status():

    {"state": "...", "progress": ..., "message": "...",
     "chunks_total": N, "chunks_done": k, "active_chunks": c}
    

Device & dtype round-trip

Inputs are moved to CPU for remote execution when needed, and the final tensor is returned on the original device and dtype of your input (e.g., preserve CUDA when possible for downstream ops).

Offload Policy & Local Overrides

  • By default, modules that provide export_config() are treated as quantum leaves and offloaded.

  • Set layer.force_local = True to force local execution (useful for debugging and A/B comparisons).

  • Many Merlin tests also use a context helper with layer.as_simulation(): to temporarily force local-mode (if your layer provides it).

Estimating Required Shots (No Auto-Execute)

Merlin includes a helper that proxies Perceval’s built-in estimator and does not submit jobs:

estimates = proc.estimate_required_shots_per_input(
    layer=q,
    input=X,                          # shape [B, D] or [D]
    desired_samples_per_input=2_000
)
# -> list[int] length B (or 1 for a single vector).
#    0 means "not viable" under current platform/perfs/filters).

Behavior:

  • For each input row, Merlin maps your feature vector to the circuit parameter values (same mapping used during remote execution), then calls remote_processor.estimate_required_shots(...).

  • It mirrors the layer’s exported circuit and input state (including detected-photon filters) so the estimate aligns with actual execution.

  • This is a planner only; it doesn’t modify processor/job history.

Timeouts & Errors

  • Timeout: if a per-call or default timeout elapses, Merlin issues remote cancellation and raises TimeoutError.

  • Cancellation: * fut.cancel_remote() or proc.cancel_all() –> pending chunk workers

    raise CancelledError; completed chunks are discarded for the call.

  • Remote failures: * If the backend marks a job as failed, Merlin raises a RuntimeError with

    the platform message. If the message indicates an explicit remote cancel, Merlin maps it to CancelledError.

Multiple Quantum Layers

Sequential models with multiple quantum leaves are supported:

  • Each quantum leaf is processed in order; each may chunk and run those chunks with its own intra-leaf concurrency (chunk_concurrency).

  • future.job_ids will include all job IDs across both leaves.

Controlling Shots Explicitly

  • Sampling backends respect the per-call nsample you pass to forward/forward_async; Merlin does not auto-derive or override it.

  • Use estimate_required_shots_per_input ahead of time to pick good values.

  • max_shots_per_call lets you enforce a hard cap for each cloud job.

Workflow Recipes (End-to-End Examples)

The following examples mirror tested workflows (see tests/core/cloud/test_userguide_examples.py).

Mixed classical –> quantum –> classical

# Build the quantum layer and probe its output size
q = QuantumLayer(...).eval()  # see Quick Start builder pattern
dist = q(torch.rand(2, q.input_size)).shape[1]

model = nn.Sequential(
    nn.Linear(3, q.input_size, bias=False),
    q,                          # offloaded by Merlin
    nn.Linear(dist, 4, bias=False),
    nn.Softmax(dim=-1),
).eval()

proc = MerlinProcessor(pcvl.RemoteProcessor("sim:slos"))
# Prefer exact probabilities if supported; else sample.
use_probs = "probs" in getattr(proc, "available_commands", [])
nsamp = None if use_probs else 20_000

X = torch.rand(6, 3)
fut = proc.forward_async(model, X, nsample=nsamp)
Y = fut.wait()
print("shape:", Y.shape, "job_ids:", len(fut.job_ids))  # expect >= 1

Gradient-free fine-tuning with COBYLA (no autograd on quantum layer)

# Optional dependency: SciPy
from scipy.optimize import minimize

# Small model: Linear -> Quantum -> Linear(scalar)
q = QuantumLayer(...).eval()
dist = q(torch.rand(2, q.input_size)).shape[1]
readout = nn.Linear(dist, 1, bias=False).eval()
pre = nn.Linear(3, q.input_size, bias=False).eval()
model = nn.Sequential(pre, q, readout).eval()

# Flatten quantum params we will tune (keep classical layers fixed)
q_params = [(n, p) for n, p in q.named_parameters() if p.requires_grad]
shapes = [p.shape for _, p in q_params]
sizes = [p.numel() for _, p in q_params]

def get_flat():
    import torch
    return torch.cat([p.detach().flatten().cpu() for _, p in q_params], dim=0)

def set_from_flat(vec):
    import torch
    off = 0
    with torch.no_grad():
        for (_, p), sz, shp in zip(q_params, sizes, shapes, strict=False):
            chunk = vec[off:off+sz].view(shp).to(p.dtype)
            p.data.copy_(chunk.to(p.device))
            off += sz

x0 = get_flat().double().numpy()
proc = MerlinProcessor(pcvl.RemoteProcessor("sim:slos"))
nsamp = None if "probs" in getattr(proc, "available_commands", []) else 20_000
X = torch.rand(8, 3)

# Objective: maximize mean scalar output -> minimize negative
def objective(v_np):
    v = torch.from_numpy(v_np).to(torch.float64)
    set_from_flat(v.to(torch.float32))
    with torch.no_grad():
        y = proc.forward(model, X, nsample=nsamp)
        return -float(y.mean().item())

res = minimize(objective, x0, method="COBYLA",
               options={"maxiter": 12, "rhobeg": 0.5})
print("final objective:", res.fun)

Local vs remote A/B (force simulation)

q = QuantumLayer(...).eval()
X = torch.rand(4, q.input_size)
proc = MerlinProcessor(pcvl.RemoteProcessor("sim:slos"))

# Remote path (offloaded)
y_remote = proc.forward(q, X, nsample=5000)

# Local path (force simulation)
q.force_local = True
y_local = proc.forward(q, X, nsample=5000)

# Compare distributions (allowing some sampling noise)
print((y_local - y_remote).abs().mean())

Monitoring status & safe cancellation

fut = proc.forward_async(q, torch.rand(16, q.input_size), nsample=40000, timeout=None)
# Poll status (state/progress/message + chunk counters)
print(fut.status())
# If needed, cancel cooperatively
fut.cancel_remote()
try:
    _ = fut.wait()
except Exception as e:
    print("Cancelled:", type(e).__name__)

High-throughput batching with chunking

proc = MerlinProcessor(
    pcvl.RemoteProcessor("sim:slos"),
    microbatch_size=8,          # split big batches into <=8 rows per job
    chunk_concurrency=2        # up to 2 chunk-jobs in flight per quantum leaf
)
X = torch.rand(64, q.input_size)  # big batch
fut = proc.forward_async(q, X, nsample=3000)
Y = fut.wait()
print("chunks_total/done/active:", fut.status())

Troubleshooting

  • No job IDs appear: * Your backend may be very fast, or your layer ran locally (e.g.,

    force_local=True).

  • Perceval requires ``max_shots_per_call``: * Merlin passes a safe default when you leave it None. If your org policy

    requires explicit bounds, set it at construction.

  • Timeouts in CI: * Backends vary. Make tests resilient to fast or slow responses by polling

    future.done() before asserting on timeout exceptions.

API Reference (Summary)

  • forward(module, input, *, nsample=None, timeout=None) -> torch.Tensor

  • forward_async(module, input, *, nsample=None, timeout=None) -> Future * Future helpers:

    • future.job_ids: list[str]

    • future.status() -> dict

    • future.cancel_remote() -> None

  • cancel_all() -> None

  • estimate_required_shots_per_input(layer, input, desired_samples_per_input) -> list[int]

  • get_job_history() -> list[RemoteJob]

  • clear_job_history() -> None

Version Notes

  • Default chunk_concurrency is 1 (serial intra-leaf). Opt in to parallelism by setting it > 1.

  • Constructor timeout must be a float. Use per-call timeout=None for an unlimited call.

  • Estimation helper added to keep shot selection user-driven without auto-submitting jobs.