Writing Python plugin to find visually similar DICOM images by indexing image phash of middle frame

Hi all!
I’m a cardiologist working in an insurance company and my job is to review interventional cardiology studies. I have set up an orthanc server on my work PC (running Windows) to easily look up the dicom studies. I recently noticed some hospitals changing dicom tags like patient name, Id and resubmitting old studies. I cooked up a python script with ChatGPT to find visually similar dicom files by generating phash of middle frame of an instance. I now want to index the phash value along with each instance so I can separate instances with similar phash. Please understand that I have no prior coding knowledge and I’m only able to stitch together scripts using chatGPT. I’m not sure if this will or wont work since I havent been able to figure out how to run it inside Orthanc.

Any help is appreciated.

Script: phasher.py

import orthanc
import os
import pydicom
import numpy as np
from skimage.transform import resize
from PIL import Image
import imagehash

def generate_image_hash(file_path):
dcm = pydicom.dcmread(file_path)

if hasattr(dcm, 'Rows'):
    if hasattr(dcm, 'NumberOfFrames') and dcm.NumberOfFrames > 5:
        midpoint = round(dcm.NumberOfFrames/2)
        pixel_data = dcm.pixel_array[midpoint]
    else:
        pixel_data = dcm.pixel_array[0]

    common_size = (512, 512)
    resized_pixel_data = resize(pixel_data, common_size)

    if len(resized_pixel_data.shape) == 3:
        resized_pixel_data = np.mean(resized_pixel_data, axis=-1)

    resized_pixel_data = (resized_pixel_data * 255).astype(np.uint8)
    hash_value = str(imagehash.phash(Image.fromarray(resized_pixel_data), hash_size=30))

    return hash_value
else:
    print(f"Warning: 'Rows' attribute not found in {file_path}. Skipping.")
    return None

def compute_phash(file_path):
try:
return generate_image_hash(file_path)
except pydicom.errors.InvalidDicomError as e:
print(f"Error processing DICOM file {file_path}: {e}")
return None

def index_existing_instances():
instances = orthanc.GetInstances()

for instance_id in instances:
    file_path = orthanc.GetInstanceFile(instance_id)

    phash_value = compute_phash(file_path)

    if phash_value is not None:
        tags = orthanc.GetInstanceTags(instance_id)
        tags['phash'] = phash_value
        orthanc.SetInstanceTags(instance_id, tags)
        
        print(f"Indexed instance {instance_id} with phash: {phash_value}")

if name == “main”:
index_existing_instances()

Update:
Above script wouldn’t have worked.

I’ve modified a sample script to generate phash of frame 0 of the instance

import orthanc
from PIL import Image
import io
import pydicom
import imagehash

def DecodeInstance(output, uri, **request):
if request[‘method’] == ‘GET’:
# Retrieve the instance ID from the regular expression (*)
instanceId = request[‘groups’][0]
# Get the content of the DICOM file
f = orthanc.GetDicomForInstance(instanceId)
# Parse it using pydicom
dicom = pydicom.dcmread(io.BytesIO(f))
pixel_data = dicom.pixel_array[0]
hash_value = str(imagehash.phash(Image.fromarray(pixel_data), hash_size=20))
# Return a string representation the dataset to the caller
output.AnswerBuffer(str(hash_value), ‘text/plain’)
else:
output.SendMethodNotAllowed(‘GET’)

orthanc.RegisterRestCallback('/pydicom/(.)', DecodeInstance) # ()

This callback can be called as follows:

$ curl http://localhost:8042/pydicom/19816330-cb02e1cf-df3a8fe8-bf510623-ccefe9f5

Gotta figure out a way to iterate through all instances and add phash tag.

Update: Adding Phash as a private DICOM tag posed challenges, so I’ve decided to store Phash Values for InstanceIDs along with PatientName, PatientID, and InstituteName in an SQLite database. The current plugin setup fulfills my requirements, and this may be the last update in this thread.

Phasher.py

import sys
import orthanc
from PIL import Image
import requests
import os
import io
import pydicom
import imagehash
import sqlite3

DB_PATH = “D:\CLIP\Phasher.db”

def get_instances():
“”"
Fetches the list of instances from the Orthanc server.

Returns:
    list: List of instances.
"""
api_url = "http://localhost:8042/instances/"
orthanc_auth = ("admin", "admin")
try:
    response = requests.get(api_url, auth=orthanc_auth)
    response.raise_for_status()  
    instances = response.json()
    return instances
except requests.RequestException as e:
    print(f"Error fetching instances: {e}")
    return []

def check_instances_in_db(conn, instance_uuids):
“”"
Checks if instance UUIDs are already in the database.

Args:
    conn (sqlite3.Connection): SQLite database connection.
    instance_uuids (list): List of instance UUIDs to check.

Returns:
    list: List of existing instance UUIDs in the database.
"""
cursor = conn.cursor()
cursor.execute("SELECT InstanceID FROM Table1 WHERE InstanceID IN ({})".format(
    ','.join(['?'] * len(instance_uuids))), instance_uuids)

existing_instances = [row[0] for row in cursor.fetchall()]
return existing_instances

def create_database():
“”"
Creates the database and table if they don’t exist.

Returns:
    sqlite3.Connection: SQLite database connection.
