Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 38 additions & 33 deletions auditTrail/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
let e = {};
const _ = require('lodash')
const mongoose = require("mongoose");
const mongoose = require("@datanimbus/mongoose");

function isValue(a) {
return a == null || !(typeof a == 'object');
Expand Down Expand Up @@ -46,39 +46,39 @@ function getDiff(a, b, oldData, newData) {
}
}

e.getAuditPreSaveHook = (collectionName)=> {
return function(next, req){
if(req){
e.getAuditPreSaveHook = (collectionName) => {
return function (next, req) {
if (req) {
let data = {};
data.user = null
data.txnId = null
if (req && req.headers ) {
data.txnId = req.headers.TxnId || req.headers.txnId;
data.user = req.headers.User || req.headers.user;
}
if (req && req.headers) {
data.txnId = req.headers.TxnId || req.headers.txnId;
data.user = req.headers.User || req.headers.user;
}
data.timestamp = new Date();
data.data = {};
if(this._id){
mongoose.connection.db.collection(collectionName).findOne({_id: this._id})
.then(doc => {
data.data.old = doc ? doc : null;
this._auditData = data;
next();
});
}else{
if (this._id) {
mongoose.connection.db.collection(collectionName).findOne({ _id: this._id })
.then(doc => {
data.data.old = doc ? doc : null;
this._auditData = data;
next();
});
} else {
this._auditData = data;
next();
}
}
else{
else {
next();
}
};
};

e.getAuditPostSaveHook = (collectionName,client,queueName)=>{
return function(doc){
if(doc._auditData){
e.getAuditPostSaveHook = (collectionName, client, queueName) => {
return function (doc) {
if (doc._auditData) {
let oldData = doc._auditData.data.old;
let newData = doc.toJSON();
delete doc._auditData.data;
Expand All @@ -100,22 +100,24 @@ e.getAuditPostSaveHook = (collectionName,client,queueName)=>{
Object.assign(auditData.data.new, newData);
Object.assign(auditData.data.old, oldData);
}
if(!_.isEqual(auditData.data.old, auditData.data.new))
client.publish(queueName, JSON.stringify(auditData));
if (!_.isEqual(auditData.data.old, auditData.data.new))
client.getQueue(queueName).add(queueName, auditData, {
removeOnComplete: true
});
}
};
};

e.getAuditPreRemoveHook = ()=>{
return function(next, req){
if(req){
e.getAuditPreRemoveHook = () => {
return function (next, req) {
if (req) {
let data = {};
data.user = null
data.txnId = null
if (req && req.headers ) {
data.txnId = req.headers.TxnId || req.headers.txnId;
data.user = req.headers.User || req.headers.user;
}
if (req && req.headers) {
data.txnId = req.headers.TxnId || req.headers.txnId;
data.user = req.headers.User || req.headers.user;
}
data.timestamp = new Date();
data.data = {};
data._metadata = {};
Expand All @@ -132,11 +134,14 @@ e.getAuditPreRemoveHook = ()=>{
};
};

e.getAuditPostRemoveHook = (collectionName,client,queueName)=>{
return function(doc){
if(doc._auditData){
e.getAuditPostRemoveHook = (collectionName, client, queueName) => {
return function (doc) {
if (doc._auditData) {
doc._auditData.colName = collectionName;
client.publish(queueName, JSON.stringify(doc._auditData));}
client.getQueue(queueName).add(queueName, doc._auditData, {
removeOnComplete: true
});
}
};
};

Expand Down
2 changes: 1 addition & 1 deletion database/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const mongoose = require('mongoose');
const mongoose = require('@datanimbus/mongoose');
const log4js = require('log4js');

const logLevel = process.env.LOG_LEVEL ? process.env.LOG_LEVEL : 'info';
Expand Down
106 changes: 59 additions & 47 deletions eventsUtil/index.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@

var helperUtil = require('../eventsUtil/constants');
var eventPriorityMap = helperUtil.eventPriorityMap;
var client = null;

let log4js = require('log4js')
let log4js = require('log4js');
const { getQueue } = require('../queue');

const logLevel = process.env.LOG_LEVEL ? process.env.LOG_LEVEL : 'info';
log4js.configure({
appenders: { out: { type: 'stdout', layout: { type: 'basic' } } },
categories: { default: { appenders: ['out'], level: logLevel.toUpperCase() } }
});
});

let version = require('../package.json').version;
let loggerName = process.env.HOSTNAME ? `[${process.env.DATA_STACK_NAMESPACE}] [${process.env.HOSTNAME}] [data.stack-eventUtil ${version}]` : `[data.stack-eventUtil ${version}]`;
let logger = log4js.getLogger(loggerName);

function setNatsClient(natsClient) {
client = natsClient;
const EVENTS_QUEUE = 'events';
const priorityValues = {
"High": 1,
"Medium": 5,
"Low": 10
};

function getJobPriority(eventId) {
const priorityString = eventPriorityMap[eventId] || "High"; // default
return priorityValues[priorityString];
}
/**
*
Expand All @@ -25,48 +34,51 @@ function setNatsClient(natsClient) {
* @param {*} doc The Document Object of the Source Module
* @param {*} [partner] The Partner Document Object. (Only if origin is flow)
*/
function publishEvent(eventId, source, req, doc, partner) {
let txnId = ""
let user = ""
if (req && req.headers ) {
txnId = `[${req.headers.TxnId || req.headers.txnId}]`;
user = `[${req.headers.User || req.headers.user}]`;
}
try {
let payload = {
"eventId": eventId,
"source": source,
"documentId": doc._id,
"documentName": doc.name,
"app": doc.app,
"timestamp": new Date().toISOString(),
"priority": eventPriorityMap[eventId],
}
if (req) {
payload.triggerType = 'user';
payload.triggerId = user;
payload.txnId = txnId;
} else {
payload.triggerType = 'cron';
payload.triggerId = 'AGENT_HB_MISS';
}
if (partner) {
payload.partnerId = partner._id;
payload.partnerName = partner.name;
}
if (client) {
client.publish('events', JSON.stringify(payload));
logger.debug(`${txnId}Event published`);
logger.debug(`${txnId}Event payload - JSON.stringify(payload)`);
} else {
logger.error(`${txnId}Client not initialised to publish events`);
}
} catch (e) {
logger.error(`${txnId}Publish event : ${e.message}`);
}
async function publishEvent(eventId, source, req, doc, partner) {
let txnId = "";
let user = "";
if (req && req.headers) {
txnId = [req.headers.TxnId || req.headers.txnId];
user = [req.headers.User || req.headers.user];
}
try {
let payload = {
"eventId": eventId,
"source": source,
"documentId": doc._id,
"documentName": doc.name,
"app": doc.app,
"timestamp": new Date().toISOString(),
"priority": eventPriorityMap[eventId],
};
if (req) {
payload.triggerType = 'user';
payload.triggerId = user;
payload.txnId = txnId;
} else {
payload.triggerType = 'cron';
payload.triggerId = 'AGENT_HB_MISS';
}
if (partner) {
payload.partnerId = partner._id;
payload.partnerName = partner.name;
}

const queue = getQueue(EVENTS_QUEUE);
await queue.add(eventId, payload, {
priority: getJobPriority(eventId) || 1,
removeOnComplete: true,
});

logger.debug(`${txnId}Event published to queue`);
logger.debug(`${txnId}Event payload - ${JSON.stringify(payload)}`);
} catch (e) {
logger.error(`${txnId}Publish event error: ${e.message}`);
}
}

module.exports = {
setNatsClient: setNatsClient,
publishEvent: publishEvent
}
publishEvent: publishEvent,
EVENTS_QUEUE: EVENTS_QUEUE
};

1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ e.streaming = require("./streaming");
e.eventsUtil = require("./eventsUtil");
e.storageEngine = require("./storageEngine");
e.database = require("./database");
e.queue = require("./queue");

module.exports = e;
2 changes: 1 addition & 1 deletion logToMongo/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const mongoose = require('mongoose');
const mongoose = require('@datanimbus/mongoose');
const pathNotToLog = ["/audit", "/audit/count", "/webHookStatus", "/webHookStatus/count", "/logs", "/logs/count", "/health"];
function logToMongo(name) {
return function (req, res, next) {
Expand Down
6 changes: 4 additions & 2 deletions logToQueue/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,16 @@ function logToQueue(name, client, queueName, collectionName, masking, serviceId)
delete (body.data.reqBody);
}
}
let bodyStr = JSON.stringify(body);
let bodyStr = body;
if (req.originalUrl == '/rbac/validate' || req.originalUrl == '/rbac/usr/hb') {
next();
}
else {
try {
if (supportedHTTPMethods.indexOf(req.method) > -1) {
client.publish(queueName, bodyStr);
client.getQueue(queueName).add(queueName, bodyStr, {
removeOnComplete: true
});
}
} catch (e) {
logger.error(`[${req.headers.TxnId}] Publish error : ${e.message}`)
Expand Down
Loading