|
""" |
|
Quantum Physics Problem Generator |
|
Shlomo Kashani |
|
|
|
Description: |
|
------------ |
|
This module is part of the QuantumLLMInstruct system, designed to generate and solve quantum physics problems |
|
using advanced Large Language Models (LLMs). It utilizes a multi-stage pipeline for problem generation, |
|
solution generation, and database management. |
|
|
|
Core Functionalities: |
|
--------------------- |
|
1. **Problem Generation**: |
|
- Generates quantum physics problems in LaTeX format using LLMs. |
|
- Supports domain-specific problem generation across multiple quantum fields. |
|
|
|
2. **Solution Generation**: |
|
- Provides step-by-step LaTeX solutions for the generated problems using a second LLM. |
|
|
|
3. **Data Management**: |
|
- Stores generated problems and solutions in DuckDB and Parquet files. |
|
- Enables exporting data in Parquet format for scalability and compatibility. |
|
|
|
4. **Gradio Interface**: |
|
- A user-friendly interface to interact with the system, including problem generation, |
|
solution generation, and database exploration. |
|
|
|
5. **Hugging Face Integration**: |
|
- Supports visualization and interaction with the dataset on the Hugging Face platform. |
|
|
|
Main Components: |
|
---------------- |
|
- **initialize_duckdb() / initialize_parquet()**: Initializes the database schema. |
|
- **generate_multiple_problems()**: Generates multiple problems for the selected quantum domains. |
|
- **generate_solutions()**: Solves unsolved problems in the database. |
|
- **export_parquet()**: Exports the database to a Parquet file for external use. |
|
|
|
Dependencies: |
|
------------- |
|
- Python 3.7+ |
|
- Transformers: `transformers` |
|
- DuckDB: `duckdb` |
|
- Gradio: `gradio` |
|
- Pandas: `pandas` |
|
""" |
|
|
|
import numpy as np |
|
import random |
|
import io |
|
import duckdb |
|
import math |
|
from datetime import datetime |
|
import PIL |
|
from PIL import Image |
|
import pennylane as qml |
|
import base64 |
|
import platform |
|
from math import pi |
|
import pandas as pd |
|
import os |
|
from transformers import AutoModelForCausalLM, AutoTokenizer |
|
import tqdm |
|
import duckdb |
|
from tqdm import tqdm |
|
import uuid |
|
import random |
|
import sympy |
|
from datetime import datetime |
|
|
|
from Q_llm_prompts import * |
|
|
|
|
|
|
|
|
|
model_options = [ |
|
"Qwen/Qwen2.5-Coder-1.5B-Instruct", |
|
"Qwen/Qwen2.5-Coder-3B-Instruct", |
|
"Qwen/Qwen2.5-Coder-7B-Instruct", |
|
"Qwen/Qwen2.5-Math-7B-Instruct", |
|
"Qwen/Qwen2.5-Coder-32B-Instruct", |
|
"meta-llama/Llama-3.2-3B-Instruct" |
|
|
|
|
|
|
|
] |
|
|
|
solutions_model_options = model_options |
|
|
|
|
|
selected_model = model_options[0] |
|
model = AutoModelForCausalLM.from_pretrained( |
|
selected_model, |
|
torch_dtype="auto", |
|
device_map="auto" |
|
) |
|
tokenizer = AutoTokenizer.from_pretrained(selected_model) |
|
solution_model = selected_model |
|
solution_tokenizer =tokenizer |
|
solution_model_instance =model |
|
|
|
|
|
def reload_model(model_name): |
|
global model, tokenizer |
|
model = AutoModelForCausalLM.from_pretrained( |
|
model_name, |
|
torch_dtype="auto", |
|
device_map="auto" |
|
) |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
return f"Model loaded: {model_name}" |
|
|
|
|
|
|
|
|
|
dev = qml.device('default.qubit', wires=10) |
|
|
|
|
|
def is_mac_os(): |
|
return platform.system() == 'Darwin' |
|
|
|
device = 'cpu' if is_mac_os() else 'cuda' |
|
|
|
RESPONSE_SOLUTION_LLM_SYS_PROMPT = "You are an expert in quantum physics and provide detailed solutions in plain text. All mathematical equations and symbols must strictly be in LaTeX." |
|
RESPONSE_SOLUTION_LLM_USR_PROMPT = """ |
|
Provide a complete solution to the following quantum physics problem in plain text format: |
|
{problem} |
|
""" |
|
|
|
|
|
PARQUET_FILE = 'quantum_problems.parquet' |
|
|
|
def initialize_parquet(): |
|
"""Initialize Parquet file with the required schema if it doesn't exist.""" |
|
if not os.path.exists(PARQUET_FILE): |
|
data = { |
|
"uuid": [], |
|
"timestamp": [], |
|
"problem": [], |
|
"sub_domain": [], |
|
"main_domain": [], |
|
"model_name": [], |
|
"solution": [], |
|
"solution_model_name": [] |
|
} |
|
df = pd.DataFrame(data) |
|
df.to_parquet(PARQUET_FILE, index=False) |
|
print("Initialized Parquet file with schema.") |
|
|
|
def load_parquet(): |
|
"""Load data from the Parquet file.""" |
|
if os.path.exists(PARQUET_FILE): |
|
return pd.read_parquet(PARQUET_FILE) |
|
else: |
|
initialize_parquet() |
|
return pd.read_parquet(PARQUET_FILE) |
|
|
|
def save_parquet(df): |
|
"""Save DataFrame to Parquet file.""" |
|
df.to_parquet(PARQUET_FILE, index=False) |
|
|
|
def insert_problem_pqt(uuid, timestamp, problem, main_domain, sub_domain, model_name, solution=None, solution_model_name=None): |
|
"""Insert a new problem into the Parquet file.""" |
|
df = load_parquet() |
|
new_row = { |
|
"uuid": uuid, |
|
"timestamp": timestamp, |
|
"problem": problem, |
|
"sub_domain": sub_domain, |
|
"main_domain": main_domain, |
|
"model_name": model_name, |
|
"solution": solution, |
|
"solution_model_name": solution_model_name |
|
} |
|
df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True) |
|
save_parquet(df) |
|
|
|
def update_solution_pqt(uuid, solution, solution_model_name): |
|
"""Update the solution for a given problem UUID.""" |
|
df = load_parquet() |
|
df.loc[df["uuid"] == uuid, ["solution", "solution_model_name"]] = solution, solution_model_name |
|
save_parquet(df) |
|
|
|
|
|
|
|
DB_FILE = 'quantum_problems.duckdb' |
|
|
|
def initialize_duckdb(): |
|
conn = duckdb.connect(database=DB_FILE) |
|
|
|
conn.execute(""" |
|
CREATE TABLE IF NOT EXISTS problems ( |
|
uuid TEXT UNIQUE NOT NULL, |
|
timestamp TEXT, |
|
problem TEXT, |
|
sub_domain TEXT, |
|
main_domain TEXT, |
|
model_name TEXT, |
|
solution TEXT, |
|
solution_model_name TEXT |
|
) |
|
""") |
|
|
|
|
|
|
|
conn.close() |
|
|
|
|
|
def buffer_plot_and_get(fig): |
|
buf = io.BytesIO() |
|
fig.savefig(buf, format='png') |
|
buf.seek(0) |
|
return PIL.Image.open(buf) |
|
|
|
|
|
def pil_image_to_bytes(image): |
|
img_byte_arr = io.BytesIO() |
|
image.save(img_byte_arr, format='PNG') |
|
return img_byte_arr.getvalue() |
|
|
|
|
|
def encode_image_from_blob(blob): |
|
img_buffer = io.BytesIO(blob) |
|
image = Image.open(img_buffer) |
|
img_str = base64.b64encode(img_buffer.getvalue()).decode("utf-8") |
|
return f'<img src="data:image/png;base64,{img_str}" style="max-width:500px;"/>' |
|
|
|
|
|
def generate_random_hamiltonian(num_qubits): |
|
terms = [] |
|
for _ in range(random.randint(1, 5)): |
|
coeff = round(random.uniform(-1, 1), 2) |
|
pauli_ops = [random.choice(['I', 'X', 'Y', 'Z']) for _ in range(num_qubits)] |
|
term = f"{coeff} * {' '.join(pauli_ops)}" |
|
terms.append(term) |
|
return " + ".join(terms) |
|
|
|
|
|
def hamiltonian_to_qasm(hamiltonian, num_qubits): |
|
qasm_code = f"OPENQASM 2.0;\ninclude \"qelib1.inc\";\nqreg q[{num_qubits}];\n" |
|
rotations = {i: 0.0 for i in range(num_qubits)} |
|
terms = hamiltonian.split(" + ") |
|
|
|
for term in terms: |
|
coeff, paulis = term.split(" * ") |
|
paulis = paulis.split() |
|
coeff = float(coeff) |
|
|
|
for i, pauli in enumerate(paulis): |
|
if pauli == "X": |
|
qasm_code += f"x q[{i}];\n" |
|
elif pauli == "Y": |
|
qasm_code += f"ry(pi/2) q[{i}];\n" |
|
elif pauli == "Z": |
|
rotations[i] += coeff |
|
|
|
for i, angle in rotations.items(): |
|
if angle != 0: |
|
angle_degrees = round(angle * 180 / math.pi, 2) |
|
qasm_code += f"rz({angle_degrees}) q[{i}];\n" |
|
|
|
return qasm_code |
|
|
|
|
|
def qasm_to_pennylane(qasm_code): |
|
qasm_lines = qasm_code.split("\n") |
|
num_qubits = int(qasm_lines[2].split('[')[1].split(']')[0]) |
|
|
|
@qml.qnode(dev) |
|
def circuit(): |
|
for line in qasm_lines: |
|
if "x" in line: |
|
qml.PauliX(int(line.split('q[')[1].split(']')[0])) |
|
elif "rz" in line: |
|
angle = float(line.split('(')[1].split(')')[0]) |
|
qml.RZ(angle, int(line.split('q[')[1].split(']')[0])) |
|
elif "ry" in line: |
|
qml.RY(pi / 2, int(line.split('q[')[1].split(']')[0])) |
|
return qml.state() |
|
|
|
return circuit |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_from_duckdb(db_file='quantum_hamiltonians.duckdb'): |
|
conn = duckdb.connect(database=db_file) |
|
df = conn.execute("SELECT * FROM hamiltonians").df() |
|
conn.close() |
|
|
|
|
|
html_content = [] |
|
for index, row in df.iterrows(): |
|
plot_blob = row['plot'] |
|
encoded_img = encode_image_from_blob(plot_blob) |
|
|
|
html_content.append(f""" |
|
<table style='width: 100%; border-collapse: collapse; margin: 10px;'> |
|
<tr> |
|
<td style='width: 30%; text-align: center;'> |
|
<h3>Circuit {index + 1}</h3> |
|
{encoded_img} <!-- Display the image --> |
|
</td> |
|
<td style='padding: 10px;'> |
|
<table style='width: 100%; border-collapse: collapse;'> |
|
<tr> |
|
<td><strong>Hamiltonian:</strong></td><td>{row['hamiltonian']}</td> |
|
</tr> |
|
<tr> |
|
<td><strong>QASM Representation:</strong></td><td>{row['qasm_code']}</td> |
|
</tr> |
|
<tr> |
|
<td><strong>Trotter Decomposition:</strong></td><td>{row['trotter_code']}</td> |
|
</tr> |
|
<tr> |
|
<td><strong>Number of Qubits:</strong></td><td>{row['num_qubits']}</td> |
|
</tr> |
|
<tr> |
|
<td><strong>Trotter Order:</strong></td><td>{row['trotter_order']}</td> |
|
</tr> |
|
<tr> |
|
<td><strong>Timestamp:</strong></td><td>{row['timestamp']}</td> |
|
</tr> |
|
</table> |
|
</td> |
|
</tr> |
|
</table> |
|
""") |
|
|
|
return "".join(html_content) |
|
|
|
|
|
def generate_hamiltonians(num_hamiltonians, selected_qubits, selected_order): |
|
results_table = [] |
|
timestamp = str(datetime.now()) |
|
|
|
for i in range(num_hamiltonians): |
|
num_qubits = random.choice(selected_qubits) |
|
order = selected_order |
|
hamiltonian = generate_random_hamiltonian(num_qubits) |
|
qasm_code = hamiltonian_to_qasm(hamiltonian, num_qubits) |
|
trotter_code = trotter_decomposition(hamiltonian, order) |
|
|
|
|
|
circuit = qasm_to_pennylane(qasm_code) |
|
|
|
|
|
fig, ax = qml.draw_mpl(circuit)() |
|
circuit_plot_image = buffer_plot_and_get(fig) |
|
circuit_plot_bytes = pil_image_to_bytes(circuit_plot_image) |
|
|
|
|
|
results_table.append((i + 1, circuit_plot_bytes, hamiltonian, qasm_code, trotter_code, num_qubits, order, timestamp)) |
|
|
|
|
|
|
|
def trotter_decomposition(hamiltonian, order): |
|
terms = hamiltonian.split(" + ") |
|
trotter_steps = [] |
|
|
|
for term in terms: |
|
coeff, *pauli_ops = term.split(" * ") |
|
coeff = float(coeff) |
|
for _ in range(order): |
|
trotter_steps.append(f"exp({coeff / order}) * ({' * '.join(pauli_ops)})") |
|
for _ in range(order): |
|
trotter_steps.append(f"exp({-coeff / order}) * ({' * '.join(pauli_ops)})") |
|
|
|
return " + ".join(trotter_steps) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def export_parquet(db_file): |
|
"""Export DuckDB table to a Parquet file using COPY.""" |
|
try: |
|
conn = duckdb.connect(database=db_file) |
|
parquet_file = f"quantum_problems_{datetime.now().strftime('%Y%m%d_%H%M%S')}.parquet" |
|
conn.execute(f""" |
|
COPY ( |
|
SELECT |
|
uuid, |
|
CAST(timestamp AS VARCHAR) AS timestamp, |
|
problem, |
|
sub_domain, |
|
main_domain, |
|
model_name, |
|
solution, |
|
solution_model_name |
|
FROM problems |
|
) TO '{parquet_file}' (FORMAT PARQUET); |
|
""") |
|
conn.close() |
|
df = pd.read_parquet(parquet_file) |
|
df['timestamp'] = df['timestamp'].astype(str) |
|
df.to_parquet(parquet_file, index=False) |
|
|
|
return f"Data successfully exported to Parquet file: {parquet_file}" |
|
except Exception as e: |
|
return f"Error exporting to Parquet: {e}" |
|
|
|
def generate_dynamic_prompt(selected_domains): |
|
if not selected_domains: |
|
raise ValueError("No domains selected. Please select at least one domain.") |
|
|
|
selected_domain = random.choice(selected_domains) |
|
|
|
|
|
domain_details = quantum_problem_domains[selected_domain] |
|
domain_description = domain_details["description"] |
|
example_output = domain_details["template"] |
|
RESPONSE_INSTRUCTION_LLM_PROMPT = f""" |
|
Generate a single detailed quantum physics problem for an exam in LaTeX format. Do not solve the problem. |
|
Do not include additional explanations or comments outside of LaTeX, and avoid unnecessary LaTeX imports (e.g., \\documentclass{{}}, \\usepackage{{}}, or \\begin{{document}}). |
|
All mathematical equations and symbols must strictly be in LaTeX. |
|
Your response must strictly follow this provided format: |
|
1) {{Problem:}} Clearly define the quantum physics problem here, using mathematical precision and LaTeX formatting. Provide any equations or detailed descriptions necessary for students to understand and solve the problem. |
|
2) {{Domain:}} Provide a concise two-word domain description in CAPS such as "ISING HAMILTONIAN". |
|
Do not solve the problem!. The problem must strictly adhere to one and only one of the following domain types: |
|
{domain_description} |
|
Example Response Output: |
|
{example_output} |
|
""" |
|
return RESPONSE_INSTRUCTION_LLM_PROMPT, selected_domain |
|
|
|
|
|
def generate_problem(pair_id, model_name, selected_domains): |
|
try: |
|
prompt, selected_domain = generate_dynamic_prompt(selected_domains) |
|
|
|
messages = [ |
|
{"role": "system", "content": "You are a quantum physics professor and an expert in quantum computing."}, |
|
{"role": "user", "content": prompt} |
|
] |
|
text = tokenizer.apply_chat_template( |
|
messages, |
|
tokenize=False, |
|
add_generation_prompt=True |
|
) |
|
model_inputs = tokenizer([text], return_tensors="pt").to(model.device) |
|
|
|
generated_ids = model.generate( |
|
**model_inputs, |
|
max_new_tokens=10024 |
|
) |
|
generated_ids = [ |
|
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids) |
|
] |
|
response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] |
|
|
|
if "{Problem:}" not in response or "{Domain:}" not in response: |
|
raise ValueError(f"Generated problem does not match the expected format. Response:\n{response}") |
|
|
|
problem = response.split("{Problem:}")[1].split("{Domain:}")[0].strip() |
|
sub_domain = response.split("{Domain:}")[1].strip() |
|
|
|
|
|
conn = duckdb.connect(database=DB_FILE) |
|
conn.execute(""" |
|
INSERT INTO problems (uuid, timestamp, problem, main_domain, sub_domain, model_name) |
|
VALUES (?, ?, ?, ?, ?, ?) |
|
""", (str(uuid.uuid4()), datetime.now().isoformat(), problem, selected_domain, sub_domain, model_name.split("/")[-1])) |
|
conn.close() |
|
|
|
|
|
return response, selected_domain |
|
except Exception as e: |
|
print(f"Error generating problem {pair_id}: {e}") |
|
return None, None |
|
|
|
def generate_multiple_problems(num_pairs, selected_domains): |
|
if not selected_domains: |
|
return "Please select at least one domain type." |
|
|
|
conn = duckdb.connect(database=DB_FILE) |
|
current_count = conn.execute("SELECT COUNT(*) FROM problems").fetchone()[0] |
|
conn.close() |
|
|
|
|
|
model_name = selected_model.split("/")[-1] |
|
domain_list = ", ".join(selected_domains[:3]) |
|
|
|
tqdm_desc = f"Generating Instructions - Model: {model_name} | Total: {num_pairs}" |
|
|
|
responses = [] |
|
with tqdm(total=num_pairs, desc=tqdm_desc, unit="problem") as pbar: |
|
for i in range(num_pairs): |
|
response, selected_domain = generate_problem(current_count + i + 1, selected_model, selected_domains) |
|
if response: |
|
responses.append(response) |
|
pbar.set_postfix_str(f"Last Domain: {selected_domain}") |
|
pbar.update(1) |
|
|
|
return "\n\n".join(responses) |
|
|
|
|
|
def generate_solutions_pqt(solution_model_name): |
|
df = load_parquet() |
|
unsolved_problems = df[df["solution"].isna()] |
|
|
|
if unsolved_problems.empty: |
|
return "No unsolved problems found in the database." |
|
|
|
with tqdm(total=len(unsolved_problems), desc="Generating Solutions", unit="solution") as pbar: |
|
for _, row in unsolved_problems.iterrows(): |
|
try: |
|
solution_prompt = RESPONSE_SOLUTION_LLM_USR_PROMPT.format(problem=row["problem"]) |
|
|
|
messages = [ |
|
{"role": "system", "content": RESPONSE_SOLUTION_LLM_SYS_PROMPT}, |
|
{"role": "user", "content": solution_prompt} |
|
] |
|
text = solution_tokenizer.apply_chat_template( |
|
messages, |
|
tokenize=False, |
|
add_generation_prompt=True |
|
) |
|
model_inputs = solution_tokenizer([text], return_tensors="pt").to(solution_model_instance.device) |
|
|
|
generated_ids = solution_model_instance.generate( |
|
**model_inputs, |
|
max_new_tokens=10024 |
|
) |
|
generated_ids = [ |
|
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids) |
|
] |
|
solution = solution_tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] |
|
|
|
|
|
update_solution_pqt(row["uuid"], solution, solution_model_name.split("/")[-1]) |
|
except Exception as e: |
|
print(f"Error generating solution for problem {row['uuid']}: {e}") |
|
pbar.update(1) |
|
return "Solutions generated successfully!" |
|
|
|
def generate_solutions(solution_model_name): |
|
conn = duckdb.connect(database=DB_FILE) |
|
problems = conn.execute("SELECT uuid, problem FROM problems WHERE solution IS NULL").fetchall() |
|
|
|
if not problems: |
|
return "No unsolved problems found in the database." |
|
|
|
|
|
model_name = solution_model_name.split("/")[-1] |
|
total_problems = len(problems) |
|
tqdm_desc = f"Solution Model: {model_name} | Total Problems: {total_problems}" |
|
|
|
with tqdm(total=total_problems, desc=tqdm_desc, unit="solution") as pbar: |
|
for problem_id, problem_text in problems: |
|
try: |
|
solution_prompt = RESPONSE_SOLUTION_LLM_USR_PROMPT.format(problem=problem_text) |
|
|
|
messages = [ |
|
{"role": "system", "content": RESPONSE_SOLUTION_LLM_SYS_PROMPT}, |
|
{"role": "user", "content": solution_prompt} |
|
] |
|
text = solution_tokenizer.apply_chat_template( |
|
messages, |
|
tokenize=False, |
|
add_generation_prompt=True |
|
) |
|
model_inputs = solution_tokenizer([text], return_tensors="pt").to(solution_model_instance.device) |
|
|
|
generated_ids = solution_model_instance.generate( |
|
**model_inputs, |
|
max_new_tokens=10024 |
|
) |
|
generated_ids = [ |
|
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids) |
|
] |
|
solution = solution_tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0] |
|
|
|
|
|
conn.execute(""" |
|
UPDATE problems |
|
SET solution = ?, solution_model_name = ? |
|
WHERE uuid = ? |
|
""", (solution, model_name, problem_id)) |
|
|
|
|
|
pbar.set_postfix_str(f"Last Problem UUID: {problem_id}") |
|
except Exception as e: |
|
print(f"Error generating solution for problem {problem_id}: {e}") |
|
pbar.update(1) |
|
conn.close() |
|
return "Solutions generated successfully!" |
|
|
|
|
|
|
|
def load_problems_from_duckdb(): |
|
"""Load all problems and solutions from the DuckDB database.""" |
|
conn = duckdb.connect(database=DB_FILE) |
|
df = conn.execute("SELECT * FROM problems").df() |
|
conn.close() |
|
return df |
|
|
|
|
|
def load_summary_from_duckdb(): |
|
conn = duckdb.connect(database=DB_FILE) |
|
|
|
|
|
total_problems = conn.execute("SELECT COUNT(*) FROM problems").fetchone()[0] |
|
|
|
|
|
distinct_domains_count = conn.execute("SELECT COUNT(DISTINCT main_domain) FROM problems").fetchone()[0] |
|
|
|
|
|
problems_by_model = conn.execute("SELECT model_name, COUNT(*) as count FROM problems GROUP BY model_name").fetchall() |
|
conn.close() |
|
|
|
|
|
summary = f"<h3>Total Problems: {total_problems}</h3>" |
|
summary += f"<h4>Distinct Domains: {distinct_domains_count}</h4>" |
|
|
|
summary += "<h4>Problems by Model:</h4><ul>" |
|
for model_name, count in problems_by_model: |
|
summary += f"<li>{model_name}: {count}</li>" |
|
summary += "</ul>" |
|
|
|
return summary |
|
|