"""
db_path = DB_PATH
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''
    CREATE TABLE IF NOT EXISTS Table1 (
        InstanceID TEXT PRIMARY KEY,
        Phash TEXT,
        PatientName TEXT,
        PatientID TEXT,
        InstitutionName TEXT
    )
''')
conn.commit()
return conn

def update_phash_value(conn, instance_id, new_hash_value, PatientName, PatientID, InstitutionName):
“”"
Updates the Phash value along with Patient Name, ID and Institution Name in the database.

Args:
    conn (sqlite3.Connection): SQLite database connection.
    instance_id (str): Instance UUID.
    new_hash_value (str): Image Phash value.
    PatientName (str): Patient name.
    PatientID (str): Patient ID.
    InstitutionName (str): Institution name.
"""
try:
    cursor = conn.cursor()
    cursor.execute("UPDATE Table1 SET Phash = ?, PatientName = ?, PatientID = ?, InstitutionName = ? WHERE InstanceID = ?",
                   (new_hash_value, PatientName, PatientID, InstitutionName, instance_id))
    conn.commit()
    print(f"Phash value updated successfully for InstanceID {instance_id}")
except sqlite3.Error as e:
    print(f"Error updating Phash value: {e}")

def DecodeInstance(instance):
“”"
Decodes the instance and updates the Phash value in the database.

Args:
    instance (str): Instance UUID.
"""
instance_id = instance

db_path = DB_PATH  
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT Phash FROM Table1 WHERE InstanceID = ?", (instance_id,))
existing_phash = cursor.fetchone()
if existing_phash is not None and existing_phash[0]:
    print(f"Phash value already exists for InstanceID {instance_id}. Skipping processing.")
    conn.close()
    return
f = orthanc.GetDicomForInstance(instance_id)
dicom = pydicom.dcmread(io.BytesIO(f))
if hasattr(dicom, 'Rows'):
    if hasattr(dicom, 'NumberOfFrames') and dicom.NumberOfFrames > 5:
        midpoint = round(dicom.NumberOfFrames / 2)
        pixel_data = dicom.pixel_array[midpoint]
    else:
        pixel_data = dicom.pixel_array[0]
    PatientName = str(dicom.PatientName)
    PatientID = str(dicom.PatientID)
    InstitutionName = str(dicom.InstitutionName)
    new_hash_value = str(imagehash.phash(Image.fromarray(pixel_data), hash_size=30))
    update_phash_value(conn, instance_id, new_hash_value, PatientName, PatientID, InstitutionName)

conn.close()

def manageDatabase():
“”"
Manages the database by updating existing records and adding new instances. Only updates at start of Orthanc Server.
“”"
instances = get_instances()
if not instances:
print(“No instances found.”)
return
conn = create_database()
existing_instances = check_instances_in_db(conn, instances)
for instance in instances:
if instance not in existing_instances:
conn.execute(“INSERT INTO Table1 (InstanceID) VALUES (?)”, (instance,))
conn.commit()
conn.close()

def OnChange(changeType, level, resource):
“”"
Callback function to handle Orthanc server changes.
“”"
db_path = DB_PATH
if changeType == orthanc.ChangeType.ORTHANC_STARTED:
manageDatabase()
if os.path.exists(db_path):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(“SELECT InstanceID FROM Table1”)
instance_ids = [row[0] for row in cursor.fetchall()]
conn.close()
for instance_id in instance_ids:
DecodeInstance(instance_id)

orthanc.RegisterOnChangeCallback(OnChange)

Adding an API endpoint to retrieve list of visually similar duplicate dicom images would be ideal but I’m just using the following script to directly read the database and save results to a text document.

import sqlite3
import sys

def find_duplicates(db_path, output_file):
“”"
Finds and prints instances with duplicate Phash values from the specified SQLite database.
FindDuplicateInstances.py

Parameters:
- db_path (str): The path to the SQLite database file.
- output_file (str): The path to the file where the results will be saved.
"""
try:
    
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    query = """
        SELECT COUNT(*), GROUP_CONCAT(PatientName), GROUP_CONCAT(PatientID), GROUP_CONCAT(InstitutionName)
        FROM Table1
        GROUP BY Phash
        HAVING COUNT(*) > 1
    """
    cursor.execute(query)

    with open(output_file, 'w') as result_file:
        original_stdout = sys.stdout  
        sys.stdout = result_file  
        print("Duplicates:")
        for row in cursor.fetchall():
            count, patient_names, patient_ids, institution_names = row
            if patient_names is not None and patient_ids is not None and institution_names is not None:
                print(f"{count}")
                patient_names = patient_names.split(',')
                patient_ids = patient_ids.split(',')
                institution_names = institution_names.split(',')

                patients_info = zip(patient_names, patient_ids, institution_names)
                for patient_info in patients_info:
                    print(f"Patient Name: {patient_info[0]}, Patient ID: {patient_info[1]}, Institution Name: {patient_info[2]}")

        sys.stdout = original_stdout 

    print(f"Results saved in {output_file}")

except sqlite3.Error as e:
    print(f"Error accessing the database: {e}")
finally:
    if conn:
        conn.close()

if __name__ == "__main__":

db_path = "D:\\CLIP\\phasher.db"  
output_file = "D:\\CLIP\\result.txt"
find_duplicates(db_path, output_file)