mirror of
https://github.com/openstf/stf
synced 2025-10-05 19:42:01 +02:00
add groups feature
This commit is contained in:
parent
6fd750dad5
commit
7f5dc4c152
119 changed files with 12416 additions and 402 deletions
115
lib/units/groups-engine/index.js
Normal file
115
lib/units/groups-engine/index.js
Normal file
|
@ -0,0 +1,115 @@
|
|||
/**
|
||||
* Copyright © 2019 code initially contributed by Orange SA, authors: Denis Barbaron - Licensed under the Apache license 2.0
|
||||
**/
|
||||
|
||||
const events = require('events')
|
||||
const Promise = require('bluebird')
|
||||
const logger = require('../../util/logger')
|
||||
const zmqutil = require('../../util/zmqutil')
|
||||
const srv = require('../../util/srv')
|
||||
const lifecycle = require('../../util/lifecycle')
|
||||
const wireutil = require('../../wire/util')
|
||||
|
||||
const groupsScheduler = require('./scheduler')
|
||||
const groupsWatcher = require('./watchers/groups')
|
||||
const devicesWatcher = require('./watchers/devices')
|
||||
const usersWatcher = require('./watchers/users')
|
||||
|
||||
module.exports = function(options) {
|
||||
const log = logger.createLogger('groups-engine')
|
||||
const channelRouter = new events.EventEmitter()
|
||||
|
||||
const push = zmqutil.socket('push')
|
||||
Promise.map(options.endpoints.push, function(endpoint) {
|
||||
return srv.resolve(endpoint).then(function(records) {
|
||||
return srv.attempt(records, function(record) {
|
||||
log.info('Sending output to "%s"', record.url)
|
||||
push.connect(record.url)
|
||||
return Promise.resolve(true)
|
||||
})
|
||||
})
|
||||
})
|
||||
.catch(function(err) {
|
||||
log.fatal('Unable to connect to push endpoint', err)
|
||||
lifecycle.fatal()
|
||||
})
|
||||
|
||||
// Input
|
||||
const sub = zmqutil.socket('sub')
|
||||
Promise.map(options.endpoints.sub, function(endpoint) {
|
||||
return srv.resolve(endpoint).then(function(records) {
|
||||
return srv.attempt(records, function(record) {
|
||||
log.info('Receiving input from "%s"', record.url)
|
||||
sub.connect(record.url)
|
||||
return Promise.resolve(true)
|
||||
})
|
||||
})
|
||||
})
|
||||
.catch(function(err) {
|
||||
log.fatal('Unable to connect to sub endpoint', err)
|
||||
lifecycle.fatal()
|
||||
})
|
||||
|
||||
const pushdev = zmqutil.socket('push')
|
||||
Promise.map(options.endpoints.pushdev, function(endpoint) {
|
||||
return srv.resolve(endpoint).then(function(records) {
|
||||
return srv.attempt(records, function(record) {
|
||||
log.info('Sending output to "%s"', record.url)
|
||||
pushdev.connect(record.url)
|
||||
return Promise.resolve(true)
|
||||
})
|
||||
})
|
||||
})
|
||||
.catch(function(err) {
|
||||
log.fatal('Unable to connect to pushdev endpoint', err)
|
||||
lifecycle.fatal()
|
||||
})
|
||||
|
||||
const subdev = zmqutil.socket('sub')
|
||||
Promise.map(options.endpoints.subdev, function(endpoint) {
|
||||
return srv.resolve(endpoint).then(function(records) {
|
||||
return srv.attempt(records, function(record) {
|
||||
log.info('Receiving input from "%s"', record.url)
|
||||
subdev.connect(record.url)
|
||||
return Promise.resolve(true)
|
||||
})
|
||||
})
|
||||
})
|
||||
.catch(function(err) {
|
||||
log.fatal('Unable to connect to subdev endpoint', err)
|
||||
lifecycle.fatal()
|
||||
})
|
||||
|
||||
// Establish always-on channels
|
||||
;[wireutil.global].forEach(function(channel) {
|
||||
log.info('Subscribing to permanent channel "%s"', channel)
|
||||
sub.subscribe(channel)
|
||||
subdev.subscribe(channel)
|
||||
})
|
||||
|
||||
sub.on('message', function(channel, data) {
|
||||
channelRouter.emit(channel.toString(), channel, data)
|
||||
})
|
||||
|
||||
subdev.on('message', function(channel, data) {
|
||||
channelRouter.emit(channel.toString(), channel, data)
|
||||
})
|
||||
|
||||
groupsScheduler()
|
||||
groupsWatcher(push, pushdev, channelRouter)
|
||||
devicesWatcher(push, pushdev, channelRouter)
|
||||
usersWatcher(pushdev)
|
||||
|
||||
lifecycle.observe(function() {
|
||||
[push, sub, pushdev, subdev].forEach(function(sock) {
|
||||
try {
|
||||
sock.close()
|
||||
}
|
||||
catch (err) {
|
||||
// No-op
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
log.info('Groups engine started')
|
||||
}
|
156
lib/units/groups-engine/scheduler/index.js
Normal file
156
lib/units/groups-engine/scheduler/index.js
Normal file
|
@ -0,0 +1,156 @@
|
|||
/**
|
||||
* Copyright © 2019 code initially contributed by Orange SA, authors: Denis Barbaron - Licensed under the Apache license 2.0
|
||||
**/
|
||||
|
||||
const Promise = require('bluebird')
|
||||
const logger = require('../../../util/logger')
|
||||
const apiutil = require('../../../util/apiutil')
|
||||
const db = require('../../../db')
|
||||
const dbapi = require('../../../db/api')
|
||||
const r = require('rethinkdb')
|
||||
|
||||
module.exports = function() {
|
||||
const log = logger.createLogger('groups-scheduler')
|
||||
|
||||
function updateOriginGroupLifetime(group) {
|
||||
const lock = {}
|
||||
|
||||
return dbapi.adminLockGroup(group.id, lock).then(function(lockingSuccessed) {
|
||||
if (lockingSuccessed) {
|
||||
const now = Date.now()
|
||||
|
||||
return db.run(r.table('groups').get(group.id).update({
|
||||
dates: [{
|
||||
start: new Date(now)
|
||||
, stop: new Date(now + (group.dates[0].stop - group.dates[0].start))
|
||||
}]
|
||||
}))
|
||||
}
|
||||
return false
|
||||
})
|
||||
.finally(function() {
|
||||
return dbapi.adminUnlockGroup(lock)
|
||||
})
|
||||
}
|
||||
|
||||
function deleteUserGroup(group) {
|
||||
const lock = {}
|
||||
|
||||
return dbapi.adminLockGroup(group.id, lock).then(function(lockingSuccessed) {
|
||||
if (lockingSuccessed) {
|
||||
return dbapi.deleteUserGroup(group.id)
|
||||
}
|
||||
else {
|
||||
return db.run(r.table('groups').get(group.id).update({
|
||||
isActive: false
|
||||
, state: apiutil.WAITING
|
||||
}))
|
||||
}
|
||||
})
|
||||
.finally(function() {
|
||||
return dbapi.adminUnlockGroup(lock)
|
||||
})
|
||||
}
|
||||
|
||||
function updateGroupDates(group, incr, isActive) {
|
||||
const repetitions = group.repetitions - incr
|
||||
const dates = group.dates.slice(incr)
|
||||
const duration = group.devices.length * (dates[0].stop - dates[0].start) * (repetitions + 1)
|
||||
|
||||
return db.run(r.table('groups').get(group.id).update({
|
||||
dates: dates
|
||||
, repetitions: repetitions
|
||||
, duration: duration
|
||||
, isActive: isActive
|
||||
, state: apiutil.READY
|
||||
}))
|
||||
.then(function() {
|
||||
return dbapi.updateUserGroupDuration(group.owner.email, group.duration, duration)
|
||||
})
|
||||
}
|
||||
|
||||
function doBecomeUnactiveGroup(group) {
|
||||
const lock = {}
|
||||
|
||||
return dbapi.adminLockGroup(group.id, lock).then(function(lockingSuccessed) {
|
||||
if (lockingSuccessed) {
|
||||
return updateGroupDates(group, 1, false)
|
||||
}
|
||||
else {
|
||||
return db.run(r.table('groups').get(group.id).update({
|
||||
isActive: false
|
||||
, state: apiutil.WAITING
|
||||
}))
|
||||
}
|
||||
})
|
||||
.finally(function() {
|
||||
return dbapi.adminUnlockGroup(lock)
|
||||
})
|
||||
}
|
||||
|
||||
function doCleanElapsedGroupDates(group, incr) {
|
||||
const lock = {}
|
||||
|
||||
return dbapi.adminLockGroup(group.id, lock).then(function(lockingSuccessed) {
|
||||
return lockingSuccessed ? updateGroupDates(group, incr, false) : false
|
||||
})
|
||||
.finally(function() {
|
||||
return dbapi.adminUnlockGroup(lock)
|
||||
})
|
||||
}
|
||||
|
||||
function doBecomeActiveGroup(group, incr) {
|
||||
const lock = {}
|
||||
|
||||
return dbapi.adminLockGroup(group.id, lock).then(function(lockingSuccessed) {
|
||||
return lockingSuccessed ? updateGroupDates(group, incr, true) : false
|
||||
})
|
||||
.finally(function() {
|
||||
return dbapi.adminUnlockGroup(lock)
|
||||
})
|
||||
}
|
||||
|
||||
dbapi.unlockBookingObjects().then(function() {
|
||||
setInterval(function() {
|
||||
const now = Date.now()
|
||||
|
||||
dbapi.getReadyGroupsOrderByIndex('startTime').then(function(groups) {
|
||||
Promise.each(groups, (function(group) {
|
||||
if (apiutil.isOriginGroup(group.class)) {
|
||||
if (now >= group.dates[0].stop.getTime()) {
|
||||
return updateOriginGroupLifetime(group)
|
||||
}
|
||||
}
|
||||
else if ((group.isActive || group.state === apiutil.WAITING) &&
|
||||
now >= group.dates[0].stop.getTime()) {
|
||||
if (group.dates.length === 1) {
|
||||
return deleteUserGroup(group)
|
||||
}
|
||||
else {
|
||||
return doBecomeUnactiveGroup(group)
|
||||
}
|
||||
}
|
||||
else if (!group.isActive) {
|
||||
for(const i in group.dates) {
|
||||
if (now >= group.dates[i].stop.getTime()) {
|
||||
if (group.dates[i].stop === group.dates[group.dates.length - 1].stop) {
|
||||
return deleteUserGroup(group)
|
||||
}
|
||||
}
|
||||
else if (now < group.dates[i].start.getTime()) {
|
||||
return i > 0 ? doCleanElapsedGroupDates(group, i) : false
|
||||
}
|
||||
else {
|
||||
return doBecomeActiveGroup(group, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}))
|
||||
})
|
||||
.catch(function(err) {
|
||||
log.error('An error occured during groups scheduling', err.stack)
|
||||
})
|
||||
}, 1000)
|
||||
})
|
||||
}
|
254
lib/units/groups-engine/watchers/devices.js
Normal file
254
lib/units/groups-engine/watchers/devices.js
Normal file
|
@ -0,0 +1,254 @@
|
|||
/**
|
||||
* Copyright © 2019 code initially contributed by Orange SA, authors: Denis Barbaron - Licensed under the Apache license 2.0
|
||||
**/
|
||||
|
||||
const wirerouter = require('../../../wire/router')
|
||||
const _ = require('lodash')
|
||||
const r = require('rethinkdb')
|
||||
const util = require('util')
|
||||
const uuid = require('uuid')
|
||||
const logger = require('../../../util/logger')
|
||||
const timeutil = require('../../../util/timeutil')
|
||||
const wireutil = require('../../../wire/util')
|
||||
const wire = require('../../../wire')
|
||||
const dbapi = require('../../../db/api')
|
||||
const db = require('../../../db')
|
||||
|
||||
module.exports = function(push, pushdev, channelRouter) {
|
||||
const log = logger.createLogger('watcher-devices')
|
||||
|
||||
function sendReleaseDeviceControl(serial, channel) {
|
||||
push.send([
|
||||
channel
|
||||
, wireutil.envelope(
|
||||
new wire.UngroupMessage(
|
||||
wireutil.toDeviceRequirements({
|
||||
serial: {
|
||||
value: serial
|
||||
, match: 'exact'
|
||||
}
|
||||
})
|
||||
)
|
||||
)
|
||||
])
|
||||
}
|
||||
|
||||
function sendDeviceGroupChange(id, group, serial, originName) {
|
||||
pushdev.send([
|
||||
wireutil.global
|
||||
, wireutil.envelope(
|
||||
new wire.DeviceGroupChangeMessage(
|
||||
id
|
||||
, new wire.DeviceGroupMessage(
|
||||
group.id
|
||||
, group.name
|
||||
, new wire.DeviceGroupOwnerMessage(
|
||||
group.owner.email
|
||||
, group.owner.name
|
||||
)
|
||||
, new wire.DeviceGroupLifetimeMessage(
|
||||
group.dates[0].start.getTime()
|
||||
, group.dates[0].stop.getTime()
|
||||
)
|
||||
, group.class
|
||||
, group.repetitions
|
||||
, originName
|
||||
)
|
||||
, serial
|
||||
)
|
||||
)
|
||||
])
|
||||
}
|
||||
|
||||
function sendDeviceChange(device1, device2, action) {
|
||||
function publishDevice() {
|
||||
const device = _.cloneDeep(device1)
|
||||
|
||||
delete device.channel
|
||||
delete device.owner
|
||||
delete device.group.id
|
||||
delete device.group.lifeTime
|
||||
return device
|
||||
}
|
||||
|
||||
pushdev.send([
|
||||
wireutil.global
|
||||
, wireutil.envelope(
|
||||
new wire.DeviceChangeMessage(
|
||||
publishDevice()
|
||||
, action
|
||||
, device2.group.origin
|
||||
, timeutil.now('nano')
|
||||
)
|
||||
)
|
||||
])
|
||||
}
|
||||
|
||||
function sendReleaseDeviceControlAndDeviceGroupChange(
|
||||
device
|
||||
, sendDeviceGroupChangeWrapper) {
|
||||
let messageListener
|
||||
const responseTimer = setTimeout(function() {
|
||||
channelRouter.removeListener(wireutil.global, messageListener)
|
||||
sendDeviceGroupChangeWrapper()
|
||||
}, 5000)
|
||||
|
||||
messageListener = wirerouter()
|
||||
.on(wire.LeaveGroupMessage, function(channel, message) {
|
||||
if (message.serial === device.serial &&
|
||||
message.owner.email === device.owner.email) {
|
||||
clearTimeout(responseTimer)
|
||||
channelRouter.removeListener(wireutil.global, messageListener)
|
||||
sendDeviceGroupChangeWrapper()
|
||||
}
|
||||
})
|
||||
.handler()
|
||||
|
||||
channelRouter.on(wireutil.global, messageListener)
|
||||
sendReleaseDeviceControl(device.serial, device.channel)
|
||||
}
|
||||
|
||||
db.run(r
|
||||
.table('devices')
|
||||
.pluck(
|
||||
'serial'
|
||||
, 'channel'
|
||||
, 'owner'
|
||||
, 'model'
|
||||
, 'operator'
|
||||
, 'manufacturer'
|
||||
, {group: ['id', 'origin', 'originName', 'lifeTime']}
|
||||
, {provider: ['name']}
|
||||
, {network: ['type', 'subtype']}
|
||||
, {display: ['height', 'width']}
|
||||
, 'version'
|
||||
, 'sdk'
|
||||
, 'abi'
|
||||
, 'cpuPlatform'
|
||||
, 'openGLESVersion'
|
||||
, {phone: ['imei']}
|
||||
)
|
||||
.changes(), function(err, cursor) {
|
||||
if (err) {
|
||||
throw err
|
||||
}
|
||||
return cursor
|
||||
})
|
||||
.then(function(cursor) {
|
||||
cursor.each(function(err, data) {
|
||||
if (err) {
|
||||
throw err
|
||||
}
|
||||
if (data.old_val === null) {
|
||||
return sendDeviceChange(data.new_val, data.new_val, 'created')
|
||||
}
|
||||
else if (data.new_val === null) {
|
||||
sendDeviceChange(data.old_val, data.old_val, 'deleted')
|
||||
}
|
||||
else if (data.new_val.model !== data.old_val.model ||
|
||||
data.new_val.group.origin !== data.old_val.group.origin ||
|
||||
data.new_val.operator !== data.old_val.operator ||
|
||||
data.new_val.hasOwnProperty('network') &&
|
||||
(!data.old_val.hasOwnProperty('network') ||
|
||||
data.new_val.network.type !== data.old_val.network.type ||
|
||||
data.new_val.network.subtype !== data.old_val.network.subtype
|
||||
) ||
|
||||
data.new_val.provider.name !== data.old_val.provider.name) {
|
||||
sendDeviceChange(data.new_val, data.old_val, 'updated')
|
||||
}
|
||||
|
||||
const isDeleted = data.new_val === null
|
||||
const id = isDeleted ? data.old_val.group.id : data.new_val.group.id
|
||||
|
||||
return dbapi.getGroup(id).then(function(group) {
|
||||
function sendDeviceGroupChangeOnDeviceDeletion() {
|
||||
const fakeGroup = Object.assign({}, group)
|
||||
|
||||
fakeGroup.id = util.format('%s', uuid.v4()).replace(/-/g, '')
|
||||
fakeGroup.name = 'none'
|
||||
sendDeviceGroupChange(
|
||||
group.id
|
||||
, fakeGroup
|
||||
, data.old_val.serial
|
||||
, data.old_val.group.originName
|
||||
)
|
||||
}
|
||||
|
||||
function sendDeviceGroupChangeOnDeviceCurrentGroupUpdating() {
|
||||
sendDeviceGroupChange(
|
||||
data.old_val.group.id
|
||||
, group
|
||||
, data.new_val.serial
|
||||
, data.new_val.group.originName
|
||||
)
|
||||
}
|
||||
|
||||
if (group) {
|
||||
if (isDeleted) {
|
||||
if (data.old_val.owner) {
|
||||
sendReleaseDeviceControlAndDeviceGroupChange(
|
||||
data.old_val
|
||||
, sendDeviceGroupChangeOnDeviceDeletion
|
||||
)
|
||||
return
|
||||
}
|
||||
sendDeviceGroupChangeOnDeviceDeletion()
|
||||
return
|
||||
}
|
||||
|
||||
const isChangeCurrentGroup = data.new_val.group.id !== data.old_val.group.id
|
||||
const isChangeOriginGroup = data.new_val.group.origin !== data.old_val.group.origin
|
||||
const isChangeLifeTime =
|
||||
data.new_val.group.lifeTime.start.getTime() !==
|
||||
data.old_val.group.lifeTime.start.getTime()
|
||||
|
||||
if (isChangeLifeTime && !isChangeCurrentGroup && !isChangeOriginGroup) {
|
||||
sendDeviceGroupChange(
|
||||
data.old_val.group.id
|
||||
, group
|
||||
, data.new_val.serial
|
||||
, data.new_val.group.originName
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
if (isChangeCurrentGroup) {
|
||||
if (data.new_val.owner && group.users.indexOf(data.new_val.owner.email) < 0) {
|
||||
sendReleaseDeviceControlAndDeviceGroupChange(
|
||||
data.new_val
|
||||
, sendDeviceGroupChangeOnDeviceCurrentGroupUpdating
|
||||
)
|
||||
}
|
||||
else {
|
||||
sendDeviceGroupChangeOnDeviceCurrentGroupUpdating()
|
||||
}
|
||||
}
|
||||
|
||||
if (isChangeOriginGroup) {
|
||||
dbapi.getGroup(data.old_val.group.origin).then(function(originGroup) {
|
||||
if (originGroup) {
|
||||
dbapi.removeOriginGroupDevice(originGroup, data.new_val.serial)
|
||||
}
|
||||
})
|
||||
dbapi.getGroup(data.new_val.group.origin).then(function(originGroup) {
|
||||
if (originGroup) {
|
||||
dbapi.addOriginGroupDevice(originGroup, data.new_val.serial)
|
||||
}
|
||||
})
|
||||
if (!isChangeCurrentGroup) {
|
||||
sendDeviceGroupChange(
|
||||
data.new_val.group.id
|
||||
, group
|
||||
, data.new_val.serial
|
||||
, data.new_val.group.originName
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
||||
.catch(function(err) {
|
||||
log.error('An error occured during DEVICES table watching', err.stack)
|
||||
})
|
||||
}
|
346
lib/units/groups-engine/watchers/groups.js
Normal file
346
lib/units/groups-engine/watchers/groups.js
Normal file
|
@ -0,0 +1,346 @@
|
|||
/**
|
||||
* Copyright © 2019 code initially contributed by Orange SA, authors: Denis Barbaron - Licensed under the Apache license 2.0
|
||||
**/
|
||||
|
||||
const wirerouter = require('../../../wire/router')
|
||||
const Promise = require('bluebird')
|
||||
const _ = require('lodash')
|
||||
const r = require('rethinkdb')
|
||||
const logger = require('../../../util/logger')
|
||||
const timeutil = require('../../../util/timeutil')
|
||||
const apiutil = require('../../../util/apiutil')
|
||||
const wireutil = require('../../../wire/util')
|
||||
const wire = require('../../../wire')
|
||||
const dbapi = require('../../../db/api')
|
||||
const db = require('../../../db')
|
||||
|
||||
module.exports = function(push, pushdev, channelRouter) {
|
||||
const log = logger.createLogger('watcher-groups')
|
||||
|
||||
function sendReleaseDeviceControl(serial, channel) {
|
||||
push.send([
|
||||
channel
|
||||
, wireutil.envelope(
|
||||
new wire.UngroupMessage(
|
||||
wireutil.toDeviceRequirements({
|
||||
serial: {
|
||||
value: serial
|
||||
, match: 'exact'
|
||||
}
|
||||
})
|
||||
)
|
||||
)
|
||||
])
|
||||
}
|
||||
|
||||
function sendGroupChange(
|
||||
group
|
||||
, subscribers
|
||||
, isChangedDates
|
||||
, isChangedClass
|
||||
, isAddedUser
|
||||
, users
|
||||
, isAddedDevice
|
||||
, devices
|
||||
, action) {
|
||||
function dates2String(dates) {
|
||||
return dates.map(function(date) {
|
||||
return {
|
||||
start: date.start.toJSON()
|
||||
, stop: date.stop.toJSON()
|
||||
}
|
||||
})
|
||||
}
|
||||
pushdev.send([
|
||||
wireutil.global
|
||||
, wireutil.envelope(
|
||||
new wire.GroupChangeMessage(
|
||||
new wire.GroupField(
|
||||
group.id
|
||||
, group.name
|
||||
, group.class
|
||||
, group.privilege
|
||||
, group.owner
|
||||
, dates2String(group.dates)
|
||||
, group.duration
|
||||
, group.repetitions
|
||||
, group.devices
|
||||
, group.users
|
||||
, group.state
|
||||
, group.isActive
|
||||
)
|
||||
, action
|
||||
, subscribers
|
||||
, isChangedDates
|
||||
, isChangedClass
|
||||
, isAddedUser
|
||||
, users
|
||||
, isAddedDevice
|
||||
, devices
|
||||
, timeutil.now('nano')
|
||||
)
|
||||
)
|
||||
])
|
||||
}
|
||||
|
||||
function sendGroupUsersChange(group, users, devices, isAdded, action) {
|
||||
const isDeletedLater = action === 'GroupDeletedLater'
|
||||
|
||||
pushdev.send([
|
||||
wireutil.global
|
||||
, wireutil.envelope(
|
||||
new wire.GroupUserChangeMessage(users, isAdded, group.id, isDeletedLater, devices))
|
||||
])
|
||||
}
|
||||
|
||||
function doUpdateDeviceOriginGroup(group) {
|
||||
return dbapi.updateDeviceOriginGroup(group.ticket.serial, group).then(function() {
|
||||
push.send([
|
||||
wireutil.global
|
||||
, wireutil.envelope(
|
||||
new wire.DeviceOriginGroupMessage(group.ticket.signature)
|
||||
)
|
||||
])
|
||||
})
|
||||
}
|
||||
|
||||
function doUpdateDevicesCurrentGroup(group, devices) {
|
||||
return Promise.map(devices, function(serial) {
|
||||
return dbapi.updateDeviceCurrentGroup(serial, group)
|
||||
})
|
||||
}
|
||||
|
||||
function doUpdateDevicesCurrentGroupFromOrigin(devices) {
|
||||
return Promise.map(devices, function(serial) {
|
||||
return dbapi.updateDeviceCurrentGroupFromOrigin(serial)
|
||||
})
|
||||
}
|
||||
|
||||
function doUpdateDevicesCurrentGroupDates(group) {
|
||||
if (apiutil.isOriginGroup(group.class)) {
|
||||
return Promise.map(group.devices, function(serial) {
|
||||
return dbapi.loadDeviceBySerial(serial).then(function(device) {
|
||||
return device.group.id === group.id ?
|
||||
doUpdateDevicesCurrentGroup(group, [serial]) :
|
||||
false
|
||||
})
|
||||
})
|
||||
}
|
||||
else {
|
||||
return Promise.map(group.devices, function(serial) {
|
||||
return doUpdateDevicesCurrentGroup(group, [serial])
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
function treatGroupUsersChange(group, users, isActive, isAddedUser) {
|
||||
if (isActive) {
|
||||
return Promise.map(users, function(email) {
|
||||
return Promise.map(group.devices, function(serial) {
|
||||
return dbapi.loadDeviceBySerial(serial).then(function(device) {
|
||||
if (device && device.group.id === group.id) {
|
||||
if (!isAddedUser && device.owner && device.owner.email === email) {
|
||||
return new Promise(function(resolve) {
|
||||
let messageListener
|
||||
const responseTimer = setTimeout(function() {
|
||||
channelRouter.removeListener(wireutil.global, messageListener)
|
||||
resolve(serial)
|
||||
}, 5000)
|
||||
|
||||
messageListener = wirerouter()
|
||||
.on(wire.LeaveGroupMessage, function(channel, message) {
|
||||
if (message.serial === serial &&
|
||||
message.owner.email === email) {
|
||||
clearTimeout(responseTimer)
|
||||
channelRouter.removeListener(wireutil.global, messageListener)
|
||||
resolve(serial)
|
||||
}
|
||||
})
|
||||
.handler()
|
||||
|
||||
channelRouter.on(wireutil.global, messageListener)
|
||||
sendReleaseDeviceControl(serial, device.channel)
|
||||
})
|
||||
}
|
||||
return serial
|
||||
}
|
||||
return false
|
||||
})
|
||||
})
|
||||
.then(function(devices) {
|
||||
sendGroupUsersChange(
|
||||
group, [email], _.without(devices, false), isAddedUser, 'GroupUser(s)Updated')
|
||||
})
|
||||
})
|
||||
}
|
||||
else {
|
||||
return sendGroupUsersChange(group, users, [], isAddedUser, 'GroupUser(s)Updated')
|
||||
}
|
||||
}
|
||||
|
||||
function treatGroupDevicesChange(oldGroup, group, devices, isAddedDevice) {
|
||||
if (isAddedDevice) {
|
||||
return doUpdateDevicesCurrentGroup(group, devices)
|
||||
}
|
||||
else {
|
||||
return doUpdateDevicesCurrentGroupFromOrigin(devices)
|
||||
.then(function() {
|
||||
if (group === null) {
|
||||
sendGroupUsersChange(oldGroup, oldGroup.users, [], false, 'GroupDeletedLater')
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
function treatGroupDeletion(group) {
|
||||
if (apiutil.isOriginGroup(group.class)) {
|
||||
return dbapi.getRootGroup().then(function(rootGroup) {
|
||||
return Promise.map(group.devices, function(serial) {
|
||||
return dbapi.updateDeviceOriginGroup(serial, rootGroup)
|
||||
})
|
||||
.then(function() {
|
||||
sendGroupUsersChange(group, group.users, [], false, 'GroupDeletedLater')
|
||||
})
|
||||
})
|
||||
}
|
||||
else {
|
||||
return sendGroupUsersChange(group, group.users, [], false, 'GroupDeleted')
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
db.run(r
|
||||
.table('groups')
|
||||
.pluck(
|
||||
'id'
|
||||
, 'name'
|
||||
, 'class'
|
||||
, 'privilege'
|
||||
, 'owner'
|
||||
, 'dates'
|
||||
, 'duration'
|
||||
, 'repetitions'
|
||||
, 'devices'
|
||||
, 'users'
|
||||
, 'state'
|
||||
, 'isActive'
|
||||
, 'ticket'
|
||||
)
|
||||
.changes(), function(err, cursor) {
|
||||
if (err) {
|
||||
throw err
|
||||
}
|
||||
return cursor
|
||||
})
|
||||
.then(function(cursor) {
|
||||
cursor.each(function(err, data) {
|
||||
let users, devices, isBecomeActive, isBecomeUnactive, isActive
|
||||
, isAddedUser, isAddedDevice, isUpdatedDeviceOriginGroup, isChangedDates
|
||||
|
||||
if (err) {
|
||||
throw err
|
||||
}
|
||||
if (data.old_val === null) {
|
||||
sendGroupChange(
|
||||
data.new_val
|
||||
, data.new_val.users
|
||||
, false
|
||||
, false
|
||||
, false
|
||||
, []
|
||||
, false
|
||||
, []
|
||||
, 'created'
|
||||
)
|
||||
return sendGroupUsersChange(
|
||||
data.new_val
|
||||
, data.new_val.users
|
||||
, data.new_val.devices
|
||||
, true
|
||||
, 'GroupCreated'
|
||||
)
|
||||
}
|
||||
|
||||
if (data.new_val === null) {
|
||||
sendGroupChange(
|
||||
data.old_val
|
||||
, data.old_val.users
|
||||
, false
|
||||
, false
|
||||
, false
|
||||
, []
|
||||
, false
|
||||
, []
|
||||
, 'deleted'
|
||||
)
|
||||
|
||||
users = data.old_val.users
|
||||
devices = data.old_val.devices
|
||||
isChangedDates = false
|
||||
isActive = data.old_val.isActive
|
||||
isBecomeActive = isBecomeUnactive = false
|
||||
isAddedUser = isAddedDevice = false
|
||||
isUpdatedDeviceOriginGroup = false
|
||||
}
|
||||
else {
|
||||
users = _.xor(data.new_val.users, data.old_val.users)
|
||||
devices = _.xor(data.new_val.devices, data.old_val.devices)
|
||||
isChangedDates =
|
||||
data.old_val.dates.length !== data.new_val.dates.length ||
|
||||
data.old_val.dates[0].start.getTime() !==
|
||||
data.new_val.dates[0].start.getTime() ||
|
||||
data.old_val.dates[0].stop.getTime() !==
|
||||
data.new_val.dates[0].stop.getTime()
|
||||
isActive = data.new_val.isActive
|
||||
isBecomeActive = !data.old_val.isActive && data.new_val.isActive
|
||||
isBecomeUnactive = data.old_val.isActive && !data.new_val.isActive
|
||||
isAddedUser = data.new_val.users.length > data.old_val.users.length
|
||||
isAddedDevice = data.new_val.devices.length > data.old_val.devices.length
|
||||
isUpdatedDeviceOriginGroup =
|
||||
data.new_val.ticket !== null &&
|
||||
(data.old_val.ticket === null ||
|
||||
data.new_val.ticket.signature !== data.old_val.ticket.signature)
|
||||
|
||||
if (!isUpdatedDeviceOriginGroup) {
|
||||
sendGroupChange(
|
||||
data.new_val
|
||||
, _.union(data.old_val.users, data.new_val.users)
|
||||
, isChangedDates
|
||||
, data.old_val.class !== data.new_val.class
|
||||
, isAddedUser
|
||||
, users
|
||||
, isAddedDevice
|
||||
, devices
|
||||
, 'updated'
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
if (isUpdatedDeviceOriginGroup) {
|
||||
return doUpdateDeviceOriginGroup(data.new_val)
|
||||
}
|
||||
else if (isBecomeActive && data.new_val.devices.length) {
|
||||
return doUpdateDevicesCurrentGroup(data.new_val, data.new_val.devices)
|
||||
}
|
||||
else if (isBecomeUnactive && data.new_val.devices.length) {
|
||||
return doUpdateDevicesCurrentGroupFromOrigin(data.new_val.devices)
|
||||
}
|
||||
else if (devices.length && isActive && !apiutil.isOriginGroup(data.old_val.class)) {
|
||||
return treatGroupDevicesChange(data.old_val, data.new_val, devices, isAddedDevice)
|
||||
}
|
||||
else if (data.new_val === null) {
|
||||
return treatGroupDeletion(data.old_val)
|
||||
}
|
||||
else if (isChangedDates && isActive) {
|
||||
return doUpdateDevicesCurrentGroupDates(data.new_val)
|
||||
}
|
||||
else if (users.length) {
|
||||
return treatGroupUsersChange(data.old_val, users, isActive, isAddedUser)
|
||||
}
|
||||
return true
|
||||
})
|
||||
})
|
||||
.catch(function(err) {
|
||||
log.error('An error occured during GROUPS table watching', err.stack)
|
||||
})
|
||||
}
|
94
lib/units/groups-engine/watchers/users.js
Normal file
94
lib/units/groups-engine/watchers/users.js
Normal file
|
@ -0,0 +1,94 @@
|
|||
/**
|
||||
* Copyright © 2019 code initially contributed by Orange SA, authors: Denis Barbaron - Licensed under the Apache license 2.0
|
||||
**/
|
||||
|
||||
const timeutil = require('../../../util/timeutil')
|
||||
const r = require('rethinkdb')
|
||||
const _ = require('lodash')
|
||||
const logger = require('../../../util/logger')
|
||||
const wireutil = require('../../../wire/util')
|
||||
const wire = require('../../../wire')
|
||||
const db = require('../../../db')
|
||||
|
||||
module.exports = function(pushdev) {
|
||||
const log = logger.createLogger('watcher-users')
|
||||
|
||||
function sendUserChange(user, isAddedGroup, groups, action, targets) {
|
||||
pushdev.send([
|
||||
wireutil.global
|
||||
, wireutil.envelope(
|
||||
new wire.UserChangeMessage(
|
||||
user
|
||||
, isAddedGroup
|
||||
, groups
|
||||
, action
|
||||
, targets
|
||||
, timeutil.now('nano')))
|
||||
])
|
||||
}
|
||||
|
||||
db.run(r
|
||||
.table('users')
|
||||
.pluck(
|
||||
'email'
|
||||
, 'name'
|
||||
, 'privilege'
|
||||
, {groups: ['quotas', 'subscribed']
|
||||
})
|
||||
.changes(), function(err, cursor) {
|
||||
if (err) {
|
||||
throw err
|
||||
}
|
||||
return cursor
|
||||
})
|
||||
.then(function(cursor) {
|
||||
cursor.each(function(err, data) {
|
||||
if (err) {
|
||||
throw err
|
||||
}
|
||||
if (data.old_val === null) {
|
||||
sendUserChange(data.new_val, false, [], 'created', ['settings'])
|
||||
}
|
||||
else if (data.new_val === null) {
|
||||
sendUserChange(data.old_val, false, [], 'deleted', ['settings'])
|
||||
}
|
||||
else {
|
||||
const targets = []
|
||||
|
||||
if (!_.isEqual(
|
||||
data.new_val.groups.quotas.allocated
|
||||
, data.old_val.groups.quotas.allocated)) {
|
||||
targets.push('settings')
|
||||
targets.push('view')
|
||||
}
|
||||
else if (!_.isEqual(
|
||||
data.new_val.groups.quotas.consumed
|
||||
, data.old_val.groups.quotas.consumed)) {
|
||||
targets.push('view')
|
||||
}
|
||||
else if (data.new_val.groups.quotas.defaultGroupsNumber !==
|
||||
data.old_val.groups.quotas.defaultGroupsNumber ||
|
||||
data.new_val.groups.quotas.defaultGroupsDuration !==
|
||||
data.old_val.groups.quotas.defaultGroupsDuration ||
|
||||
data.new_val.groups.quotas.defaultGroupsRepetitions !==
|
||||
data.old_val.groups.quotas.defaultGroupsRepetitions ||
|
||||
data.new_val.groups.quotas.repetitions !==
|
||||
data.old_val.groups.quotas.repetitions ||
|
||||
!_.isEqual(data.new_val.groups.subscribed, data.old_val.groups.subscribed)) {
|
||||
targets.push('settings')
|
||||
}
|
||||
if (targets.length) {
|
||||
sendUserChange(
|
||||
data.new_val
|
||||
, data.new_val.groups.subscribed.length > data.old_val.groups.subscribed.length
|
||||
, _.xor(data.new_val.groups.subscribed, data.old_val.groups.subscribed)
|
||||
, 'updated'
|
||||
, targets)
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
.catch(function(err) {
|
||||
log.error('An error occured during USERS table watching', err.stack)
|
||||
})
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue