Analysing network traffic for your ALB, using elasticsearch and lambda
At my job, I utilised my 10% hack time at work to build a system to analyse network traffic.
I know there are tools out there like beats, which can do something similar, but I wanted something that I could run serverless, and invoke using a cron job or something similar.
At the time they had no form of logging in place at the time, and a lot of outages, so it made sense to have some way to visualise with something like elasticsearch.
Here is the code I used to process network logs, which as stored in S3 automatically from the ALB. I’ll go onto how this is trigger further down
public class Function
{
private readonly IAmazonS3 _s3Client;
private readonly IElasticSearchAdapter _elasticSearchAdapter;
private readonly IParser _parser;
public Function()
{
_s3Client = new AmazonS3Client();
_elasticSearchAdapter = new ElasticSearchAdapter();
_parser = new Parser();
}
public Function(IAmazonS3 s3Client)
{
_s3Client = s3Client;
_elasticSearchAdapter = new ElasticSearchAdapter();
_parser = new Parser();
}
public void FunctionHandler(ILambdaContext context)
{
var logS3Objects = _s3Client.ListObjectsAsync("prod-network-logging").GetAwaiter().GetResult();
foreach (var s3Object in logS3Objects.S3Objects)
{
try
{
using (var response = _s3Client.GetObjectStreamAsync(s3Object.BucketName, s3Object.Key, null).GetAwaiter().GetResult())
{
byte[] dataBuffer = new byte[4096];
using (var gzipStream = new GZipInputStream(response))
using (var decodedStream = new MemoryStream())
{
StreamUtils.Copy(gzipStream, decodedStream, dataBuffer);
gzipStream.Flush();
decodedStream.Flush();
decodedStream.Position = 0;
using (var reader = new StreamReader(decodedStream))
{
var lines = new List<string>();
while (!reader.EndOfStream)
lines.Add(reader.ReadLineAsync().GetAwaiter().GetResult());
Console.WriteLine("There looks like " + lines.Count() + " log entries to be processed.");
var count = 1;
var data = lines.Select(x => _parser.Parse(x));
_elasticSearchAdapter.CreateElasticRecord(data);
Console.WriteLine("Wrote " + count + " records to elasticsearch.");
_s3Client.DeleteObjectAsync(s3Object.BucketName, s3Object.Key).GetAwaiter().GetResult();
}
}
}
}
catch (Exception e)
{
context.Logger.LogLine(e.Message);
context.Logger.LogLine(e.StackTrace);
}
}
}
}
And the implementation of the ElasticSearch adapter:
public class ElasticSearchAdapter : IElasticSearchAdapter
{
private readonly HttpClient _esClient;
public ElasticSearchAdapter()
{
var endpoint = Environment.GetEnvironmentVariable("ElasticSearchEndpoint");
_esClient = new HttpClient {BaseAddress = new Uri("https://" + endpoint)};
}
public void CreateElasticRecord(IEnumerable<NetworkLog> data)
{
foreach (var batch in data.Batch(100))
{
var index = $"logs-{DateTime.Now.Year.ToString("00")}-{DateTime.Now.Month.ToString("00")}-{DateTime.Now.Day.ToString("00")}";
var uri = new Uri("_bulk", UriKind.Relative);
var messageBody = "";
foreach (var item in batch)
{
messageBody += "{\"create\":{\"_index\":\"" + index + "\",\"_type\":\"network_log\",\"_id\":\""+ Guid.NewGuid() + "\"}}\r\n" +
JsonConvert.SerializeObject(item) + "\r\n";
}
var messageRequest = new HttpRequestMessage(HttpMethod.Post, uri)
{
Content = new StringContent(messageBody, Encoding.UTF8, "application/json")
};
var result = _esClient.SendAsync(messageRequest).GetAwaiter().GetResult();
if (result.StatusCode != HttpStatusCode.OK)
Console.WriteLine("Something didn't look quite right: " + result.StatusCode);
}
}
}
Class representation of the log entry
public class NetworkLog
{
public string type { get; set; }
public DateTime requestTime{ get; set; }
public string resourceId{ get; set; }
public string internalIp{ get; set; }
public int internalPort{ get; set; }
public string externalIp{ get; set; }
public int externalPort{ get; set; }
public decimal processingTime{ get; set; }
public decimal targetProcessingTime{ get; set; }
public decimal responseProcessingTime{ get; set; }
public int statusCode { get; set; }
public int targetStatusCode{ get; set; }
public long receivedBytes{ get; set; }
public long sendBytes{ get; set; }
public string request{ get; set; }
public string userAgent{ get; set; }
public string sslCipher{ get; set; }
public string sslProtocol{ get; set; }
public string traceId{ get; set; }
public string domainName{ get; set; }
public int ruleId{ get; set; }
}
And finally the parser
public class Parser : IParser
{
private readonly IParsingIntelligenceLayer _intelligentStats;
public Parser()
{
_intelligentStats = new ParsingIntelligenceLayer();
}
public NetworkLog Parse(string logEntry)
{
var originalLog = logEntry;
try
{
var log = new NetworkLog();
if (logEntry == null || string.IsNullOrWhiteSpace(logEntry) || logEntry.StartsWith(" ") || logEntry.Trim() == "" )
return null;
//type
log.type = logEntry.Split(' ').First();
logEntry = logEntry.Remove(0, log.type.Length + 1);
//timestamp
DateTime.TryParse(logEntry.Split(' ').First(), new DateTimeFormatInfo(), DateTimeStyles.RoundtripKind, out var timestamp);
log.requestTime = timestamp;
logEntry = logEntry.Remove(0, logEntry.IndexOf('Z') + 2);
//resourceId
log.resourceId = logEntry.Split(' ').First();
logEntry = logEntry.Remove(0, log.resourceId.Length + 1);
//externalIp
log.externalIp = logEntry.Split(':').First();
logEntry = logEntry.Remove(0, log.externalIp.Length + 1);
log.externalPort = int.Parse(logEntry.Split(' ').First());
logEntry = logEntry.Remove(0, log.externalPort.ToString().Length + 1);
//internalIp - might come through as -
if (logEntry.StartsWith("-"))
{
log.internalIp = "-";
log.internalPort = 0;
logEntry = logEntry.Remove(0, 2);
}
else
{
log.internalIp = logEntry.Split(':').First();
logEntry = logEntry.Remove(0, log.internalIp.Length + 1);
log.internalPort = int.Parse(logEntry.Split(' ').First());
logEntry = logEntry.Remove(0, log.internalPort.ToString().Length + 1);
}
//processing time
var processingTime = logEntry.Split(' ').First();
log.processingTime = decimal.Parse(processingTime);
logEntry = logEntry.Remove(0, processingTime.Length + 1);
//targetProcessingTime
var targetProcessingTime = logEntry.Split(' ').First();
log.targetProcessingTime = decimal.Parse(targetProcessingTime);
logEntry = logEntry.Remove(0, targetProcessingTime.Length + 1);
//responseProcessingTime
var responseProcessingTime = logEntry.Split(' ').First();
log.responseProcessingTime = decimal.Parse(responseProcessingTime);
logEntry = logEntry.Remove(0, responseProcessingTime.Length + 1);
//statusCode
log.statusCode = int.Parse(logEntry.Split(' ').First());
logEntry = logEntry.Remove(0, log.statusCode.ToString().Length + 1);
//targetStatusCode - might come through as -
if (logEntry.StartsWith("-"))
{
log.targetStatusCode = 0;
logEntry = logEntry.Remove(0, 2);
}
else
{
log.targetStatusCode = int.Parse(logEntry.Split(' ').First());
logEntry = logEntry.Remove(0, log.targetStatusCode.ToString().Length + 1);
}
//receivedBytes
log.receivedBytes = long.Parse(logEntry.Split(' ').First());
logEntry = logEntry.Remove(0, log.receivedBytes.ToString().Length + 1);
//sendBytes
log.sendBytes = long.Parse(logEntry.Split(' ').First());
logEntry = logEntry.Remove(0, log.sendBytes.ToString().Length + 1);
//request
logEntry = logEntry.Remove(0, 1); //remove opening
log.request = logEntry.Split("\" \"").First();
logEntry = logEntry.Remove(0, log.request.Length + 2);
//userAgent
logEntry = logEntry.Remove(0, 1); //remove opening
log.userAgent = logEntry.Split("\" ").First();
logEntry = logEntry.Remove(0, log.userAgent.Length + 2);
//sslCipher
log.sslCipher = logEntry.Split(' ').First();
logEntry = logEntry.Remove(0, log.sslCipher.Length + 1);
//sslProtocol
log.sslProtocol = logEntry.Split(' ').First();
logEntry = logEntry.Remove(0, log.sslProtocol.Length + 1);
//targetGroupArn
var targetGroupArn = logEntry.Split(' ').First();
logEntry = logEntry.Remove(0, targetGroupArn.Length + 1);
//traceId
logEntry = logEntry.Remove(0, 1); //remove opening
log.traceId = logEntry.Split("\" \"").First();
logEntry = logEntry.Remove(0, log.traceId.Length + 2);
//domainName
logEntry = logEntry.Remove(0, 1); //remove opening
log.domainName = logEntry.Split("\" \"").First();
logEntry = logEntry.Remove(0, log.domainName.Length + 2);
//certArn
logEntry = logEntry.Remove(0, 1); //remove opening
var certArn = logEntry.Split("\" ").First();
logEntry = logEntry.Remove(0, certArn.Length + 2);
//ruleId
if (logEntry.StartsWith("-"))
{
log.ruleId = -1;
logEntry = logEntry.Remove(0, 2);
}
else
{
log.ruleId = int.Parse(logEntry.Split(' ').First());
logEntry = logEntry.Remove(0, log.ruleId.ToString().Length + 1);
}
return _intelligentStats.Improve(log);
}
catch (Exception ex)
{
Console.WriteLine("Unable to parse " + originalLog);
Console.WriteLine(ex);
throw;
}
}
}
Then the terraform to deploy (some have been removed / modified for security reasons)
Elasticsearch cluster
resource "aws_elasticsearch_domain" "network-logging" {
domain_name = "network-logging"
elasticsearch_version = "6.3"
cluster_config {
instance_type = "t2.small.elasticsearch"
instance_count = 1
dedicated_master_enabled = false
zone_awareness_enabled = false
}
ebs_options {
ebs_enabled = true
volume_type = "gp2"
volume_size = 35
}
encrypt_at_rest {
enabled = false
}
vpc_options {
subnet_ids = ["${data.aws_subnet_ids.data.ids[0]}"]
security_group_ids = [
"${aws_security_group.network-logging.id}",
]
}
advanced_options {
"rest.action.multi.allow_explicit_index" = "true"
}
snapshot_options {
automated_snapshot_start_hour = 00
}
tags {
Domain = "network-logging"
}
access_policies = <<CONFIG
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "*"
},
"Action": "es:*",
"Resource": "arn:aws:es:eu-west-2:${data.aws_caller_identity.current.account_id}:domain/network-logging/*"
}
]
}
CONFIG
}
resource "aws_security_group" "network-logging" {
*REMOVED*
}
Iam definition
resource "aws_iam_policy" "other-permissions" {
name = "network-logging-policy"
path = "/"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "LambdaLogCreation",
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*"
},
{
"Effect":"Allow",
"Action": [
"ec2:CreateNetworkInterface",
"ec2:DescribeNetworkInterfaces",
"ec2:DeleteNetworkInterface"
],
"Resource": "*"
},
{
"Sid": "ESPermission",
"Effect": "Allow",
"Action": [
"es:*"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": ["s3:ListBucket"],
"Resource": ["${data.aws_s3_bucket.network-logging.arn}", "${module.aws-lambda-dotnet.lambda-repository-arn}"]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject"
],
"Resource": ["${data.aws_s3_bucket.network-logging.arn}", "${module.aws-lambda-dotnet.lambda-repository-arn}"]
}
]
}
EOF
}
resource "aws_iam_role_policy_attachment" "other-permissions-policy-attachment" {
role = "${module.aws-lambda-dotnet.iam_role_name}"
policy_arn = "${aws_iam_policy.other-permissions.arn}"
}
The lambda definition (we use a module for this, and unfortunately can’t post the code, but it basically wraps up the creation of a lambda, and permissions surrounding that)
module "aws-lambda-dotnet" "lambda" {
source = "git::git@github.com:******/terraform-modules.git//aws-lambda-dotnet?ref=v1.0.36"
function_type = "vpc"
name = "network-logging"
application_version = "${var.application_version}"
environment_variables = {
Environment = "${terraform.workspace}"
AwsRegion = "eu-west-2"
ElasticSearchEndpoint = "${aws_elasticsearch_domain.network-logging.endpoint}"
}
handler = "network-logging::NetworkLogging.Function::FunctionHandler"
runtime = "dotnetcore2.1"
timeout = 300
memory_size = 1024
}
And finally the trigger, which calls the lambda function, every 5 minutes.
resource "aws_cloudwatch_event_rule" "every_five_minutes" {
name = "every-five-minutes"
description = "Fires every five minutes"
schedule_expression = "rate(5 minutes)"
}
resource "aws_cloudwatch_event_target" "check_foo_every_five_minutes" {
rule = "${aws_cloudwatch_event_rule.every_five_minutes.name}"
target_id = "check_foo"
arn = "${module.aws-lambda-dotnet.function_arn}"
}
resource "aws_lambda_permission" "allow_cloudwatch_to_call_check_logger" {
statement_id = "AllowExecutionFromCloudWatch"
action = "lambda:InvokeFunction"
function_name = "${module.aws-lambda-dotnet.function_name}"
principal = "events.amazonaws.com"
source_arn = "${aws_cloudwatch_event_rule.every_five_minutes.arn}"
}
Written on March 15, 2019.