Windows execmd: basic execm (question/response) test ok

This commit is contained in:
Jean-Francois Dockes 2015-09-09 15:39:14 +02:00
parent 592c3919d1
commit 91984f5513
2 changed files with 83 additions and 44 deletions

View file

@ -357,10 +357,10 @@ bool ExecCmd::Internal::preparePipes(bool has_input,HANDLE *hChildInput,
// now same procedure for input pipe // now same procedure for input pipe
sa.bInheritHandle = FALSE; sa.bInheritHandle = FALSE;
HANDLE m_hInputWrite = CreateNamedPipe( m_hInputWrite = CreateNamedPipe(
TEXT("\\\\.\\pipe\\instreamPipe"), TEXT("\\\\.\\pipe\\instreamPipe"),
PIPE_ACCESS_OUTBOUND | FILE_FLAG_OVERLAPPED, PIPE_ACCESS_OUTBOUND | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT, PIPE_WAIT,
1, 4096, 4096, 0, &sa); 1, 4096, 4096, 0, &sa);
if (m_hInputWrite == INVALID_HANDLE_VALUE) { if (m_hInputWrite == INVALID_HANDLE_VALUE) {
printError("preparePipes: CreateNamedPipe(inputW)"); printError("preparePipes: CreateNamedPipe(inputW)");
@ -584,10 +584,10 @@ static WaitResult Wait(HANDLE hdl, int timeout)
{ {
//HANDLE hdls[2] = { hdl, eQuit }; //HANDLE hdls[2] = { hdl, eQuit };
HANDLE hdls[1] = { hdl}; HANDLE hdls[1] = { hdl};
LOGDEB0(("ExecCmd::Wait()\n")); LOGDEB1(("ExecCmd::Wait()\n"));
DWORD res = WaitForMultipleObjects(1, hdls, FALSE, timeout); DWORD res = WaitForMultipleObjects(1, hdls, FALSE, timeout);
if (res == WAIT_OBJECT_0) { if (res == WAIT_OBJECT_0) {
LOGDEB0(("ExecCmd::Wait: returning Ok\n")); LOGDEB1(("ExecCmd::Wait: returning Ok\n"));
return Ok; return Ok;
} else if (res == (WAIT_OBJECT_0 + 1)) { } else if (res == (WAIT_OBJECT_0 + 1)) {
LOGDEB0(("ExecCmd::Wait: returning Quit\n")); LOGDEB0(("ExecCmd::Wait: returning Quit\n"));
@ -604,11 +604,9 @@ static WaitResult Wait(HANDLE hdl, int timeout)
// Send data to the child. // Send data to the child.
int ExecCmd::send(const string& data) int ExecCmd::send(const string& data)
{ {
DWORD dwWritten; LOGDEB2(("ExecCmd::send: cnt %d\n", int(data.size())));
BOOL bSuccess = false; BOOL bSuccess = WriteFile(m->m_hInputWrite, data.c_str(),
(DWORD)data.size(), NULL, &m->m_oInputWrite);
bSuccess = WriteFile(m->m_hInputWrite, data.c_str(), (DWORD)data.size(),
NULL, &m->m_oInputWrite);
DWORD err = GetLastError(); DWORD err = GetLastError();
// TODO: some more decision, either the operation completes immediately // TODO: some more decision, either the operation completes immediately
@ -621,10 +619,12 @@ int ExecCmd::send(const string& data)
} }
WaitResult waitRes = Wait(m->m_oInputWrite.hEvent, m->m_timeoutMs); WaitResult waitRes = Wait(m->m_oInputWrite.hEvent, m->m_timeoutMs);
DWORD dwWritten;
if (waitRes == Ok) { if (waitRes == Ok) {
if (!GetOverlappedResult(m->m_hInputWrite, if (!GetOverlappedResult(m->m_hInputWrite,
&m->m_oInputWrite, &dwWritten, TRUE)) { &m->m_oInputWrite, &dwWritten, TRUE)) {
err = GetLastError(); err = GetLastError();
LOGERR(("ExecCmd::send: GetOverLappedResult: err %d\n", err));
return -1; return -1;
} }
} else if (waitRes == Quit) { } else if (waitRes == Quit) {
@ -640,6 +640,7 @@ int ExecCmd::send(const string& data)
} }
return -1; return -1;
} }
LOGDEB2(("ExecCmd::send: returning %d\n", int(dwWritten)));
return dwWritten; return dwWritten;
} }
@ -651,10 +652,12 @@ int ExecCmd::send(const string& data)
// and write to cout in this programme // and write to cout in this programme
// Stop when there is no more data. // Stop when there is no more data.
// @arg cnt count to read, -1 means read to end of data. // @arg cnt count to read, -1 means read to end of data.
// 0 means read whatever comes back on the first read;
int ExecCmd::receive(string& data, int cnt) int ExecCmd::receive(string& data, int cnt)
{ {
LOGDEB1(("ExecCmd::receive: cnt %d\n", cnt));
int totread = 0; int totread = 0;
LOGDEB(("ExecCmd::receive: cnt %d\n", cnt));
// If there is buffered data, use it (remains from a previous getline()) // If there is buffered data, use it (remains from a previous getline())
if (m->m_bufoffs < m->m_buf.size()) { if (m->m_bufoffs < m->m_buf.size()) {
@ -663,7 +666,7 @@ int ExecCmd::receive(string& data, int cnt)
data.append(m->m_buf, m->m_bufoffs, toread); data.append(m->m_buf, m->m_bufoffs, toread);
m->m_bufoffs += toread; m->m_bufoffs += toread;
totread += toread; totread += toread;
if (cnt > 0 && totread == cnt) { if (cnt == 0 || (cnt > 0 && totread == cnt)) {
return cnt; return cnt;
} }
} }
@ -699,7 +702,7 @@ int ExecCmd::receive(string& data, int cnt)
data.append(chBuf, dwRead); data.append(chBuf, dwRead);
if (m->m_advise) if (m->m_advise)
m->m_advise->newData(dwRead); m->m_advise->newData(dwRead);
LOGDEB(("ExecCmd::receive: got %d bytes\n", int(dwRead))); LOGDEB1(("ExecCmd::recv: ReadFile: %d bytes\n", int(dwRead)));
} }
} else if (waitRes == Quit) { } else if (waitRes == Quit) {
if (!CancelIo(m->m_hOutputRead)) { if (!CancelIo(m->m_hOutputRead)) {
@ -707,8 +710,9 @@ int ExecCmd::receive(string& data, int cnt)
} }
break; break;
} else if (waitRes == Timeout) { } else if (waitRes == Timeout) {
// We only want to cancel if m_advise says so here. Is the io still // We only want to cancel if m_advise says so here. Is the
// valid at this point ? Should we catch a possible exception to CancelIo? // io still valid at this point ? Should we catch a
// possible exception to CancelIo?
if (m->m_advise) if (m->m_advise)
m->m_advise->newData(0); m->m_advise->newData(0);
if (m->m_killRequest) { if (m->m_killRequest) {
@ -719,7 +723,10 @@ int ExecCmd::receive(string& data, int cnt)
break; break;
} }
} }
if (cnt == 0)
break;
} }
LOGDEB1(("ExecCmd::receive: returning %d bytes\n", totread));
return totread; return totread;
} }
@ -728,7 +735,7 @@ int ExecCmd::getline(string& data)
LOGDEB2(("ExecCmd::getline: cnt %d, timeo %d\n", cnt, timeo)); LOGDEB2(("ExecCmd::getline: cnt %d, timeo %d\n", cnt, timeo));
data.erase(); data.erase();
if (m->m_buf.empty()) { if (m->m_buf.empty()) {
m->m_buf.resize(4096); m->m_buf.reserve(4096);
m->m_bufoffs = 0; m->m_bufoffs = 0;
} }
@ -740,21 +747,27 @@ int ExecCmd::getline(string& data)
for (; nn > 0;) { for (; nn > 0;) {
nn--; nn--;
char c = m->m_buf[m->m_bufoffs++]; char c = m->m_buf[m->m_bufoffs++];
if (c == '\r')
continue;
data += c; data += c;
if (c == '\n') { if (c == '\n') {
foundnl = true; foundnl = true;
break; break;
} }
} }
if (foundnl)
if (foundnl) {
LOGDEB2(("ExecCmd::getline: ret: [%s]\n", data.c_str()));
return int(data.size()); return int(data.size());
}
// Read more // Read more
m->m_buf.erase(); m->m_buf.erase();
if (receive(m->m_buf) < 0) { if (receive(m->m_buf, 0) < 0) {
return -1; return -1;
} }
if (m->m_buf.empty()) { if (m->m_buf.empty()) {
LOGDEB(("ExecCmd::getline: eof? ret: [%s]\n", data.c_str()));
return int(data.size()); return int(data.size());
} }
m->m_bufoffs = 0; m->m_bufoffs = 0;
@ -789,7 +802,30 @@ int ExecCmd::doexec(const string &cmd, const vector<string>& args,
return -1; return -1;
} }
if (input) { if (input) {
send(*input); if (!input->empty()) {
if (send(*input) != input->size()) {
LOGERR(("ExecCmd::doexec: send failed\n"));
CloseHandle(m->m_hInputWrite);
m->m_hInputWrite = NULL;
return wait();
}
}
if (m->m_provide) {
for (;;) {
m->m_provide->newData();
if (input->empty()) {
CloseHandle(m->m_hInputWrite);
m->m_hInputWrite = NULL;
break;
}
if (send(*input) != input->size()) {
LOGERR(("ExecCmd::doexec: send failed\n"));
CloseHandle(m->m_hInputWrite);
m->m_hInputWrite = NULL;
break;
}
}
}
} else if (output) { } else if (output) {
receive(*output); receive(*output);
} }

