Migrate studies from MySQL OrthanC to Postgres with Advanced Storage ona folder by year

Hi Sebastien / Alain,

I plan to move the server from MySQL to Postgres, since the Advanced Storage is a plugin I find wonderful.

I would like to migrate the studies by year, in order to have one folder per year:

  • /extra/orthancpg2026
  • /extra/orthancpg2025

I understand from previous posts that to migrate studies I can use Orthanc cloner tool, or ImportDicomFiles.py.

But it seems I cannot filter by date range.

Is there a script that can migrate studies by date range?

The other option is to migrate the whole instance, and then migrate (one by one) to the “new location”.

Also, I understood that Orthanc cloner would be the best choice to migrate from one instace to another. But in the case I already have the files on the disk (from a backup on NAS), would Advanced Storage Indexer be (really) faster than OrthanC cloner?

Many thanks in advance for your help and advice.

Best regards,

Cedric.

ChatGPT made me a script that seems to work pretty well…

copy26.py

import requests
from requests.auth import HTTPBasicAuth

SOURCE = "http://192.168.1.250:8042"
DEST = "http://192.168.1.250:4042"

source_auth = HTTPBasicAuth("orthanc","orthanc")
dest_auth = HTTPBasicAuth("orthanc","orthanc")

# Buscar estudios 2026
query = {
    "Level": "Study",
    "Query": {
        "StudyDate": "20260101-20261231"
    }
}

r = requests.post(
    f"{SOURCE}/tools/find",
    json=query,
    auth=source_auth
)

studies = r.json()

print(f"Encontrados {len(studies)} estudios")

for study_id in studies:
    print("Copiando:", study_id)

    # exportar estudio
    study = requests.get(
        f"{SOURCE}/studies/{study_id}/archive",
        auth=source_auth
    )

    # importar en destino
    upload = requests.post(
        f"{DEST}/instances",
        files={"file": ("study.zip", study.content)},
        auth=dest_auth
    )

    print(upload.status_code)
python3 copy2026.py
Encontrados 1157 estudios
Copiando: 35888a95-d262e384-1125243c-65f63792-c90f5899
200
Copiando: 109077cc-73622866-8a7ab75b-8c286605-9bb9d0ad
200
Copiando: 384a360a-0a430774-4a77b1ff-b6142ef3-32ef806e
200
Copiando: 3b06b559-52169cf8-ac2d679f-cefc92dd-bbf42683
200
Copiando: 55cf793d-e96f7884-9bdf8c3f-74801ba2-17276ea8
...

This is for a small client. With the big client, I will have to select smaller ranges.

Enhanced script. Check first if study is not yet on target server.

Actually it works for ANY type of migration, from OrthanC to another OrthanC instance.

import requests
from requests.auth import HTTPBasicAuth

SOURCE = "http://127.0.0.1:8042"
DEST = "http://127.0.0.1:4042"

source_auth = HTTPBasicAuth("orthanc","orthanc")
dest_auth = HTTPBasicAuth("orthanc","orthanc")

query = {
    "Level": "Study",
    "Expand": True,
    "Query": {
        "StudyDate": "20250101-20250630"
    }
}

r = requests.post(
    f"{SOURCE}/tools/find",
    json=query,
    auth=source_auth
)

print("Status find:", r.status_code, flush=True)
print("Respuesta find:", r.text[:500], flush=True)

if r.status_code != 200:
    raise Exception(f"Error buscando estudios: HTTP {r.status_code}")

studies = r.json()

total = len(studies)
copied = 0
skipped = 0
errors = 0

print(f"Encontrados {total} estudios", flush=True)

def exists_in_dest(study_instance_uid):
    q = {
        "Level": "Study",
        "Query": {
            "StudyInstanceUID": study_instance_uid
        }
    }

    resp = requests.post(
        f"{DEST}/tools/find",
        json=q,
        auth=dest_auth
    )

    if resp.status_code != 200:
        raise Exception(f"Error validando destino HTTP {resp.status_code}: {resp.text[:300]}")

    return len(resp.json()) > 0


for idx, study in enumerate(studies, start=1):

    study_id = study["ID"]
    tags = study.get("MainDicomTags", {})
    study_uid = tags.get("StudyInstanceUID", "")

    print(f"[{idx}/{total}] Estudio {study_id}", flush=True)

    if study_uid == "":
        errors += 1
        print("ERROR: StudyInstanceUID vacío o no encontrado", flush=True)
        continue

    try:
        if exists_in_dest(study_uid):
            skipped += 1
            print(f"YA EXISTE en destino: {study_uid} ({skipped} omitidos)", flush=True)
            continue

        print(f"Copiando estudio {study_uid}", flush=True)

        study_zip = requests.get(
            f"{SOURCE}/studies/{study_id}/archive",
            auth=source_auth
        )

        if study_zip.status_code != 200:
            errors += 1
            print(f"ERROR descargando ZIP HTTP {study_zip.status_code}", flush=True)
            continue

        upload = requests.post(
            f"{DEST}/instances",
            files={"file": ("study.zip", study_zip.content)},
            auth=dest_auth
        )

        if upload.status_code in [200, 202]:
            copied += 1
            print(f"OK ({copied} copiados)", flush=True)
        else:
            errors += 1
            print(f"ERROR subiendo HTTP {upload.status_code}: {upload.text[:300]}", flush=True)

    except Exception as e:
        errors += 1
        print(f"ERROR: {e}", flush=True)

print("", flush=True)
print("=================================", flush=True)
print(f"Total estudios : {total}", flush=True)
print(f"Copiados       : {copied}", flush=True)
print(f"Omitidos       : {skipped}", flush=True)
print(f"Errores        : {errors}", flush=True)
print("=================================", flush=True)
1 Like