Source code for reRoute_Dynamics.Trip_Simulator

'''
Trip_Simulator.py
S. Peck

Trip_Simulator.py contains the method and scripting used to simulate a vehicle trip by passing a vehicle and its driving
conditions through logic and then into a Longitudinal Dynamics Model in Physics_Engine.py.
This can also be run as a standalone script. 

Methods:
simulate_trip.py - method to run a vehicle with a given ESS on a given trip over a given route.


'''
import Instance_Tools as it
import pandas as pd
import Object_Params as op
import Physics_Engine as pe
import Geography_Tools as gt
from alive_progress import alive_bar

[docs] def simulate_trip(route, trip=op.Trip(), bus=op.Bus(), ESS=op.ESS()): '''simulate_trip() takes a route, and returns a modeled power consumption and velocity for a vehicle on that given trip. :param route: a route object as exported by Geography_Tools.py that the vehicle will traverse :param trip: a trip object from Object_Params that determines the external conditions of the trip Default: Object_Parameters.Trip() :param bus: a bus object from Object_Params that determines the vehicle design Default: Object_Parameters.Bus() :param ESS: an ess object from Object_Params that determines the Energy Storage System design Default: Object_Parameters.ESS() :return: a geodataframe that provides all the relevant driving conditions, positional velocity, and modeled power required. ''' # make copies so there isnt accidental overwriting of params. trip = trip.copy() bus = bus.copy() ESS = ESS.copy() # take the stops and convert such that -1 is 0. rider_series = pd.Series(route.stops) rider_series = rider_series + 1 # get total number of stops by filtering out the 0's n_stops = len(rider_series[rider_series != 0]) # set the positions with stops to ridership changes instead using generate_riders rider_series[rider_series != 0] = it.generate_riders(n_stops, mean_ridership = trip.m_riders, seed=trip.seed) # convert the ridership data to a list. rider_changes = list(rider_series) # take the signal data and convert invalid points to 0 signal_series = pd.Series(route.signals) signal_series[signal_series == -1] = 0 # multiply the signal values by a boolean of if the stoplight has been hit or not. signal_series = signal_series.apply(lambda x: x*it.check_hit_signal(stoplight_chance = trip.chance_sig, seed=trip.seed)) # convert the data to a list. signals_hit = list(signal_series) # get the stop type arrays for each point stop_types = it.determine_stop_type(rider_changes, signals_hit, pd.Series(route.signs)+1) # use the stop type arrays and running distance to get the distance from each point to its next stop. stop_distances = it.get_distances_to_stops(stop_types, route.cum_d_X) # Convert stop distances to meters stop_distances = list(pd.Series(stop_distances)*1000) # Get the grades grades = route.grades # get the grade forces grade_forces = list(pd.Series(grades).apply(lambda x: pe.calculate_grade_force(x,bus.mass, bus.Cf))) # Get the changes in travel distance, in meters dxs = list(pd.Series(route.d_X)*1000) # get the speed limits, in m/s, and affect it by the trip's traffic parameter limits = list(pd.Series(route.limits)*1000*((1-(trip.traffic/2)))*(1+bus.f_a)) # get the bearing angle from headings/ bearings = list(pd.Series(route.bearings).apply(lambda x: gt.heading_to_angle(x))) # get the geodesic distances in meters geodes_dists =list(pd.Series(route.dx)*1000) # get the elevations, in km elevations = route.elevation geometry = route.geometry wind_angle = gt.heading_to_angle(trip.wind_bearing) # create an empty dict to store the result structure initialize_result = {'type':'initialize', 'v_f':0, 'dt':0, 'P':0, 'BP':None, 'v':None, 'dx':0, 'grade':0, 'limit':0, 'stop_clf':[True, True, True, True], 'gdx':0, 'elevation':0, 'dx_to_next':0, 'b_dx':0, 'geometry':0, 'c_rate':0, 'SOH_loss':0, 'n_dec':0, 'dQ':0} # Initialize the running data with the initialize dict. running_data = [initialize_result] # Loop through each point. #with alive_bar(len(stop_distances)-1) as bar: for i in range(len(stop_distances)-1): #print(running_data[-1]) v = running_data[-1]['v_f'] # Query point data dx_to_next_stop = stop_distances[i] grade = grades[i] grade_force = grade_forces[i] dx = dxs[i] limit = limits[i] bearing = bearings[i] stop_classes = stop_types[i+1] rider_change = rider_changes[i] geodes_dist = geodes_dists[i] elevation = elevations[i] position = geometry[i] # Calculate the wind force wind_force = pe.calculate_wind_force(bearing, v, wind_angle, trip.v_wind, trip.p_air, bus.Cd, bus.area) # Calculate the braking distance braking_distance_data = pe.get_braking_distance(v, bus.mass, grade_force, wind_force, bus.a_br, bus.f_br, bus.f_i, bus.dmax) # Parse braking information braking_distance = braking_distance_data['dx'] adjusted_braking_factor = braking_distance_data['bf'] # Conditional booleans above_limit = (v > (limit + trip.MOE*limit)) below_limit = (v < (limit - trip.MOE*limit)) stopped = (v < trip.stop_margin) stop_upcoming = (braking_distance+limit*bus.dt_max >= dx_to_next_stop ) # check if the distance to the next stop is not 0 if dx_to_next_stop != 0: braking_distance_data = pe.get_braking_distance(v, bus.mass, grade_force, wind_force, bus.a_br, bus.f_br, bus.f_i, dx_to_next_stop) # Parse braking information braking_distance = braking_distance_data['dx'] adjusted_braking_factor = braking_distance_data['bf'] # Set up a dict for the true result. true_result = {'type':'NULL', 'v_f':None, 'dt':None, 'P':None, 'BP':None, 'v':None, 'dx':dx, 'grade':grade, 'limit':limit, 'stop_clf':stop_classes, 'gdx':geodes_dist, 'elevation':elevation, 'dx_to_next':dx_to_next_stop, 'b_dx':(braking_distance, adjusted_braking_factor), 'geometry':position, 'c_rate':None, 'SOH_loss':None, 'n_dec':None, 'dQ':None} # Check if there's a stop upcoming and if the bus is still moving if stop_upcoming and not stopped: #brake to stop. result = pe.brake(v, bus.mass, dx, grade_force, wind_force, bus.a_br, adjusted_braking_factor, bus.f_i, bus.dmax) # insert the results. true_result['type'] = 'stp_brk' true_result['v_f'] = result['v_f'] true_result['dt'] = result['dt'] true_result['P'] = result['P'] true_result['BP'] = ESS.calc_instance_power(true_result['P']) true_result['v'] = ESS.calc_voltage_simple(true_result['BP']) true_result['c_rate'] = (true_result['BP']/true_result['v']/ESS.module_S_P[0]/ESS.bus_S_P[0])/(ESS.Q_cell*ESS.module_S_P[1]*ESS.bus_S_P[1]) true_result['n_dec'] = ESS.decay_by_c_rate(true_result['c_rate']) true_result['dQ'] = (true_result['BP']*true_result['dt']/3600/true_result['v']/ESS.module_S_P[0]/ESS.bus_S_P[0]) true_result['SOH_loss'] = ESS.cell_SOH_loss_by_DB(true_result['dQ']/ESS.module_S_P[1]/ESS.bus_S_P[1], true_result['c_rate'])/ (ESS.Q_cell)*100 running_data.append(true_result.copy()) #<-- This has to use the copy, otherwise it will change prev. values # if the speed is within the stop margin, stop the bus. if (result['v_f'] < trip.stop_margin): result['v_f'] = 0 # Calculate the stop time based on the class. stop_time = stop_classes[0]*trip.t_stop + stop_classes[1]*trip.t_sig + stop_classes[2]*trip.t_sign + stop_classes[3]*trip.t_end # ridership, signal, sign, end # Adjust the mass of the bus bus.mass += rider_change*trip.m_pass # Add the results. true_result['type'] = 'rest' true_result['v_f'] = 0 true_result['dt'] = stop_time true_result['P'] = 0 true_result['BP'] = ESS.calc_instance_power(true_result['P']) true_result['v'] = ESS.calc_voltage_simple(true_result['BP']) true_result['c_rate'] = (true_result['BP']/true_result['v']/ESS.module_S_P[0]/ESS.bus_S_P[0])/(ESS.Q_cell*ESS.module_S_P[1]*ESS.bus_S_P[1]) true_result['n_dec'] = ESS.decay_by_c_rate(true_result['c_rate']) true_result['dQ'] = (true_result['BP']*true_result['dt']/3600/true_result['v']/ESS.module_S_P[0]/ESS.bus_S_P[0]) #true_result['SOH_loss'] = true_result['dQ']*true_result['n_dec']/ (ESS.Q_cell*ESS.module_S_P[1]*ESS.bus_S_P[1])*100 true_result['SOH_loss'] = ESS.cell_SOH_loss_by_DB(true_result['dQ']/ESS.module_S_P[1]/ESS.bus_S_P[1], true_result['c_rate'])/ (ESS.Q_cell)*100 true_result['dx'] = 0 tmp_storage = true_result['stop_clf'] running_data.append(true_result.copy()) # otherwise, if the bus is not coming up to a stop: else: if stopped: #accelerate # Get a list of the results for accelerate (done like this as accelerate *may* # give a number of dicts depending on how development goes results = [pe.accelerate(v, bus.mass, dx, grade_force, wind_force, bus.a_prof, bus.a_br, bus.f_br, bus.f_i, bus.P_max, bus.a_max, bus.dt_max)] # Append the results for result in results: true_result['type'] = 'ac_from_0' true_result['v_f'] = result['v_f'] true_result['dt'] = result['dt'] true_result['P'] = result['P'] true_result['BP'] = ESS.calc_instance_power(true_result['P']) true_result['v'] = ESS.calc_voltage_simple(true_result['BP']) true_result['c_rate'] = (true_result['BP']/true_result['v']/ESS.module_S_P[0]/ESS.bus_S_P[0])/(ESS.Q_cell*ESS.module_S_P[1]*ESS.bus_S_P[1]) true_result['n_dec'] = ESS.decay_by_c_rate(true_result['c_rate']) true_result['dQ'] = (true_result['BP']*true_result['dt']/3600/true_result['v']/ESS.module_S_P[0]/ESS.bus_S_P[0]) #print(true_result) true_result['SOH_loss'] = ESS.cell_SOH_loss_by_DB(true_result['dQ']/ESS.module_S_P[1]/ESS.bus_S_P[1], true_result['c_rate'])/ (ESS.Q_cell)*100 #print('ac_from_0:',result) running_data.append(true_result.copy()) #print("v, dx, grade_force, wind_force = {}, {}, {}, {}".format(v, dx, grade_force, wind_force)) elif below_limit and not stopped: #accelerate results = [pe.accelerate(v, bus.mass, dx, grade_force, wind_force, bus.a_prof, bus.a_br, bus.f_br, bus.f_i, bus.P_max, bus.a_max, bus.dt_max)] for result in results: true_result['type'] = 'ac_below' true_result['v_f'] = result['v_f'] true_result['dt'] = result['dt'] true_result['P'] = result['P'] true_result['BP'] = ESS.calc_instance_power(true_result['P']) true_result['v'] = ESS.calc_voltage_simple(true_result['BP']) true_result['c_rate'] = (true_result['BP']/true_result['v']/ESS.module_S_P[0]/ESS.bus_S_P[0])/(ESS.Q_cell*ESS.module_S_P[1]*ESS.bus_S_P[1]) true_result['n_dec'] = ESS.decay_by_c_rate(true_result['c_rate']) true_result['dQ'] = (true_result['BP']*true_result['dt']/3600/true_result['v']/ESS.module_S_P[0]/ESS.bus_S_P[0]) true_result['SOH_loss'] = ESS.cell_SOH_loss_by_DB(true_result['dQ']/ESS.module_S_P[1]/ESS.bus_S_P[1], true_result['c_rate'])/ (ESS.Q_cell)*100 #print('ac_below:',result) running_data.append(true_result.copy()) elif above_limit and not stopped: #brake result = pe.brake(v, bus.mass, dx, grade_force, wind_force, bus.a_br, adjusted_braking_factor, bus.f_i, bus.dmax) true_result['type'] = 'br_above' true_result['v_f'] = result['v_f'] true_result['dt'] = result['dt'] true_result['P'] = result['P'] true_result['BP'] = ESS.calc_instance_power(true_result['P']) true_result['v'] = ESS.calc_voltage_simple(true_result['BP']) true_result['c_rate'] = (true_result['BP']/true_result['v']/ESS.module_S_P[0]/ESS.bus_S_P[0])/(ESS.Q_cell*ESS.module_S_P[1]*ESS.bus_S_P[1]) true_result['n_dec'] = ESS.decay_by_c_rate(true_result['c_rate']) true_result['dQ'] = (true_result['BP']*true_result['dt']/3600/true_result['v']/ESS.module_S_P[0]/ESS.bus_S_P[0]) true_result['SOH_loss'] = ESS.cell_SOH_loss_by_DB(true_result['dQ']/ESS.module_S_P[1]/ESS.bus_S_P[1], true_result['c_rate'])/ (ESS.Q_cell)*100 #print('br_above:',result) running_data.append(true_result.copy()) else: #maintain result = pe.maintain(v, bus.mass, dx, grade_force, wind_force, bus.a_br, bus.f_br, bus.f_i, bus.P_max) true_result['type'] = 'main' true_result['v_f'] = result['v_f'] true_result['dt'] = result['dt'] true_result['P'] = result['P'] true_result['BP'] = ESS.calc_instance_power(true_result['P']) true_result['v'] = ESS.calc_voltage_simple(true_result['BP']) true_result['c_rate'] = (true_result['BP']/true_result['v']/ESS.module_S_P[0]/ESS.bus_S_P[0])/(ESS.Q_cell*ESS.module_S_P[1]*ESS.bus_S_P[1]) true_result['n_dec'] = ESS.decay_by_c_rate(true_result['c_rate']) true_result['dQ'] = (true_result['BP']*true_result['dt']/3600/true_result['v']/ESS.module_S_P[0]/ESS.bus_S_P[0]) true_result['SOH_loss'] = ESS.cell_SOH_loss_by_DB(true_result['dQ']/ESS.module_S_P[1]/ESS.bus_S_P[1], true_result['c_rate'])/ (ESS.Q_cell)*100 #print('main:',result) running_data.append(true_result.copy()) #print(running_data[-1]) #bar() # Not converted to ESS yet return running_data.copy()
if __name__ == '__main__': # load the arguments parser = argparse.ArgumentParser(description='Simulate a bus trip.') parser.add_argument('-r',"--route", help="Path to a Route export json from Geography_Tools.") parser.add_argument('-t',"--trip", help="Path to a trip parameter export txt from Object_Params.") parser.add_argument('-b',"--bus", help="Path to a bus parameter export txt from Object_Params.") parser.add_argument('-e',"--ESS", help="Path to a ESS parameter export txt from Object_Params.") parser.add_argument('-o', "-output", help="Output filepath.") parser.add_argument('-n', "-name", help='Output filename.') # parse the arguments args = parser.parse_args() # Check for the optional params if type(args.trip) == type(None): trip_obj = OP.Trip() else: trip_obj = op.load_trip_params(args.trip) if type(args.bus) == type(None): bus_obj = OP.Bus() else: bus_obj = op.load_bus_params(args.bus) if type(args.ESS) == type(None): ESS_obj = OP.ESS() else: ESS_obj = op.load_ESS_params(args.ESS) # get the path params output_path = args.output output_name = args.name # Check the required params if type(args.route) == type(None): route_obj = gt.load_from_json(input("[ Trip_Simulator.py ] Route filepath: ")) if type(output_path) == type(None): output_path = input("[ Trip_Simulator.py ] Output path: ") if type(output_name) == type(None): output_name = input("[ Trip_Simulator.py ] Output name: ") # run the simulation results = simulate_trip(route_obj, trip_obj, bus_obj, ESS_obj) # Save the data as a feather. pd.DataFrame(results).to_feather('{}/{}.feather'.format(output_path, output_name))