使用pyqpanda2量子机器学习模块 ################################# .. warning:: 以下接口的量子计算部分使用pyqpanda2 https://pyqpanda-toturial.readthedocs.io/zh/latest/。 您需要自行安装pyqpanda2, `pip install pyqpanda` 量子计算层 *********************************** .. _QuantumLayer: QuantumLayer ============================ QuantumLayer是一个支持量子含参线路作为参数的自动求导模块的封装类。用户定义一个函数作为参数 ``qprog_with_measure`` ,该函数需要包含pyqpanda2定义的量子线路:一般包含量子线路的编码线路,演化线路和测量操作。 该类可以嵌入量子经典混合机器学习模型,通过经典的梯度下降法,使得量子经典混合模型的目标函数或损失函数最小。 用户可通过参数 ``diff_method`` 指定 ``QuantumLayer`` 层中量子线路参数的梯度计算方式, ``QuantumLayer`` 当前支持有限差分法 ``finite_diff`` 以及 ``parameter-shift`` 方法。 有限差分法是估算函数梯度最传统和最常用的数值方法之一。主要思想是用差分代替偏导数: .. math:: f^{\prime}(x)=\lim _{h \rightarrow 0} \frac{f(x+h)-f(x)}{h} 若使用 ``parameter-shift`` 方法,我们使用如下目标函数: .. math:: O(\theta)=\left\langle 0\left|U^{\dagger}(\theta) H U(\theta)\right| 0\right\rangle 理论上可以通过 ``parameter-shift`` 这一更精确的方法计算量子线路中参数对哈密顿量的梯度: .. math:: \nabla O(\theta)= \frac{1}{2}\left[O\left(\theta+\frac{\pi}{2}\right)-O\left(\theta-\frac{\pi}{2}\right)\right] .. py:class:: pyvqnet.qnn.quantumlayer.QuantumLayer(qprog_with_measure,para_num,machine_type_or_cloud_token,num_of_qubits:int,num_of_cbits:int = 1,diff_method:str = "parameter_shift",delta:float = 0.01,dtype=None,name="") 变分量子层的抽象计算模块。对一个参数化的量子线路进行仿真,得到测量结果。该变分量子层继承了VQNet框架的梯度计算模块,可以计算线路参数的梯度,训练变分量子线路模型或将变分量子线路嵌入混合量子和经典模型。 :param qprog_with_measure: 用pyqpanda2构建的量子线路运行和测量函数。 :param para_num: `int` - 参数个数。 :param machine_type_or_cloud_token: qpanda量子虚拟机类型 "CPU"。 :param num_of_qubits: 量子比特数。 :param num_of_cbits: 经典比特数,默认为1。 :param diff_method: 求解量子线路参数梯度的方法,“参数位移”或“有限差分”,默认参数偏移。 :param delta: 有限差分计算梯度时的 \delta。 :param dtype: 参数的数据类型,defaults:None,使用默认数据类型:kfloat32,代表32位浮点数。 :param name: 这个模块的名字, 默认为""。 :return: 一个可以计算量子线路的模块。 .. note:: qprog_with_measure是pyqpanda2中定义的量子线路函数 :https://pyqpanda-toturial.readthedocs.io/zh/latest/QCircuit.html。 此函数必须包含以下参数作为函数入参(即使某个参数未实际使用),否则无法在QuantumLayer中正常运行。 qprog_with_measure (input,param,qubits,cbits,machine) `input`: 输入一维经典数据。如果没有输入可以输入 None。 `param`: 输入一维的变分量子线路的待训练参数。 `qubits`: 该QuantumLayer分配的量子比特,类型为pyQpanda.Qubits。 `cbits`: 由QuantumLayer分配的经典比特,用来辅助测量函数,类型为 pyQpanda.ClassicalCondition。如果线路不使用cbits,也应保留此参数。 `machine`: 由QuantumLayer创建的模拟器,例如CPUQVM,GPUQVM,QCloud等。 使用QuantumLayer的 `m_para` 属性获取变分量子线路的训练参数。该参数为QTensor类,可使用to_numpy()接口转化为numpy数组。 .. note:: 该类具有别名 `QpandaQCircuitVQCLayer` 。 .. note:: 该类计算梯度需要额外使用pyqpanda进行线路计算个数与参数个数、数据个数、数据维度的乘积线性相关。 Example:: import pyqpanda as pq from pyvqnet.qnn.measure import ProbsMeasure from pyvqnet.qnn.quantumlayer import QuantumLayer import numpy as np from pyvqnet.tensor import QTensor def pqctest (input,param,qubits,cbits,machine): circuit = pq.QCircuit() circuit.insert(pq.H(qubits[0])) circuit.insert(pq.H(qubits[1])) circuit.insert(pq.H(qubits[2])) circuit.insert(pq.H(qubits[3])) circuit.insert(pq.RZ(qubits[0],input[0])) circuit.insert(pq.RZ(qubits[1],input[1])) circuit.insert(pq.RZ(qubits[2],input[2])) circuit.insert(pq.RZ(qubits[3],input[3])) circuit.insert(pq.CNOT(qubits[0],qubits[1])) circuit.insert(pq.RZ(qubits[1],param[0])) circuit.insert(pq.CNOT(qubits[0],qubits[1])) circuit.insert(pq.CNOT(qubits[1],qubits[2])) circuit.insert(pq.RZ(qubits[2],param[1])) circuit.insert(pq.CNOT(qubits[1],qubits[2])) circuit.insert(pq.CNOT(qubits[2],qubits[3])) circuit.insert(pq.RZ(qubits[3],param[2])) circuit.insert(pq.CNOT(qubits[2],qubits[3])) prog = pq.QProg() prog.insert(circuit) # pauli_dict = {'Z0 X1':10,'Y2':-0.543} rlt_prob = ProbsMeasure([0,2],prog,machine,qubits) return rlt_prob pqc = QuantumLayer(pqctest,3,"CPU",4,1) #classic data as input input = QTensor([[1,2,3,4],[40,22,2,3],[33,3,25,2.0]] ) #forward circuits rlt = pqc(input) grad = QTensor(np.ones(rlt.data.shape)*1000) #backward circuits rlt.backward(grad) print(rlt) # [ # [0.2500000, 0.2500000, 0.2500000, 0.2500000], # [0.2500000, 0.2500000, 0.2500000, 0.2500000], # [0.2500000, 0.2500000, 0.2500000, 0.2500000] # ] 如果使用GPU,参考下面的例子: Example:: import pyqpanda as pq from pyvqnet.qnn.measure import ProbsMeasure from pyvqnet.qnn.quantumlayer import QuantumLayer import numpy as np from pyvqnet.tensor import QTensor,DEV_GPU_0 def pqctest (input,param,qubits,cbits,machine): circuit = pq.QCircuit() circuit.insert(pq.H(qubits[0])) circuit.insert(pq.H(qubits[1])) circuit.insert(pq.H(qubits[2])) circuit.insert(pq.H(qubits[3])) circuit.insert(pq.RZ(qubits[0],input[0])) circuit.insert(pq.RZ(qubits[1],input[1])) circuit.insert(pq.RZ(qubits[2],input[2])) circuit.insert(pq.RZ(qubits[3],input[3])) circuit.insert(pq.CNOT(qubits[0],qubits[1])) circuit.insert(pq.RZ(qubits[1],param[0])) circuit.insert(pq.CNOT(qubits[0],qubits[1])) circuit.insert(pq.CNOT(qubits[1],qubits[2])) circuit.insert(pq.RZ(qubits[2],param[1])) circuit.insert(pq.CNOT(qubits[1],qubits[2])) circuit.insert(pq.CNOT(qubits[2],qubits[3])) circuit.insert(pq.RZ(qubits[3],param[2])) circuit.insert(pq.CNOT(qubits[2],qubits[3])) prog = pq.QProg() prog.insert(circuit) # pauli_dict = {'Z0 X1':10,'Y2':-0.543} rlt_prob = ProbsMeasure([0,2],prog,machine,qubits) return rlt_prob #这里的"CPU" 指的是qpanda量子计算模拟器使用CPU,跟pyvqnet是否使用GPU无关。 pqc = QuantumLayer(pqctest,3,"CPU",4,1) #这里使用toGPU将QuantumLayer 移动到GPU上 pqc.toGPU() #classic data as input input = QTensor([[1,2,3,4],[40,22,2,3],[33,3,25,2.0]] ) input.toGPU() #forward circuits rlt = pqc(input) grad = QTensor(np.ones(rlt.data.shape)*1000,device=DEV_GPU_0) #backward circuits rlt.backward(grad) print(rlt) QpandaQCircuitVQCLayerLite ============================ 如您更加熟悉pyqpanda2语法,可以使用该接口QpandaQCircuitVQCLayerLite,自定义量子比特 ``qubits`` ,经典比特 ``cbits`` ,后端模拟器 ``machine`` 加入QpandaQCircuitVQCLayerLite的参数 ``qprog_with_measure`` 函数中。 .. py:class:: pyvqnet.qnn.quantumlayer.QpandaQCircuitVQCLayerLite(qprog_with_measure,para_num,diff_method:str = "parameter_shift",delta:float = 0.01,dtype=None,name="") 变分量子层的抽象计算模块。对一个参数化的量子线路使用pyqpanda2进行仿真,得到测量结果。该变分量子层继承了VQNet框架的梯度计算模块,可以使用参数漂移法等计算线路参数的梯度,训练变分量子线路模型或将变分量子线路嵌入混合量子和经典模型。 :param qprog_with_measure: 用pyQPand构建的量子线路运行和测量函数。 :param para_num: `int` - 参数个数。 :param diff_method: 求解量子线路参数梯度的方法,“参数位移”或“有限差分”,默认参数偏移。 :param delta: 有限差分计算梯度时的 \delta。 :param dtype: 参数的数据类型,defaults:None,使用默认数据类型:kfloat32,代表32位浮点数。 :param name: 这个模块的名字, 默认为""。 :return: 一个可以计算量子线路的模块。 .. note:: qprog_with_measure是pyqpanda2中定义的量子线路函数 :https://pyqpanda-toturial.readthedocs.io/zh/latest/QCircuit.html。 此函数必须包含以下参数作为函数入参(即使某个参数未实际使用),否则无法在QpandaQCircuitVQCLayerLite中正常运行。 与QuantumLayer相比。该接口传入的变分线路运行函数中,用户应该手动创建量子比特和模拟器: https://pyqpanda-toturial.readthedocs.io/zh/latest/QuantumMachine.html, 如果qprog_with_measure需要quantum measure,用户还需要手动创建需要分配cbits: https://pyqpanda-toturial.readthedocs.io/zh/latest/Measure.html qprog_with_measure的使用可参考下面的例子。 qprog_with_measure(input,param,nqubits,ncubits) `input`: 输入一维经典数据。如果没有,输入 None。 `param`: 输入一维的变分量子线路的待训练参数。 .. note:: 该类具有别名 `QuantumLayerV2` 。 .. note:: 该类计算梯度需要额外使用pyqpanda进行线路计算个数与参数个数、数据个数、数据维度的乘积线性相关。 Example:: import pyqpanda as pq from pyvqnet.qnn.measure import ProbsMeasure from pyvqnet.qnn.quantumlayer import QpandaQCircuitVQCLayerLite import numpy as np from pyvqnet.tensor import QTensor def pqctest (input,param): num_of_qubits = 4 machine = pq.CPUQVM() machine.init_qvm() qubits = machine.qAlloc_many(num_of_qubits) circuit = pq.QCircuit() circuit.insert(pq.H(qubits[0])) circuit.insert(pq.H(qubits[1])) circuit.insert(pq.H(qubits[2])) circuit.insert(pq.H(qubits[3])) circuit.insert(pq.RZ(qubits[0],input[0])) circuit.insert(pq.RZ(qubits[1],input[1])) circuit.insert(pq.RZ(qubits[2],input[2])) circuit.insert(pq.RZ(qubits[3],input[3])) circuit.insert(pq.CNOT(qubits[0],qubits[1])) circuit.insert(pq.RZ(qubits[1],param[0])) circuit.insert(pq.CNOT(qubits[0],qubits[1])) circuit.insert(pq.CNOT(qubits[1],qubits[2])) circuit.insert(pq.RZ(qubits[2],param[1])) circuit.insert(pq.CNOT(qubits[1],qubits[2])) circuit.insert(pq.CNOT(qubits[2],qubits[3])) circuit.insert(pq.RZ(qubits[3],param[2])) circuit.insert(pq.CNOT(qubits[2],qubits[3])) prog = pq.QProg() prog.insert(circuit) rlt_prob = ProbsMeasure([0,2],prog,machine,qubits) return rlt_prob pqc = QpandaQCircuitVQCLayerLite(pqctest,3) #classic data as input input = QTensor([[1.0,2,3,4],[4,2,2,3],[3,3,2,2]] ) #forward circuits rlt = pqc(input) grad = QTensor(np.ones(rlt.data.shape)*1000) #backward circuits rlt.backward(grad) print(rlt) # [ # [0.2500000, 0.2500000, 0.2500000, 0.2500000], # [0.2500000, 0.2500000, 0.2500000, 0.2500000], # [0.2500000, 0.2500000, 0.2500000, 0.2500000] # ] 如果使用GPU,参考下面的例子: Example:: import pyqpanda as pq from pyvqnet.qnn.measure import ProbsMeasure from pyvqnet.qnn.quantumlayer import QpandaQCircuitVQCLayerLite import numpy as np from pyvqnet.tensor import QTensor,DEV_GPU_0 def pqctest (input,param): num_of_qubits = 4 machine = pq.CPUQVM() machine.init_qvm() qubits = machine.qAlloc_many(num_of_qubits) circuit = pq.QCircuit() circuit.insert(pq.H(qubits[0])) circuit.insert(pq.H(qubits[1])) circuit.insert(pq.H(qubits[2])) circuit.insert(pq.H(qubits[3])) circuit.insert(pq.RZ(qubits[0],input[0])) circuit.insert(pq.RZ(qubits[1],input[1])) circuit.insert(pq.RZ(qubits[2],input[2])) circuit.insert(pq.RZ(qubits[3],input[3])) circuit.insert(pq.CNOT(qubits[0],qubits[1])) circuit.insert(pq.RZ(qubits[1],param[0])) circuit.insert(pq.CNOT(qubits[0],qubits[1])) circuit.insert(pq.CNOT(qubits[1],qubits[2])) circuit.insert(pq.RZ(qubits[2],param[1])) circuit.insert(pq.CNOT(qubits[1],qubits[2])) circuit.insert(pq.CNOT(qubits[2],qubits[3])) circuit.insert(pq.RZ(qubits[3],param[2])) circuit.insert(pq.CNOT(qubits[2],qubits[3])) prog = pq.QProg() prog.insert(circuit) rlt_prob = ProbsMeasure([0,2],prog,machine,qubits) return rlt_prob pqc = QpandaQCircuitVQCLayerLite(pqctest,3) #layer move to gpu pqc.toGPU() #classic data as input input = QTensor([[1.0,2,3,4],[4,2,2,3],[3,3,2,2]] ) #data move to gpu input.toGPU(DEV_GPU_0) #forward circuits rlt = pqc(input) grad = QTensor(np.ones(rlt.data.shape)*1000,device= DEV_GPU_0) #backward circuits rlt.backward(grad) print(rlt) NoiseQuantumLayer ========================= 在真实的量子计算机中,受制于量子比特自身的物理特性,常常存在不可避免的计算误差。为了能在量子虚拟机中更好的模拟这种误差,VQNet同样支持含噪声量子虚拟机。含噪声量子虚拟机的模拟更贴近真实的量子计算机,我们可以自定义支持的逻辑门类型,自定义逻辑门支持的噪声模型。 现有可支持的量子噪声模型依据QPanda中定义,具体参考链接 `QPANDA2 `_ 中的介绍。 使用 NoiseQuantumLayer 定义一个量子线路自动微分类,该类支持QPanda噪声虚拟机。用户定义一个函数作为参数 ``qprog_with_measure`` ,该函数需要包含pyqpanda2定义的量子线路,同样需要传入一个参数 ``noise_set_config``,使用pyqpanda2接口,设置噪声模型。 .. py:class:: pyvqnet.qnn.quantumlayer.NoiseQuantumLayer(qprog_with_measure,para_num,machine_type,num_of_qubits:int,num_of_cbits:int=1,diff_method:str= "parameter_shift",delta:float=0.01,noise_set_config = None, dtype=None,name="") 变分量子层的抽象计算模块。对一个参数化的量子线路进行仿真,得到测量结果。该变分量子层继承了VQNet框架的梯度计算模块,可以计算线路参数的梯度,训练变分量子线路模型或将变分量子线路嵌入混合量子和经典模型。 这一层可以在量子线路中使用噪声模型。 :param qprog_with_measure: 用pyqpanda2构建的量子线路运行和测量函数。 :param para_num: `int` - 参数个数。 :param machine_type: qpanda机器类型。 :param num_of_qubits: 量子比特数。 :param num_of_cbits: 经典比特数,默认为1。 :param diff_method: 求解量子线路参数梯度的方法,“参数位移”或“有限差分”,默认参数偏移。 :param delta: 有限差分计算梯度时的 \delta。 :param noise_set_config: 噪声设置函数。 :param dtype: 参数的数据类型,defaults:None,使用默认数据类型:kfloat32,代表32位浮点数。 :param name: 这个模块的名字, 默认为""。 :return: 一个可以计算含噪声量子线路的模块。 .. note:: qprog_with_measure是pyqpanda2中定义的量子线路函数 :https://pyqpanda-toturial.readthedocs.io/zh/latest/QCircuit.html。 此函数必须包含以下参数作为函数入参(即使某个参数未实际使用),否则无法在NoiseQuantumLayer中正常运行。 qprog_with_measure (input,param,qubits,cbits,machine) `input`: 输入一维经典数据。如果没有输入可以输入 None。 `param`: 输入一维的变分量子线路的待训练参数。 `qubits`: 该NoiseQuantumLayer分配的量子比特,类型为pyQpanda.Qubits。 `cbits`: cbits由NoiseQuantumLayer分配的经典比特,用来辅助测量函数,类型为 pyQpanda.ClassicalCondition。如果线路不使用cbits,也应保留此参数。 `machine`: 由NoiseQuantumLayer创建的模拟器。 .. note:: 该类计算梯度需要额外使用pyqpanda进行线路计算个数与参数个数、数据个数、数据维度的乘积线性相关。 Example:: import pyqpanda as pq from pyvqnet.qnn.measure import ProbsMeasure from pyvqnet.qnn.quantumlayer import NoiseQuantumLayer import numpy as np from pyqpanda import * from pyvqnet.tensor import QTensor def circuit(weights,param,qubits,cbits,machine): circuit = pq.QCircuit() circuit.insert(pq.H(qubits[0])) circuit.insert(pq.RY(qubits[0], weights[0])) circuit.insert(pq.RY(qubits[0], param[0])) prog = pq.QProg() prog.insert(circuit) prog << measure_all(qubits, cbits) result = machine.run_with_configuration(prog, cbits, 100) counts = np.array(list(result.values())) states = np.array(list(result.keys())).astype(float) # Compute probabilities for each state probabilities = counts / 100 # Get state expectation expectation = np.sum(states * probabilities) return expectation def default_noise_config(qvm,q): p = 0.01 qvm.set_noise_model(NoiseModel.BITFLIP_KRAUS_OPERATOR, GateType.PAULI_X_GATE, p) qvm.set_noise_model(NoiseModel.BITFLIP_KRAUS_OPERATOR, GateType.PAULI_Y_GATE, p) qvm.set_noise_model(NoiseModel.BITFLIP_KRAUS_OPERATOR, GateType.PAULI_Z_GATE, p) qvm.set_noise_model(NoiseModel.BITFLIP_KRAUS_OPERATOR, GateType.RX_GATE, p) qvm.set_noise_model(NoiseModel.BITFLIP_KRAUS_OPERATOR, GateType.RY_GATE, p) qvm.set_noise_model(NoiseModel.BITFLIP_KRAUS_OPERATOR, GateType.RZ_GATE, p) qvm.set_noise_model(NoiseModel.BITFLIP_KRAUS_OPERATOR, GateType.RY_GATE, p) qvm.set_noise_model(NoiseModel.BITFLIP_KRAUS_OPERATOR, GateType.HADAMARD_GATE, p) qves =[] for i in range(len(q)-1): qves.append([q[i],q[i+1]])# qves.append([q[len(q)-1],q[0]]) qvm.set_noise_model(NoiseModel.DAMPING_KRAUS_OPERATOR, GateType.CNOT_GATE, p, qves) return qvm qvc = NoiseQuantumLayer(circuit,24,"noise",1,1,diff_method= "parameter_shift", delta=0.01,noise_set_config = default_noise_config) input = QTensor([ [0., 1., 1., 1.], [0., 0., 1., 1.], [1., 0., 1., 1.] ] ) rlt = qvc(input) grad = QTensor(np.ones(rlt.data.shape)*1000) rlt.backward(grad) print(qvc.m_para.grad) #[1195., 105., 70., 0., # 45., -45., 50., 15., # -80., 50., 10., -30., # 10., 60., 75., -110., # 55., 45., 25., 5., # 5., 50., -25., -15.] 下面给出一个 ``noise_set_config`` 的例子,这里使得 ``RX`` , ``RY`` , ``RZ`` , ``X`` , ``Y`` , ``Z`` , ``H`` 等逻辑门加入了 p = 0.01 的 BITFLIP_KRAUS_OPERATOR噪声模型。 .. code-block:: def noise_set_config(qvm,q): p = 0.01 qvm.set_noise_model(NoiseModel.BITFLIP_KRAUS_OPERATOR, GateType.PAULI_X_GATE, p) qvm.set_noise_model(NoiseModel.BITFLIP_KRAUS_OPERATOR, GateType.PAULI_Y_GATE, p) qvm.set_noise_model(NoiseModel.BITFLIP_KRAUS_OPERATOR, GateType.PAULI_Z_GATE, p) qvm.set_noise_model(NoiseModel.BITFLIP_KRAUS_OPERATOR, GateType.RX_GATE, p) qvm.set_noise_model(NoiseModel.BITFLIP_KRAUS_OPERATOR, GateType.RY_GATE, p) qvm.set_noise_model(NoiseModel.BITFLIP_KRAUS_OPERATOR, GateType.RZ_GATE, p) qvm.set_noise_model(NoiseModel.BITFLIP_KRAUS_OPERATOR, GateType.RY_GATE, p) qvm.set_noise_model(NoiseModel.BITFLIP_KRAUS_OPERATOR, GateType.HADAMARD_GATE, p) qves =[] for i in range(len(q)-1): qves.append([q[i],q[i+1]])# qves.append([q[len(q)-1],q[0]]) qvm.set_noise_model(NoiseModel.DAMPING_KRAUS_OPERATOR, GateType.CNOT_GATE, p, qves) return qvm QiskitLayer ================================= .. py:class:: pyvqnet.qnn.QiskitLayer(qiskit_circuits,para_num) 一个用于在 vqnet 中实现前向和反向传播的 qiskit 电路封装层。QISKIT_VQC 是一个定义 qiskit 量子电路及其 `run` 函数的类。 以下示例展示了它的工作原理。此层仅支持电路的输入和权重作为参数。 :param cirq_vqc: 定义 qiskit 电路的定义、后端和运行函数的类。 :param para_num: `int` - para_num 的数量。 :return: 一个可以运行 qiskit 量子电路模型的类。 Example:: """ 调用qiskit训练 qiskit 2.1.1 qiskit-aer 0.17.2 opencv-python """ import sys sys.path.insert(0,"../") import os import os.path import urllib import gzip import numpy as np import random import sys sys.path.insert(0,"../../") random.seed(42) np.random.seed(42) from pyvqnet.nn.module import Module from pyvqnet.optim import Adam import pyvqnet pyvqnet.utils.set_random_seed(42) from pyvqnet.nn.loss import MeanSquaredError from pyvqnet.qnn.utils import QiskitLayer import qiskit from qiskit.quantum_info import Statevector from qiskit import QuantumRegister, ClassicalRegister from qiskit.quantum_info.operators import Pauli max_parallel_threads = 24 gpu = False method = "statevector" backend_options = { "method": method, "precision": "double", "max_parallel_threads": max_parallel_threads, "fusion_enable": True, "fusion_threshold": 14, "fusion_max_qubit": 5, } from qiskit_aer import StatevectorSimulator simulator = StatevectorSimulator() simulator.set_options(**backend_options) url_base = 'https://ossci-datasets.s3.amazonaws.com/mnist/' key_file = { 'train_img':'train-images-idx3-ubyte.gz', 'train_label':'train-labels-idx1-ubyte.gz', 'test_img':'t10k-images-idx3-ubyte.gz', 'test_label':'t10k-labels-idx1-ubyte.gz' } def _download(dataset_dir,file_name): file_path = dataset_dir + "/" + file_name if os.path.exists(file_path): with gzip.GzipFile(file_path) as f: file_path_ungz = file_path[:-3].replace('\\', '/') if not os.path.exists(file_path_ungz): open(file_path_ungz,"wb").write(f.read()) return print("Downloading " + file_name + " ... ") urllib.request.urlretrieve(url_base + file_name, file_path) if os.path.exists(file_path): with gzip.GzipFile(file_path) as f: file_path_ungz = file_path[:-3].replace('\\', '/') file_path_ungz = file_path_ungz.replace('-idx', '.idx') if not os.path.exists(file_path_ungz): open(file_path_ungz,"wb").write(f.read()) print("Done") def download_mnist(dataset_dir): for v in key_file.values(): _download(dataset_dir,v) def dataloader(data,label,batch_size, shuffle = True)->np: if shuffle: for _ in range(len(data)//batch_size): random_index = np.random.randint(0, len(data), (batch_size, 1)) yield data[random_index].reshape(batch_size,-1),label[random_index].reshape(batch_size,-1) else: for i in range(0,len(data)-batch_size+1,batch_size): yield data[i:i+batch_size], label[i:i+batch_size] def get_accuary(result,label): result,label = np.array(result.data), np.array(label.data) is_correct = (np.abs(result - label) < 0.5) is_correct = np.count_nonzero(is_correct) acc = is_correct return acc def load_mnist_4_4(dataset="training_data", digits=np.arange(10), path="."): import os, struct from array import array as pyarray download_mnist(path) if dataset == "training_data": fname_image = os.path.join(path, 'train-images.idx3-ubyte').replace('\\', '/') fname_label = os.path.join(path, 'train-labels.idx1-ubyte').replace('\\', '/') elif dataset == "testing_data": fname_image = os.path.join(path, 't10k-images.idx3-ubyte').replace('\\', '/') fname_label = os.path.join(path, 't10k-labels.idx1-ubyte').replace('\\', '/') else: raise ValueError("dataset must be 'training_data' or 'testing_data'") flbl = open(fname_label, 'rb') magic_nr, size = struct.unpack(">II", flbl.read(8)) lbl = pyarray("b", flbl.read()) flbl.close() fimg = open(fname_image, 'rb') magic_nr, size, rows, cols = struct.unpack(">IIII", fimg.read(16)) img = pyarray("B", fimg.read()) fimg.close() ind = [k for k in range(size) if lbl[k] in digits] N = len(ind) images = np.zeros((N, rows, cols)) images_new = []# = np.zeros((N, 4, 4)) labels = np.zeros((N, 1), dtype=int) import cv2 for i in range(len(ind)): tmp1 = np.array(img[ind[i] * rows * cols: (ind[i] + 1) * rows * cols]).reshape((rows, cols)) tmp1 = tmp1[4:24,4:24] tmp = cv2.resize(tmp1,(4,4)) if np.max(tmp) ==0: continue images_new.append(tmp) if lbl[ind[i]] ==digits[1]: labels[i] = 1 else: labels[i] = 0 return np.array(images_new), labels class QISKIT_VQC: def __init__(self, n_qubits, backend, shots): # --- Circuit definition --- qc = ClassicalRegister(1) self.qc = qc self.n_qubits = n_qubits all_qubits = [i for i in range(n_qubits)] self.all_qubits= all_qubits self.backend = backend self.shots = shots def run(self,**kwargs): x = kwargs['x'] weights = kwargs['w'] weights = weights.astype(np.float64) x = x.astype(np.float64) sum_feature = np.power(np.sum([t**2 for t in x]),0.5) normalize_feat = x/sum_feature self._circuit = qiskit.QuantumCircuit(QuantumRegister(4)) self.theta = weights.reshape([4,6]) self._circuit.initialize(normalize_feat, [0,1,2,3]) # 先搭建广义的旋转层 for i in range(self.n_qubits): self._circuit.rz(self.theta[i,0], i) self._circuit.ry(self.theta[i,1], i) self._circuit.rz(self.theta[i,2], i) for d in range(3, 6): # 搭建纠缠层 for i in range(self.n_qubits-1): self._circuit.cx(i, i + 1) self._circuit.cx(self.n_qubits-1, 0) # 对每一个量子比特搭建Ry for i in range(self.n_qubits): self._circuit.ry(self.theta[i,d], i) statevec = Statevector(self._circuit) Expectation = np.real(statevec.expectation_value(Pauli('ZIII'))) return Expectation #define qiskit circuits class circuit = QISKIT_VQC(4, simulator, 1000) class Model_qiskit(Module): def __init__(self): super(Model_qiskit, self).__init__() self.qvc = QiskitLayer(circuit,24) def forward(self, x): return self.qvc(x)*0.5 + 0.5 def Run_qiskit(): x_train, y_train = load_mnist_4_4("training_data",digits=[3,6]) y_train = y_train.reshape(-1, 1) x_test, y_test = load_mnist_4_4("testing_data",digits=[3,6]) x_train = x_train.astype(np.float32) x_test = x_test.astype(np.float32) y_train = y_train.astype(np.float32) y_test = y_test.astype(np.float32) x_train = x_train *np.pi / 255 x_test = x_test *np.pi / 255 x_train = x_train[:100] y_train = y_train[:100] x_test = x_test[:50] y_test = y_test[:50] model = Model_qiskit() optimizer = Adam(model.parameters(),lr =0.01) batch_size = 10 epoch = 2 loss = MeanSquaredError() print("start training..............") model.train() TL=[] TA=[] for i in range(epoch): count=0 sum_loss = 0 accuary = 0 t = 0 model.train() for data,label in dataloader(x_train,y_train,batch_size,True): optimizer.zero_grad() result = model(data) loss_b = loss(label,result) loss_b.backward() optimizer._step() sum_loss += loss_b.item() count+=batch_size accuary += get_accuary(result,label) t = t + 1 print(f"epoch:{i}, iter{t} #### loss:{sum_loss*batch_size/count} #####accuray:{accuary/count}") TL.append(sum_loss*batch_size/count) TA.append(accuary/count) print(f"qiskit epoch {epoch},最终准确率 {TA[-1]}") if __name__=="__main__": Run_qiskit() CirqLayer ================================= .. py:class:: pyvqnet.qnn.CirqLayer(cirq_vqc,para_num) 一个用于在 vqnet 中实现前向和反向传播的 cirq 电路封装层。CIRQ_VQC是需要用户定义 cirq 量子电路及其 `run` 函数的类。以下示例展示了它的工作原理。 此层仅支持电路的输入和权重作为参数。 :param cirq_vqc: 定义 cirq 电路的定义、后端和运行函数的类。 :param para_num: `int` - para_num 的数量。 :return: 一个可以运行 cirq 量子电路模型的类。 .. note:: 以下示例代码需要 ``cirq==1.5.0, numpy <2``. Example:: import numpy as np import random random.seed(42) np.random.seed(42) from pyvqnet.nn.module import Module import pyvqnet pyvqnet.utils.set_random_seed(42) from pyvqnet.optim import Adam from pyvqnet.nn.loss import MeanSquaredError from pyvqnet.qnn.utils import CirqLayer import cirq import sympy from pyvqnet.utils.utils import get_circuit_symbols def dataloader(data,label,batch_size, shuffle = True)->np: if shuffle: for _ in range(len(data)//batch_size): random_index = np.random.randint(0, len(data), (batch_size, 1)) yield data[random_index].reshape(batch_size,-1),label[random_index].reshape(batch_size,-1) else: for i in range(0,len(data)-batch_size+1,batch_size): yield data[i:i+batch_size].reshape(batch_size,-1), label[i:i+batch_size].reshape(batch_size,-1) def get_accuary(result,label): result,label = np.array(result.data), np.array(label.data) is_correct = (np.abs(result - label) < 0.5) is_correct = np.count_nonzero(is_correct) acc = is_correct return acc def load_mnist_4_4(dataset="training_data", digits=np.arange(10), path=".",encoding = "raw" ): import os, struct from array import array as pyarray if dataset == "training_data": fname_image = os.path.join(path, 'train-images.idx3-ubyte').replace('\\', '/') fname_label = os.path.join(path, 'train-labels.idx1-ubyte').replace('\\', '/') elif dataset == "testing_data": fname_image = os.path.join(path, 't10k-images.idx3-ubyte').replace('\\', '/') fname_label = os.path.join(path, 't10k-labels.idx1-ubyte').replace('\\', '/') else: raise ValueError("dataset must be 'training_data' or 'testing_data'") flbl = open(fname_label, 'rb') magic_nr, size = struct.unpack(">II", flbl.read(8)) lbl = pyarray("b", flbl.read()) flbl.close() fimg = open(fname_image, 'rb') magic_nr, size, rows, cols = struct.unpack(">IIII", fimg.read(16)) img = pyarray("B", fimg.read()) fimg.close() ind = [k for k in range(size) if lbl[k] in digits] N = len(ind) images_new = []# = np.zeros((N, 4, 4)) labels = np.zeros((N, 1), dtype=int) import cv2 for i in range(len(ind)): tmp1 = np.array(img[ind[i] * rows * cols: (ind[i] + 1) * rows * cols]).reshape((rows, cols)) tmp1 = tmp1[4:24,4:24] tmp = cv2.resize(tmp1,(4,4)) if np.max(tmp) ==0: continue if encoding == "normalized": sum_feature = np.power(np.sum([t**2 for t in tmp.flatten()]),0.5) normalize_feat = tmp/sum_feature images_new.append(normalize_feat) if lbl[ind[i]] ==digits[1]: labels[i] = 1 else: labels[i] = 0 return np.array(images_new), labels class CIRQ_VQC: def __init__(self,simulator = cirq.Simulator ()): self._circuit = cirq.Circuit() n_qubits =4 ###define qubits q0 = cirq.NamedQubit ('q0') q1 = cirq.NamedQubit ('q1') q2 = cirq.NamedQubit ('q2') q3 = cirq.NamedQubit ('q3') qubits = [q0,q1,q2,q3] self.qubits = [q0,q1,q2,q3] ###define varational parameters param = sympy.symbols(f'theta(0:24)') self.theta = np.asarray(param).reshape((4,6)) ###define circuits circuit = cirq.Circuit() for i ,q in enumerate(qubits): circuit.append(cirq.rz(self.theta[i][0])(q)) circuit.append(cirq.ry(self.theta[i][1])(q)) circuit.append(cirq.rz(self.theta[i][2])(q)) for d in range(3, 6): for i in range(n_qubits-1): circuit.append(cirq.CNOT(qubits[i], qubits[i + 1])) circuit.append(cirq.CNOT(qubits[n_qubits-1], qubits[0])) for i ,q in enumerate(qubits): circuit.append(cirq.ry(self.theta[i][d])(q)) self._circuit = circuit ###define backend self._backend = simulator self._param_symbols_list,self._input_symbols_list = get_circuit_symbols(self._circuit) def run(self,resolver,init_state): rlt = self._backend.simulate(self._circuit,resolver,initial_state=init_state).final_state_vector z0 = cirq.Z(self.qubits[0]) qubit_map={self.qubits[0]: 0} expectation = z0.expectation_from_state_vector(rlt, qubit_map).real return expectation #define cirq circuits class circuit = CIRQ_VQC() class Model_cirq(Module): def __init__(self): super(Model_cirq, self).__init__() self.qvc = CirqLayer(circuit,24) def forward(self, x): y = self.qvc(x)*0.5 + 0.5 return y.astype(x.dtype) def run_cirq(): x_train, y_train = load_mnist_4_4("training_data",digits=[3,6],encoding="normalized") y_train = y_train.reshape(-1, 1) x_test, y_test = load_mnist_4_4("testing_data",digits=[3,6],encoding="normalized") x_train = x_train.astype(np.float32) x_test = x_test.astype(np.float32) y_test = y_test.astype(np.float32) y_train = y_train.astype(np.float32) x_train = x_train[:100] y_train = y_train[:100] x_test = x_test[:50] y_test = y_test[:50] model = Model_cirq() optimizer = Adam(model.parameters(),lr =0.01) batch_size = 10 epoch = 5 loss = MeanSquaredError() print("start training..............") model.train() TL=[] TA=[] for i in range(epoch): count=0 sum_loss = 0 accuary = 0 t = 0 for data,label in dataloader(x_train,y_train,batch_size,False): optimizer.zero_grad() result = model(data) loss_b = loss(label,result) loss_b.backward() optimizer._step() sum_loss += loss_b.item() count+=batch_size accuary += get_accuary(result,label) t = t + 1 print(f"epoch:{i}, #### loss:{sum_loss*batch_size/count} #####accuray:{accuary/count}") TL.append(sum_loss*batch_size/count) TA.append(accuary/count) print(f"cirq epoch {epoch},最终准确率 {TA[-1]}") if __name__=="__main__": run_cirq() 量子逻辑门 *********************************** 处理量子比特的方式就是量子逻辑门。 使用量子逻辑门,我们有意识的使量子态发生演化。量子逻辑门是构成量子算法的基础。 基本量子逻辑门 ============================ 在VQNet中,我们使用本源量子自研的 `pyqpanda2 `_ 的各个逻辑门搭建量子线路,进行量子模拟。 当前pyqpanda2支持的逻辑门可参考pyqpanda2 `量子逻辑门 `_ 部分的定义。 基于pyqpanda2量子机器学习算法封装接口 **************************************** 以下使用pyqpanda2构建线路,并在pyvqnet中实现了一些经典量子机器学习算法。 QGAN制备任意分布初态 ======================= 基于2019年 Christa Zoufal 的论文 `Quantum Generative Adversarial Networks for learning and loading random distributions `_ , VQNet提供了一个QGAN制备任意分布初态的例子。该算法使用纯量子变分线路制备特定随机分布的生成量子态,可以减少原先生成特定量子态所需的逻辑门,降低量子线路复杂度。 QGAN使用经典的GAN模型结构,分为Generator生成器与Discriminator鉴别器两个子模型,Generator为量子线路产生特定分布,而Generator生成的分布generated data samples 以及真实的随机分布training data samples 输入Discriminator模型进行鉴别真伪。 .. image:: ./images/qgan-arch.PNG :width: 600 px :align: center | 构建VQNet的量子生成对抗网络接口 ``QGANAPI`` 类,我们可以对真实分布的数据 real_data 使用量子生成器进行初态制备。这里使用量子比特数为3,量子生成器内部含参线路模块重复次数为1。 使用的评价指标为KL散度。 .. code-block:: import pickle import os import pyqpanda as pq from pyvqnet.qnn.qgan.qgan_utils import QGANAPI import numpy as np ################################## num_of_qubits = 3 # paper config rep = 1 number_of_data = 10000 # Load data samples from different distributions mu = 1 sigma = 1 real_data = np.random.lognormal(mean=mu, sigma=sigma, size=number_of_data) # intial save_dir = None qgan_model = QGANAPI( real_data, # numpy generated data distribution, 1 - dim. num_of_qubits, batch_size=2000, num_epochs=2000, q_g_cir=None, bounds = [0.0,2**num_of_qubits -1], reps=rep, metric="kl", tol_rel_ent=0.01, if_save_param_dir=save_dir ) 接下来使用其训练接口 ``train`` 训练。 .. code-block:: # train qgan_model.train() # train qgan ``eval`` 画出其与真实分布之间的概率分布函数对比: .. code-block:: # show probability distribution function of generated distribution and real distribution qgan_model.eval(real_data) #draw pdf ``get_trained_quantum_parameters`` 获取训练参数并输出为一个numpy数组形式。如果 ``save_dir`` 不为空,则该类将保存参数到文件中。可以通过 ``load_param_and_eval`` 函数载入参数,并可以通过 ``get_circuits_with_trained_param`` 获取训练完参数的量子生成器pyqpanda2线路。 .. code-block:: # get trained quantum parameters param = qgan_model.get_trained_quantum_parameters() print(f" trained param {param}") #load saved parameters files if save_dir is not None: path = os.path.join( save_dir, qgan_model._start_time + "trained_qgan_param.pickle") with open(path, "rb") as file: t3 = pickle.load(file) param = t3["quantum_parameters"] print(f" trained param {param}") #show probability distribution function of generated distribution and real distribution qgan_model.load_param_and_eval(param) #calculate metric print(qgan_model.eval_metric(param, "kl")) #get generator quantum circuit machine = pq.CPUQVM() machine.init_qvm() qubits = machine.qAlloc_many(num_of_qubits) qpanda_cir = qgan_model.get_circuits_with_trained_param(qubits) print(qpanda_cir) 生成lognormal分布的损失函数以及概率分布函数图,一般来说需要使用不同的随机种子多次训练该模型可得到较好结果: .. image:: ./images/qgan-loss.png :width: 600 px :align: center | .. image:: ./images/qgan-pdf.png :width: 600 px :align: center | 量子核SVM算法 ========================= 在机器学习任务中,数据通常不能被原始空间中的超平面分隔。寻找此类超平面的一种常见技术是对数据应用非线性变换函数。 此函数称为特征映射,通过特征映射,我们可以在这个新的特征空间中计算数据点之间的距离有多近,从而进行机器学习的分类任务。 本例参照 `Supervised learning with quantum enhanced feature spaces `_ 论文的第一个方法构建变分线路进行数据分类任务。 ``gen_vqc_qsvm_data`` 为生成该例子所需的数据。 ``vqc_qsvm`` 为变分量子线路类,用来对输入数据进行分类。 ``vqc_qsvm.plot()`` 函数可视化了数据的分布情况。 .. image:: ./images/VQC-SVM.png :width: 600 px :align: center | .. code-block:: from pyvqnet.qnn.svm import vqc_qsvm, gen_vqc_qsvm_data import matplotlib.pyplot as plt import numpy as np batch_size = 40 maxiter = 40 training_size = 20 test_size = 10 gap = 0.3 #线路模块重复次数 rep = 3 #定义接口类 VQC_QSVM = vqc_qsvm(batch_size, maxiter, rep) #随机生成数据 train_features, test_features, train_labels, test_labels, samples = \ gen_vqc_qsvm_data(training_size=training_size, test_size=test_size, gap=gap) VQC_QSVM.plot(train_features, test_features, train_labels, test_labels, samples) #训练 VQC_QSVM.train(train_features, train_labels) #测试数据测试 rlt, acc_1 = VQC_QSVM.predict(test_features, test_labels) print(f"testing_accuracy {acc_1}") 除了上述直接用变分量子线路将经典数据特征映射到量子特征空间,在论文 `Supervised learning with quantum enhanced feature spaces `_ 中还介绍了使用量子线路直接估计核函数,并使用经典支持向量机进行分类的方法。类比经典SVM中的各种核函数 :math:`K(i,j)` , 使用量子核函数定义经典数据在量子特征空间 :math:`\phi(\mathbf{x}_i)` 的内积 : .. math:: |\langle \phi(\mathbf{x}_j) | \phi(\mathbf{x}_i) \rangle |^2 = |\langle 0 | U^\dagger(\mathbf{x}_j) U(\mathbf{x}_i) | 0 \rangle |^2 使用VQNet和pyqpanda2,我们定义一个 ``QuantumKernel_VQNet`` 产生量子核函数,并使用 ``sklearn`` 的 ``SVC`` 进行分类: .. image:: ./images/qsvm-kernel.png :width: 600 px :align: center | .. code-block:: import numpy as np import pyqpanda as pq from sklearn.svm import SVC from pyqpanda import * from pyqpanda.Visualization.circuit_draw import * from pyvqnet.qnn.svm import QuantumKernel_VQNet, gen_vqc_qsvm_data import matplotlib try: matplotlib.use('TkAgg') except: pass train_features, test_features,train_labels, test_labels, samples = gen_vqc_qsvm_data(20,5,0.3) quantum_kernel = QuantumKernel_VQNet(n_qbits=2) quantum_svc = SVC(kernel=quantum_kernel.evaluate) quantum_svc.fit(train_features, train_labels) score = quantum_svc.score(test_features, test_labels) print(f"quantum kernel classification test score: {score}") 同时扰动随机近似优化器 ============================ .. py:class:: pyvqnet.qnn.SPSA(maxiter: int = 1000, last_avg: int = 1, c0: float = _C0, c1: float = 0.2, c2: float = 0.602, c3: float = 0.101, c4: float = 0, init_para=None, model=None, calibrate_flag=False) 同时扰动随机近似 (SPSA) 优化器。 SPSA 提供了一种用于逼近多元可微成本函数梯度的随机方法。 为实现这一点,使用扰动参数向量对成本函数进行两次评估:原始参数向量的每个分量同时随随机生成的值移动。 `SPSA 网站 `__ 上提供了进一步的介绍。 :param maxiter: 要执行的最大迭代次数。默认值:1000。 :param last_avg: last_avg 迭代的平均参数。 如果 last_avg = 1,则只考虑最后一次迭代。默认值:1。 :param c0: 初始a。更新参数的步长。默认值:0.2*pi :param c1: 初始的c。用于近似梯度的步长。默认值:0.1。 :param c2: 论文中的alpha,用于在每次迭代时调整a(c0)。默认值:0.602。 :param c3: 论文中的gamma,每次迭代时用来调整c(c1)。默认值:0.101。 :param c4: 同样用来控制a的参数。默认值:0。 :param init_para: 初始化参数。默认值:无。 :param model: 参数模型:模型。默认值:无。 :param calibrate_flag: 是否校准超参数 a 和 c,默认值:False。 :return: 一个SPSA优化器实例 .. warning:: SPSA只支持一维的输入参数。 Example:: from pyvqnet.qnn import AngleEmbeddingCircuit, expval, QpandaQCircuitVQCLayerLite, SPSA from pyvqnet.qnn.template import BasicEntanglerTemplate import pyqpanda as pq from pyvqnet.nn.module import Module #定义一个量子变分线路模型 class Model_spsa(Module): def __init__(self): super(Model_spsa, self).__init__() self.qvc = QpandaQCircuitVQCLayerLite(layer_fn_spsa_pq, 3) def forward(self, x): y = self.qvc(x) return y #本例线路是最小化该VQC的期望值 def layer_fn_spsa_pq(input, weights): num_of_qubits = 1 machine = pq.CPUQVM() machine.init_qvm() qubits = machine.qAlloc_many(num_of_qubits) c1 = AngleEmbeddingCircuit(input, qubits) weights =weights.reshape([4,1]) bc_class = BasicEntanglerTemplate(weights, 1) c2 = bc_class.create_circuit(qubits) m_prog = pq.QProg() m_prog.insert(c1) m_prog.insert(c2) pauli_dict = {'Z0': 1} exp2 = expval(machine, m_prog, pauli_dict, qubits) return exp2 model = Model_spsa() #定义一个SPSA优化器 optimizer = SPSA(maxiter=20, init_para=model.parameters(), model=model, ) .. py:method:: pyvqnet.qnn.SPSA._step(input_data) 优化 sapa 优化器 :param input_data: 输入训练数据QTensor :return: train_para:最终参数。 theta_best:最后 `last_avg` 次优化后的平均参数。 Example:: import numpy as np import pyqpanda as pq import sys sys.path.insert(0, "../") import pyvqnet from pyvqnet.nn.module import Module from pyvqnet.qnn import SPSA from pyvqnet.tensor.tensor import QTensor from pyvqnet.qnn import AngleEmbeddingCircuit, expval, QpandaQCircuitVQCLayerLite, expval from pyvqnet.qnn.template import BasicEntanglerTemplate #定义一个量子变分线路模型 class Model_spsa(Module): def __init__(self): super(Model_spsa, self).__init__() self.qvc = QpandaQCircuitVQCLayerLite(layer_fn_spsa_pq, 3) def forward(self, x): y = self.qvc(x) return y #本例线路是最小化该VQC的期望值 def layer_fn_spsa_pq(input, weights): num_of_qubits = 1 machine = pq.CPUQVM() machine.init_qvm() qubits = machine.qAlloc_many(num_of_qubits) c1 = AngleEmbeddingCircuit(input, qubits) weights =weights.reshape([4,1]) bc_class = BasicEntanglerTemplate(weights, 1) c2 = bc_class.create_circuit(qubits) m_prog = pq.QProg() m_prog.insert(c1) m_prog.insert(c2) pauli_dict = {'Z0': 1} exp2 = expval(machine, m_prog, pauli_dict, qubits) return exp2 model = Model_spsa() #定义一个SPSA优化器 optimizer = SPSA(maxiter=20, init_para=model.parameters(), model=model, ) #初始化参数 data = QTensor(np.array([[0.27507603]])) p = model.parameters() p[0].data = pyvqnet._core.Tensor( np.array([3.97507603, 3.12950603, 1.00854038, 1.25907603])) #调用SPSA进行迭代优化 optimizer._step(input_data=data) #计算优化后的VQC期望值 y = model(data) print(y)