Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/api/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,10 @@ func Run(ctx context.Context, srv *http.Server, coreApp *app.Application, api *r

// Then shutdown GTFS manager (stops data fetching - the lowest-level dependency)
if coreApp.GtfsManager != nil {
coreApp.GtfsManager.Shutdown()
err := coreApp.GtfsManager.Shutdown(shutdownCtx)
if err != nil {
logger.Error("Error occurred while shutting down GTFS manager", "error", err)
}
}

logger.Info("server exited")
Expand Down
6 changes: 5 additions & 1 deletion internal/gtfs/advanced_direction_calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,12 @@ func TestMain(m *testing.M) {

// Global Teardown
// If sharedManager was initialized during tests, shut it down now.
ctx := context.Background()
if sharedManager != nil {
sharedManager.Shutdown()
err := sharedManager.Shutdown(ctx)
if err != nil {
_, _ = os.Stderr.WriteString("Error occurred while shutting down shared GTFS manager: " + err.Error() + "\n")
}
}

// Exit with the test result code
Expand Down
21 changes: 19 additions & 2 deletions internal/gtfs/gtfs_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Manager struct {
shutdownChan chan struct{}
wg sync.WaitGroup
shutdownOnce sync.Once
closeDBOnce sync.Once
stopSpatialIndex *rtree.RTree
blockLayoverIndices map[string][]*BlockLayoverIndex
regionBounds *RegionBounds
Expand Down Expand Up @@ -379,17 +380,33 @@ func (manager *Manager) SetGtfsURL(url string) {
}

// Shutdown gracefully shuts down the manager and its background goroutines
func (manager *Manager) Shutdown() {
func (manager *Manager) Shutdown(ctx context.Context) error {
manager.shutdownOnce.Do(func() {
close(manager.shutdownChan)
manager.wg.Wait()
})

// Always close DB exactly once, regardless of path
defer manager.closeDBOnce.Do(func() {
if manager.GtfsDB != nil {
if err := manager.GtfsDB.Close(); err != nil {
logger := slog.Default().With(slog.String("component", "gtfs_manager"))
logging.LogError(logger, "failed to close GTFS database", err)
}
}
})

done := make(chan struct{})
go func() {
manager.wg.Wait()
close(done)
}()

select {
case <-done:
return nil
case <-ctx.Done():
return fmt.Errorf("shutdown timeout exceeded: %w", ctx.Err())
}
}

// RLock acquires the static data read lock.
Expand Down
70 changes: 55 additions & 15 deletions internal/gtfs/gtfs_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,12 @@ func TestManager_GetVehicleForTrip(t *testing.T) {
// We use isolated GTFSManager here instead of shared test components because we want to control the real-time vehicles for this test.
manager, err := InitGTFSManager(ctx, gtfsConfig)
assert.Nil(t, err)
defer manager.Shutdown()

defer func() {
if err := manager.Shutdown(context.Background()); err != nil {
t.Errorf("Error occurred while shutting down GTFS manager: %v", err)
}
}()

trip := &gtfs.Trip{
ID: gtfs.TripID{ID: "5735633"},
Expand Down Expand Up @@ -407,7 +412,12 @@ func TestRoutesForAgencyID_MapOptimization(t *testing.T) {
}
manager, err := InitGTFSManager(ctx, gtfsConfig)
require.NoError(t, err, "Failed to initialize manager")
defer manager.Shutdown()
defer func() {
err := manager.Shutdown(ctx)
if err != nil {
t.Errorf("Error occurred while shutting down GTFS manager: %v", err)
}
}()

targetAgencyID := "25"
expectedRouteCount := 13
Expand Down Expand Up @@ -435,18 +445,24 @@ func TestRoutesForAgencyID_MapOptimization(t *testing.T) {
}

func TestRoutesForAgencyID_ConcurrentAccess(t *testing.T) {
ctx := context.Background()
setupCtx := context.Background()

gtfsConfig := Config{
GtfsURL: models.GetFixturePath(t, "raba.zip"),
GTFSDataPath: ":memory:",
Env: appconf.Test,
}
manager, err := InitGTFSManager(ctx, gtfsConfig)
manager, err := InitGTFSManager(setupCtx, gtfsConfig)
require.NoError(t, err)
defer manager.Shutdown()

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer func() {
err := manager.Shutdown(context.Background())
if err != nil {
t.Errorf("Error occurred while shutting down GTFS manager: %v", err)
}
}()

runCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

var wg sync.WaitGroup
Expand All @@ -459,7 +475,7 @@ func TestRoutesForAgencyID_ConcurrentAccess(t *testing.T) {
defer wg.Done()
for {
select {
case <-ctx.Done():
case <-runCtx.Done():
return
default:
manager.RLock()
Expand Down Expand Up @@ -488,7 +504,7 @@ func TestRoutesForAgencyID_ConcurrentAccess(t *testing.T) {

for {
select {
case <-ctx.Done():
case <-runCtx.Done():
return
default:
manager.setStaticGTFS(staticData)
Expand Down Expand Up @@ -517,7 +533,11 @@ func BenchmarkRoutesForAgencyID_MapLookup(b *testing.B) {
if err != nil {
b.Fatalf("Failed to initialize: %v", err)
}
defer manager.Shutdown()
defer func() {
if err := manager.Shutdown(context.Background()); err != nil {
b.Errorf("Error occurred while shutting down GTFS manager: %v", err)
}
}()

b.ReportAllocs()

Expand Down Expand Up @@ -661,7 +681,11 @@ func TestActiveServiceIDsCacheInvalidation(t *testing.T) {

manager, err := InitGTFSManager(ctx, gtfsConfig)
require.NoError(t, err)
defer manager.Shutdown()
defer func() {
if err := manager.Shutdown(ctx); err != nil {
t.Errorf("Error occurred while shutting down GTFS manager: %v", err)
}
}()

// Use a fixed date that has known calendar data in the RABA fixture.
// The RABA feed covers weekdays; pick a Monday.
Expand Down Expand Up @@ -713,9 +737,13 @@ func TestActiveServiceIDsCache_ErrorPathLeavesNothingCached(t *testing.T) {
}
manager, err := InitGTFSManager(context.Background(), gtfsConfig)
require.NoError(t, err)
defer manager.Shutdown()

cancelledCtx, cancel := context.WithCancel(context.Background())
defer func() {
if err := manager.Shutdown(context.Background()); err != nil {
t.Errorf("Error occurred while shutting down GTFS manager: %v", err)
}
}()
cancel()

_, queryErr := manager.GetActiveServiceIDsForDateCached(cancelledCtx, "20240101")
Expand Down Expand Up @@ -766,6 +794,7 @@ func TestActiveServiceIDsCacheNilDB(t *testing.T) {
}

func TestActiveServiceIDsCacheMutationSafety(t *testing.T) {
ctx := context.Background()
// Use an isolated manager so the cache is cold, guaranteeing the first call is a
// genuine cache miss and that we exercise both the miss-path and hit-path copies.
gtfsConfig := Config{
Expand All @@ -775,9 +804,12 @@ func TestActiveServiceIDsCacheMutationSafety(t *testing.T) {
}
manager, err := InitGTFSManager(context.Background(), gtfsConfig)
require.NoError(t, err)
defer manager.Shutdown()
defer func() {
if err := manager.Shutdown(ctx); err != nil {
t.Errorf("Error occurred while shutting down GTFS manager: %v", err)
}
}()

ctx := context.Background()
date := "20240101"

// First call: cache miss path — result must be a defensive copy.
Expand Down Expand Up @@ -832,7 +864,11 @@ func TestActiveServiceIDsCacheConcurrentForceUpdate(t *testing.T) {

manager, err := InitGTFSManager(ctx, gtfsConfig)
require.NoError(t, err)
defer manager.Shutdown()
defer func() {
if err := manager.Shutdown(context.Background()); err != nil {
t.Errorf("Error occurred while shutting down GTFS manager: %v", err)
}
}()

date := "20240101"

Expand Down Expand Up @@ -936,7 +972,11 @@ func BenchmarkGetActiveServiceIDsForDate(b *testing.B) {
if err != nil {
b.Fatalf("Failed to initialize: %v", err)
}
defer manager.Shutdown()
defer func() {
if err := manager.Shutdown(context.Background()); err != nil {
b.Errorf("Error occurred while shutting down GTFS manager: %v", err)
}
}()

date := "20240101"

Expand Down
31 changes: 26 additions & 5 deletions internal/gtfs/hot_swap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ func TestHotSwap_QueriesCompleteDuringSwap(t *testing.T) {
if err != nil {
t.Fatalf("Failed to init manager: %v", err)
}
defer manager.Shutdown()

defer func() {
if err := manager.Shutdown(context.Background()); err != nil {
t.Errorf("Failed to shutdown manager: %v", err)
}
}()

agencies := manager.GetAgencies()
assert.Equal(t, 1, len(agencies))
Expand Down Expand Up @@ -122,7 +127,11 @@ func TestHotSwap_FailureRecovery(t *testing.T) {
if err != nil {
t.Fatalf("Failed to init manager: %v", err)
}
defer manager.Shutdown()
defer func() {
if err := manager.Shutdown(context.Background()); err != nil {
t.Errorf("Error occurred while shutting down GTFS manager: %v", err)
}
}()

agencies, err := manager.GtfsDB.Queries.ListAgencies(context.Background())
if err != nil {
Expand Down Expand Up @@ -175,7 +184,11 @@ func TestHotSwap_OldDatabaseCleanup(t *testing.T) {
if err != nil {
t.Fatalf("Failed to init manager: %v", err)
}
defer manager.Shutdown()
defer func() {
if err := manager.Shutdown(context.Background()); err != nil {
t.Errorf("Error occurred while shutting down GTFS manager: %v", err)
}
}()

manager.SetGtfsURL(gtfsNew)
err = manager.ForceUpdate(context.Background())
Expand Down Expand Up @@ -217,7 +230,11 @@ func TestHotSwap_MutexProtectedSwap(t *testing.T) {
if err != nil {
t.Fatalf("Failed to init manager: %v", err)
}
defer manager.Shutdown()
defer func() {
if err := manager.Shutdown(context.Background()); err != nil {
t.Errorf("Error occurred while shutting down GTFS manager: %v", err)
}
}()

// Verify initial state
manager.RLock()
Expand Down Expand Up @@ -271,7 +288,11 @@ func TestHotSwap_ConcurrentForceUpdate(t *testing.T) {

manager, err := InitGTFSManager(ctx, gtfsConfig)
require.NoError(t, err)
defer manager.Shutdown()
defer func() {
if err := manager.Shutdown(context.Background()); err != nil {
t.Errorf("Error occurred while shutting down GTFS manager: %v", err)
}
}()

// Verify initial state
manager.RLock()
Expand Down
19 changes: 13 additions & 6 deletions internal/gtfs/shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ func TestManagerShutdown(t *testing.T) {

// Test shutdown
done := make(chan struct{})
errCh := make(chan error, 1)
go func() {
manager.Shutdown()
close(done)
errCh <- manager.Shutdown(context.Background())
}()

// Shutdown should complete within a reasonable time
Expand All @@ -45,6 +45,8 @@ func TestManagerShutdown(t *testing.T) {
// Success
case <-time.After(5 * time.Second):
t.Fatal("Shutdown took too long")
case err := <-errCh:
require.NoError(t, err, "Failed to shutdown GTFS manager")
}
}

Expand Down Expand Up @@ -79,9 +81,9 @@ func TestManagerShutdownWithRealtime(t *testing.T) {

// Test shutdown
done := make(chan struct{})
errCh := make(chan error, 1)
go func() {
manager.Shutdown()
close(done)
errCh <- manager.Shutdown(context.Background())
}()

// Shutdown should complete within a reasonable time even with real-time goroutine
Expand All @@ -90,6 +92,8 @@ func TestManagerShutdownWithRealtime(t *testing.T) {
// Success
case <-time.After(5 * time.Second):
t.Fatal("Shutdown took too long")
case err := <-errCh:
require.NoError(t, err, "Failed to shutdown GTFS manager")
}
}

Expand All @@ -110,6 +114,9 @@ func TestManagerShutdownIdempotent(t *testing.T) {
require.NoError(t, err, "Failed to initialize GTFS manager")

// Call shutdown multiple times - should not panic or hang
manager.Shutdown()
manager.Shutdown() // Second call should be safe
ctx := context.Background()
err = manager.Shutdown(ctx)
require.NoError(t, err, "Failed to shutdown GTFS manager")
err = manager.Shutdown(ctx) // Second call should be safe
require.NoError(t, err, "Failed to shutdown GTFS manager on second call")
}
12 changes: 6 additions & 6 deletions internal/restapi/arrival_and_departure_for_stop_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,8 +678,8 @@ func TestArrivalsAndDeparturesForStopHandlerInvalidTime(t *testing.T) {
}

func TestArrivalAndDepartureForStopHandler_MultiAgency_Regression(t *testing.T) {
api := createTestApi(t)
defer api.Shutdown()
api, cleanup := createIsolatedTestApi(t)
defer cleanup()

ctx := context.Background()
queries := api.GtfsManager.GtfsDB.Queries
Expand Down Expand Up @@ -879,8 +879,8 @@ func TestGetPredictedTimes_TripLevelDelayFallback(t *testing.T) {
}

func TestArrivalAndDepartureForStop_PositiveUTCOffset_ServiceDateRegression(t *testing.T) {
api := createTestApi(t)
defer api.Shutdown()
api, cleanup := createIsolatedTestApi(t)
defer cleanup()

ctx := context.Background()
queries := api.GtfsManager.GtfsDB.Queries
Expand Down Expand Up @@ -972,8 +972,8 @@ func TestArrivalAndDepartureForStop_PositiveUTCOffset_ServiceDateRegression(t *t
// Regression test for loop routes where the same stop appears multiple times in a trip.
// Ensures that stopSequence correctly selects among multiple occurrences of the same stop.
func TestArrivalAndDepartureForStopHandler_LoopRouteStopSequence(t *testing.T) {
api := createTestApi(t)
defer api.Shutdown()
api, cleanup := createIsolatedTestApi(t)
defer cleanup()

ctx := context.Background()
queries := api.GtfsManager.GtfsDB.Queries
Expand Down
Loading
Loading