Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8e1176c
BCDA-9287: create struct to wrap db connections
michaeljvaldes Jul 22, 2025
bd4a313
Pass db connections explicitly from cli to service
michaeljvaldes Jul 23, 2025
ca1b0f1
Remove unintended change to middleware
michaeljvaldes Jul 23, 2025
bb1fd60
Separate connection and pool
michaeljvaldes Jul 23, 2025
6aa2fa2
Pass connection through routers and services instead of combined struct
michaeljvaldes Jul 23, 2025
cab0a71
Add connection as dependency of data router
michaeljvaldes Jul 23, 2025
0d37ba1
Pass connection as argument to token job middleware
michaeljvaldes Jul 23, 2025
ed1783c
Remove connection global from cli
michaeljvaldes Jul 24, 2025
8272973
Define db pool as dependency for apis and workers
michaeljvaldes Jul 24, 2025
e30cd6e
Remove connection globals from several tests
michaeljvaldes Jul 30, 2025
82c98f0
Pass connection to health.go
michaeljvaldes Jul 30, 2025
134d8fb
Refactor connection globals in cclf
michaeljvaldes Jul 30, 2025
607ce18
Refactor connection global in admin create group lambda
michaeljvaldes Jul 30, 2025
2749820
Refactor db globals in optout lambda
michaeljvaldes Jul 30, 2025
db96700
Refactor globals in ratelimit middleware
michaeljvaldes Jul 30, 2025
3256641
Refactor db connection in worker and river
michaeljvaldes Jul 30, 2025
ae8dc98
Refactor db connections in cleanup worker
michaeljvaldes Jul 30, 2025
486f0c4
Inject provider as dependency
michaeljvaldes Jul 31, 2025
c896f87
Refactor provider-related tests
michaeljvaldes Aug 4, 2025
7376ff3
Merge branch 'main' into mvaldes/BCDA-9287-Connection
michaeljvaldes Aug 4, 2025
29ba3cc
Remove database connection globals
michaeljvaldes Aug 5, 2025
b00818b
Rename connection to db
michaeljvaldes Aug 5, 2025
1237279
Remove comments
michaeljvaldes Aug 5, 2025
beb66ca
Pass repo as argument in cli function
michaeljvaldes Aug 5, 2025
a54157f
Pass globals as arguments in cli functions
michaeljvaldes Aug 5, 2025
49b753b
Run test scenarios as separate tests
michaeljvaldes Aug 6, 2025
16dd65f
Rename blacklist to denylist in a few places
michaeljvaldes Aug 6, 2025
cfb33ef
Add comment for refactoring todo
michaeljvaldes Aug 6, 2025
01bd874
Merge branch 'main' into mvaldes/BCDA-9287-Connection
michaeljvaldes Aug 6, 2025
bd29e55
Add unit test for pool connection
michaeljvaldes Aug 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions bcda/api/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ type Handler struct {
// Needed to have access to the repository/db for lookup needed in the bulkRequest.
// TODO (BCDA-3412): Remove this reference once we've captured all of the necessary
// logic into a service method.
r models.Repository
db *sql.DB
r models.Repository
}

