summaryrefslogtreecommitdiffstats
blob: 3a981a6427be01ebbe0aa525cbb8b65ebed91184 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/**
 * Copyright (C) ARM Limited 2010-2014. All rights reserved.
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License version 2 as
 * published by the Free Software Foundation.
 */

#include "Sender.h"

#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include "Buffer.h"
#include "Logging.h"
#include "OlySocket.h"
#include "SessionData.h"

Sender::Sender(OlySocket* socket) {
	mDataFile = NULL;
	mDataSocket = NULL;

	// Set up the socket connection
	if (socket) {
		char streamline[64] = {0};
		mDataSocket = socket;

		// Receive magic sequence - can wait forever
		// Streamline will send data prior to the magic sequence for legacy support, which should be ignored for v4+
		while (strcmp("STREAMLINE", streamline) != 0) {
			if (mDataSocket->receiveString(streamline, sizeof(streamline)) == -1) {
				logg->logError(__FILE__, __LINE__, "Socket disconnected");
				handleException();
			}
		}

		// Send magic sequence - must be done first, after which error messages can be sent
		char magic[32];
		snprintf(magic, 32, "GATOR %i\n", PROTOCOL_VERSION);
		mDataSocket->send(magic, strlen(magic));

		gSessionData->mWaitingOnCommand = true;
		logg->logMessage("Completed magic sequence");
	}

	pthread_mutex_init(&mSendMutex, NULL);
}

Sender::~Sender() {
	// Just close it as the client socket is on the stack
	if (mDataSocket != NULL) {
		mDataSocket->closeSocket();
		mDataSocket = NULL;
	}
	if (mDataFile != NULL) {
		fclose(mDataFile);
	}
}

void Sender::createDataFile(char* apcDir) {
	if (apcDir == NULL) {
		return;
	}

	mDataFileName = (char*)malloc(strlen(apcDir) + 12);
	sprintf(mDataFileName, "%s/0000000000", apcDir);
	mDataFile = fopen(mDataFileName, "wb");
	if (!mDataFile) {
		logg->logError(__FILE__, __LINE__, "Failed to open binary file: %s", mDataFileName);
		handleException();
	}
}

template<typename T>
inline T min(const T a, const T b) {
	return (a < b ? a : b);
}

void Sender::writeData(const char* data, int length, int type) {
	if (length < 0 || (data == NULL && length > 0)) {
		return;
	}

	// Multiple threads call writeData()
	pthread_mutex_lock(&mSendMutex);

	// Send data over the socket connection
	if (mDataSocket) {
		// Start alarm
		const int alarmDuration = 8;
		alarm(alarmDuration);

		// Send data over the socket, sending the type and size first
		logg->logMessage("Sending data with length %d", length);
		if (type != RESPONSE_APC_DATA) {
			// type and length already added by the Collector for apc data
			unsigned char header[5];
			header[0] = type;
			Buffer::writeLEInt(header + 1, length);
			mDataSocket->send((char*)&header, sizeof(header));
		}

		// 100Kbits/sec * alarmDuration sec / 8 bits/byte
		const int chunkSize = 100*1000 * alarmDuration / 8;
		int pos = 0;
		while (true) {
			mDataSocket->send((const char*)data + pos, min(length - pos, chunkSize));
			pos += chunkSize;
			if (pos >= length) {
				break;
			}

			// Reset the alarm
			alarm(alarmDuration);
			logg->logMessage("Resetting the alarm");
		}

		// Stop alarm
		alarm(0);
	}

	// Write data to disk as long as it is not meta data
	if (mDataFile && type == RESPONSE_APC_DATA) {
		logg->logMessage("Writing data with length %d", length);
		// Send data to the data file
		if (fwrite(data, 1, length, mDataFile) != (unsigned int)length) {
			logg->logError(__FILE__, __LINE__, "Failed writing binary file %s", mDataFileName);
			handleException();
		}
	}

	pthread_mutex_unlock(&mSendMutex);
}