1
0
Fork 0
mirror of https://github.com/openstf/stf synced 2025-10-05 10:39:25 +02:00

Allow all pending transaction results to be cancelled.

This commit is contained in:
Simo Kinnunen 2014-04-15 17:33:22 +09:00
parent f46540e51e
commit b0400130ff
2 changed files with 32 additions and 7 deletions

View file

@ -478,13 +478,12 @@ module.exports = function(options) {
} }
}) })
.catch(function(err) { .catch(function(err) {
var reply = wireutil.reply('storage')
log.error('Storage upload had an error', err.stack) log.error('Storage upload had an error', err.stack)
leaveChannel(responseChannel) leaveChannel(responseChannel)
push.send([ socket.emit('tx.cancel', responseChannel, {
channel success: false
, reply.fail('fail') , data: 'fail_upload'
]) })
}) })
}) })
.on('forward.test', function(channel, responseChannel, data) { .on('forward.test', function(channel, responseChannel, data) {

View file

@ -25,8 +25,17 @@ module.exports = function TransactionServiceFactory(socket) {
} }
} }
function cancelListener(someChannel, data) {
if (someChannel === channel) {
Object.keys(pending).forEach(function(source) {
pending[source].cancel(data)
})
}
}
socket.on('tx.done', doneListener) socket.on('tx.done', doneListener)
socket.on('tx.progress', progressListener) socket.on('tx.progress', progressListener)
socket.on('tx.cancel', cancelListener)
this.channel = channel this.channel = channel
this.results = results this.results = results
@ -40,6 +49,7 @@ module.exports = function TransactionServiceFactory(socket) {
.finally(function() { .finally(function() {
socket.removeListener('tx.done', doneListener) socket.removeListener('tx.done', doneListener)
socket.removeListener('tx.progress', progressListener) socket.removeListener('tx.progress', progressListener)
socket.removeListener('tx.cancel', cancelListener)
socket.emit('tx.cleanup', channel) socket.emit('tx.cleanup', channel)
}) })
.progressed(function() { .progressed(function() {
@ -67,8 +77,15 @@ module.exports = function TransactionServiceFactory(socket) {
} }
} }
function cancelListener(someChannel, data) {
if (someChannel === channel) {
pending.cancel(data)
}
}
socket.on('tx.done', doneListener) socket.on('tx.done', doneListener)
socket.on('tx.progress', progressListener) socket.on('tx.progress', progressListener)
socket.on('tx.cancel', cancelListener)
this.channel = channel this.channel = channel
this.result = result this.result = result
@ -77,6 +94,7 @@ module.exports = function TransactionServiceFactory(socket) {
.finally(function() { .finally(function() {
socket.removeListener('tx.done', doneListener) socket.removeListener('tx.done', doneListener)
socket.removeListener('tx.progress', progressListener) socket.removeListener('tx.progress', progressListener)
socket.removeListener('tx.cancel', cancelListener)
socket.emit('tx.cleanup', channel) socket.emit('tx.cleanup', channel)
}) })
.progressed(function() { .progressed(function() {
@ -90,7 +108,7 @@ module.exports = function TransactionServiceFactory(socket) {
function PendingTransactionResult(result) { function PendingTransactionResult(result) {
var resolver = Promise.defer() var resolver = Promise.defer()
, seq = 0 , seq = 0
, last = null , last = Infinity
, unplaced = [] , unplaced = []
resolver.promise.finally(function() { resolver.promise.finally(function() {
@ -102,7 +120,7 @@ module.exports = function TransactionServiceFactory(socket) {
var message var message
, foundAny = false , foundAny = false
while ((message = unplaced[seq])) { while (seq <= last && (message = unplaced[seq])) {
unplaced[seq] = void 0 unplaced[seq] = void 0
if (seq === last) { if (seq === last) {
@ -149,6 +167,14 @@ module.exports = function TransactionServiceFactory(socket) {
readQueue() readQueue()
} }
this.cancel = function(message) {
if (!result.settled) {
last = message.seq = seq
unplaced[message.seq] = message
readQueue()
}
}
this.result = result this.result = result
this.promise = resolver.promise this.promise = resolver.promise
} }