Skip to content

Commit

Permalink
interpolate arrival times from previous delay
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Oct 12, 2024
1 parent c288578 commit e8323e7
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 82 deletions.
181 changes: 118 additions & 63 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
//! # amtrak-gtfs-rt
//!Decrypts Amtrak's GTFS-RT
//!
//!
//!This software package decrypts the Amtrak track-a-train json data and performs lookups of trip information in the GTFS schedule to match each vehicle with it's route_id and trip_id.
//!Pull requests are welcome!
//!
//!A valid Amtrak GTFS structure must be passed into the function to work.
//!
//!Here's an example of some working code!
//!Here's an example of some working code!
//!Note that `prost` version `0.11` should be used, as `gtfs-rt` does not use `0.12` yet.
//!```rust
//!```rust
//!extern crate amtrak_gtfs_rt;
//!
//!use prost::Message;
Expand All @@ -28,22 +28,20 @@
//! let trip_data = amtrak_gtfs_rt.trip_updates.encode_to_vec();
//!}
//!```
//!
//!
//! Note that the Metropolitan Transportation Commission also publishes Capital Corridor in their own feed.
//! https://511.org/open-data/transit provides Capital Corridor as "CC". This data refreshes more often (and is closer in location & time), and shows locomotive numbers.
//! For this reason, you may wish to remove Capital Corridor from this feed.
//! Thus, we've included a function `filter_capital_corridor()` which takes in any `FeedMessage` and removes CC vehicles and trips.

use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, TimeZone, Weekday};
use chrono::{Datelike, NaiveDate, NaiveDateTime, TimeZone, Weekday};
use geojson::FeatureCollection;
use gtfs_structures::Gtfs;
use std::collections::HashMap;
use std::time::SystemTime;
pub mod stop_times;
use crate::stop_times::RootTripData;
use gtfs_realtime::FeedMessage;
use gtfs_realtime::FeedEntity;
use gtfs_realtime::FeedMessage;

