diff options
Diffstat (limited to 'daemon/ExternalSource.cpp')
-rw-r--r-- | daemon/ExternalSource.cpp | 177 |
1 files changed, 168 insertions, 9 deletions
diff --git a/daemon/ExternalSource.cpp b/daemon/ExternalSource.cpp index fe5824b..b6ec301 100644 --- a/daemon/ExternalSource.cpp +++ b/daemon/ExternalSource.cpp | |||
@@ -8,41 +8,195 @@ | |||
8 | 8 | ||
9 | #include "ExternalSource.h" | 9 | #include "ExternalSource.h" |
10 | 10 | ||
11 | #include <fcntl.h> | ||
11 | #include <sys/prctl.h> | 12 | #include <sys/prctl.h> |
13 | #include <unistd.h> | ||
12 | 14 | ||
13 | #include "Logging.h" | 15 | #include "Logging.h" |
14 | #include "OlySocket.h" | 16 | #include "OlySocket.h" |
15 | #include "SessionData.h" | 17 | #include "SessionData.h" |
16 | 18 | ||
17 | ExternalSource::ExternalSource(sem_t *senderSem) : mBuffer(0, FRAME_EXTERNAL, 1024, senderSem), mSock("/tmp/gator") { | 19 | static const char MALI_VIDEO[] = "\0mali-video"; |
20 | static const char MALI_VIDEO_STARTUP[] = "\0mali-video-startup"; | ||
21 | static const char MALI_VIDEO_V1[] = "MALI_VIDEO 1\n"; | ||
22 | |||
23 | static bool setNonblock(const int fd) { | ||
24 | int flags; | ||
25 | |||
26 | flags = fcntl(fd, F_GETFL); | ||
27 | if (flags < 0) { | ||
28 | logg->logMessage("fcntl getfl failed"); | ||
29 | return false; | ||
30 | } | ||
31 | |||
32 | if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0) { | ||
33 | logg->logMessage("fcntl setfl failed"); | ||
34 | return false; | ||
35 | } | ||
36 | |||
37 | return true; | ||
38 | } | ||
39 | |||
40 | ExternalSource::ExternalSource(sem_t *senderSem) : mBuffer(0, FRAME_EXTERNAL, 128*1024, senderSem), mMonitor(), mMveStartupUds(MALI_VIDEO_STARTUP, sizeof(MALI_VIDEO_STARTUP)), mInterruptFd(-1), mMveUds(-1) { | ||
41 | sem_init(&mBufferSem, 0, 0); | ||
18 | } | 42 | } |
19 | 43 | ||
20 | ExternalSource::~ExternalSource() { | 44 | ExternalSource::~ExternalSource() { |
21 | } | 45 | } |
22 | 46 | ||
47 | void ExternalSource::waitFor(const uint64_t currTime, const int bytes) { | ||
48 | while (mBuffer.bytesAvailable() <= bytes) { | ||
49 | mBuffer.check(currTime); | ||
50 | sem_wait(&mBufferSem); | ||
51 | } | ||
52 | } | ||
53 | |||
54 | void ExternalSource::configureConnection(const int fd, const char *const handshake, size_t size) { | ||
55 | if (!setNonblock(fd)) { | ||
56 | logg->logError(__FILE__, __LINE__, "Unable to set nonblock on fh"); | ||
57 | handleException(); | ||
58 | } | ||
59 | |||
60 | if (!mMonitor.add(fd)) { | ||
61 | logg->logError(__FILE__, __LINE__, "Unable to add fh to monitor"); | ||
62 | handleException(); | ||
63 | } | ||
64 | |||
65 | // Write the handshake to the circular buffer | ||
66 | waitFor(1, Buffer::MAXSIZE_PACK32 + 4 + size - 1); | ||
67 | mBuffer.packInt(fd); | ||
68 | mBuffer.writeLEInt((unsigned char *)mBuffer.getWritePos(), size - 1); | ||
69 | mBuffer.advanceWrite(4); | ||
70 | mBuffer.writeBytes(handshake, size - 1); | ||
71 | } | ||
72 | |||
73 | bool ExternalSource::connectMve() { | ||
74 | if (!gSessionData->maliVideo.countersEnabled()) { | ||
75 | return true; | ||
76 | } | ||
77 | |||
78 | mMveUds = OlySocket::connect(MALI_VIDEO, sizeof(MALI_VIDEO)); | ||
79 | if (mMveUds < 0) { | ||
80 | return false; | ||
81 | } | ||
82 | |||
83 | if (!gSessionData->maliVideo.start(mMveUds)) { | ||
84 | return false; | ||
85 | } | ||
86 | |||
87 | configureConnection(mMveUds, MALI_VIDEO_V1, sizeof(MALI_VIDEO_V1)); | ||
88 | |||
89 | return true; | ||
90 | } | ||
91 | |||
23 | bool ExternalSource::prepare() { | 92 | bool ExternalSource::prepare() { |
93 | if (!mMonitor.init() || !setNonblock(mMveStartupUds.getFd()) || !mMonitor.add(mMveStartupUds.getFd())) { | ||
94 | return false; | ||
95 | } | ||
96 | |||
97 | connectMve(); | ||
98 | |||
24 | return true; | 99 | return true; |
25 | } | 100 | } |
26 | 101 | ||
27 | void ExternalSource::run() { | 102 | void ExternalSource::run() { |
28 | prctl(PR_SET_NAME, (unsigned long)&"gatord-uds", 0, 0, 0); | 103 | int pipefd[2]; |
104 | |||
105 | prctl(PR_SET_NAME, (unsigned long)&"gatord-external", 0, 0, 0); | ||
106 | |||
107 | if (pipe(pipefd) != 0) { | ||
108 | logg->logError(__FILE__, __LINE__, "pipe failed"); | ||
109 | handleException(); | ||
110 | } | ||
111 | mInterruptFd = pipefd[1]; | ||
112 | |||
113 | if (!mMonitor.add(pipefd[0])) { | ||
114 | logg->logError(__FILE__, __LINE__, "Monitor::add failed"); | ||
115 | handleException(); | ||
116 | } | ||
29 | 117 | ||
30 | while (gSessionData->mSessionIsActive) { | 118 | while (gSessionData->mSessionIsActive) { |
31 | // Will be aborted when the socket is closed at the end of the capture | 119 | struct epoll_event events[16]; |
32 | int length = mSock.receive(mBuffer.getWritePos(), mBuffer.contiguousSpaceAvailable()); | 120 | // Clear any pending sem posts |
33 | if (length <= 0) { | 121 | while (sem_trywait(&mBufferSem) == 0); |
34 | break; | 122 | int ready = mMonitor.wait(events, ARRAY_LENGTH(events), -1); |
123 | if (ready < 0) { | ||
124 | logg->logError(__FILE__, __LINE__, "Monitor::wait failed"); | ||
125 | handleException(); | ||
35 | } | 126 | } |
36 | 127 | ||
37 | mBuffer.advanceWrite(length); | 128 | const uint64_t currTime = getTime(); |
38 | mBuffer.check(0); | 129 | |
130 | for (int i = 0; i < ready; ++i) { | ||
131 | const int fd = events[i].data.fd; | ||
132 | if (fd == mMveStartupUds.getFd()) { | ||
133 | // Mali Video Engine says it's alive | ||
134 | int client = mMveStartupUds.acceptConnection(); | ||
135 | // Don't read from this connection, establish a new connection to Mali-V500 | ||
136 | close(client); | ||
137 | if (!connectMve()) { | ||
138 | logg->logError(__FILE__, __LINE__, "Unable to configure incoming Mali video connection"); | ||
139 | handleException(); | ||
140 | } | ||
141 | } else if (fd == pipefd[0]) { | ||
142 | // Means interrupt has been called and mSessionIsActive should be reread | ||
143 | } else { | ||
144 | while (true) { | ||
145 | waitFor(currTime, Buffer::MAXSIZE_PACK32 + 4); | ||
146 | |||
147 | mBuffer.packInt(fd); | ||
148 | char *const bytesPos = mBuffer.getWritePos(); | ||
149 | mBuffer.advanceWrite(4); | ||
150 | const int contiguous = mBuffer.contiguousSpaceAvailable(); | ||
151 | const int bytes = read(fd, mBuffer.getWritePos(), contiguous); | ||
152 | if (bytes < 0) { | ||
153 | if (errno == EAGAIN) { | ||
154 | // Nothing left to read, and Buffer convention dictates that writePos can't go backwards | ||
155 | mBuffer.writeLEInt((unsigned char *)bytesPos, 0); | ||
156 | break; | ||
157 | } | ||
158 | // Something else failed, close the socket | ||
159 | mBuffer.writeLEInt((unsigned char *)bytesPos, -1); | ||
160 | close(fd); | ||
161 | break; | ||
162 | } else if (bytes == 0) { | ||
163 | // The other side is closed | ||
164 | mBuffer.writeLEInt((unsigned char *)bytesPos, -1); | ||
165 | close(fd); | ||
166 | break; | ||
167 | } | ||
168 | |||
169 | mBuffer.writeLEInt((unsigned char *)bytesPos, bytes); | ||
170 | mBuffer.advanceWrite(bytes); | ||
171 | |||
172 | // Short reads also mean nothing is left to read | ||
173 | if (bytes < contiguous) { | ||
174 | break; | ||
175 | } | ||
176 | } | ||
177 | } | ||
178 | } | ||
179 | |||
180 | // Only call mBufferCheck once per iteration | ||
181 | mBuffer.check(currTime); | ||
39 | } | 182 | } |
40 | 183 | ||
41 | mBuffer.setDone(); | 184 | mBuffer.setDone(); |
185 | |||
186 | mInterruptFd = -1; | ||
187 | close(pipefd[0]); | ||
188 | close(pipefd[1]); | ||
42 | } | 189 | } |
43 | 190 | ||
44 | void ExternalSource::interrupt() { | 191 | void ExternalSource::interrupt() { |
45 | // Do nothing | 192 | if (mInterruptFd >= 0) { |
193 | int8_t c = 0; | ||
194 | // Write to the pipe to wake the monitor which will cause mSessionIsActive to be reread | ||
195 | if (::write(mInterruptFd, &c, sizeof(c)) != sizeof(c)) { | ||
196 | logg->logError(__FILE__, __LINE__, "write failed"); | ||
197 | handleException(); | ||
198 | } | ||
199 | } | ||
46 | } | 200 | } |
47 | 201 | ||
48 | bool ExternalSource::isDone() { | 202 | bool ExternalSource::isDone() { |
@@ -50,7 +204,12 @@ bool ExternalSource::isDone() { | |||
50 | } | 204 | } |
51 | 205 | ||
52 | void ExternalSource::write(Sender *sender) { | 206 | void ExternalSource::write(Sender *sender) { |
207 | // Don't send external data until the summary packet is sent so that monotonic delta is available | ||
208 | if (!gSessionData->mSentSummary) { | ||
209 | return; | ||
210 | } | ||
53 | if (!mBuffer.isDone()) { | 211 | if (!mBuffer.isDone()) { |
54 | mBuffer.write(sender); | 212 | mBuffer.write(sender); |
213 | sem_post(&mBufferSem); | ||
55 | } | 214 | } |
56 | } | 215 | } |