type fhirResponseWriter interface {
Expand All @@ -63,11 +62,11 @@ type fhirResponseWriter interface {
JobsBundle(context.Context, http.ResponseWriter, []*models.Job, string)
}

func NewHandler(dataTypes map[string]service.DataType, basePath string, apiVersion string) *Handler {
return newHandler(dataTypes, basePath, apiVersion, database.Connection)
func NewHandler(dataTypes map[string]service.DataType, basePath string, apiVersion string, connections *database.Connections) *Handler {
return newHandler(dataTypes, basePath, apiVersion, connections)
}

func newHandler(dataTypes map[string]service.DataType, basePath string, apiVersion string, db *sql.DB) *Handler {
func newHandler(dataTypes map[string]service.DataType, basePath string, apiVersion string, connections *database.Connections) *Handler {
h := &Handler{JobTimeout: time.Hour * time.Duration(utils.GetEnvInt("ARCHIVE_THRESHOLD_HR", 24))}

h.Enq = queueing.NewEnqueuer()
Expand All @@ -80,8 +79,8 @@ func newHandler(dataTypes map[string]service.DataType, basePath string, apiVersi
log.API.Fatalf("no ACO configs found, these are required for processing logic")
}

repository := postgres.NewRepository(db)
h.db, h.r = db, repository
repository := postgres.NewRepository(connections.Connection)
h.r = repository
h.Svc = service.NewService(repository, cfg, basePath)

h.supportedDataTypes = dataTypes
Expand Down
61 changes: 29 additions & 32 deletions bcda/api/requests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type RequestsTestSuite struct {

runoutEnabledEnvVar string

db *sql.DB
connections *database.Connections

acoID uuid.UUID

Expand All @@ -79,9 +79,10 @@ func TestRequestsTestSuite(t *testing.T) {
func (s *RequestsTestSuite) SetupSuite() {
// See testdata/acos.yml
s.acoID = uuid.Parse("ba21d24d-cd96-4d7d-a691-b0e8c88e67a5")
s.db, _ = databasetest.CreateDatabase(s.T(), "../../db/migrations/bcda/", true)
db, _ := databasetest.CreateDatabase(s.T(), "../../db/migrations/bcda/", true)
s.connections = &database.Connections{Connection: db}
tf, err := testfixtures.New(
testfixtures.Database(s.db),
testfixtures.Database(db),
testfixtures.Dialect("postgres"),
testfixtures.Directory("testdata/"),
)
Expand Down Expand Up @@ -137,7 +138,7 @@ func (s *RequestsTestSuite) TestRunoutEnabled() {
mockSvc := &service.MockService{}
mockAco := service.ACOConfig{Data: []string{"adjudicated"}}
mockSvc.On("GetACOConfigForID", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&mockAco, true)
h := newHandler(resourceMap, fmt.Sprintf("/%s/fhir", tt.apiVersion), tt.apiVersion, s.db)
h := newHandler(resourceMap, fmt.Sprintf("/%s/fhir", tt.apiVersion), tt.apiVersion, s.connections)
h.Svc = mockSvc
enqueuer := queueing.NewMockEnqueuer(s.T())
h.Enq = enqueuer
Expand Down Expand Up @@ -239,7 +240,7 @@ func (s *RequestsTestSuite) TestJobsStatusV1() {
"Patient": {},
"Coverage": {},
"ExplanationOfBenefit": {},
}, fhirPath, apiVersion, s.db)
}, fhirPath, apiVersion, s.connections)
h.Svc = mockSvc

rr := httptest.NewRecorder()
Expand Down Expand Up @@ -353,7 +354,7 @@ func (s *RequestsTestSuite) TestJobsStatusV2() {
"Patient": {},
"Coverage": {},
"ExplanationOfBenefit": {},
}, v2BasePath, apiVersionTwo, s.db)
}, v2BasePath, apiVersionTwo, s.connections)
if tt.useMock {
h.Svc = mockSvc
}
Expand Down Expand Up @@ -472,7 +473,7 @@ func (s *RequestsTestSuite) TestAttributionStatus() {
fhirPath := "/" + apiVersion + "/fhir"

resourceMap := s.resourceType
h := newHandler(resourceMap, fhirPath, apiVersion, s.db)
h := newHandler(resourceMap, fhirPath, apiVersion, s.connections)
h.Svc = mockSvc

rr := httptest.NewRecorder()
Expand Down Expand Up @@ -563,7 +564,11 @@ func (s *RequestsTestSuite) TestDataTypeAuthorization() {
"ClaimResponse": {Adjudicated: false, PartiallyAdjudicated: true},
}

h := NewHandler(dataTypeMap, v2BasePath, apiVersionTwo)
h := NewHandler(dataTypeMap, v2BasePath, apiVersionTwo, s.connections)
r := models.NewMockRepository(s.T())
r.On("CreateJob", mock.Anything, mock.Anything).Return(uint(4), nil)
h.r = r

h.supportedDataTypes = dataTypeMap
client.SetLogger(log.API) // Set logger so we don't get errors later
jsonBytes, _ := json.Marshal("{}")
Expand Down Expand Up @@ -647,7 +652,7 @@ func (s *RequestsTestSuite) TestRequests() {
fhirPath := "/" + apiVersion + "/fhir"
resourceMap := s.resourceType

h := newHandler(resourceMap, fhirPath, apiVersion, s.db)
h := newHandler(resourceMap, fhirPath, apiVersion, s.connections)

// Test Group and Patient
// Patient, Coverage, and ExplanationOfBenefit
Expand Down Expand Up @@ -777,7 +782,7 @@ func (s *RequestsTestSuite) TestJobStatusErrorHandling() {

for _, tt := range tests {
s.T().Run(tt.testName, func(t *testing.T) {
h := newHandler(resourceMap, basePath, apiVersion, s.db)
h := newHandler(resourceMap, basePath, apiVersion, s.connections)
if tt.useMockService {
mockSrv := service.MockService{}
timestp := time.Now()
Expand Down Expand Up @@ -851,7 +856,7 @@ func (s *RequestsTestSuite) TestJobStatusProgress() {
apiVersion := apiVersionTwo
requestUrl := v2JobRequestUrl
resourceMap := s.resourceType
h := newHandler(resourceMap, basePath, apiVersion, s.db)
h := newHandler(resourceMap, basePath, apiVersion, s.connections)

req := httptest.NewRequest("GET", requestUrl, nil)
rctx := chi.NewRouteContext()
Expand Down Expand Up @@ -900,7 +905,7 @@ func (s *RequestsTestSuite) TestDeleteJob() {

for _, tt := range tests {
s.T().Run(tt.name, func(t *testing.T) {
handler := newHandler(s.resourceType, basePath, apiVersion, s.db)
handler := newHandler(s.resourceType, basePath, apiVersion, s.connections)

if tt.useMockService {
mockSrv := service.MockService{}
Expand Down Expand Up @@ -960,7 +965,7 @@ func (s *RequestsTestSuite) TestJobFailedStatus() {

for _, tt := range tests {
s.T().Run(tt.name, func(t *testing.T) {
h := newHandler(resourceMap, tt.basePath, tt.version, s.db)
h := newHandler(resourceMap, tt.basePath, tt.version, s.connections)
mockSrv := service.MockService{}
timestp := time.Now()
mockSrv.On("GetJobAndKeys", testUtils.CtxMatcher, uint(1)).Return(
Expand Down Expand Up @@ -1018,7 +1023,7 @@ func (s *RequestsTestSuite) TestGetResourceTypes() {
{"CT000000", "v2", []string{"Patient", "ExplanationOfBenefit", "Coverage", "Claim", "ClaimResponse"}},
}
for _, test := range testCases {
h := newHandler(s.resourceType, "/"+test.apiVersion+"/fhir", test.apiVersion, s.db)
h := newHandler(s.resourceType, "/"+test.apiVersion+"/fhir", test.apiVersion, s.connections)
rp := middleware.RequestParameters{
Version: test.apiVersion,
ResourceTypes: []string{},
Expand Down Expand Up @@ -1051,23 +1056,15 @@ func TestBulkRequest_Integration(t *testing.T) {

client.SetLogger(log.API) // Set logger so we don't get errors later

h := NewHandler(dataTypeMap, v2BasePath, apiVersionTwo)

cfg, err := database.LoadConfig()
if err != nil {
t.FailNow()
}
d, err := database.CreatePgxv5DB(cfg)
if err != nil {
t.FailNow()
}
driver := riverpgxv5.New(d)
connections := database.Connect()
h := NewHandler(dataTypeMap, v2BasePath, apiVersionTwo, connections)
driver := riverpgxv5.New(connections.Pgxv5Pool)
// start from clean river_job slate
_, err = driver.GetExecutor().Exec(context.Background(), `delete from river_job`)
_, err := driver.GetExecutor().Exec(context.Background(), `delete from river_job`)
assert.Nil(t, err)

acoID := "A0002"
repo := postgres.NewRepository(h.db)
repo := postgres.NewRepository(connections.Connection)

// our DB is not always cleaned up properly so sometimes this record exists when this test runs and sometimes it doesnt
repo.CreateACO(context.Background(), models.ACO{CMSID: &acoID, UUID: uuid.NewUUID()}) // nolint:errcheck
Expand Down Expand Up @@ -1130,7 +1127,7 @@ func (s *RequestsTestSuite) genGroupRequest(groupID string, rp middleware.Reques
rctx := chi.NewRouteContext()
rctx.URLParams.Add("groupId", groupID)

aco := postgrestest.GetACOByUUID(s.T(), s.db, s.acoID)
aco := postgrestest.GetACOByUUID(s.T(), s.connections.Connection, s.acoID)
ad := auth.AuthData{ACOID: s.acoID.String(), CMSID: *aco.CMSID, TokenID: uuid.NewRandom().String()}

ctx := context.WithValue(req.Context(), chi.RouteCtxKey, rctx)
Expand All @@ -1145,7 +1142,7 @@ func (s *RequestsTestSuite) genGroupRequest(groupID string, rp middleware.Reques

func (s *RequestsTestSuite) genPatientRequest(rp middleware.RequestParameters) *http.Request {
req := httptest.NewRequest("GET", "http://bcda.cms.gov/api/v1/Patient/$export", nil)
aco := postgrestest.GetACOByUUID(s.T(), s.db, s.acoID)
aco := postgrestest.GetACOByUUID(s.T(), s.connections.Connection, s.acoID)
ad := auth.AuthData{ACOID: s.acoID.String(), CMSID: *aco.CMSID, TokenID: uuid.NewRandom().String()}
ctx := context.WithValue(req.Context(), auth.AuthDataContextKey, ad)
ctx = middleware.SetRequestParamsCtx(ctx, rp)
Expand All @@ -1156,7 +1153,7 @@ func (s *RequestsTestSuite) genPatientRequest(rp middleware.RequestParameters) *

func (s *RequestsTestSuite) genASRequest() *http.Request {
req := httptest.NewRequest("GET", "http://bcda.cms.gov/api/v1/attribution_status", nil)
aco := postgrestest.GetACOByUUID(s.T(), s.db, s.acoID)
aco := postgrestest.GetACOByUUID(s.T(), s.connections.Connection, s.acoID)
ad := auth.AuthData{ACOID: s.acoID.String(), CMSID: *aco.CMSID, TokenID: uuid.NewRandom().String()}
ctx := context.WithValue(req.Context(), auth.AuthDataContextKey, ad)
newLogEntry := MakeTestStructuredLoggerEntry(logrus.Fields{"cms_id": "A9999", "request_id": uuid.NewRandom().String()})
Expand Down Expand Up @@ -1184,7 +1181,7 @@ func (s *RequestsTestSuite) genGetJobsRequest(version string, statuses []models.

req := httptest.NewRequest("GET", target, nil)

aco := postgrestest.GetACOByUUID(s.T(), s.db, s.acoID)
aco := postgrestest.GetACOByUUID(s.T(), s.connections.Connection, s.acoID)
ad := auth.AuthData{ACOID: s.acoID.String(), CMSID: *aco.CMSID, TokenID: uuid.NewRandom().String()}

ctx := context.WithValue(req.Context(), auth.AuthDataContextKey, ad)
Expand All @@ -1205,7 +1202,7 @@ func (s *RequestsTestSuite) TestValidateResources() {
"Patient": {},
"Coverage": {},
"ExplanationOfBenefit": {},
}, fhirPath, apiVersion, s.db)
}, fhirPath, apiVersion, s.connections)
err := h.validateResources([]string{"Vegetable"}, "1234")
assert.Contains(s.T(), err.Error(), "invalid resource type")
}
Expand Down
39 changes: 22 additions & 17 deletions bcda/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/CMSgov/bcda-app/bcda/api"
"github.com/CMSgov/bcda-app/bcda/auth"
"github.com/CMSgov/bcda-app/bcda/constants"
"github.com/CMSgov/bcda-app/bcda/database"
"github.com/CMSgov/bcda-app/bcda/health"
"github.com/CMSgov/bcda-app/bcda/responseutils"
"github.com/CMSgov/bcda-app/bcda/service"
Expand All @@ -25,20 +26,24 @@ import (
"github.com/CMSgov/bcda-app/log"
)

var h *api.Handler
type ApiV1 struct {
handler *api.Handler
connections *database.Connections
}

func init() {
func NewApiV1(connections *database.Connections) *ApiV1 {
resources, ok := service.GetDataTypes([]string{
"Patient",
"Coverage",
"ExplanationOfBenefit",
"Observation",
}...)

if ok {
h = api.NewHandler(resources, "/v1/fhir", "v1")
} else {
if !ok {
panic("Failed to configure resource DataTypes")
} else {
h := api.NewHandler(resources, "/v1/fhir", "v1", connections)
return &ApiV1{handler: h, connections: connections}
}
}

Expand All @@ -64,8 +69,8 @@ Responses:
429: tooManyRequestsResponse
500: errorResponse
*/
func BulkPatientRequest(w http.ResponseWriter, r *http.Request) {
h.BulkPatientRequest(w, r)
func (a ApiV1) BulkPatientRequest(w http.ResponseWriter, r *http.Request) {
a.handler.BulkPatientRequest(w, r)
}

/*
Expand All @@ -92,8 +97,8 @@ func BulkPatientRequest(w http.ResponseWriter, r *http.Request) {
429: tooManyRequestsResponse
500: errorResponse
*/
func BulkGroupRequest(w http.ResponseWriter, r *http.Request) {
h.BulkGroupRequest(w, r)
func (a ApiV1) BulkGroupRequest(w http.ResponseWriter, r *http.Request) {
a.handler.BulkGroupRequest(w, r)
}

/*
Expand Down Expand Up @@ -122,8 +127,8 @@ Responses:
410: goneResponse
500: errorResponse
*/
func JobStatus(w http.ResponseWriter, r *http.Request) {
h.JobStatus(w, r)
func (a ApiV1) JobStatus(w http.ResponseWriter, r *http.Request) {
a.handler.JobStatus(w, r)
}

/*
Expand Down Expand Up @@ -162,8 +167,8 @@ Responses:
410: goneResponse
500: errorResponse
*/
func JobsStatus(w http.ResponseWriter, r *http.Request) {
h.JobsStatus(w, r)
func (a ApiV1) JobsStatus(w http.ResponseWriter, r *http.Request) {
a.handler.JobsStatus(w, r)
}

type gzipResponseWriter struct {
Expand Down Expand Up @@ -204,8 +209,8 @@ Responses:
410: goneResponse
500: errorResponse
*/
func DeleteJob(w http.ResponseWriter, r *http.Request) {
h.DeleteJob(w, r)
func (a ApiV1) DeleteJob(w http.ResponseWriter, r *http.Request) {
a.handler.DeleteJob(w, r)
}

/*
Expand All @@ -229,8 +234,8 @@ Responses:
200: AttributionFileStatusResponse
404: notFoundResponse
*/
func AttributionStatus(w http.ResponseWriter, r *http.Request) {
h.AttributionStatus(w, r)
func (a ApiV1) AttributionStatus(w http.ResponseWriter, r *http.Request) {
a.handler.AttributionStatus(w, r)
}

/*
Expand Down
Loading