summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'daemon/ExternalSource.cpp')
-rw-r--r--daemon/ExternalSource.cpp177
1 files changed, 168 insertions, 9 deletions
diff --git a/daemon/ExternalSource.cpp b/daemon/ExternalSource.cpp
index fe5824b..b6ec301 100644
--- a/daemon/ExternalSource.cpp
+++ b/daemon/ExternalSource.cpp
@@ -8,41 +8,195 @@
8 8
9#include "ExternalSource.h" 9#include "ExternalSource.h"
10 10
11#include <fcntl.h>
11#include <sys/prctl.h> 12#include <sys/prctl.h>
13#include <unistd.h>
12 14
13#include "Logging.h" 15#include "Logging.h"
14#include "OlySocket.h" 16#include "OlySocket.h"
15#include "SessionData.h" 17#include "SessionData.h"
16 18
17ExternalSource::ExternalSource(sem_t *senderSem) : mBuffer(0, FRAME_EXTERNAL, 1024, senderSem), mSock("/tmp/gator") { 19static const char MALI_VIDEO[] = "\0mali-video";
20static const char MALI_VIDEO_STARTUP[] = "\0mali-video-startup";
21static const char MALI_VIDEO_V1[] = "MALI_VIDEO 1\n";
22
23static bool setNonblock(const int fd) {
24 int flags;
25
26 flags = fcntl(fd, F_GETFL);
27 if (flags < 0) {
28 logg->logMessage("fcntl getfl failed");
29 return false;
30 }
31
32 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0) {
33 logg->logMessage("fcntl setfl failed");
34 return false;
35 }
36
37 return true;
38}
39
40ExternalSource::ExternalSource(sem_t *senderSem) : mBuffer(0, FRAME_EXTERNAL, 128*1024, senderSem), mMonitor(), mMveStartupUds(MALI_VIDEO_STARTUP, sizeof(MALI_VIDEO_STARTUP)), mInterruptFd(-1), mMveUds(-1) {
41 sem_init(&mBufferSem, 0, 0);
18} 42}
19 43
20ExternalSource::~ExternalSource() { 44ExternalSource::~ExternalSource() {
21} 45}
22 46
47void ExternalSource::waitFor(const uint64_t currTime, const int bytes) {
48 while (mBuffer.bytesAvailable() <= bytes) {
49 mBuffer.check(currTime);
50 sem_wait(&mBufferSem);
51 }
52}
53
54void ExternalSource::configureConnection(const int fd, const char *const handshake, size_t size) {
55 if (!setNonblock(fd)) {
56 logg->logError(__FILE__, __LINE__, "Unable to set nonblock on fh");
57 handleException();
58 }
59
60 if (!mMonitor.add(fd)) {
61 logg->logError(__FILE__, __LINE__, "Unable to add fh to monitor");
62 handleException();
63 }
64
65 // Write the handshake to the circular buffer
66 waitFor(1, Buffer::MAXSIZE_PACK32 + 4 + size - 1);
67 mBuffer.packInt(fd);
68 mBuffer.writeLEInt((unsigned char *)mBuffer.getWritePos(), size - 1);
69 mBuffer.advanceWrite(4);
70 mBuffer.writeBytes(handshake, size - 1);
71}
72
73bool ExternalSource::connectMve() {
74 if (!gSessionData->maliVideo.countersEnabled()) {
75 return true;
76 }
77
78 mMveUds = OlySocket::connect(MALI_VIDEO, sizeof(MALI_VIDEO));
79 if (mMveUds < 0) {
80 return false;
81 }
82
83 if (!gSessionData->maliVideo.start(mMveUds)) {
84 return false;
85 }
86
87 configureConnection(mMveUds, MALI_VIDEO_V1, sizeof(MALI_VIDEO_V1));
88
89 return true;
90}
91
23bool ExternalSource::prepare() { 92bool ExternalSource::prepare() {
93 if (!mMonitor.init() || !setNonblock(mMveStartupUds.getFd()) || !mMonitor.add(mMveStartupUds.getFd())) {
94 return false;
95 }
96
97 connectMve();
98
24 return true; 99 return true;
25} 100}
26 101
27void ExternalSource::run() { 102void ExternalSource::run() {
28 prctl(PR_SET_NAME, (unsigned long)&"gatord-uds", 0, 0, 0); 103 int pipefd[2];
104
105 prctl(PR_SET_NAME, (unsigned long)&"gatord-external", 0, 0, 0);
106
107 if (pipe(pipefd) != 0) {
108 logg->logError(__FILE__, __LINE__, "pipe failed");
109 handleException();
110 }
111 mInterruptFd = pipefd[1];
112
113 if (!mMonitor.add(pipefd[0])) {
114 logg->logError(__FILE__, __LINE__, "Monitor::add failed");
115 handleException();
116 }
29 117
30 while (gSessionData->mSessionIsActive) { 118 while (gSessionData->mSessionIsActive) {
31 // Will be aborted when the socket is closed at the end of the capture 119 struct epoll_event events[16];
32 int length = mSock.receive(mBuffer.getWritePos(), mBuffer.contiguousSpaceAvailable()); 120 // Clear any pending sem posts
33 if (length <= 0) { 121 while (sem_trywait(&mBufferSem) == 0);
34 break; 122 int ready = mMonitor.wait(events, ARRAY_LENGTH(events), -1);
123 if (ready < 0) {
124 logg->logError(__FILE__, __LINE__, "Monitor::wait failed");
125 handleException();
35 } 126 }
36 127
37 mBuffer.advanceWrite(length); 128 const uint64_t currTime = getTime();
38 mBuffer.check(0); 129
130 for (int i = 0; i < ready; ++i) {
131 const int fd = events[i].data.fd;
132 if (fd == mMveStartupUds.getFd()) {
133 // Mali Video Engine says it's alive
134 int client = mMveStartupUds.acceptConnection();
135 // Don't read from this connection, establish a new connection to Mali-V500
136 close(client);
137 if (!connectMve()) {
138 logg->logError(__FILE__, __LINE__, "Unable to configure incoming Mali video connection");
139 handleException();
140 }
141 } else if (fd == pipefd[0]) {
142 // Means interrupt has been called and mSessionIsActive should be reread
143 } else {
144 while (true) {
145 waitFor(currTime, Buffer::MAXSIZE_PACK32 + 4);
146
147 mBuffer.packInt(fd);
148 char *const bytesPos = mBuffer.getWritePos();
149 mBuffer.advanceWrite(4);
150 const int contiguous = mBuffer.contiguousSpaceAvailable();
151 const int bytes = read(fd, mBuffer.getWritePos(), contiguous);
152 if (bytes < 0) {
153 if (errno == EAGAIN) {
154 // Nothing left to read, and Buffer convention dictates that writePos can't go backwards
155 mBuffer.writeLEInt((unsigned char *)bytesPos, 0);
156 break;
157 }
158 // Something else failed, close the socket
159 mBuffer.writeLEInt((unsigned char *)bytesPos, -1);
160 close(fd);
161 break;
162 } else if (bytes == 0) {
163 // The other side is closed
164 mBuffer.writeLEInt((unsigned char *)bytesPos, -1);
165 close(fd);
166 break;
167 }
168
169 mBuffer.writeLEInt((unsigned char *)bytesPos, bytes);
170 mBuffer.advanceWrite(bytes);
171
172 // Short reads also mean nothing is left to read
173 if (bytes < contiguous) {
174 break;
175 }
176 }
177 }
178 }
179
180 // Only call mBufferCheck once per iteration
181 mBuffer.check(currTime);
39 } 182 }
40 183
41 mBuffer.setDone(); 184 mBuffer.setDone();
185
186 mInterruptFd = -1;
187 close(pipefd[0]);
188 close(pipefd[1]);
42} 189}
43 190
44void ExternalSource::interrupt() { 191void ExternalSource::interrupt() {
45 // Do nothing 192 if (mInterruptFd >= 0) {
193 int8_t c = 0;
194 // Write to the pipe to wake the monitor which will cause mSessionIsActive to be reread
195 if (::write(mInterruptFd, &c, sizeof(c)) != sizeof(c)) {
196 logg->logError(__FILE__, __LINE__, "write failed");
197 handleException();
198 }
199 }
46} 200}
47 201
48bool ExternalSource::isDone() { 202bool ExternalSource::isDone() {
@@ -50,7 +204,12 @@ bool ExternalSource::isDone() {
50} 204}
51 205
52void ExternalSource::write(Sender *sender) { 206void ExternalSource::write(Sender *sender) {
207 // Don't send external data until the summary packet is sent so that monotonic delta is available
208 if (!gSessionData->mSentSummary) {
209 return;
210 }
53 if (!mBuffer.isDone()) { 211 if (!mBuffer.isDone()) {
54 mBuffer.write(sender); 212 mBuffer.write(sender);
213 sem_post(&mBufferSem);
55 } 214 }
56} 215}