Skip to content

Commit

Permalink
Merge pull request #392 from ARGOeu/devel
Browse files Browse the repository at this point in the history
Version 2.1.2
  • Loading branch information
themiszamani authored May 10, 2023
2 parents 7474e46 + a48df2a commit c6c413e
Show file tree
Hide file tree
Showing 323 changed files with 44,052 additions and 118 deletions.
30 changes: 23 additions & 7 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pipeline {
}
}
}
stage('Flink Jobs Testing & Packaging') {
stage('Flink Jobs v2 Testing & Packaging') {
agent {
docker {
image 'argo.registry:5000/epel-7-java18'
Expand All @@ -42,12 +42,6 @@ pipeline {
steps {
echo 'Packaging & Testing Flink Jobs'
sh """
mvn clean package cobertura:cobertura -Dcobertura.report.format=xml -f ${PROJECT_DIR}/flink_jobs/stream_status/pom.xml
mvn clean package cobertura:cobertura -Dcobertura.report.format=xml -f ${PROJECT_DIR}/flink_jobs/batch_ar/pom.xml
mvn clean package cobertura:cobertura -Dcobertura.report.format=xml -f ${PROJECT_DIR}/flink_jobs/batch_status/pom.xml
mvn clean package cobertura:cobertura -Dcobertura.report.format=xml -f ${PROJECT_DIR}/flink_jobs/ams_ingest_metric/pom.xml
mvn clean package cobertura:cobertura -Dcobertura.report.format=xml -f ${PROJECT_DIR}/flink_jobs/ams_ingest_sync/pom.xml
mvn clean package cobertura:cobertura -Dcobertura.report.format=xml -f ${PROJECT_DIR}/flink_jobs/status_trends/pom.xml
mvn clean package cobertura:cobertura -Dcobertura.report.format=xml -f ${PROJECT_DIR}/flink_jobs_v2/pom.xml
"""
junit '**/target/surefire-reports/*.xml'
Expand All @@ -60,6 +54,28 @@ pipeline {
}
}
}
stage('Flink Jobs v3 Testing & Packaging') {
agent {
docker {
image 'argo.registry:5000/epel-7-java11-mvn384'
args '-u jenkins:jenkins -v $HOME/.m2:/root/.m2 -v /var/run/docker.sock:/var/run/docker.sock -u root:root'
}
}
steps {
sh """
cd ${PROJECT_DIR}/flink_jobs_v3/
mvn clean package
"""
junit '**/target/surefire-reports/*.xml'
archiveArtifacts artifacts: '**/target/*.jar'
step( [ $class: 'JacocoPublisher' ] )
}
post {
always {
cleanWs()
}
}
}
}
post {
success {
Expand Down
29 changes: 28 additions & 1 deletion bin/multi_job_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ def compose_hdfs_commands(year, month, day, args, config):
hdfs_metric = hdfs_metric.fill(
namenode=namenode.geturl(), hdfs_user=hdfs_user, tenant=tenant).geturl()

hdfs_tenants = config.get("HDFS", "path_tenants")
hdfs_tenants = hdfs_tenants.fill(
namenode=namenode.geturl(), hdfs_user=hdfs_user
).geturl()

# dictionary holding all the commands with their respective arguments' name
hdfs_commands = dict()

Expand All @@ -57,6 +62,13 @@ def compose_hdfs_commands(year, month, day, args, config):
# file location of target day's metric data (local or hdfs)
hdfs_commands["--mdata"] = hdfs_check_path(
hdfs_metric+"/"+args.date, client)

# if job will run in combined data mode then we should add the hdfs tenants path as --basispath to the jar
# to find out if the tenant will run in combined data mode we check if --source-data
# is provided as argument and has value other than tenant

if ("source_data" in args and args.source_data != "tenant"):
hdfs_commands["--basispath"] = hdfs_check_path(hdfs_tenants, client)

return hdfs_commands

Expand Down Expand Up @@ -157,6 +169,16 @@ def compose_command(config, args, hdfs_commands, dry_run=False):
if "trends" not in args.calculate:
cmd_command.append("--calcTrends")
cmd_command.append("OFF")

# check if sources are defined
if args.source_data:
cmd_command.append("--source-data")
cmd_command.append(args.source_data)

if args.source_topo:
cmd_command.append("--source-topo")
cmd_command.append(args.source_topo)

return cmd_command


Expand Down Expand Up @@ -211,6 +233,11 @@ def main(args=None):
action="store_true", dest="clear_results")
parser.add_argument("--dry-run", help="Runs in test mode without actually submitting the job",
action="store_true", dest="dry_run")

parser.add_argument(
"-s", "--source-data", metavar="STRING", help="Define source of data to be used (possible values: tenant, feeds, all)",
dest="source_data", required=False)
parser.add_argument(
"-S", "--source-topo", metavar="STRING", help="Define source of topology to be used (possible values: tenant, all)",
dest="source_topo", required=False)
# Pass the arguments to main method
sys.exit(main(parser.parse_args()))
8 changes: 6 additions & 2 deletions bin/test_multi_job_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
expected_result2 = """sudo flink_path run -c multi_class multi.jar --run.date 2021-01-01 --mongo.uri mongodb://localhost:21017/argo_TENANTA \
--mongo.method insert --pdata hdfs://hdfs_test_host:hdfs_test_port/user/hdfs_test_user/argo/tenants/TENANTA/mdata/2020-12-31 \
--mdata hdfs://hdfs_test_host:hdfs_test_port/user/hdfs_test_user/argo/tenants/TENANTA/mdata/2021-01-01 \
--api.endpoint api.foo --api.token key1 --report.id report_uuid --clearMongo true --calcStatus OFF"""
--api.endpoint api.foo --api.token key1 --report.id report_uuid --clearMongo true --calcStatus OFF --source-data feeds --source-topo all"""


class TestClass(unittest.TestCase):
Expand All @@ -36,6 +36,8 @@ def test_compose_command(self):
parser.add_argument('--method')
parser.add_argument('--clear-prev-results',dest='clear_results',action='store_true')
parser.add_argument('--calculate')
parser.add_argument('--source-data')
parser.add_argument('--source-topo')
args = parser.parse_args(
['--tenant', 'TENANTA', '--date', '2018-02-11', '--report', 'report_name', '--method', 'upsert', '--sudo', '--clear-prev-results', '--calculate', 'trends'])

Expand All @@ -61,8 +63,10 @@ def test_compose_command2(self):
parser.add_argument('--method', default="insert")
parser.add_argument('--clear-prev-results',dest='clear_results',action='store_true')
parser.add_argument('--calculate')
parser.add_argument('--source-data')
parser.add_argument('--source-topo')
args = parser.parse_args(
['--tenant', 'TENANTA', '--report', 'report_name', '--sudo','--clear-prev-results', '--calculate', 'trends, ar'])
['--tenant', 'TENANTA', '--report', 'report_name', '--sudo','--clear-prev-results', '--calculate', 'trends, ar', '--source-data', 'feeds', '--source-topo', 'all'])

hdfs_metric = "hdfs://hdfs_test_host:hdfs_test_port/user/hdfs_test_user/argo/tenants/TENANTA/mdata"

Expand Down
1 change: 1 addition & 0 deletions conf/argo-streaming.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ user= foo
rollback_days= 3
path_metric= hdfs://{{namenode}}/user/{{hdfs_user}}/argo/tenants/{{tenant}}/mdata
path_sync= hdfs://{{namenode}}/user/{{hdfs_user}}/argo/tenants/{{tenant}}/sync
path_tenants= hdfs://{{namenode}}/user/{{hdfs_user}}/argo/tenants
writer_bin= /home/root/hdfs

[STREAMING]
Expand Down
2 changes: 2 additions & 0 deletions conf/conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ rollback_days: 3
path_metric: {{namenode}}/user/{{hdfs_user}}/argo/tenants/{{tenant}}/mdata
# hdfs path for sync data
path_sync: {{namenode}}/user/{{hdfs_user}}/argo/tenants/{{tenant}}/sync
# hdfs path for root folder were argo tenants reside
path_tenants: {{namenode}}/user/{{hdfs_user}}/argo/tenants

# hdfs writer executable
writer_bin: /path/to/binary
Expand Down
5 changes: 5 additions & 0 deletions conf/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
"type": "template,uri",
"default": "hdfs://{{namenode}}/user/{{hdfs_user}}/argo/tenants/{{tenant}}/sync"
},
"path_tenants": {
"desc": "template for constructing the hdfs root path to where all tenants reside",
"type": "template,uri",
"default": "hdfs://{{namenode}}/user/{{hdfs_user}}/argo/tenants"
},
"user": {
"desc": "name of the hdfs user",
"type": "string",
Expand Down
10 changes: 8 additions & 2 deletions flink_jobs_v2/ApiResourceManager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
<parent>
<groupId>flink.jobs.v2</groupId>
<artifactId>flink_jobs_v2</artifactId>
<version>2.1.1</version>
<version>2.1.2</version>
</parent>
<version>2.1.1</version>
<version>2.1.2</version>
<groupId>api.resource.manager</groupId>
<artifactId>ApiResourceManager</artifactId>
<packaging>jar</packaging>
Expand All @@ -21,6 +21,12 @@


<dependencies>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>1.6</version>
</dependency>
<!-- Apache Flink dependencies -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,20 @@
import argo.avro.GroupGroup;
import argo.avro.MetricProfile;
import argo.avro.Weight;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.EnumMap;
import java.util.List;
import java.util.TimeZone;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

/**
* APIResourceManager class fetches remote argo-web-api resources such as report
Expand All @@ -33,6 +44,7 @@ public class ApiResourceManager {
private String weightsID;
private RequestManager requestManager;
private ApiResponseParser apiResponseParser;
private boolean isSourceTopoAll;
private boolean isCombined;
//private boolean verify;
//private int timeoutSec;
Expand Down Expand Up @@ -210,6 +222,38 @@ public void getRemoteMetric() {

}

/**
* Retrieves the metric profile content based on the metric_id attribute and
* stores it to the enum map
*/
public MetricProfile[] getNewEntriesMetrics() throws ParseException {

if (this.data.get(ApiResource.METRIC) == null) {
getRemoteMetric();
}
String content = this.data.get(ApiResource.METRIC);

JsonParser jsonParser = new JsonParser();
JsonElement jElement = jsonParser.parse(content);
JsonObject jRoot = jElement.getAsJsonObject();
String mpDate = jRoot.get("date").getAsString();
String yesterdayContent = null;
if (mpDate.equals(date)) {
DateTime yesterday = convertStringtoDate("yyyy-MM-dd", mpDate).minusDays(1);
String yesterdaystr = convertDateToString("yyyy-MM-dd", yesterday);

String path = "https://%s/api/v2/metric_profiles/%s?date=%s";
String fullURL = String.format(path, this.endpoint, this.metricID, yesterdaystr);
yesterdayContent = this.apiResponseParser.getJsonData(this.requestManager.getResource(fullURL), false);

}
List<MetricProfile> newentries = this.apiResponseParser.getListNewMetrics(content, yesterdayContent);

MetricProfile[] rArr = new MetricProfile[newentries.size()];
rArr = newentries.toArray(rArr);
return rArr;
}

/**
* Retrieves the aggregation profile content based on the aggreagation_id
* attribute and stores it to the enum map
Expand Down Expand Up @@ -257,8 +301,12 @@ public void getRemoteThresholds() {
* Retrieves the topology endpoint content and stores it to the enum map
*/
public void getRemoteTopoEndpoints() {
String combinedparam="";
if(isSourceTopoAll){
combinedparam="&mode=combined";
}
String path = "https://%s/api/v2/topology/endpoints/by_report/%s?date=%s";
String fullURL = String.format(path, this.endpoint, this.reportName, this.date);
String fullURL = String.format(path, this.endpoint, this.reportName, this.date+combinedparam);
String content = this.requestManager.getResource(fullURL);

this.data.put(ApiResource.TOPOENDPOINTS, this.apiResponseParser.getJsonData(content, true));
Expand All @@ -269,8 +317,13 @@ public void getRemoteTopoEndpoints() {
* Retrieves the topology groups content and stores it to the enum map
*/
public void getRemoteTopoGroups() {
String combinedparam="";
if(isSourceTopoAll){
combinedparam="&mode=combined";
}

String path = "https://%s/api/v2/topology/groups/by_report/%s?date=%s";
String fullURL = String.format(path, this.endpoint, this.reportName, this.date);
String fullURL = String.format(path, this.endpoint, this.reportName, this.date+combinedparam);
String content = this.requestManager.getResource(fullURL);

this.data.put(ApiResource.TOPOGROUPS, this.apiResponseParser.getJsonData(content, true));
Expand Down Expand Up @@ -505,6 +558,7 @@ public void getRemoteAll() {
this.getRemoteThresholds();
}
// Go to topology

this.getRemoteTopoEndpoints();
this.getRemoteTopoGroups();
// get weights
Expand All @@ -526,4 +580,26 @@ public void setIsCombined(boolean isCombined) {
this.isCombined = isCombined;
}

public void setIsSourceTopoAll(boolean isSourceTopoAll) {
this.isSourceTopoAll = isSourceTopoAll;
}



public static DateTime convertStringtoDate(String format, String dateStr) throws ParseException {

SimpleDateFormat sdf = new SimpleDateFormat(format);
sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
Date date = sdf.parse(dateStr);
return new DateTime(date.getTime(), DateTimeZone.UTC);

}

public static String convertDateToString(String format, DateTime date) throws ParseException {

//String format = "yyyy-MM-dd'T'HH:mm:ss'Z'";
DateTimeFormatter dtf = DateTimeFormat.forPattern(format);
String dateString = date.toString(dtf);
return dateString;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.text.ParseException;
import java.text.SimpleDateFormat;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;

/**
* Parses a given request's response
Expand Down Expand Up @@ -110,19 +114,21 @@ public void setEgroup(String egroup) {
* @return First available item in data array as JSON string representation
*/
public String getJsonData(String content, boolean asArray) {

JsonParser jsonParser = new JsonParser();
// Grab the first - and only line of json from ops data
JsonElement jElement = jsonParser.parse(content);
JsonObject jRoot = jElement.getAsJsonObject();

// Get the data array and the first item
if(jRoot.get("data")==null) {
return null;
}

if (asArray) {
if (jRoot.get("data") == null) {
return null;
}
return jRoot.get("data").toString();
}

JsonArray jData = jRoot.get("data").getAsJsonArray();
if (!jData.iterator().hasNext()) {
return null;
Expand Down Expand Up @@ -314,5 +320,31 @@ public List<MetricProfile> getListMetrics(String content) {
}
return results;
}




/**
* Compares the metric profile between two dates,and return the ones that are newly introduced
* @param content
* @param yesterdayContent
* @return
*/
public List<MetricProfile> getListNewMetrics(String content, String yesterdayContent) {

List<MetricProfile> results = new ArrayList<MetricProfile>();

if (yesterdayContent == null) {
return results;
}
results = getListMetrics(content);

List<MetricProfile> yesterdayResults = new ArrayList<MetricProfile>();
yesterdayResults = getListMetrics(yesterdayContent);

results.removeAll(yesterdayResults);
return results;
}

}

Loading

0 comments on commit c6c413e

Please sign in to comment.