View file

@ -19,10 +19,11 @@
using namespace std; using namespace std;
// Testing the rclexecm protocol outside of recoll. Here we use the // Testing the rclexecm protocol outside of recoll. Here we use the
// rcldoc.py filter, you can try with rclaudio too, adjust the file arg // rcldoc.py filter, you can try with rclaudio too, adjust the file
// accordingly // arg accordingly. This simplified driver only really works with
// single-doc files (else it extracts only the first doc, usually the
// empty self-doc).
bool exercise_mhexecm(const string& cmdstr, const string& mimetype, bool exercise_mhexecm(const string& cmdstr, const string& mimetype,
vector<string>& files) vector<string>& files)
{ {
@ -171,25 +172,26 @@ public:
// Data provider, used if the -i flag is set // Data provider, used if the -i flag is set
class MEPv : public ExecCmdProvide { class MEPv : public ExecCmdProvide {
public: public:
FILE *m_fp;
string *m_input; string *m_input;
int m_cnt;
MEPv(string *i) MEPv(string *i)
: m_input(i) : m_input(i), m_cnt(0) {
{
m_fp = fopen("/etc/group", "r");
} }
~MEPv() { ~MEPv() {
if (m_fp)
fclose(m_fp);
} }
void newData() { void newData() {
char line[1024]; if (m_cnt++ < 10) {
if (m_fp && fgets(line, 1024, m_fp)) { char num[30];
m_input->assign((const char *)line); sprintf(num, "%d", m_cnt);
*m_input = string("This is an input chunk ") + string(num) +
string("\n");
} else { } else {
m_input->erase(); m_input->erase();
} }
} }
void reset() {
m_cnt = 0;
}
}; };
@ -249,7 +251,7 @@ int main(int argc, char *argv[])
l.push_back(*argv++); argc--; l.push_back(*argv++); argc--;
} }
DebugLog::getdbl()->setloglevel(DEBINFO); DebugLog::getdbl()->setloglevel(DEBDEB1);
DebugLog::setfilename("stderr"); DebugLog::setfilename("stderr");
#if 0 #if 0
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
@ -302,8 +304,9 @@ int main(int argc, char *argv[])
} }
int status = -1; int status = -1;
for (int i=0;i < 10000; i++) { for (int i = 0; i < 1; i++) {
output.clear(); output.clear();
pv.reset();
try { try {
status = mexec.doexec(arg1, l, ip, op); status = mexec.doexec(arg1, l, ip, op);
} catch (CancelExcept) { } catch (CancelExcept) {
@ -312,8 +315,8 @@ int main(int argc, char *argv[])
//fprintf(stderr, "Status: 0x%x\n", status); //fprintf(stderr, "Status: 0x%x\n", status);
if (op_flags & OPT_o) { if (op_flags & OPT_o) {
//cout << "data received: [" << output << "]\n"; //cout << "data received: [" << output << "]\n";
cerr << "status " << status << " bytes received " << cerr << "iter " << i << " status " <<
output.size() << endl; status << " bytes received " << output.size() << endl;
} }
if (status) if (status)
break; break;