"""
Geography_Tools.py
S. Peck
Geography_Tools.py is a module that contains the methods primarily used in the interpretation
of geospatial data for the ultimate creation of Route .json files from position and elevation data.
Methods:
geodesic_formula() - method to calculate the geodesic distance between two (lat, lon) points.
compass_heading() - method to determine cardinal compass heading from an angle off of North.
heading_to_angle() - method to determine the compass degree angle from a cardinal direction.
point_bearing() - method to determine the travel bearing between two points.
get_bounding_box() - method to create the smallest possible box that encompasses all the points in a shape.
interpolate_points() - method to interpolate additional points between two initial geospatial points.
repeat_id_remover() - method to swap any repeated value in an iterable with -1.
verbose_line_updater() - method to provide line updates for verbosity
query_elevation_changes() - method to determine the elevation changes in an iterable of elevation.
query_stops() - method to assign stops to their closest corresponding point in a geometry series.
query_signals() - method to assign signals to their closest corresponding point in a geometry series.
query_distance_traveled() - method to calculate the distance traveled from point to point in a geometry series.
query_speed_limits() - method to assign speed limits to each point in a geometry series.
query_bearings() - method to query the bearing to the next point for each point in a geometric series.
get_rasterfiles() - method to return all standard .tif files in a directory.
reproject_rasterfiles() - method to reproject and save raster files to a particular geographic projection.
query_elevation_series() - method to query the elevation from each point in a geometry series using rasterfiles.
smooth_elevation() - method to use a savisky-golay filter to smooth out elevation changes (or otherwise)
calculate_grades() - method to calculate the slope grade at each point in a geometry series with elevations
interpolate_geometry_series() - method to take a whole series of points and interolate extra points if there are significant gaps.
load_from_json() - method to load a Route object from a saved .json file.
Classes:
Route - a class used to store information on a route, like geometry, elevation, signals, limits, and stops.
save_to_json() - class method used to encode and store a Route class as a .json file
query_point() - class method used to query the information of a point at an index in the route
to_gdf() - class method used to convert a Route class's stored information to a GeoDataFrame.
"""
import pandas as pd
import geopandas as gpd
import shapely
import numpy as np
import datetime as dt
from alive_progress import alive_bar
import inspect
from rasterio.warp import calculate_default_transform, reproject, Resampling
import rasterio
import os
import scipy
import pyogrio as pio
import json
import random
import geopy.distance as gdist
def _haversine_formula(x1, y1, x2, y2):
"""
haversine_formula takex the latitude and longitude of two separate points,
and calculates the distance in kilometers between those two points as a crow flies.
Params:
x1 - latitude pt 1 in degrees
y1 - longitude pt 1 in degrees
x2 - latitude pt 2 in degrees
y2 - longitude pt 2 in degrees
Returns:
distance between the points in kilometers.
Notes:
11/20/2024 - Efficiency can be improved by having radian conversion be done externally.
12/21/2025 - Deprecated. Succeeded by geodesic_formula.
"""
# convert the degree coordinates to radians
lat1=np.radians(x1)
lon1=np.radians(y1)
lat2=np.radians(x2)
lon2=np.radians(y2)
# Get the radius of earth in kilometers - using the average of the radii
R = 6371.009 #@Great_Circle_Distance
# calculate the difference in radians
dlon=lon2-lon1
dlat=lat2-lat1
# calculate the distance using the haversine formula.
a = np.sin(dlat / 2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2)**2
c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
distance = R * c
print(distance, geodesic_formula(x1, y1, x2, y2))
# return the distance in km.
if distance < .0001:
distance=.0001
return distance
[docs]
def compass_heading(bearing):
"""compass_heading converts a bearing in degrees to a
compass value like North, South, or East.
:param bearing: The bearing value, in degrees, as float.
:return: String representation of compass heading.
Notes:
11/18/2024 - There is a more mathematically elegant solution to this, but I'm not bothering with it right now.
"""
# set up a list of possible compass directions
possible_dirs = ["N", "NE", "E", "SE", "S", "SW", "W", "NW", "N"]
# get the index of the 45 quadrants the angle bearing is in
bearing_index = int(np.round(bearing/45))
# make sure the index isn't negative
if bearing_index < 0:
bearing_index=bearing_index+8
elif bearing_index > 8:
while bearing_index > 8:
bearing_index -= 8
# return the corresponding compass direction
return possible_dirs[bearing_index]
[docs]
def heading_to_angle(heading):
"""heading_to_angle() takes a compass heading of 8 directions,
and converts it to an angle in degrees.
:param heading: A string representing heading.
:return: A compass bearing angle as float.
"""
options = ["N", "NE", "E", "SE", "S", "SW", "W", "NW"]
idx = options.index(heading)
return (idx/len(options)*360)
[docs]
def point_bearing(x1, y1, x2, y2, bearing_type = "Angle"):
"""point_bearing takex in the latitude and longitude of two separate ponts,
and calculates the bearing when travelling from point 1 and point 2.
:param x1: latitude of point 1 in degrees
:param y1: longitude of point 1 in degrees
:param x2: latitude of point 2 in degrees
:param y2: longitude of point 2 in degrees
:param bearing_type: Determine the bearing output.
'Angle' - return the angle in degrees.
'Compass' - return the angle as a directional bearing (eg: N, E, S, W)
:return: Bearing of the vector between the two points.
"""
# convert to radians
lat1=np.radians(x1)
lon1=np.radians(y1)
lat2=np.radians(x2)
lon2=np.radians(y2)
# run the bearing calculation
x = np.cos(lat2) * np.sin(lon2-lon1)
y = np.cos(lat1) * np.sin(lat2) - np.sin(lat1) * np.cos(lat2) * np.cos(lon2-lon1)
bearing = np.arctan2(x, y)
bearing = np.degrees(bearing)
while bearing<0:
bearing+=360
while bearing>360:
bearing-=360
# check the return type
if (bearing_type == "Angle"):
return bearing
elif (bearing_type == "Compass"):
return compass_heading(bearing)
[docs]
def get_bounding_box(shape, how='basic'):
"""get_bounding_box() takes in a shapely shape,
and then generates a polygon that is a rectangular bounding box.
:param shape: Any shapely shape
:return: shapely.polygon that fits around the provided shape
"""
if how=='basic':
# get the bound coordinates
bbox_cords = shape.bounds
# create a rectangle of those coordinates
boundary_coords = ((bbox_cords[0], bbox_cords[1]),
(bbox_cords[0], bbox_cords[3]),
(bbox_cords[2], bbox_cords[3]),
(bbox_cords[2], bbox_cords[1]))
# return a shapely polygon of those coordinates
return shapely.Polygon(boundary_coords)
elif how=='swapax':
# get the bound coordinates
bbox_cords = shape.bounds
# create a rectangle of those coordinates
boundary_coords = ((bbox_cords[1], bbox_cords[0]),
(bbox_cords[3], bbox_cords[0]),
(bbox_cords[3], bbox_cords[2]),
(bbox_cords[1], bbox_cords[2]))
# return a shapely polygon of those coordinates
return shapely.Polygon(boundary_coords)
[docs]
def interpolate_points(point_1, point_2, max_dist=1):
"""interpolate_points takes two shapely points and a maximum distance,
and then interpolates between the points if the maximum distance is exceeded.
:param point_1: starting point as a shapely point of lat, lon
:param point_2: final point as a shapely point of lat, lon
:param max_dist: maximum distance the points can be without interpolation.
Default int of 1.
:return: An iterable of (lon,lat) shapely points.
"""
# get distance in meters
distance = geodesic_formula(point_1.x, point_1.y, point_2.x, point_2.y)*1000
if distance < max_dist:
return [point_1]
else:
# calculate the number of points between to be interpolated
num_interp = int(np.ceil(distance/max_dist))
# calculate the distance between each interpolated point
dx = distance/num_interp
# generate a linestring
line = shapely.LineString([point_1, point_2])
# create the points list with the initial point
points = [point_1]
#print(dx,distance,num_interp)
# loop through each distance from the origin
for dis in np.arange(dx/distance, 1, dx/distance):
# interpolate a point
i_point = line.interpolate(dis, normalized=True).coords[:][0]
# convert it to a shapely point
i_point = shapely.Point(i_point[0], i_point[1])
# append the point
points.append(i_point)
# return the points
return points
[docs]
def repeat_id_remover(sequence):
"""repeat_id_remover takes an iterable of IDs where -1 is invalid,
and swaps out any repeat index with an invalid.
:param sequence: A sequence of id values in an iterable.
:return: Sequence with repeated values swapped for a -1.
"""
# start the sequence with invalid
sequence_value = -1
# build a list for the new sequence
new_sequence = sequence
# creaate a history
his = set([])
# loop through each element in the sequence
for idx, ele in enumerate(new_sequence):
# if the element is in the history,
if ele in his:
# swap the index with the sequence value
new_sequence[idx] = sequence_value
# then, add the element to the history.
his.add(ele)
# return the new sequence
return new_sequence
### ---- SERIES QUERY METHODS ---- ###
def _flip(x, y):
"""Flips the x and y coordinate values"""
return y, x
[docs]
def verbose_line_updater(message, reset=False):
"""verbose_line_updater generates a verbose message with timestamps and
method calls based on a passed message. Can reset printline if specified.
:param message: message, as str, to be printed with the line.
:param reset: boolean to reset carrige or not.
Default false.
:return: The string to be printed.
"""
# generate the string.
data_string = "[{} -- {}.{}()] {} ".format(dt.datetime.now().time(),
"Geography_Tools",
str(inspect.stack()[1][3]),
message)
# check if the carrage should be reset.
if reset:
print(data_string, end='\r')
else:
print(data_string)
# return the carrage.
return data_string
[docs]
def query_elevation_changes(elev):
"""query_elevation_change takes an iterable of elevation,
and returns the elevation change between the point at the
current index, and the next poinnt.
:param elev: iterable of elevation data
:return: list of elevation changes to reach the next point.
"""
# convert to series
elev_ser = pd.Series(elev)
# shift the series by one
shifted = elev_ser.shift(-1)
# get the original final point
final_point = elev[-1]
# set the final point to replace the nan
shifted.iloc[-1] = final_point
# calculate the shift between the elevations and return.
return list(elev_ser-shifted)
[docs]
def query_stops(geometry, stop_table_path, key='stop_id', epsg_from = 4326, epsg_to=4326, margin=10, verbose=False):
"""query_stops takes in a geometry point series for a given route, and a path to
all stop ids and geometry, and then returns a list of stop id's in the order the
bus will arrive.
:param geometry: a series of shapely points representing longitude and latitude
:param stop_table_path: a path to the dataset containing stop geodata csv, as str
:param key: column identifier for the stop id (or whatever value you want) as str
:param epsg_from: the epsg projection the csv is in as an int.
Default is 4326.
:param epsg_to: the epsg projection the data should be reporojected to as an int
Default is 4326.
:param margin: distance, in meters, that the stop is allowed to be from a given point to qualify, as int.
:param verbose: boolean to enable verbosity.
Default false.
:return: list of stop ids (or other queried values from the data), with invalid points being labeled -1.
"""
# Check the shape boundaries for the geometric series
shape_bounds = shapely.LineString(list(geometry)).buffer(3e-4)
# read the data.
if verbose: verbose_line_updater("Loading CSV: {}".format(stop_table_path))
stop_table = pd.read_csv(stop_table_path)
if verbose: verbose_line_updater("Loaded CSV.")
# convert the data to a shapely geometry pandas series
if verbose: verbose_line_updater("Loading stop geometry...")
stop_table['geometry'] = stop_table.apply(lambda x: shapely.Point(x.stop_lat, x.stop_lon), axis=1)
if verbose: verbose_line_updater("Loaded stop geometry.")
#print(stop_table['geometry'])
# convert the data to a geodataframe, and re-set the crs to the specified
if epsg_from != epsg_to:
if verbose: verbose_line_updater("Setting geodata: epsg{}-->epsg:{}".format(epsg_from, epsg_to))
stop_table = gpd.GeoDataFrame(stop_table).set_crs(epsg = epsg_from).to_crs(epsg = epsg_to)
else:
stop_table = gpd.GeoDataFrame(stop_table).set_crs(epsg = epsg_from)
if verbose: verbose_line_updater("Geodata Set.")
# filter the stop table to only be the stops within the boundary
if verbose: verbose_line_updater("Filtering stops...")
stop_table = stop_table[stop_table['geometry'].apply(lambda x: shape_bounds.buffer(1e-10).contains(x)) == True]
if verbose: verbose_line_updater("Stops filtered.")
# check for stops within a set distance of each geometry point
stop_id_list = []
if verbose: verbose_line_updater("Processing stops...")
# loop through the geimetry
#with alive_bar(len(geometry)) as bar:
for point in list(geometry):
# re-set the pont name because i dont want to re-change them all
route_pt = point
# get the distances using the geodesic formula, converted to meters
stop_table['dist'] = stop_table.geometry.apply(lambda x: geodesic_formula(x.x, x.y, route_pt.x, route_pt.y)*1000)
# filter the data to yeild the closest stop
closest_stop = stop_table[stop_table['dist'] == stop_table['dist'].min()].reset_index().iloc[0]
# if the closest stop is within the margin, append it to the list. Otherwise, append -1.
stop_id_list.append(int(closest_stop['dist']<margin)*closest_stop[key] - int(closest_stop['dist']>margin))
#bar()
if verbose: verbose_line_updater("Stops processed. Returning values.")
# return the stop id list with repeats removed, and the last index guaranteed to be a stop.
return repeat_id_remover(stop_id_list)
[docs]
def query_distance_traveled(geometry_series, verbose=False):
"""query_distance_traveled takes in a series of points,
and then calculates the distance traveled from point to point, in kilometers.
:param geometry_series: a series of shapely points of lat and lon.
:param verbose: boolean to enable verbosity.
Default False.
:return: an iterable representing the change in distance between each point and the next.
"""
# get the current geometry series
if verbose: verbose_line_updater("Loading geometry.")
currents = geometry_series
# shift the geometry series up by one, which will be the next ponts.
if verbose: verbose_line_updater("Shifting geometry.")
nexts = geometry_series.shift(-1)
# combine the two into a dataframe.
if verbose: verbose_line_updater("Combining geometry.")
data = pd.concat([currents, nexts], axis=1, keys=['current', 'next']).reset_index(drop=True)
# get the final point and make sure it's the last value.
final_point = list(data['current'])[-1]
# set the nan value in the beginning of the 'nexts' to the initial, representing no distance traveled.
data.iloc[-1, data.columns.get_loc("next")] = final_point
# apply the geodesic formula to the data, which will yeild a series of changes in distance.
if verbose: verbose_line_updater("Calculating distances.")
dXs = data.apply(lambda x: geodesic_formula(x.current.x, x.current.y, x.next.x, x.next.y), axis=1)
if verbose: verbose_line_updater("Point distances processed. Returning values.")
# return the series of the changes in distance, in km, make sure it never doesn't travel.
return dXs.values
[docs]
def query_signals(geometry, signal_data_path, key="SIGNAL_ID", epsg = 4326, margin = 10, verbose=False):
"""query_signals takes in a geometric series of shapely points,
a path to stoplight signal data, and returns a list of the signal id at
each point in that series.
:param geometry: a series of shapely points of lat and lon.
:param signal_data_path: path, as str, to a shapefile containing stop signal data.
:param key: the column identifier of signal id (or other target value) to be queried
:param epsg: the epsg the data will be re-set as.
Default is 4326.
:param margin: the distance, in meters, that the signal is allowed to be from a given point to qualify.
:param verbose: boolean to enable verbosity.
Default False.
:return: list of signal IDs, or other info about the signal, as specified.
-1 if none found.
"""
# get the bounding box of the geometry
shape_bounds = shapely.LineString(list(geometry)).buffer(3e-3)
# Get the signal data.
if verbose: verbose_line_updater("Loading shapefile & setting EPSG: {}".format(signal_data_path))
signals = pio.read_dataframe(signal_data_path).to_crs(epsg = epsg)
signals.geometry = signals.geometry.apply(lambda x: shapely.ops.transform(_flip, x))
if verbose: verbose_line_updater("Shapefile laded with EPSG:{}.".format(epsg))
# get the signals that are within the bounds of the data.
if verbose: verbose_line_updater("Filtering sinals...")
signals = signals[signals.apply(lambda x: shape_bounds.contains(x.geometry), axis=1).reset_index(drop=True) == True]
if verbose: verbose_line_updater("Signals filtered.")
# Generate empty list for signal ids
signal_id_list = []
# loop through the geometry
if verbose: verbose_line_updater("Processing signals...")
#with alive_bar(len(geometry)) as bar:
for point in list(geometry):
try:
# use the geodesic formula to get the distances between the current point and each point in the series, in meters
signals['dist'] = signals.geometry.apply(lambda x: geodesic_formula(x.x, x.y, point.x, point.y)*1000)
# get the closest signal through the minimum distance
closest_signal = signals[signals['dist'] == signals['dist'].min()].reset_index().iloc[0]
# check if the signal is within the margin, and if so, append it to the list. If not, append -1.
signal_id_list.append(int((int(closest_signal['dist'])<margin)*closest_signal[key] - int(closest_signal['dist']>margin)))
except:
signal_id_list.append(int(-1))
#verbose_line_updater("No signal data in bounds.")
#bar()
if verbose: verbose_line_updater("Signals processed. Returning values.")
# return the list.
return repeat_id_remover(signal_id_list)
[docs]
def query_speed_limits(geometry, limit_data_path, key="SPEED_LIM", epsg = 4326, margin = 3e-4, last_known_limit= 20*0.00044704, verbose=False):
"""query_speed_limits takes a geometric iterable of shapely points,
a path to speed limit geodata, and returns the corresponding speed limits at
each point.
:param geometry: a series of shapely points of lat and lon
:param limit_data_path: path to shapefile of speed limit data as str
:param key: column identifier of a speed limit or other value as str, assumed to be an int in units of mph
:param margin: distance a point can be from the street to qualify.
Default 5e-5.
:param last_known_limit: the speed limit int value that the bus will assumedly start at or above.
Default 20 mph.
:param verbose: boolean to enable verbosity.
Default False.
:return: list of speed limits corresponding to the geometry.
Notes:
11/13/2024 - This can be adjusted such that instead of the first value, it's the true closest. Also can probably be combined with the other queries in such a way that it reduces individual methods. FIXED -- 11/19/2024
11/18/2024 - Currently Broken. FIXED -- 11/19/2024
"""
# DEPENDING ON THE FILE, THIS CAN TAKE A WHILE.
geometry = gpd.GeoSeries(geometry).apply(lambda x: shapely.ops.transform(_flip, x))
# get the bounding box of the route.
#shape_bounds = get_bounding_box(shapely.LineString(list(geometry)))
shape_bounds = shapely.LineString(list(geometry)).buffer(3e-4)
# get the speed limit data from the shapefile
if verbose: verbose_line_updater("Loading shapefile & setting EPSG: {}".format(limit_data_path))
limits = pio.read_dataframe(limit_data_path).to_crs(epsg=epsg)
if verbose: verbose_line_updater("Shapefile loaded with EPSG:{}.".format(epsg))
# get the streets that are within the bounding box
if verbose: verbose_line_updater("Filtering streets...")
bound_streets = limits[limits.apply(lambda x: shape_bounds.intersects(x.geometry), axis=1).reset_index(drop=True) == True][['geometry', key]]
if verbose: verbose_line_updater("Streets filtered.")
# set up list.
limit_list = []
bound_streets['dist'] = 0
# loop through the geometry:
if verbose: verbose_line_updater("Processing speed limits...")
#with alive_bar(len(geometry)) as bar:
for i in range(len(list(geometry))):
point = list(geometry)[i]
# get the speed limit at the point through checking that the distance is within the margin. Use first value that appears.
#bound_streets['range'] = bound_streets.geometry.apply(lambda x: x.buffer(margin))
bound_streets['contains_point'] = bound_streets['geometry'].apply(lambda x: x.buffer(margin).contains(point))
matches_street = bound_streets[bound_streets['contains_point'] == True]
limit = 0
if len(matches_street) > 0:
limit=matches_street[key].max()
# append the limit to the list.
limit_list.append(limit*0.00044704)
#bar()
# while there is a speed limit of zero, swap them out for the next nearest speed limit that isn't zero
while (0 in limit_list):
remaining = limit_list.count(0)
for i in range(len(limit_list)):
if verbose: verbose_line_updater("Filling gaps: {}".format(remaining), reset=True)
if limit_list[i] <= 0:
limit_list[i] = last_known_limit
else:
last_known_limit = limit_list[i]
if verbose: verbose_line_updater("Speed limits processed. Returning values.")
# return the limit list, converted to km/s
return limit_list
[docs]
def query_bearings(geometry, bearing_type = "Angle", verbose=False):
"""query_bearings takes a geometric iterable of shapely points,
and returns the corresponding bearing when travelling form one point to the next.
:param geometry: a series of shapely points of lat and lon
:param bearing_type: determine the bearing output.
Angle - return the angle in degrees.
Compass - return the angle as a directional bearing (eg: N, E, S, W)
:param verbose: boolean to enable verbosity.
Default False.
:return: list of bearings when travelling to subsequent point.
"""
# get the current geometry series
if verbose: verbose_line_updater("Loading geometry.")
currents = geometry
# shift the geometry series up by one, which will be the next ponts.
if verbose: verbose_line_updater("Shifting geometry.")
nexts = geometry.shift(-1)
# combine the two into a dataframe.
if verbose: verbose_line_updater("Combining geometry.")
data = pd.concat([currents, nexts], axis=1, keys=['current', 'next']).reset_index(drop=True)
# get the initial point and make sure it's the first value.
final_point = list(data['current'])[-1]
# set the nan value in the beginning of the 'nexts' to the initial, representing no distance traveled.
data.iloc[-1, data.columns.get_loc("next")] = final_point
# apply the geodesic formula to the data, which will yeild a series of changes in distance.
if verbose: verbose_line_updater("Calculating bearings.")
bearings = data.apply(lambda x: point_bearing(x.current.y, x.current.x, x.next.y, x.next.x, bearing_type), axis=1)
if verbose: verbose_line_updater("Bearings processed. Returning values.")
return bearings
### ---- ELEVATION TOOLS ---- ###
[docs]
def get_rasterfiles(dir_path):
"""get_rasterfiles takes a path to a directory and returns a series of paths to any .tif files
contained therin.
:param dir_path: path to directory, as str.
:return: pandas series of path strings to each .tif file.
"""
# create a series using the list of items in the specified directory
rasterfiles_raw = pd.Series(os.listdir(dir_path))
# filter the list to only be those that contain tif.
rasterfiles_raw = rasterfiles_raw[rasterfiles_raw.apply(lambda x: ('tif' in (x.split("."))[-1]))].reset_index(drop='true')
rasterfiles = rasterfiles_raw[rasterfiles_raw.apply(lambda x: ('.tif_' not in x))].reset_index(drop='true')
# convert the paths to have the full path rather than just the filenames.
rasterfiles = rasterfiles.apply(lambda x: "{}{}".format(dir_path, x))
# return the series.
return rasterfiles
[docs]
def reproject_rasterfiles(filepath_sequence, target_crs='EPSG:4326', verbose=False):
"""reproject_rasterfiles takes an iterable of geotiff files, and
then reprojects them as a target crs and saves them in the same
directory as the original.
:param filepath_sequence: an iterable of filepaths to geotiff files.
:param target_crs: the projection system the rasterfiles are to be re-projected to.
:param verbose: specify if the output is verbose.
Default False.
:return: iterable sequence of the newly reprojected files.
Notes:
11/14/2024 - This should be made to have the ability to override if requested. For now, leaving it as-is is fine.
"""
# adjust the reprojection so it can be saved properly:
crs_split = target_crs.split(":")
crs_string = crs_split[0] + crs_split[1]
if verbose: verbose_line_updater("Reprojecting raster files to: {}".format(target_crs))
#with alive_bar(len(filepath_sequence)) as bar:
# Loop through each file in the iterable
for path in list(filepath_sequence):
#Check if the reprojection is already there.
if (str(path.split(".tif")[1]) != '_{}'.format(crs_string) and (os.path.isfile(path + '_{}.tif'.format(crs_string)) == False)):
# open the current rasterfile.
with rasterio.open(path) as src:
# use rasterIO to calculate the transform of the file to the desired crs
transform, width, height = calculate_default_transform(src.crs,
target_crs,
src.width,
src.height,
*src.bounds)
# copy the kwargs from the geotiff
kwargs = src.meta.copy()
# update the kwargs with the target CRS
kwargs.update({
'crs': target_crs,
'transform': transform,
'width': width,
'height': height
})
# Create a reprojection file using the target crs
with rasterio.open(path+'_{}.tif'.format(crs_string), 'w', **kwargs) as dst:
# loop through the projections in src
for i in range(1, src.count + 1):
# reproject the values in the src.
reproject(source=rasterio.band(src, i),
destination=rasterio.band(dst, i),
src_transform=src.transform,
src_crs=src.crs,
dst_transform=transform,
dst_crs=target_crs,
resampling=Resampling.nearest)
#bar()
if verbose: verbose_line_updater("Raster files succesfully reprojected.")
return pd.Series(filepath_sequence).apply(lambda x: x + "_{}.tif".format(crs_string)).astype(str)
[docs]
def query_elevation_series(geometry, gtiff_dir_ser, verbose=False):
"""query_elevation_series takes a series of geometric shapely points, and an iterable of
geotiff filepaths, and generates a pandas series of elevations from each point in
the geometry.
Also for some reason tifs store lat and long in (lon, lat), so i swap em.
:param geometry: a series of shapely points of lat and lon
:param gtiff_dir_ser: an iterable of geometric shapely points. Must have an identical projection to the raster files.
:param verbose: boolean parameter to specify verbosity.
Defualt false.
:return: pandas series of elevations corresponding to the geometry iterable, in km.
"""
# Generate the empty dictionary
data_dict = {}
# make sure the geometry is a series.
geometry = pd.Series(geometry)
if verbose: verbose_line_updater("Converting geometry...")
# Convert the geometry to individual lat/lon tuples
route_ll = (geometry.apply(lambda x: (x.y, x.x))).reset_index(drop=True)
if verbose: verbose_line_updater("Geometry converted.")
if verbose: verbose_line_updater("Querying elevation data...")
# loop through each geotiff file in the series passed
for path in list(gtiff_dir_ser):
# Use rasterio to open the file
with rasterio.open(path) as img:
# sample the elevations using rasterio and the list of lats and lons.
route = pd.Series(list(rasterio.sample.sample_gen(img, list(route_ll), masked=True)))
# remove any masked values
filtered = route[route.apply(lambda x: str(type(x[0]))) != "<class 'numpy.ma.core.MaskedConstant'>"]
# loop through the index of the filtered values
for index in filtered.index:
# add the filtered value to the data dictionary at the index corresponding to
# its respective point.
data_dict[index] = filtered[index][0]
if verbose: verbose_line_updater("Elevation succesfully queried. Returning values.")
# convert to series
data_series = pd.Series(list(data_dict.values()), index=list(data_dict.keys()))
# sort the data by index
data_sorted = data_series.sort_index()
# convert to km
data_valued = data_sorted.apply(float).values/1000
# return
return data_valued
[docs]
def smooth_elevation(elev_series, lg:int = 43, deg:int = 3):
# Apply the savgol filter to the points
y_new = scipy.signal.savgol_filter(elev_series,
lg,
deg,
axis = 0)
return y_new
### ---- Processing Tools ---- ###
def _combine_lidar_data(dsm, dx, max_grade=7.5):
"""
combine_lidar_data takes dsm elevation series, as well as the change in distance
between the points, and generates a filtered elevation that provides cleaner road elevation.
Params:
:param dsm: digital surface model elevation iterable.
:param dtm: digital terrain model elevation iterable.
:param dx: iterable change in distance, in kilometers.
:param max_grade: maximum possible grade the road can achieve. Default int=7.5.
Returns:
iterable of filtered elevations, in km.
Note:
1/16/2025 - Made private, as it is not strictly nessecary for functionality right now. Also, writing a test for this sounds like a headache.
"""
# combine the dsm and dx into a dataframe
data = pd.concat([pd.Series(dsm), pd.Series(dx)], axis=1, keys=['dsm', 'dx']).reset_index(drop=True)
# calculate the rolling dsm median
data['rolling_dsm'] = (data['dsm'].rolling(7).median()).shift(-3)
# calculate the difference in elevation
data['dsm_dy'] = data['dsm'].diff()
# get the filtered elevation.
filtered = data.apply(lambda x: (int(abs(x.dsm_dy/x.dx*100)>max_grade) * x.rolling_dsm) + (int(abs(x.dsm_dy/x.dx*100)<=max_grade)*x.dsm), axis=1)
# return the values.
return filtered.values
[docs]
def calculate_grades(dx, elevations, clip = True , max_grade=7.5):
"""
calculate_grades takes the distance between points,
as well as the elevations at each point, and returns the grade at each point.
:param dx: an iterable of distances between each point
:param elevations: an iterable of elevations corresponding to each point.
:param clip: boolean to determine to clip the grades or not.
Default True.
:param max_grade: an int representing the maximum grade a point can be without being clipped to.
Default 7.5
:return: iterable of the slope grade at each point.
"""
# calculate the grade percentage using the elevation difference and distance difference,
grades = (pd.Series(elevations).diff()/pd.Series(dx)*100)
# ensure only finite values
gradeindex = np.where(~np.isfinite(grades)==True)
grades[gradeindex[0][1:]] = max_grade
grades = grades[np.isfinite(grades)]
# check to clip the values
if clip:
grades = grades.clip(max_grade, -max_grade)
else:
grades=grades
grades = list(grades)
grades.insert(0,grades[0])
# return the grade.
return grades
[docs]
def interpolate_geometry_series(geometry, max_distance=1):
"""interpolate_geometry_series takes an iterable of geospatial shapely points,
and then generates an interpolated series in the event that
the maximum distance (in meters) is exceeded.
:param geometry: a series of shapely points of lat and lon.
:param max_distance: int representing maximum distance the points can be without interpolation.
:return: iterable of lists of shapely points that have been interpolated.
"""
currents = pd.Series(geometry)
# shift the geometry series up by one, which will be the next ponts.
nexts = geometry.shift(-1)
# combine the two into a dataframe.
data = pd.concat([currents, nexts], axis=1, keys=['current', 'next']).reset_index(drop=True)
# get the initial point and make sure it's the final value.
final_point = list(data['current'])[-1]
# set the nan value in the beginning of the 'nexts' to the initial, representing no distance traveled.
data.iloc[-1, data.columns.get_loc("next")] = final_point
# interpolate the points
grace = data.apply(lambda x: interpolate_points(x.current, x.next, max_dist=max_distance), axis=1)
# explode the data so that its a cohesive series
sploded = grace.explode().reset_index(drop=True)
# return an iterable of the interpolated points at each index
return gpd.GeoSeries(sploded)
def _interpolate_distance_traveled(dx, interpolated_iterable):
"""
Notes:
1/16/2025 - Not strictly nessecary. Privated.
"""
data = pd.concat([pd.Series(dx), interpolated_iterable.apply(len)], axis=1, keys=['dx', 'i_i']).reset_index(drop=True)
interp = data.apply(lambda x: [x.dx/x.i_i]*int(x.i_i), axis=1)
return interp
### ---- Class, Saving, and Inter-operability tools ---- ###
[docs]
class Route:
"""
Route class is is used to store route information.
Params:
:param geometry: iterable of shapely geometry points
:param elevation: iterable of elevation data
:param limits: iterable of speed limit at each point. (optional, defaults to 25mph in km/s)
:param stops: iterable of stop flags at each point. (optional, defaults to 10 evenly spaced stops.)
:param signals: iterable of signal flags at each point. (optional, defaults to 8 evenly spaced signals.)
:param signs: iterable of sign flags at each point. (optional, defaults to 3 evenly spaced stop signs.)
:param intersperse_empty: boolean flag to intersperse any empty parameters. Default False.
Methods:
save_to_json() - saves the stored data to a json file. can be loaded again with Geography Tools' load_from_json.
"""
def __init__(self,
geometry,
elevation,
limits = None,
stops = None,
signals = None,
signs = None,
intersperse_empty = False,
smooth_grades = False):
# set up the params list
self._params = [len(geometry),
not (limits is None),
not (stops is None),
not (signals is None),
not (signs is None),
intersperse_empty,
smooth_grades]
# initialize geometry and elevation
self.geometry = list(geometry)
self.elevation = list(elevation)
# get length of passed geometry
geolen = len(geometry)
# if limits is empty, use 25mph (in km/s) as default.
if limits == None:
self.limits = self._intersperse_list(25/2236.936, geolen)
else: self.limits = list(limits)
# if stops is empty, populate with -1, then intersperse with 10 stops (number of timepoints per route)
if stops == None:
if intersperse_empty:
self.stops = self._intersperse_list(-1, geolen, 0, 10)
self.stops[-1] = 0
else:
self.stops = self._intersperse_list(-1, geolen, 0, 0)
self.stops[-1] = 0
else: self.stops = list(stops)
# if signals is empty, intersperse with 10 signals.
if signals == None:
if intersperse_empty:
self.signals = self._intersperse_list(-1, geolen, 0, 8)
else:
self.signals = self._intersperse_list(-1, geolen, 0, 0)
else: self.signals = list(signals)
# if signs is empty, intersperse with 3 signs.
if signs == None:
if intersperse_empty:
self.signs = self._intersperse_list(-1, geolen, 0, 3)
else:
self.signs = self._intersperse_list(-1, geolen, 0, 0)
else: self.signs = list(signs)
# calculate the bearings, distance changes, and grades.
self.bearings = query_bearings(pd.Series(self.geometry), bearing_type="Compass")
self.dx = list(query_distance_traveled(pd.Series(self.geometry)))
self.dz = query_elevation_changes(self.elevation)
self.d_X = list(np.sqrt(np.asarray(self.dz)**2 + np.asarray(self.dx)**2))
self.cum_d_X = list(pd.Series(self.d_X).cumsum())
self.grades = calculate_grades(self.dx, elevation, max_grade=7.5)
if smooth_grades:
smooth_grades=True
self.grades = smooth_elevation(self.grades, 25, 3)
# return None
return None
def __str__(self):
"""
Strng method.
"""
return "Route(l:{},{},{},{},{})".format(self._params[0],
self._params[1],
self._params[2],
self._params[3],
self._params[4])
def _intersperse_list(self, default_value, list_size, intersperse_val=-1, intersperse_num=0):
"""
intersperse_list() is used to generate a list of point-based flags of a given value,
with the option to evenly intersperse those values with an alternate flag.
Params:
:param default_value: the value that the list will be populated with initially.
:param list_size: size of the list to be made, as an int.
:param intersperse_val: value to be interspersed, default of -1.
:param intersperse_num: number of ocurrences of the interspersed value, as int. Default 0.
Returns:
the new list.
"""
# Generate a new list of target size filled with target value.
new_list = [default_value]*list_size
# loop through and intersperse the desired value.
for i in range(intersperse_num):
new_list[int(i*(list_size/intersperse_num))] = intersperse_val
# return the new list.
return new_list
[docs]
def save_to_json(self, path):
"""save_to_json() is used to encode and save the Route data to a json file.
:param path: path and filename to be saved to, as str. Should end with '.json'.
:return: path to the saved json file.
"""
# generate data dictionary and compress the data where possible.
data_dict = {'ge':_encode_geometry(self.geometry),
'el':self.elevation,
'li':_encode_series(self.limits),
'st':_encode_series(self.stops),
'si':_encode_series(self.signals),
'sn':_encode_series(self.signs)}
# Open and save to the json. encode in utf-8 for funsies.
with open('{}'.format(path), 'w', encoding='utf-8') as f:
# save the data.
json.dump(data_dict, f, ensure_ascii=False, indent=4)
# return the path.
return path
[docs]
def query_point(self, index):
"""
query_point() takes an index of a point and returns the data for the corresponding point.
:param index: index of the route which data you would like to query.
:return: Data at that given point, as a dict.
"""
data = {'elevation':self.elevation[index],
'limit':self.limits[index],
'stop':self.stops[index],
'signal':self.signals[index],
'sign':self.signs[index],
'bearing':self.bearings[index],
'grade':self.grades[index],
'dx_to_next':self.dx[index], # geodesic distance
'dz_to_next':self.dz[index],
'travel_dx':self.d_X[index],
'sum_travel_dx':self.cum_d_X[index]}
return data
[docs]
def to_gdf(self):
"""to_gdf() takes the constituent information in geography_tools and converts it to a geodataframe.
:return: geodataframe of all information contained by a Route object.
"""
data = {'geometry':self.geometry,
'elevation':self.elevation,
'limit':self.limits,
'stop':self.stops,
'signal':self.signals,
'sign':self.signs,
'bearing':self.bearings,
'grade':self.grades,
'dx_to_next':self.dx, # geodesic distance
'dz_to_next':self.dz,
'travel_dx':self.d_X,
'sum_travel_dx':self.cum_d_X}
return gpd.GeoDataFrame(data)
def _encode_series(iterable):
"""encode_series() takes an iterable and compresses it down so that
repeating values are stored as the value and number of occurrences.
Warning: Value typings will be lost.
:param iterable: an iterable list of values.
:return: a string representation of the compressed information.
Notes:
1/16/2025 - Privated since it isn't needed externally.
"""
# get the first item of the iterable
current_item = iterable[0]
# set the current item count to negative to avoid indexing errors.
current_item_count = -1
# create an empty string.
data_stream = ""
# loop through each position in the iterable
for i in range(len(iterable)+1):
# if the index is at the end of the iterable,
if i > len(iterable)-1:
# add by 1 and subsequently add the current item and count to the string.
current_item_count += 1
data_stream += ('{},{}'.format(current_item_count, current_item))
# otherwise,
else:
# the selected item is the current position in the iterable
item = iterable[i]
# and if the items are the same,
if item == current_item:
# increment the count by 1.
current_item_count += 1
# otherwise,
else:
# increment the count and add it to the stream.
current_item_count += 1
data_stream += ('{},{},'.format(current_item_count, current_item))
# reset the count and set the new item.
current_item_count = 0
current_item = item
# return the encoded representation.
return data_stream
def _decode_series(encoded_string):
"""
decode_series() takes in a string that was encoded using encode_series(),
and returns it to a decoded iterable.
Params:
encoded_string - string that was encoded using encode_series()
Returns:
an iterable of the decompressed information.
Notes:
1/16/2025 - Privated since it isn't needed externally.
"""
# split the string by comma.
spliterable = encoded_string.split(",")
# create an empty list to contain the data.
joint_iterable = []
# for every two items, join them into a tuple and append it to the list.
for item1, item2 in zip(spliterable[::2], spliterable[1::2]):
joint_iterable.append((item1, item2))
# create an empty list for the decoded information.
decoded_iterable = []
# loop through each item in the joint iterable
for item in joint_iterable:
# repeatedly add the encoded item for the number of occurences it has.
for i in range(int(item[0])):
decoded_iterable.append(item[1])
# return the list.
return decoded_iterable
def _encode_geometry(geo):
"""
encode_geometry() takes in an iterable of shapely points,
and then converts it to a list of alternating x and y values.
Params:
geo - iterable of shapely points.
Returns:
list of alternating X and Y values in the same order as geo.
Notes:
1/16/2025 - Privated since it isn't needed externally.
"""
# create an empty list
clist = []
# loop through the geometry and get the X and Y values,
# and extend the list with them.
for item in pd.Series(geo).apply(lambda x: [x.x, x.y]):
clist.extend(item)
# return the list.
return clist
def _decode_geometry(enc_geo):
"""
decode_geometry() takes in compressed geometry from encode_geometry(),
and converts it to an iterable of shapely points.
Params:
enc_geo - encoded geometry list.
Returns:
list of shapely geometry.
Notes:
1/16/2025 - Privated since it isn't needed externally.
"""
# empty list to store data
point_list = []
# loop through every 2
for i in range(0, len(enc_geo), 2):
# if even index, it's x, odd is y.
# append the point.
point_list.append(shapely.Point([enc_geo[i], enc_geo[i+1]]))
# return the list.
return point_list
[docs]
def load_from_json(path):
"""load_from_json() takes in a path to a json file from an exported Route object,
and returns a path object as close as possible to what was saved.
Waring: Some data types may be altered or adjusted. Speed limits are expected to be floats, stops, signals, signs, are expected to be ints.
:param path: path to json file.
:return: a Route object.
"""
# create empty data dictionary.
data = {}
# open the file.
with open('{}'.format(path), 'r') as file:
# load the data.
data = json.load(file)
# read the geometry data
geo = _decode_geometry(data['ge'])
# read the elevation data
el = data['el']
# read the limit, stop, signal, and sign data
lim = list(pd.Series(_decode_series(data['li'])).apply(float))
stp = list(pd.Series(_decode_series(data['st'])).apply(int))
signals = list(pd.Series(_decode_series(data['si'])).apply(int))
signs = list(pd.Series(_decode_series(data['sn'])).apply(int))
# generate a new Route object from the provided data.
loaded_route = Route(geo, el, lim, stp, signals, signs)
return loaded_route