-
Notifications
You must be signed in to change notification settings - Fork 37
/
Copy pathmain.bal
138 lines (113 loc) · 5.33 KB
/
main.bal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import ballerina/log;
import ballerina/sql;
import ballerina/time;
import ballerinax/h2.driver as _;
import ballerinax/java.jdbc;
final jdbc:Client dbClient = check new (url = "jdbc:h2:file:./database/loandatabase", user = "test", password = "test");
public function main() returns error? {
check initDB();
[LoanRequest[], LoanApproval[]] extractedData = check extract();
[Loan[], BranchPerformance[], RegionPerformance[]] transformResult = transform(extractedData[0], extractedData[1]);
check load(transformResult);
}
function extract() returns [LoanRequest[], LoanApproval[]]|error {
log:printInfo("BEGIN: extract data from the sftp server");
// Hint: Use io ballerina library and read the csv files
string loanRequestFile = "loan_request_2024_03_22.csv";
LoanRequest[] loanRequests;
string loanApprovalsFile = "approved_loans_2024_03_22.csv";
LoanApproval[] loanApprovals;
log:printInfo("END: extract data from the sftp server");
return [loanRequests, loanApprovals];
}
function transform(LoanRequest[] loanRequests, LoanApproval[] loanApprovals)
returns [Loan[], BranchPerformance[], RegionPerformance[]] {
log:printInfo("START: transform data");
// Get the unique approved loan requests by joining two csv files
// Create an array of Loan records
// Hint: User ballerina integrated queries and transformLoanRequest function
Loan[] approvedLoans;
BranchPerformance[] branchPerformance = from var {branch, loanType, grantedAmount, interest}
in approvedLoans
group by branch, loanType
select {
id: generateId(),
branch,
loanType,
totalGrants: sum(grantedAmount),
totalInterest: sum(interest),
date: todayString()
};
// Group the `approvedLoans` by region, loanType, date, dayOfWeek
// Hint: User ballerina integrated queries and use `sum` function when needed
RegionPerformance[] regionPerformance;
log:printInfo("END: transform data");
return [approvedLoans, branchPerformance, regionPerformance];
}
function transformLoanRequest(LoanRequest loanRequest, LoanApproval loanApproval) returns Loan {
log:printInfo(string `START: transform loan request: ${loanRequest.loanRequestId}`);
var {loanRequestId, amount, loanType, datetime, period, branch, status} = loanRequest;
var {grantedAmount, interest, period: approvedPeriod} = loanApproval;
// date time related operations
time:Date date = fromUtcStringToDate(datetime, USA_UTC_OFFSET_IN_SECONDS);
string dateString = fromDateToString(date);
DayOfWeek dayOfWeek = getDayOfWeek(date);
// Hint: Categorize branch by region
string region;
// Hint: Catergorization of loans by amount and type
LoanCatergotyByAmount loanCatergoryByAmount;
// Hint: Calculate total interest
decimal totalInterest;
// Hint: Get the loan status
LoanStatus loanStatus;
// Hint: Get the loan type
LoanType 'type;
log:printInfo(string `END: transform loan request: ${loanRequest.loanRequestId}`);
return {
loanRequestId,
amount,
loanType: 'type,
datetime,
period,
branch,
status: loanStatus,
dayOfWeek,
region,
date: dateString,
grantedAmount,
interest: totalInterest,
approvedPeriod,
loanCatergoryByAmount
};
}
function load([Loan[], BranchPerformance[], RegionPerformance[]] transformResult) returns error? {
log:printInfo("START: loading data");
check loadLoan(transformResult[0]);
check loadBranchPerformance(transformResult[1]);
check loadRegionPerformance(transformResult[2]);
log:printInfo("END: loading data");
}
function loadRegionPerformance(RegionPerformance[] data) returns error? {
sql:ParameterizedQuery[] insertQueries = from RegionPerformance rp in data
select `INSERT INTO RegionPerformance
(id, region, loanType, date, dayOfWeek, totalGrants, totalInterest)
VALUES (${rp.id}, ${rp.region}, ${rp.loanType},
${rp.date}, ${rp.dayOfWeek}, ${rp.totalGrants}, ${rp.totalInterest})`;
_ = check dbClient->batchExecute(insertQueries);
}
function loadBranchPerformance(BranchPerformance[] data) returns error? {
sql:ParameterizedQuery[] insertQueries = from BranchPerformance bp in data
select `INSERT INTO BranchPerformance (id, branch, loanType, totalGrants, totalInterest, date)
VALUES (${bp.id}, ${bp.branch}, ${bp.loanType}, ${bp.totalGrants}, ${bp.totalInterest}, ${bp.date})`;
_ = check dbClient->batchExecute(insertQueries);
}
function loadLoan(Loan[] data) returns error? {
sql:ParameterizedQuery[] insertQueries = from Loan loan in data
select `INSERT INTO Loan (loanRequestId, amount, period, branch, status, loanType,
datetime, dayOfWeek, region, date, interest, grantedAmount, approvedPeriod, loanCatergoryByAmount)
VALUES (${loan.loanRequestId}, ${loan.amount}, ${loan.period}, ${loan.branch}, ${loan.status}, ${loan.loanType}, ${loan.datetime}, ${loan.dayOfWeek}, ${loan.region}, ${loan.date}, ${loan.interest}, ${loan.grantedAmount}, ${loan.approvedPeriod}, ${loan.loanCatergoryByAmount})`;
_ = check dbClient->batchExecute(insertQueries);
}
function getRegion(string branch) returns string {
return branchToRegionMap[branch] ?: "";
}