Source code for pyspd.model

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Overall Model for the linear program.

"""

import pulp
import time


[docs]class SPDModel(object): """SPDModel Container for setting up, solving, and organising the results of a Simulation. Contains three primary API methods. Creation of the Linear Program, solving the LP and parsing the results. Usage: ------ solver = SPDModel(SystemOperator) solver.create_lp() solver.solve_lp() solver.parse_result() """ def __init__(self, ISO): super(SPDModel, self).__init__() self.ISO = ISO
[docs] def create_lp(self): """ Publically exposed API Creates the Linear program including applying the objective function and adding all of the necessary constraints. This exists as a wrapper around a number of hidden functions. """ self._setup_lp() self._create_variables() self._obj_function() self._nodal_demand() self._energy_offers() self._reserve_offers() self._transmission_offer() self._reserve_proportion() self._reserve_combined() self._generator_risk() self._transmission_risk() self._reserve_dispatch()
[docs] def write_lp(self, fName=None): """ Write the Linear Program to a file """ self.lp.writeLP(fName)
[docs] def solve_lp(self): """ Solve the Linear Program including the time taken to solve it """ begin = time.time() self.lp.solve() self.solution_time = time.time() - begin
[docs] def parse_result(self): """ Publically exposed API Parse the Results of the solved Linear Program. Must be called after solving it. """ self._parse_risk() self._parse_energy_prices() self._parse_reserve_prices() self._parse_branch_flow() self._parse_reserve_dispatch() self._parse_energy_dispatch()
def _setup_lp(self): """ Setup a Linear Program from a defined ISO instance Contains several convenience mappings to shorten line lengths """ # Set up a Linear Program self.lp = pulp.LpProblem("SPD Dispatch", pulp.LpMinimize) self.addC = self.lp.addConstraint self.SUM = pulp.lpSum self.lpDict = pulp.LpVariable.dicts return self def _create_variables(self): """ Create all of the variables necessary to solve the Linear Program This maps the variables from the ISO to the requisite Linear Program Variables Returns: -------- energy_total_offers reserve_total_offers branch_flow nodal_injection reserve_zone_risk """ self.energy_offers = self.lpDict("Energy_Total", self.ISO.energy_station_names, 0) self.reserve_offers = self.lpDict("Reserve_Total", self.ISO.reserve_station_names, 0) self.branch_flow = self.lpDict("Transmission_Total", self.ISO.branch_names) self.nodal_injection = self.lpDict("Nodal_Injection", self.ISO.node_names) self.reserve_zone_risk = self.lpDict("Reserve_Risk", self.ISO.reserve_zone_names, 0) def _obj_function(self): """ Objective Function min \sum_i p_{g,i}g_{i} + \sum_j p_{r,j}r_{j} """ # Unpack the necessary variables eoffers = self.energy_offers eprices = self.ISO.energy_station_price roffers = self.reserve_offers rprices = self.ISO.reserve_station_price enames = self.ISO.energy_station_names rnames = self.ISO.reserve_station_names # Set the objective function self.lp.setObjective(self.SUM( [eoffers[i] * eprices[i] for i in enames]) + self.SUM([roffers[j] * rprices[j] for j in rnames])) def _nodal_demand(self): """ Nodal Demand constraints Injection_{n} = \sum_{j} g_{j(n)} - d_{n} Injection_{n} = \sum_{t} f_{t(n)} * d_{t(n)} """ # Unpack variables node_inj = self.nodal_injection nodal_demand = self.ISO.nodal_demand nodal_stations = self.ISO.nodal_stations node_names = self.ISO.node_names flow_map = self.ISO.node_flow_map flow_dir = self.ISO.node_flow_direction energy_offer = self.energy_offers branch_flow = self.branch_flow for node in node_names: n1 = '_'.join([node, 'Energy_Price']) n2 = '_'.join([node, 'Nodal_Transmission']) # Net Injections from Energy and Demand self.addC(node_inj[node] == self.SUM([energy_offer[i] for i in nodal_stations[node]] ) - nodal_demand[node], n1) # Net Injection from transmission self.addC(node_inj[node] == self.SUM([branch_flow[t] * flow_dir[t] for t in flow_map[node]]), n2) def _energy_offers(self): """Energy offer constraints g_{i} \le g_{max, i} """ # Unpack variables eoffers = self.energy_offers enames = self.ISO.energy_station_names ecapacity = self.ISO.energy_station_capacity for i in enames: name = '_'.join([i, 'Total_Energy']) self.addC(eoffers[i] <= ecapacity[i], name) def _reserve_offers(self): """ Reserve Offer constraints r_{j} \le r_{max, j} """ # Unpack variables roffers = self.reserve_offers rnames = self.ISO.reserve_station_names rcapacity = self.ISO.reserve_station_capacity for i in rnames: name = '_'.join([i, "Total_Reserve"]) self.addC(roffers[i] <= rcapacity[i], name) def _transmission_offer(self): """ Transmission Offer constraints f_{t} \le f_{max, t} f_{t} \ge -f_{max, t} """ bflows = self.branch_flow bnames = self.ISO.branch_names bcapacity = self.ISO.branch_capacity for i in bnames: n1 = '_'.join([i, 'Pos_flow']) n2 = '_'.join([i, 'Neg_flow']) self.addC(bflows[i] <= bcapacity[i], n1) self.addC(bflows[i] >= bcapacity[i] * -1, n2) def _reserve_proportion(self): """ Reserve Proportion Constraints r_{i} \le k_{i}g_{i} """ # Unpack Variables spin_stations = self.ISO.reserve_spinning_stations roffers = self.reserve_offers eoffers = self.energy_offers rprop = self.ISO.reserve_station_proportion for i in spin_stations: name = '_'.join([i, 'Reserve_Proportion']) self.addC(roffers[i] <= rprop[i] * eoffers[i], name) def _reserve_combined(self): """ Reserve total capacity constraints r_{i} + g_{i} \le g_{capacity, i} """ spin_stations = self.ISO.reserve_spinning_stations roffers = self.reserve_offers eoffers = self.energy_offers tot_capacity = self.ISO.total_station_capacity for i in spin_stations: name = '_'.join([i, 'Total_Capacity']) self.addC(roffers[i] + eoffers[i] <= tot_capacity[i], name) def _generator_risk(self): """ Risk for generators Risk_{r} \ge g_{i(r)} """ rzones = self.ISO.reserve_zone_names rzone_risk = self.reserve_zone_risk rzone_stations = self.ISO.reserve_zone_generators eoffers = self.energy_offers for i in rzones: for j in rzone_stations[i]: name = '_'.join([i, j, 'Generator_Risk']) self.addC(rzone_risk[i] >= eoffers[j], name) def _transmission_risk(self): """ Risk for a Transmission line Risk_{r} \ge f_{t(r)} * d_{t(r)} """ rzones = self.ISO.reserve_zone_names rzone_risk = self.reserve_zone_risk bflow = self.branch_flow bflow_dir = self.ISO.reserve_zone_flow_direction bflow_map = self.ISO.reserve_zone_flow_map for i in rzones: for j in bflow_map[i]: name = '_'.join([i, j, "Transmission_Risk"]) self.addC(rzone_risk[i] >= bflow[j] * bflow_dir[j], name) def _reserve_dispatch(self): """ Total Reserve Dispatch \sum_{j(r)} r_{j} \ge Risk_{r} """ rzones = self.ISO.reserve_zone_names rzone_risk = self.reserve_zone_risk rzone_stations = self.ISO.reserve_zone_reserve roffer = self.reserve_offers for i in rzones: name = '_'.join([i, 'Reserve_Price']) self.addC(self.SUM([roffer[j] for j in rzone_stations[i]] ) >= rzone_risk[i], name) def _parse_energy_prices(self): """ Parse The Energy Prices """ self.final_energy_prices = self._condict("Energy_Price") def _parse_reserve_prices(self): """ Parse the Reserve Prices """ self.final_reserve_prices = self._condict("Reserve_Price") def _parse_energy_dispatch(self): """ Parse the Energy Dispatch """ self.final_energy_dispatch = self._vardict("Energy_Total") def _parse_reserve_dispatch(self): """ Parse the Reserve Dispatch """ self.final_reserve_dispatch = self._vardict('Reserve_Total') def _parse_branch_flow(self): """ Parse the Branch Flows """ self.final_branch_flow = self._vardict('Transmission_Total') def _parse_risk(self): """ Parse the Risk parameters """ self.final_risk_requirements = self._vardict("Reserve_Risk") def _vardict(self, condition): """ Generic method for extracting values from variables """ return {n: n.varValue for n in self.lp.variables() if condition in n.name} def _condict(self, condition): """ Generic method for extracting values from constraints """ return {n: self.lp.constraints[n].pi for n in self.lp.constraints if condition in n}
if __name__ == '__main__': pass

This Page