Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type AcceptInvitationPayload {

input RevokeInvitationInput {
invitationId: ID!
orgId: ID!
}

type RevokeInvitationPayload {
Expand Down
77 changes: 29 additions & 48 deletions apps/frontend/app/api/v1/osograph/schema/resolvers/index.ts
Original file line number Diff line number Diff line change
@@ -1,71 +1,52 @@
import { DateTimeISOResolver, GraphQLJSON } from "graphql-scalars";
import type { GraphQLResolverMap } from "@apollo/subgraph/dist/schema-helper/resolverMap";
import type { GraphQLContext } from "@/app/api/v1/osograph/types/context";
import { viewerResolvers } from "@/app/api/v1/osograph/schema/resolvers/viewer";
import { userResolvers } from "@/app/api/v1/osograph/schema/resolvers/user";
import { organizationResolvers } from "@/app/api/v1/osograph/schema/resolvers/organization";
import { invitationResolvers } from "@/app/api/v1/osograph/schema/resolvers/invitation";
import { notebookResolvers } from "@/app/api/v1/osograph/schema/resolvers/notebook";
import { datasetResolvers } from "@/app/api/v1/osograph/schema/resolvers/dataset";
import { dataModelResolvers } from "@/app/api/v1/osograph/schema/resolvers/data-model";
import { schedulerResolvers } from "@/app/api/v1/osograph/schema/resolvers/scheduler";
import { systemResolvers } from "@/app/api/v1/osograph/schema/resolvers/system";
import { staticModelResolvers } from "@/app/api/v1/osograph/schema/resolvers/static-model";
import { dataIngestionResolvers } from "@/app/api/v1/osograph/schema/resolvers/data-ingestion";
import { modelContextResolvers } from "@/app/api/v1/osograph/schema/resolvers/model-context";
import { dataConnectionResolvers } from "@/app/api/v1/osograph/schema/resolvers/data-connection";

import { systemResolvers } from "@/app/api/v1/osograph/schema/resolvers/system/index";
import { organizationResolvers } from "@/app/api/v1/osograph/schema/resolvers/organization/index";
import { resourceResolvers } from "@/app/api/v1/osograph/schema/resolvers/resource/index";
import { userResolvers } from "@/app/api/v1/osograph/schema/resolvers/user/index";

export const resolvers: GraphQLResolverMap<GraphQLContext> = {
DateTime: DateTimeISOResolver,
JSON: GraphQLJSON,

Query: {
...viewerResolvers.Query,
...userResolvers.Query,
...organizationResolvers.Query,
...invitationResolvers.Query,
...notebookResolvers.Query,
...datasetResolvers.Query,
...dataModelResolvers.Query,
...staticModelResolvers.Query,
...schedulerResolvers.Query,
...resourceResolvers.Query,
...systemResolvers.Query,
...dataConnectionResolvers.Query,
},

Mutation: {
...userResolvers.Mutation,
...organizationResolvers.Mutation,
...invitationResolvers.Mutation,
...notebookResolvers.Mutation,
...datasetResolvers.Mutation,
...dataModelResolvers.Mutation,
...staticModelResolvers.Mutation,
...schedulerResolvers.Mutation,
...resourceResolvers.Mutation,
...systemResolvers.Mutation,
...dataIngestionResolvers.Mutation,
...modelContextResolvers.Mutation,
...dataConnectionResolvers.Mutation,
},

Viewer: viewerResolvers.Viewer,
Viewer: userResolvers.Viewer,
User: userResolvers.User,
Organization: organizationResolvers.Organization,
OrganizationMember: organizationResolvers.OrganizationMember,
Invitation: invitationResolvers.Invitation,
Notebook: notebookResolvers.Notebook,
Dataset: datasetResolvers.Dataset,
DataModelDefinition: datasetResolvers.DataModelDefinition,
StaticModelDefinition: datasetResolvers.StaticModelDefinition,
DataIngestionDefinition: datasetResolvers.DataIngestionDefinition,
DataIngestion: dataIngestionResolvers.DataIngestion,
DataModel: dataModelResolvers.DataModel,
DataModelRevision: dataModelResolvers.DataModelRevision,
DataModelRelease: dataModelResolvers.DataModelRelease,
StaticModel: staticModelResolvers.StaticModel,
DataConnection: dataConnectionResolvers.DataConnection,
DataConnectionAlias: dataConnectionResolvers.DataConnectionAlias,
Run: schedulerResolvers.Run,
ModelContext: modelContextResolvers.ModelContext,
Step: schedulerResolvers.Step,
Materialization: schedulerResolvers.Materialization,
Invitation: userResolvers.Invitation,

Notebook: resourceResolvers.Notebook,
Dataset: resourceResolvers.Dataset,
DataModelDefinition: resourceResolvers.DataModelDefinition,
StaticModelDefinition: resourceResolvers.StaticModelDefinition,
DataIngestionDefinition: resourceResolvers.DataIngestionDefinition,
DataIngestion: resourceResolvers.DataIngestion,
DataModel: resourceResolvers.DataModel,
DataModelRevision: resourceResolvers.DataModelRevision,
DataModelRelease: resourceResolvers.DataModelRelease,
StaticModel: resourceResolvers.StaticModel,
DataConnection: resourceResolvers.DataConnection,
DataConnectionAlias: resourceResolvers.DataConnectionAlias,
ModelContext: resourceResolvers.ModelContext,

Run: systemResolvers.Run,
Step: systemResolvers.Step,
Materialization: systemResolvers.Materialization,
System: systemResolvers.System,
};
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import { SupabaseAdminClient } from "@/lib/supabase/admin";
import { logger } from "@/lib/logger";
import type { GraphQLContext } from "@/app/api/v1/osograph/types/context";
import { requireAuthentication } from "@/app/api/v1/osograph/utils/auth";
import {
OrganizationErrors,
ServerErrors,
} from "@/app/api/v1/osograph/utils/errors";
import {
CreateDataConnectionSchema,
validateInput,
} from "@/app/api/v1/osograph/utils/validation";
import { getTrinoAdminClient } from "@/lib/clients/trino";
import {
createTrinoCatalog,
validateDynamicConnector,
} from "@/lib/dynamic-connectors";
import { DynamicConnectorsRow } from "@/lib/types/schema-types";
import { createQueueService } from "@/lib/services/queue";
import { SyncConnectionRunRequest } from "@opensource-observer/osoprotobufs/sync-connection";
import { GraphQLResolverModule } from "@/app/api/v1/osograph/types/utils";
import { getOrgScopedClient } from "@/app/api/v1/osograph/utils/access-control";
import { MutationCreateDataConnectionArgs } from "@/lib/graphql/generated/graphql";

async function syncDataConnection(
client: SupabaseAdminClient,
userId: string,
dataConnection: Pick<DynamicConnectorsRow, "id" | "org_id">,
) {
const { data: queuedRun, error: queuedRunError } = await client
.from("run")
.insert({
org_id: dataConnection.org_id,
run_type: "manual",
requested_by: userId,
metadata: {
dataConnectionId: dataConnection.id,
},
})
.select()
.single();
if (queuedRunError || !queuedRun) {
logger.error(
`Error creating run for data connection ${dataConnection.id}: ${queuedRunError?.message}`,
);
throw ServerErrors.database("Failed to create run request");
}

const queueService = createQueueService();

const runIdBuffer = Buffer.from(queuedRun.id.replace(/-/g, ""), "hex");
const publishMessage: SyncConnectionRunRequest = {
runId: new Uint8Array(runIdBuffer),
connectionId: dataConnection.id,
};

const result = await queueService.queueMessage({
queueName: "sync_connection_run_requests",
message: publishMessage,
encoder: SyncConnectionRunRequest,
});
if (!result.success) {
logger.error(
`Failed to publish message to queue: ${result.error?.message}`,
);
throw ServerErrors.queueError(
result.error?.message || "Failed to publish to queue",
);
}
return queuedRun;
}

/**
* Data connection mutations that operate at organization scope.
* These resolvers use getOrgScopedClient because they don't have a resourceId yet.
*/
export const dataConnectionMutations: GraphQLResolverModule<GraphQLContext>["Mutation"] =
{
createDataConnection: async (
_: unknown,
{ input }: MutationCreateDataConnectionArgs,
context: GraphQLContext,
) => {
const authenticatedUser = requireAuthentication(context.user);
const { orgId, name, type, config, credentials } = validateInput(
CreateDataConnectionSchema,
input,
);

const { client } = await getOrgScopedClient(context, orgId);

const { data: org, error: orgError } = await client
.from("organizations")
.select()
.eq("id", orgId)
.single();
if (orgError || !org) {
throw OrganizationErrors.notFound();
}
validateDynamicConnector(name, type, org.org_name);

const { data, error } = await client
.from("dynamic_connectors")
.insert({
org_id: orgId,
connector_name: name,
connector_type: type,
config: config,
created_by: authenticatedUser.userId,
})
.select()
.single();

if (error) {
logger.error("Failed to create data connection:", error);
throw ServerErrors.database("Failed to create data connection");
}

const trinoClient = getTrinoAdminClient();
const { error: trinoError } = await createTrinoCatalog(
trinoClient,
data,
credentials,
);
if (trinoError) {
// Best effort try to cleanup the connector from supabase
await client.from("dynamic_connectors").delete().eq("id", data.id);
throw ServerErrors.externalService(
`Error creating catalog: ${trinoError}`,
);
}

await syncDataConnection(client, authenticatedUser.userId, data);

return {
success: true,
message: "Data connection created successfully",
dataConnection: data,
};
},
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { GraphQLContext } from "@/app/api/v1/osograph/types/context";
import {
CreateDataIngestionSchema,
validateInput,
} from "@/app/api/v1/osograph/utils/validation";
import { getOrgScopedClient } from "@/app/api/v1/osograph/utils/access-control";
import {
ResourceErrors,
ServerErrors,
} from "@/app/api/v1/osograph/utils/errors";
import { logger } from "@/lib/logger";
import { DataIngestionsRow } from "@/lib/types/schema-types";
import { GraphQLResolverModule } from "@/app/api/v1/osograph/types/utils";
import { MutationCreateDataIngestionConfigArgs } from "@/lib/graphql/generated/graphql";

/**
* Data ingestion mutations that operate at organization scope.
* These resolvers use getOrgScopedClient because they don't have a resourceId yet.
*/
export const dataIngestionMutations: GraphQLResolverModule<GraphQLContext>["Mutation"] =
{
async createDataIngestionConfig(
_: unknown,
args: MutationCreateDataIngestionConfigArgs,
context: GraphQLContext,
) {
const input = validateInput(CreateDataIngestionSchema, args.input);

// TODO(jabolo): This is incorrect, dataset id is not an org id
const { client } = await getOrgScopedClient(context, input.datasetId);

const { data: dataset, error: datasetError } = await client
.from("datasets")
.select("*")
.eq("id", input.datasetId)
.single();

if (datasetError || !dataset) {
logger.error(
`Error fetching dataset with id ${input.datasetId}: ${datasetError?.message}`,
);
throw ResourceErrors.notFound("Dataset not found");
}

const { client: orgClient } = await getOrgScopedClient(
context,
dataset.org_id,
);

const { data: existingConfig } = await orgClient
.from("data_ingestions")
.select("id")
.eq("dataset_id", input.datasetId)
.is("deleted_at", null)
.maybeSingle();

const { data: config, error: configError } = existingConfig
? await orgClient
.from("data_ingestions")
.update({
factory_type: input.factoryType,
config: input.config,
})
.eq("id", existingConfig.id)
.select()
.single()
: await orgClient
.from("data_ingestions")
.insert({
dataset_id: input.datasetId,
factory_type: input.factoryType,
config: input.config,
org_id: dataset.org_id,
name: dataset.name,
})
.select()
.single();

if (configError || !config) {
logger.error(
`Error creating data ingestion config: ${configError?.message}`,
);
throw ServerErrors.database("Failed to create data ingestion config");
}

// Return the raw row so the `DataIngestion` field resolvers can
// access snake_case database columns and resolve GraphQL fields.
return config as DataIngestionsRow;
},
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { logger } from "@/lib/logger";
import type { GraphQLContext } from "@/app/api/v1/osograph/types/context";
import { ServerErrors } from "@/app/api/v1/osograph/utils/errors";
import {
CreateDataModelSchema,
validateInput,
} from "@/app/api/v1/osograph/utils/validation";
import { GraphQLResolverModule } from "@/app/api/v1/osograph/types/utils";
import { getOrgScopedClient } from "@/app/api/v1/osograph/utils/access-control";
import { MutationCreateDataModelArgs } from "@/lib/graphql/generated/graphql";

/**
* Data model mutations that operate at organization scope.
* These resolvers use getOrgScopedClient because they don't have a resourceId yet.
*/
export const dataModelMutations: GraphQLResolverModule<GraphQLContext>["Mutation"] =
{
createDataModel: async (
_: unknown,
{ input }: MutationCreateDataModelArgs,
context: GraphQLContext,
) => {
const validatedInput = validateInput(CreateDataModelSchema, input);

const { client } = await getOrgScopedClient(
context,
validatedInput.orgId,
);

const { data, error } = await client
.from("model")
.insert({
org_id: validatedInput.orgId,
dataset_id: validatedInput.datasetId,
name: validatedInput.name,
is_enabled: validatedInput.isEnabled,
})
.select()
.single();

if (error) {
logger.error("Failed to create dataModel:", error);
throw ServerErrors.database("Failed to create dataModel");
}

return {
success: true,
message: "DataModel created successfully",
dataModel: data,
};
},
};
Loading