//Written by Kyler Chin - Catenary Transit Initiatives.
pub fn filter_capital_corridor(input: FeedMessage) -> FeedMessage {
Expand All @@ -58,10 +56,10 @@ pub fn filter_capital_corridor(input: FeedMessage) -> FeedMessage {
let vehicle = item.vehicle.as_ref().unwrap();
if vehicle.trip.is_some() {
let trip = vehicle.trip.as_ref().unwrap();
if trip.route_id.is_some() {
if trip.route_id.as_ref().unwrap().as_str() == cc_route_id {
return false;
}
if trip.route_id.is_some()
&& trip.route_id.as_ref().unwrap().as_str() == cc_route_id
{
return false;
}
}
}
Expand Down Expand Up @@ -135,21 +133,21 @@ fn feature_to_amtrak_arrival_structs(feature: &geojson::Feature) -> Vec<AmtrakAr
let mut key = String::from("Station");
key.push_str(&i.to_string());

match feature.properties.as_ref().unwrap().get(key.as_str()) {
Some(station_text) => match station_text {
serde_json::value::Value::String(station_text) => {
let amtrak_arrival: Result<AmtrakArrivalJson, serde_json::Error> =
serde_json::from_str(&station_text);

if amtrak_arrival.is_ok() {
amtrak_arrival_jsons.push(amtrak_arrival.unwrap());
} else {
println!("Error parsing amtrak arrival json, {}\n{}", station_text, amtrak_arrival.unwrap_err());
}
if let Some(station_text) = feature.properties.as_ref().unwrap().get(key.as_str()) {
if let serde_json::value::Value::String(station_text) = station_text {
let amtrak_arrival: Result<AmtrakArrivalJson, serde_json::Error> =
serde_json::from_str(station_text);

if amtrak_arrival.is_ok() {
amtrak_arrival_jsons.push(amtrak_arrival.unwrap());
} else {
println!(
"Error parsing amtrak arrival json, {}\n{}",
station_text,
amtrak_arrival.unwrap_err()
);
}
_ => {}
},
_ => {}
}
};
}

Expand Down Expand Up @@ -223,7 +221,7 @@ fn feature_to_gtfs_unified(gtfs: &Gtfs, feature: &geojson::Feature) -> FeedEntit
let timestamp: Option<u64> = match feature.properties.as_ref().unwrap().get("updated_at") {
Some(timestamp_text) => match timestamp_text {
serde_json::value::Value::String(timestamp_text) => {
Some(process_timestamp_text(&timestamp_text))
Some(process_timestamp_text(timestamp_text))
}
_ => None,
},
Expand Down Expand Up @@ -256,10 +254,13 @@ fn feature_to_gtfs_unified(gtfs: &Gtfs, feature: &geojson::Feature) -> FeedEntit
}
.unwrap();

let features_list = feature_to_amtrak_arrival_structs(feature);

let arrivals: Vec<gtfs_realtime::trip_update::StopTimeUpdate> =
feature_to_amtrak_arrival_structs(feature)
features_list
.iter()
.map(|feature| gtfs_realtime::trip_update::StopTimeUpdate {
.enumerate()
.map(|(i, feature)| gtfs_realtime::trip_update::StopTimeUpdate {
stop_sequence: None,
stop_id: Some(feature.code.clone()),
arrival: match &feature.postarr {
Expand All @@ -271,26 +272,65 @@ fn feature_to_gtfs_unified(gtfs: &Gtfs, feature: &geojson::Feature) -> FeedEntit
None => match &feature.estarr {
Some(estarr) => Some(gtfs_realtime::trip_update::StopTimeEvent {
delay: None,
time: Some(time_and_tz_to_unix(&estarr, feature.tz)),
time: Some(time_and_tz_to_unix(estarr, feature.tz)),
uncertainty: None,
}),
None => None,
//There is no provided arrival time, interpolate it from the previous stop
None => match i {
0 => None,
_ => {
let previous = features_list.get(i - 1);

match previous {
Some(previous) => {
let previous_departure_time = match &previous.postdep {
Some(postdep) => Some(time_and_tz_to_unix(postdep, feature.tz)),
None => previous.estdep.as_ref().map(|estdep| time_and_tz_to_unix(estdep, feature.tz)),
};

match previous_departure_time {
None => None,
Some(previous_departure_time) => {
match &previous.schdep {
Some(previous_schdep) => {
let delay = previous_departure_time - time_and_tz_to_unix(previous_schdep, feature.tz);

match &feature.scharr {
Some(scharr) => {
let arrival_time = time_and_tz_to_unix(scharr, feature.tz);

let arrival_time = arrival_time + delay;

Some(gtfs_realtime::trip_update::StopTimeEvent {
delay: Some(delay.try_into().unwrap()),
time: Some(arrival_time),
uncertainty: None,
})
},
None => None,
}
},
None => None
}
}
}
},
None => None,
}
},
}
},
}},
departure: match &feature.postdep {
Some(postdep) => Some(gtfs_realtime::trip_update::StopTimeEvent {
delay: None,
time: Some(time_and_tz_to_unix(postdep, feature.tz)),
uncertainty: None,
}),
None => match &feature.estdep {
Some(estdep) => Some(gtfs_realtime::trip_update::StopTimeEvent {
None => feature.estdep.as_ref().map(|estdep| gtfs_realtime::trip_update::StopTimeEvent {
delay: None,
time: Some(time_and_tz_to_unix(&estdep, feature.tz)),
time: Some(time_and_tz_to_unix(estdep, feature.tz)),
uncertainty: None,
}),
None => None,
}},
})},
departure_occupancy_status: None,
schedule_relationship: None,
stop_time_properties: None,
Expand All @@ -299,7 +339,10 @@ fn feature_to_gtfs_unified(gtfs: &Gtfs, feature: &geojson::Feature) -> FeedEntit

let origin_local_time = origin_departure(&origin_time_string, origin_tz);

let starting_yyyy_mm_dd_in_new_york = origin_local_time.with_timezone(&chrono_tz::America::New_York).format("%Y%m%d").to_string();
let starting_yyyy_mm_dd_in_new_york = origin_local_time
.with_timezone(&chrono_tz::America::New_York)
.format("%Y%m%d")
.to_string();

let origin_weekday = origin_local_time.weekday();

Expand Down Expand Up @@ -462,12 +505,10 @@ fn time_and_tz_to_unix(timestamp_text: &String, tz: char) -> i64 {
pub fn origin_departure(timestamp_text: &str, tz: char) -> chrono::DateTime<chrono_tz::Tz> {
let naive_dt = NaiveDateTime::parse_from_str(timestamp_text, "%m/%d/%Y %l:%M:%S %p").unwrap();

let local_time_representation = tz_char_to_tz(tz)
tz_char_to_tz(tz)
.unwrap()
.from_local_datetime(&naive_dt)
.unwrap();

local_time_representation
.unwrap()
}

//time is formatted 11/18/2023 4:58:09 PM
Expand Down Expand Up @@ -528,48 +569,49 @@ pub async fn fetch_amtrak_gtfs_rt_joined(
let decrypted_string = amtk::decrypt(raw_data.text().await.unwrap().as_str())?;

let geojson: geojson::GeoJson = decrypted_string.parse::<geojson::GeoJson>()?;
let features_collection: FeatureCollection =
FeatureCollection::try_from(geojson)?;
let features_collection: FeatureCollection = FeatureCollection::try_from(geojson)?;

let list_of_train_ids = features_collection
.features
.iter()
.map(|feature| {
{ let train_num = match feature.properties.as_ref().unwrap().get("TrainNum") {
.filter_map(|feature| {
let train_num = match feature.properties.as_ref().unwrap().get("TrainNum") {
Some(a) => match a {
serde_json::value::Value::String(x) => Some(x.clone()),
_ => None,
},
_ => None,
};
let starting_date = match feature.properties.as_ref().unwrap().get("OrigSchDep") {
let starting_date = match feature.properties.as_ref().unwrap().get("OrigSchDep")
{
Some(a) => match a {
serde_json::value::Value::String(x) => {
let first_half = NaiveDate::parse_from_str(x.split(" ").nth(0).unwrap(), "%m/%d/%Y");

match first_half {
Ok(first_half) => {
Some(first_half)
},
Err(_) => None,
}
},
let first_half = NaiveDate::parse_from_str(
x.split(" ").next().unwrap(),
"%m/%d/%Y",
);

match first_half {
Ok(first_half) => Some(first_half),
Err(_) => None,
}
}
_ => None,
},
_ => None,
};

match (train_num, starting_date) {
(Some(train_num), Some(starting_date)) => Some((train_num, starting_date)),
_ => None
_ => None,
}
}
}).flatten().collect::<Vec<(String, NaiveDate)>>();
})
.collect::<Vec<(String, NaiveDate)>>();

//query the stop times all simultaniously and put into hashmap
//query_all_trips_simultaniously

// let stop_times = stop_times::query_all_trips_simultaniously(&list_of_train_ids).await;
// let stop_times = stop_times::query_all_trips_simultaniously(&list_of_train_ids).await;

//println!("Successfully decrypted");
//println!("{}", decrypted_string);
Expand All @@ -578,7 +620,7 @@ pub async fn fetch_amtrak_gtfs_rt_joined(
entity: features_collection
.features
.iter()
.map(|feature: &geojson::Feature| feature_to_gtfs_unified(&gtfs, feature))
.map(|feature: &geojson::Feature| feature_to_gtfs_unified(gtfs, feature))
.collect::<Vec<FeedEntity>>(),
header: make_gtfs_header(),
},
Expand Down Expand Up @@ -617,6 +659,19 @@ mod tests {
//println!("{:?}", entity.trip_update);
}

// println!("{:?}", amtrak_results.unwrap());
// println!("{:?}", amtrak_results.unwrap());
}

#[test]
fn read_last_stop() {
let test_str = r#"{"code":"LAX","tz":"P","bus":false,"scharr":"10/11/2024 16:57:00","schcmnt":"","autoarr":false,"autodep":false}"#;

let amtrak_arrival = serde_json::from_str::<AmtrakArrivalJson>(test_str);

assert!(amtrak_arrival.is_ok());

let amtrak_arrival = amtrak_arrival.unwrap();

assert_eq!(amtrak_arrival.code, "LAX");
}
}
Loading

0 comments on commit e8323e7

Please sign in to comment.