Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ module.exports = {
KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'u-bahn-api',

// topics
UBAHN_CREATE_USER_TOPIC: process.env.UBAHN_CREATE_TOPIC || 'user.action.topic.create',
UBAHN_CREATE_TOPIC: process.env.UBAHN_CREATE_TOPIC || 'u-bahn.action.create',
UBAHN_UPDATE_TOPIC: process.env.UBAHN_UPDATE_TOPIC || 'u-bahn.action.update',
UBAHN_DELETE_TOPIC: process.env.UBAHN_DELETE_TOPIC || 'u-bahn.action.delete',
Expand Down
16 changes: 10 additions & 6 deletions src/common/db-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,27 @@ async function get (model, pk, params) {
* @param model the sequelize model object
* @param entity entity to create
* @param auth the user auth object
* @param transaction the transaction object
* @returns {Promise<void>}
*/
async function create (model, entity, auth) {
async function create (model, entity, auth, transaction) {
if (auth) {
entity.createdBy = helper.getAuthUser(auth)
}
return model.create(entity)
return model.create(entity, { transaction })
}

/**
* delete object by pk
* @param model the sequelize model object
* @param pk the primary key
* @param transaction the transaction object
* @returns {Promise<void>}
*/
async function remove (model, pk, params) {
async function remove (model, pk, params, transaction) {
const instance = await get(model, pk, params)
return instance.destroy()
const result = await instance.destroy({ transaction })
return result
}

/**
Expand All @@ -132,13 +135,14 @@ async function remove (model, pk, params) {
* @param entity entity to create
* @param auth the auth object
* @param auth the path params
* @param transaction the transaction object
* @returns {Promise<void>}
*/
async function update (model, pk, entity, auth, params) {
async function update (model, pk, entity, auth, params, transaction) {
// insure that object exists
const instance = await get(model, pk, params)
entity.updatedBy = helper.getAuthUser(auth)
return instance.update(entity)
return instance.update(entity, { transaction })
}

/**
Expand Down
35 changes: 35 additions & 0 deletions src/common/es-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const config = require('config')
const _ = require('lodash')
const querystring = require('querystring')
const logger = require('../common/logger')
const helper = require('../common/helper')
const appConst = require('../consts')
const esClient = require('./es-client').getESClient()

Expand Down Expand Up @@ -282,6 +283,37 @@ function escapeRegex (str) {
/* eslint-enable no-useless-escape */
}

/**
* Process create entity
* @param {String} resource resource name
* @param {Object} entity entity object
*/
async function processCreate (resource, entity) {
helper.validProperties(entity, ['id'])
await esClient.index({
index: DOCUMENTS[resource].index,
type: DOCUMENTS[resource].type,
id: entity.id,
body: entity,
refresh: 'wait_for'
})
}

/**
* Process delete entity
* @param {String} resource resource name
* @param {Object} entity entity object
*/
async function processDelete (resource, entity) {
helper.validProperties(entity, ['id'])
await esClient.delete({
index: DOCUMENTS[resource].index,
type: DOCUMENTS[resource].type,
id: entity.id,
refresh: 'wait_for'
})
}

async function getOrganizationId (handle) {
const dBHelper = require('../common/db-helper')
const sequelize = require('../models/index')
Expand Down Expand Up @@ -1453,6 +1485,9 @@ async function searchAchievementValues ({ organizationId, keyword }) {
}

module.exports = {
processCreate,
processUpdate: processCreate,
processDelete,
searchElasticSearch,
getFromElasticSearch,
searchUsers,
Expand Down
16 changes: 16 additions & 0 deletions src/common/helper.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const config = require('config')
const Joi = require('@hapi/joi')
const querystring = require('querystring')
const errors = require('./errors')
const appConst = require('../consts')
Expand All @@ -9,6 +10,20 @@ const busApi = require('tc-bus-api-wrapper')
const busApiClient = busApi(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_CLIENT_ID',
'AUTH0_CLIENT_SECRET', 'BUSAPI_URL', 'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))

/**
* Function to valid require keys
* @param {Object} payload validated object
* @param {Array} keys required keys
* @throws {Error} if required key absent
*/
function validProperties (payload, keys) {
const schema = Joi.object(_.fromPairs(_.map(keys, key => [key, Joi.string().uuid().required()]))).unknown(true)
const error = schema.validate(payload).error
if (error) {
throw error
}
}

/**
* get auth user handle or id
* @param authUser the user
Expand Down Expand Up @@ -146,6 +161,7 @@ async function postEvent (topic, payload) {
}

module.exports = {
validProperties,
getAuthUser,
permissionCheck,
checkIfExists,
Expand Down
43 changes: 36 additions & 7 deletions src/common/service-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,17 @@ const MODEL_TO_RESOURCE = {
* Create record in es
* @param resource the resource to create
* @param result the resource fields
* @param toEs is to es directly
*/
async function createRecordInEs (resource, entity) {
async function createRecordInEs (resource, entity, toEs) {
try {
if (toEs) {
await esHelper.processCreate(resource, entity)
}
} catch (err) {
logger.logFullError(err)
throw err
}
try {
await publishMessage('create', resource, entity)
} catch (err) {
Expand All @@ -51,8 +60,17 @@ async function createRecordInEs (resource, entity) {
* Patch record in es
* @param resource the resource to create
* @param result the resource fields
* @param toEs is to es directly
*/
async function patchRecordInEs (resource, entity) {
async function patchRecordInEs (resource, entity, toEs) {
try {
if (toEs) {
await esHelper.processUpdate(resource, entity)
}
} catch (err) {
logger.logFullError(err)
throw err
}
try {
await publishMessage('patch', resource, entity)
} catch (err) {
Expand All @@ -65,8 +83,9 @@ async function patchRecordInEs (resource, entity) {
* @param id the id of record
* @param params the params of record (like nested ids)
* @param resource the resource to delete
* @param toEs is to es directly
*/
async function deleteRecordFromEs (id, params, resource) {
async function deleteRecordFromEs (id, params, resource, toEs) {
let payload
if (SUB_USER_DOCUMENTS[resource] || SUB_ORG_DOCUMENTS[resource]) {
payload = _.assign({}, params)
Expand All @@ -75,6 +94,15 @@ async function deleteRecordFromEs (id, params, resource) {
id
}
}
try {
if (toEs) {
await esHelper.processDelete(resource, payload)
}
} catch (err) {
logger.logFullError(err)
throw err
}

try {
await publishMessage('remove', resource, payload)
} catch (err) {
Expand Down Expand Up @@ -174,13 +202,14 @@ function sleep (ms) {
}

/**
* delete child of record with delay between each item deleted
* delete child of record with delay between each item deleted and with transaction
* @param model the child model to delete
* @param id the user id to delete
* @param params the params for child
* @param resourceName the es recource name
* @param transaction the transaction object
*/
async function deleteChild (model, id, params, resourceName) {
async function deleteChild (model, id, params, resourceName, transaction) {
const query = {}
query[params[0]] = id
const result = await dbHelper.find(model, query)
Expand All @@ -194,8 +223,8 @@ async function deleteChild (model, id, params, resourceName) {
params.forEach(attr => { esParams[attr] = record[attr] })

// remove from db
dbHelper.remove(model, record.id)
deleteRecordFromEs(record.id, esParams, resourceName)
await dbHelper.remove(model, record.id, transaction)
await deleteRecordFromEs(record.id, esParams, resourceName, !!transaction)

// sleep for configured time
await sleep(config.CASCADE_PAUSE_MS)
Expand Down
40 changes: 27 additions & 13 deletions src/modules/user/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

const joi = require('@hapi/joi')
const _ = require('lodash')
const config = require('config')

const errors = require('../../common/errors')
const logger = require('../../common/logger')
const helper = require('../../common/helper')
const dbHelper = require('../../common/db-helper')
const serviceHelper = require('../../common/service-helper')
Expand All @@ -30,8 +32,15 @@ const uniqueFields = [['handle']]
async function create (entity, auth) {
await dbHelper.makeSureUnique(User, entity, uniqueFields)

const result = await dbHelper.create(User, entity, auth)
await serviceHelper.createRecordInEs(resource, result.dataValues)
const result = await sequelize.transaction(async (t) => {
const userEntity = await dbHelper.create(User, entity, auth, t)
await serviceHelper.createRecordInEs(resource, userEntity.dataValues, true)
try {
await helper.postEvent(config.UBAHN_CREATE_USER_TOPIC, userEntity.dataValues)
} catch (err) {
logger.logFullError(err)
}
})

return result
}
Expand All @@ -56,10 +65,13 @@ create.schema = {
async function patch (id, entity, auth, params) {
await dbHelper.makeSureUnique(User, entity, uniqueFields)

const newEntity = await dbHelper.update(User, id, entity, auth)
await serviceHelper.patchRecordInEs(resource, newEntity.dataValues)
const result = await sequelize.transaction(async (t) => {
const newEntity = await dbHelper.update(User, id, entity, auth, null, t)
await serviceHelper.patchRecordInEs(resource, newEntity.dataValues, true)
return newEntity
})

return newEntity
return result
}

patch.schema = {
Expand Down Expand Up @@ -153,7 +165,7 @@ search.schema = {
* @return {Promise<void>} no data returned
*/
async function remove (id, auth, params) {
beginCascadeDelete(id, params)
await beginCascadeDelete(id, params)
}

/**
Expand All @@ -162,13 +174,15 @@ async function remove (id, auth, params) {
* @param params the path params
*/
async function beginCascadeDelete (id, params) {
await serviceHelper.deleteChild(Achievement, id, ['userId', 'achievementsProviderId'], 'Achievement')
await serviceHelper.deleteChild(ExternalProfile, id, ['userId', 'organizationId'], 'ExternalProfile')
await serviceHelper.deleteChild(UserAttribute, id, ['userId', 'attributeId'], 'UserAttribute')
await serviceHelper.deleteChild(UsersRole, id, ['userId', 'roleId'], 'UsersRole')
await serviceHelper.deleteChild(UsersSkill, id, ['userId', 'skillId'], 'UsersSkill')
await dbHelper.remove(User, id)
await serviceHelper.deleteRecordFromEs(id, params, resource)
await sequelize.transaction(async (t) => {
await serviceHelper.deleteChild(Achievement, id, ['userId', 'achievementsProviderId'], 'Achievement', t)
await serviceHelper.deleteChild(ExternalProfile, id, ['userId', 'organizationId'], 'ExternalProfile', t)
await serviceHelper.deleteChild(UserAttribute, id, ['userId', 'attributeId'], 'UserAttribute', t)
await serviceHelper.deleteChild(UsersRole, id, ['userId', 'roleId'], 'UsersRole', t)
await serviceHelper.deleteChild(UsersSkill, id, ['userId', 'skillId'], 'UsersSkill', t)
await dbHelper.remove(User, id, null, t)
await serviceHelper.deleteRecordFromEs(id, params, resource, true)
})
}

module.exports = {
Expand Down