When I first heard about Amazon Lambda, I was skeptical. It was explained to me as basically "code in the cloud" and my immediate take was "why even bother putting code 'in the cloud'? Why would I take my code over to some flimsy little endpoint on a server somewhere, when I could have a strong, big rig to do my work right here in front of me?"
This sort of immediate thinking blinded me to why Lambda is so useful - if you are in an academic environment, more often than not, there are dedicated, available resources to do most computing without any direct cost to the researcher. If you're outside of academia, that machine just simply isn't free. You could build the machine yourself for potentially thousands of dollars, and host it yourself, but more often than not, it simply is just easier to spin up a large-ish EC2 instance for when you need it, run the analysis, and turn it off when you don't need it.
Being so far from the cost of machines usually, I don't think about who's paying for the machines. But I've come to realize exactly why Lambda is so powerful: Lambda is like if you automated almost all of the annoying parts of using an EC2 instance for, literally, only the milliseconds that you are actively computing, and you pay nothing in between those uses. In other words, Lambda is basically the automated framework for spinning up a ton of EC2 instances, running a script, and then spinning them down again, without ever having to think about the instances, anything about their setup, or anything about the specs for those machines. Of course, there's still annoying things about it, but I'm going to try my best to go over all of those issues, and show you how I'm using Lambda to analyze random walks, from requirements in the script all the way to actually running the walks.
First, Lambda only supports a few languages currently - Specifically Java 8, NodeJS, Node.js 4.3, and Python 2.7. Obviously, Python 2.7 gives us access to a wide world of statistical tools beyond the rest of these languages, so we'll focus on building Lambda functions with Python.
Setting up a Random Walk Script
I've been experimenting with random walks and random walk dynamics a whole bunch in the last few months. As part of that work, I wanted to create a generalized random walk Python script that leverages Lambda and Amazon S3. At the end, when I detail my particular implementation and use of the script, it will become clear why I'd want that. As it stands though, minimal changes to this script would convert it from a cloud-ready script to something that instead ingests local files of networks, and works completely independently of any Amazon infrastructure (which is another reason why Lambda is so exciting - one could have the same script that either works locally for testing or out in the cloud for general use). I'm not going to go too deeply into the implementation of the random walk, but we'll do a survey of the code. A full dump of the code with some documentation will be provided below, but for now let's look at a few design considerations:
Walk types
The most basic random walks typically allow walkers to just transit through nodes for a set number of t-steps. Other walks involve random restarts, and there are many ways to engineer walkers to model the phenomenon that the random walkers are attempting to measure. For my purposes, the dynamic I wanted to measure was as follows:
- Some subset of nodes are given a number of walkers at T0, proportional to the observed starting position of workers in some system I'm approximating.
- Every node has an associated Risk rate \(R_i\), which is bounded between 0 and 1, and is the average percent of nodes that will "die", or disappear in the next time step.
- Surviving nodes move along weighted, directed edges to other nodes every time-step - self loops are allowed.
- The simulation continues until all nodes have died.
Of course, this is a particular type of random walk, so I provided the ability to run a more default random walk, where walkers are allotted according to and attribute \(W_i\) and the total number of T-steps under which the simulation will run is directly provided. As I explain in the documentation for the code, there are many other optional parameters that can be provided, but for people interested in the actual meat of the random walk code, I'll show in a bit of detail how that particular mechanism works - in practice most of the code is setting up all the options, but we can just look at the relevant walk code:
mat_graph = np.zeros([len(nodes),len(nodes)])
node_indices = {}
for i,node in enumerate(nodes):
node_indices[node[directions['node_identifier']]] = i
Here, we are creating the footprint for the adjacency matrix - recording which nodes stand in which indices, and creating the adjacency matrix itself. One of the problems with Lambda is that it requires you to package all of the code dependencies into a single zip file - also, memory is at a premium. So, instead of using a network library, we are going to do our calculations directly in numpy
(obviously, there's a big problem here - we are not using a sparse matrix. My networks are very small currently, so I haven't dealt with that issue - I leave that for other people to update this code (or myself when it becomes a problem)).
for edge in edges:
mat_graph[node_indices[edge[directions['edge_source_identifier']]]][node_indices[edge[directions['edge_target_identifier']]]] = edge[directions['edge_weight_attribute']]
Here we are using all of the dereferenced attribute names we've allowed for, which allows for very flexible network definitions, to populate the adjacency matrix with the weights of the edges between the nodes, which will control the flow rates of walkers between nodes.
mat_graph_normalized = np.zeros([len(nodes),len(nodes)])
for i,row in enumerate(mat_graph):
sum_row = sum(row)
if sum_row == 0:
mat_graph_normalized[i] = row
else:
mat_graph_normalized[i] = [float(el)/sum_row for el in row]
Trust, but verify - normalize the weights of all outbound edges for every node's outbound edges.
node_risks = np.zeros(len(nodes))
first_interactions = np.zeros(len(nodes))
for node in nodes:
first_interactions[node_indices[node[directions['node_identifier']]]] = float(node[directions['node_initial_position_attribute']])
if directions['network_dynamic'] == 'node_risk':
node_risks[node_indices[node['id']]] = float(node[directions['network_stopping_condition']])
Setup the initial weights for how many walkers to apporition to each node when the walk is at T0 - if people are using nodal risks, plug those values in.
The actual random walk function code is long, so I will just annotate it inline:
def run_random_walk(mat_graph, node_indices, node_risks, first_interactions, directions):
#conversion_nodes allows us to track the number of transits that occur for every node,
#which can help users focus in on specific network effects on particular nodes
conversion_nodes = range(len(mat_graph[0]))
#select indexes of nodes where more than zero walkers will start
initial_indices = np.where(first_interactions > 0)
#collect the weight of each node so that we can give each starting node an appropriate
#proportion of starting nodes
counts = np.extract(first_interactions > 0, first_interactions)
#calculate sum of those values
sum_count = float(sum(counts))
#convert weights to probabilities just in case they didn't add up to 1 yet.
probabilities = np.array(counts)/float(sum(counts))
#assign walkers according to probabilities
walkers_alotted = np.random.multinomial(directions['walker_count'], probabilities).tolist()
#get number of nodes
node_count = len(node_indices)
#create a matrix to store number of walkers per node
current_walkers = np.zeros([node_count])
#for each node, assign the walkers into their position
for i,index in enumerate(np.matrix(initial_indices).tolist()[0]):
current_walkers[index] = walkers_alotted[i]
#add a small epsilon of risk in case there were zero risk nodes, which may make
#the code run forever.
if directions['network_dynamic'] == 'node_risk':
node_risks[node_risks <= 0.001] = (np.random.rand(len(node_risks[node_risks == 0]))/100)+0.01
#get the number of live walkers
live_walkers = directions['walker_count']
#set up a list to remember the total number of live walkers per t-step
walker_counts_t_steps = [live_walkers]
#set up a list of lists to remember the total number of live walkers per t-step per node
conversion_node_counts = [[current_walkers[el] for el in conversion_nodes]]
#instantiate t-step
t_step = 0
#completeness is either all walkers are dead or we have reached the final t-step -
#which is dependent on the directions that were supplied to the lambda function.
while complete(directions, t_step, live_walkers) == False:
#increment the t-step
t_step += 1
#lose some walkers - if node risk is a valid effect in this network - otherwise, no
#walkers are lost.
walker_losses = [np.random.binomial(el[0], el[1]) for el in zip(current_walkers, node_risks)]
#those that survive are recorded
surviving_walkers = [el[0]-el[1] for el in zip(current_walkers, walker_losses)]
#remove the dead walkers from the total current live walker count for measuring completeness
live_walkers -= sum(walker_losses)
#append that current count to the historical record.
walker_counts_t_steps.append(live_walkers)
#instantiate the next walker counts for each node
next_walkers = np.zeros([node_count])
#for each node index along with the number of surviving nodes for each
for i,walker_count_node in enumerate(surviving_walkers):
#if sum(mat_graph[i]) == 0, then that means that the total outbound weight for
#edges outbound from this node is zero - no one leaves!
if sum(mat_graph[i]) == 0:
#increment this node's walkers exactly by this nodes walkers in that case
next_walkers[i] += walker_count_node
else:
#else, we give out walkers to all other nodes proportional to the weights
#to those nodes, including self-loops
next_walkers += np.random.multinomial(walker_count_node, mat_graph[i])
#the current walkers are flipped out
current_walkers = next_walkers
#add in the total number of walkers per node to the history list.
conversion_node_counts.append([current_walkers[el] for el in conversion_nodes])
#build a handy inverted index to zip together results quickly
inverted_node_indices = {v: k for k, v in node_indices.items()}
#return the total number of live walkers per t-step and the walkers per node per t-step back out to the main lambda function
return walker_counts_t_steps, zip([inverted_node_indices[cn] for cn in conversion_nodes], np.matrix(conversion_node_counts).transpose().sum(axis=1))
There's also one more interesting thing to cover in this code, and should be something to consider when you're building your own lambda functions: The way that this system ingests the network is via S3, which allows for particularly quick access to the network given it's on high-throughput infrastructure already within the AWS environment. Also, it has a callback_url
variable that can be passed into the code - if a callback_url
is provided within the event
main variable, which again, has tons of different potential configurations, allows this code to fire off a response to some machine out in the world. This is crucially important. The maximum amount of time a Lambda function can run is 5 minutes. After that, presumably undefined things can happen, including the function getting killed, probably. The limit is even worse on the API Gateway, though - it's 30 seconds. If we don't get our function finished in 30 seconds, we're going to lose the calculation. So, providing this callback_url
parameter allows for the Lambda function to still phone home even if the API Gateway request that started it off in the first place is long dead. I don't have any argument against it, and it's crucial in my use case so far - I would suggest using this type of feature somewhere in your own work. If there's some interest for some reason, I'm happy to elaborate or revise the random walk code, which again, is provided in full at the bottom, but at this point, let's move on to deploying this thing.
Packaging the script
First off, I want to highlight a few huge helps I found online for getting this done: one shows yet another description for setting up complex Python scripts on lambda, as well as another page that shows how complicated libraries with lots of C requirements like scipy get built for Lambda, which I'm going to glide over because they've done a great job on those fronts.
Basically, the main detriment of deploying on Lambda is you have to do a whole bunch of up-front work in order to make it so the code can automatically run on the machines that Amazon are using to run the functions you're trying to run. That means that, instead of having a computer where you install all the libraries, then simply require them when you're running a script, you have to install all the libraries in a virtual environment, then package them in a particular way so that all the code - literally everything beyond basic standard Python - comes alongside the script you're running, so the script is self-contained.
To get that set up, we have to first turn on a machine exactly like the ones that run Lambda code, and build everything in that machine.
The machine that we will be developing on is the one matching the right configuration on this list. Specifically, we want to choose one in our region (Right now, Virginia) That is the "PV-EBS-Backed 64-bit" machine, which resolves us to the AMI with the ID of "ami-2a69aa47":
After searching on the page, we've found our box:
Not to be one wanting to waste money, we will choose the Micro Instance. A brief note here: When installing SciPy, you may run into memory issues when installing it on a machine with only 1 Gig of RAM. If you're going to ultimately be writing a script that uses SciPy, and as such you'll need to install SciPy, then you're better off spinning up a paid Instance while developing, because solving that issue was a big pain.
Just skip through the process with defaults (obviously, because we're just going to work with an Amazon preferred box) - and launch the instance:
During this process you'll be asked about a PEM key - for me, I chose to create a new key, which was automatically downloaded to my Downloads directory. I'm not planning on keeping this machine after explaining this work, so I'm leaving it there. But if you're planning on developing your script long term, it may be better to save this in a more secure folder, and perhaps aliasing the command to log into the machine. For me, it looks like this:
ssh -i /Users/dgaff/Downloads/new_machine.pem ec2-user@ec2-54-86-251-195.compute-1.amazonaws.com
Where -i
allows for using the PEM as the key to log into the computer (this particular key path is /Users/dgaff/Downloads/new_machine.pem
), ec2-user
is the default user on AMI Linux instances, and ec2-54-86-251-195.compute-1.amazonaws.com
is the address of the machine listed on the EC2 instances page.
At this point, we're now logged into our EC2 development box. We now need to set up our virtual environment, which will allow us to package together our script. First, we'll set up all our dependencies.
The beginning of our script gives us a clear outline of the Python packages we will need to provide with our Lambda script:
from __future__ import print_function
import sys
import os
reload(sys)
sys.setdefaultencoding('utf8')
import json
import numpy as np
import boto3
__future__
, sys
, os
, and json
are standard libraries - we don't need to do anything. We do need to install the others. In order to do this, we're going to update the machine, install python and it's dependencies directly from source, instantiate a virtual environment for us to grab our packages from, and then activate that environment (note that these are identical steps outlined in perrygeo's post:
sudo yum -y update
sudo yum -y upgrade
sudo yum install python27-devel python27-pip gcc libjpeg-devel zlib-devel gcc-c++
virtualenv lambda_env
source lambda_env/bin/activate
pip install numpy
pip install boto3
After that, we just have to start constructing the zip file for containing all of our code, and then we can move on to uploading it to Lambda. We are first going to package up all our dependencies into a new zip file - dependencies in our virtual environment are stored in $VIRTUAL_ENV/lib/python2.7/site-packages
and $VIRTUAL_ENV/lib64/python2.7/site-packages
:
cd $VIRTUAL_ENV/lib/python2.7/site-packages
zip -r9 ~/random_walk.zip *
cd $VIRTUAL_ENV/lib64/python2.7/site-packages
zip -r9 ~/random_walk.zip *
Next, we'll open up some text editor, and dump our code for our script onto this computer:
cd #cd back to user dir
nano random_walk.py
zip ~/random_walk.zip random_walk.py
random_walk.zip
is now your packaged zip of your Lambda function. Choose your favorite way of getting this file off of the EC2 machine via rsync
, S3 upload, or what not, and then carry that into our next step.
Setting up the Lambda Function
Now, for all of the pipework for Lambda - because if you don't get through this bureaucratic hurdle, you may never get the code deployed.
You're provided with a list of "blueprints" - none of them matter, click "skip".
Next, you're given the option to set up a trigger - we have already set up our script to work with a trigger inherently, so we don't need to do this and we can skip - we will manually set up the trigger with the API Gateway endpoint we will establish.
At this point, we can define our function - the Name doesn't matter, so we'll call ours "Random-Walk", and the description doesn't matter, so we'll cal it "A random walk script". The runtime is obviously Python 2.7, as the code above shows. With the code entry type, we choose to upload as a zip file, which we created earlier.
The next few steps get a bit annoying. The "Handler" field is the field that represents a syntax for calling the function. For Python functions, it's [FILENAME].[FUNCTION_NAME]. In our example, the file we have is called random_walk.py
, and the function name is run_random_walk(event, context)
, so our handler is random_walk.run_random_walk
. I haven't used the role field, and I don't know how important it is, so without more information I've just created a new policy called random_walk. I leave the policy templates section empty without a good reason to choose any of these. And, while the memory constraints will be less and the function's natural running time will likely be far under 5 minutes, I'll just set both of these to their current maximum values - I'll put off making concessions until I'm forced to, in other words. I don't need VPC, because I'm not building this in a huge secure system, so I can skip that as well.
Click on to the next page to confirm the settings, and then create the function!
At this point, we now have a Lambda function. Now, we have to get this code set up so that we can call a URL to hit the code.
Setting up the API gateway
Alright, this is the real kafkaesque nightmare.
Click "Get started".
We'll create a vanilla API, and give it some simple name.
We'll add in a new API Resource. Resources are endpoints or new URLs, methods are the HTTP methods to act on those endpoints or URLs. Give it a simple and declarative name
Now, we can create a Method on the Resource. Add a POST method.
Click back on the URL and you'll be met with this page:
Now, click on the Green POST tile's headline. We can now start building out the functionality.
Now, we map our Gateway Method on our Gateway Resource to our Lambda function. A warning dialog will come up, and just click through - you'll now be met with this nightmare interface:
The integration request tile is first up - it's the top right one but lord knows what it looks like for you when you're viewing this walkthrough.
Add content type of application/json
. Clicking the check box will now bring up a warning:
Click "No, use current settings"
Set up a response template, for request message passthrough. This will structure the request sent to the API such that the event
object in our Lambda function gets the correct {'body-json': {'params': {}}}
structure.
Now, we'll allow CORS - this allows us to call the URL for the resource in Javascript, which you may want to do.
A warning will come up - click "Yes, replace existing values".
Finally, we'll deploy the API - it'll ask for a deploy stage - call it prod
, and then deploy:
And that's it! It's not a very explanatory process to get this all stitched together, but once it's together, you should be able to make curl
requests that look like the URL provided in the invoke URL parameter (of course, assuming you also are sending in options in the same way I have outlined in my script):
Working on your code
Now, if you don't want to use my code, but you're following this just to get some numpy
type script working, you'll want to do the following:
- Setup
random_walk.py
to look as such (or name it something else, but you'll have to update that name in the lambda configuration as was discussed above):
# encoding=utf8
from __future__ import print_function
import sys
import os
reload(sys)
sys.setdefaultencoding('utf8')
import json
import numpy as np
import boto3
def run_random_walk(event, context={}):
#do anything you want here. Event will always look like {'body-json': {'params': (anything you sent in to the URL request as params)}}
- Happy coding! Note if you want to change the function name, much like the file name, you'll have to update that in the lambda function code. To get this deployed, again, on the development box
cd
nano [YOUR_FILENAME].py #do some edits
zip ~/random_walk.zip [YOUR_FILENAME].py
And, as you add new dependencies,
source lambda_env/bin/activate
pip install [something]
And when you're done editing,
cd $VIRTUAL_ENV/lib/python2.7/site-packages
zip -r9 ~/random_walk.zip *
cd $VIRTUAL_ENV/lib64/python2.7/site-packages
zip -r9 ~/random_walk.zip *
My Particular Use
I've drawn a picture that messily captures why I'm so above and beyond excited about this type of programming paradigm after my initial skepticism:
What this is illustrating is this: I have a network graph that I'm showing to someone on a website. As soon as the user requests the page, the network is generated and sent to S3 - a javascript request hovers around, waiting until the network is present. Then, the statistical layer is called - the front end requests that a series of random walks are conducted on this network, subject to some rules outline in javascript. The statistical layer runs one random walk on a control network, and then runs dozens of random walks on test networks, which small changes made that were according to that data in the front end. The statistical Lambda function hovers until all the results are in for the random walks, then fires off it's callback URL, which tells my computer to store the results. Then, the results are returned from the statistical layer, back to the front end, where any statistically significant results are surfaced. All in just the bare minimum amount of time to do all this analysis, for orders of magnitude less direct cost in terms of computing services, and much less time than and overhead than setting up the infrastructure to do this myself. I may not be convinced in terms of using Lambda for my own academic work, but it's clearly, clearly the way forward for many different computational processes in applied programs.
Code
# encoding=utf8
##Random Walker Function for use in AWS Lambda environment
##Usage: lambda_handler is the main function in this script. Event variable looks like the following:
# {
# 'body-json': { #wrapped in body-json due to this being required when sending a request through API Gateway
# 'params': { #wrapped in params due to this being required when sending a request through API Gateway
# 'aws_access_key_id': 'AWS_ACCESS_KEY_ID', #specify the S3 credentials for the network object
# 'aws_secret_access_key': 'AWS_SECRET_ACCESS_KEY', #specify the S3 credentials for the network object
# 'aws_bucket': 'random-stuff-dgaffney', #specify the S3 bucket
# 'aws_object_key': 'toy_network.json', #specify the S3 object ID
# 'edge_weight_attribute': 'weight', #edges can optionally have a weight - specify what attribute on the network's edges represents the weight of each edge. Edge weights can be any non-negative real number. Defaults to 'weight' if not present.
# 'walker_count': 1000, #specify the amount of walkers to simulate on the network. Can be any positive integer, though higher numbers will likely increase the runtime.
# 'edge_source_identifier': 'source', #edges go between two nodes - this key specifies which key represents the source node for the edge (currently, this code assumes networks are directed). Defaults to 'source' if not present. Must be a string representation of the node ID.
# 'edge_target_identifier': 'target', #edges go between two nodes - this key specifies which key represents the target node for the edge (currently, this code assumes networks are directed). Defaults to 'target' if not present. Must be a string representation of the node ID.
# 'node_identifier': 'id', #nodes have unique identifiers - this key specifies which key represents the ids for each node. Defaults to 'id'. Must be a string representation of the ID, and must be unique to each node.
# 'node_initial_position_attribute': 'start_weight', #nodes can have start weights, which respresents the points at which nodes start in their random walks. This can be any non-negative real number. Defaults to 'weight' if not present.
# 'node_size_attribute': 'total_weight', #nodes must have weights for all nodes. Weights can be any non-negative number, but the sum of weights across all nodes must be a positive sum. This key specifies which node key represents the weight for each node. Defaults to 'weight'.
# 'network_dynamic': 't_step_limit', #Random walks can have a number of ways in which they can run. Currently, this script allows two modes: 't_step_limit' and 'node_risk'. 't_step_limit' simply runs a set number of iterations before completing and returning results. The number of iterations is controlled by 'network_stopping_condition', and must be a positive integer. 'node_risk' starts out with 'walker_count' number of walkers, and then burns out the walkers at every transit according to the "risk", or burn rate, of each node in the network. The burn rate is a node property, which must be a fraction between 0 and 1, and the key specifying this burn rate is provided in 'network_stopping_condition'. 'node_risk' will return the graph when all walkers have finished their walks. Defaults to 't_step_limit'
# 'network_stopping_condition': 100, #This key represents what constitutes the end of the walk. If the walk dynamic is 't_step_limit', this must be a positive integer, which represents the number steps each walker will take in total before the results are returned. If the walk dynamic is 'node_risk', the value must be a string that specifies which key represents the burn rate for each node in the network. The value of each of these keys must be a number between 0 and 1. In the event that risk is extremely minimal for nodes, a small risk will be randomly placed on those nodes.
# 'callback_url': 'http://website.com/callback_for_random_walk.json', #This script will automatically respond with the results if called through the API gateway, or if called directly, but a callback url can also be specified. Upon completion, the script will send a request to the URL, POSTing the results to that url. Not a required parameter.
# 'network_alterations': { #This specifies any alternative changes to the S3 stored network provided - use this to test alternative hypotheses for what would happen to the network if it were different in some way.
# 'nodes': [ #Nodes and edges must have the exact same attribute keys present as in the rest of the S3 object.
# {
# 'start_weight': 0.01,
# 'id': '13',
# 'total_weight': 0.01
# }
# ],
# 'edges': [
# {
# 'other_attribute': 0.02,
# 'source': '0',
# 'target': '1',
# 'weight': 0.97
# }
# ]
# }
# }
# }
# }
###### The network S3 Object corresponding to the above event could look something like the below:
#{
# 'nodes': [
# {'start_weight': 0.41, 'id': '0', 'total_weight': 0.85},
# {'start_weight': 0.34, 'id': '1', 'total_weight': 0.57},
# {'start_weight': 0.91, 'id': '2', 'total_weight': 0.00},
# {'start_weight': 0.81, 'id': '3', 'total_weight': 0.40},
# {'start_weight': 0.39, 'id': '4', 'total_weight': 0.06},
# {'start_weight': 0.67, 'id': '5', 'total_weight': 0.55},
# {'start_weight': 0.02, 'id': '6', 'total_weight': 0.99},
# {'start_weight': 0.47, 'id': '7', 'total_weight': 0.67},
# {'start_weight': 0.44, 'id': '8', 'total_weight': 0.08},
# {'start_weight': 0.89, 'id': '9', 'total_weight': 0.89},
# {'start_weight': 0.6, 'id': '10', 'total_weight': 0.65},
# {'start_weight': 0.35, 'id': '11', 'total_weight': 0.71},
# {'start_weight': 0.77, 'id': '12', 'total_weight': 0.37},
# {'start_weight': 0.36, 'id': '13', 'total_weight': 0.71},
# {'start_weight': 0.67, 'id': '14', 'total_weight': 0.37}
# ],
# 'edges': [
# {'other_attribute': 0.73, 'source': '0', 'target': '1', 'weight': 0.15},
# {'other_attribute': 0.2, 'source': '0', 'target': '2', 'weight': 0.41},
# {'other_attribute': 0.14, 'source': '1', 'target': '2', 'weight': 0.94},
# {'other_attribute': 0.08, 'source': '0', 'target': '3', 'weight': 0.83},
# {'other_attribute': 0.33, 'source': '1', 'target': '3', 'weight': 0.28},
# {'other_attribute': 0.95, 'source': '2', 'target': '3', 'weight': 0.34},
# {'other_attribute': 0.1, 'source': '1', 'target': '4', 'weight': 0.98},
# {'other_attribute': 0.82, 'source': '0', 'target': '4', 'weight': 0.95},
# {'other_attribute': 0.39, 'source': '3', 'target': '4', 'weight': 0.12},
# {'other_attribute': 0.63, 'source': '0', 'target': '5', 'weight': 0.26},
# {'other_attribute': 0.42, 'source': '1', 'target': '5', 'weight': 0.57},
# {'other_attribute': 0.09, 'source': '4', 'target': '5', 'weight': 0.57},
# {'other_attribute': 0.69, 'source': '4', 'target': '6', 'weight': 0.62},
# {'other_attribute': 0.17, 'source': '3', 'target': '6', 'weight': 0.75},
# {'other_attribute': 0.14, 'source': '1', 'target': '6', 'weight': 0.14},
# {'other_attribute': 0.21, 'source': '1', 'target': '7', 'weight': 0.96},
# {'other_attribute': 0.15, 'source': '0', 'target': '7', 'weight': 0.23},
# {'other_attribute': 0.78, 'source': '3', 'target': '7', 'weight': 0.82},
# {'other_attribute': 0.58, 'source': '0', 'target': '8', 'weight': 0.49},
# {'other_attribute': 0.5, 'source': '1', 'target': '8', 'weight': 0.28},
# {'other_attribute': 0.68, 'source': '3', 'target': '8', 'weight': 0.64},
# {'other_attribute': 0.71, 'source': '0', 'target': '9', 'weight': 0.04},
# {'other_attribute': 0.79, 'source': '2', 'target': '9', 'weight': 0.18},
# {'other_attribute': 0.31, 'source': '6', 'target': '9', 'weight': 0.96},
# {'other_attribute': 0.06, 'source': '2', 'target': '10', 'weight': 0.31},
# {'other_attribute': 0.25, 'source': '0', 'target': '10', 'weight': 0.36},
# {'other_attribute': 0.24, 'source': '3', 'target': '10', 'weight': 0.76},
# {'other_attribute': 0.43, 'source': '3', 'target': '11', 'weight': 0.71},
# {'other_attribute': 0.47, 'source': '1', 'target': '11', 'weight': 0.15},
# {'other_attribute': 0.61, 'source': '6', 'target': '11', 'weight': 0.61},
# {'other_attribute': 0.64, 'source': '0', 'target': '12', 'weight': 0.27},
# {'other_attribute': 0.07, 'source': '2', 'target': '12', 'weight': 0.87},
# {'other_attribute': 0.13, 'source': '9', 'target': '12', 'weight': 0.09},
# {'other_attribute': 0.07, 'source': '3', 'target': '13', 'weight': 0.79},
# {'other_attribute': 0.41, 'source': '12', 'target': '13', 'weight': 0.78},
# {'other_attribute': 0.12, 'source': '0', 'target': '13', 'weight': 0.05},
# {'other_attribute': 0.88, 'source': '3', 'target': '14', 'weight': 0.98},
# {'other_attribute': 0.79, 'source': '7', 'target': '14', 'weight': 0.06},
# {'other_attribute': 0.74, 'source': '9', 'target': '14', 'weight': 0.1}
# ]
#}
from __future__ import print_function
import sys
import os
reload(sys)
sys.setdefaultencoding('utf8')
import json
import numpy as np
import boto3
def complete(directions, t_step, live_walkers):
if directions['network_dynamic'] == 't_step_limit' and t_step > 100:
return True
elif directions['network_dynamic'] == 'node_risk' and live_walkers == 0:
return True
else:
return False
def run_random_walk(mat_graph, node_indices, node_risks, first_interactions, directions):
conversion_nodes = range(len(mat_graph[0]))
initial_indices = np.where(first_interactions > 0)
counts = np.extract(first_interactions > 0, first_interactions)
sum_count = float(sum(counts))
probabilities = np.array(counts)/float(sum(counts))
walkers_alotted = np.random.multinomial(directions['walker_count'], probabilities).tolist()
node_count = len(node_indices)
current_walkers = np.zeros([node_count])
for i,index in enumerate(np.matrix(initial_indices).tolist()[0]):
current_walkers[index] = walkers_alotted[i]
if directions['network_dynamic'] == 'node_risk':
node_risks[node_risks <= 0.001] = (np.random.rand(len(node_risks[node_risks == 0]))/100)+0.01
live_walkers = directions['walker_count']
walker_counts_t_steps = [live_walkers]
conversion_node_counts = [[current_walkers[el] for el in conversion_nodes]]
t_step = 0
while complete(directions, t_step, live_walkers) == False:
t_step += 1
walker_losses = [np.random.binomial(el[0], el[1]) for el in zip(current_walkers, node_risks)]
surviving_walkers = [el[0]-el[1] for el in zip(current_walkers, walker_losses)]
live_walkers -= sum(walker_losses)
walker_counts_t_steps.append(live_walkers)
next_walkers = np.zeros([node_count])
for i,walker_count_node in enumerate(surviving_walkers):
if sum(mat_graph[i]) == 0:
next_walkers[i] += walker_count_node
else:
next_walkers += np.random.multinomial(walker_count_node, mat_graph[i])
current_walkers = next_walkers
conversion_node_counts.append([current_walkers[el] for el in conversion_nodes])
inverted_node_indices = {v: k for k, v in node_indices.items()}
return walker_counts_t_steps, zip([inverted_node_indices[cn] for cn in conversion_nodes], np.matrix(conversion_node_counts).transpose().sum(axis=1))
def standardize_options(event):
check_nodes_against_network_dynamic = False
parsed_val = event['body-json']['params']
if 'callback_url' not in parsed_val.keys():
parsed_val['callback_url'] = None
if len(list(set(['aws_access_key_id', 'aws_secret_access_key', 'aws_bucket', 'aws_object_key']) & set(parsed_val.keys()))) != 4:
return [], [], {'callback_url': parsed_val['callback_url'], 'error': "Did not provide access keys for Data Object - must pass along aws_access_key_id,aws_secret_access_key,aws_bucket,aws_object_key'"}
if 'network_dynamic' not in parsed_val.keys():
parsed_val['network_dynamic'] = 't_step_limit'
if 'network_stopping_condition' not in parsed_val.keys() and parsed_val['network_dynamic'] == 't_step_limit':
parsed_val['network_stopping_condition'] = 100
if 'network_dynamic' in parsed_val.keys():
if parsed_val['network_dynamic'] not in ['t_step_limit', 'node_risk']:
return [], [], {'callback_url': parsed_val['callback_url'], 'error': "Did not provide a valid network dynamic - simulation must end by either a fixed number of iterations ({'network_dynamic': 't_step_limit', 'network_stopping_condition': 100} by default), or a death induced at all nodes ({'network_dynamic': 'node_risk', 'network_stopping_condition': 'node_risk_attribute'} where 'network_stopping_condition' refers to a nodal attribute where each value is 0-1 which represents the likelihood of death for a walker at that point)."}
if parsed_val['network_dynamic'] == 'node_risk' and 'network_stopping_condition' in parsed_val.keys():
check_nodes_against_network_dynamic = True
elif parsed_val['network_dynamic'] == 'node_risk' and 'network_stopping_condition' not in parsed_val.keys():
return [], [], {'callback_url': parsed_val['callback_url'], 'error': "Did not specify the node key for a network dynamic based on node risk likelihoods. Please specify one as follows: {'network_dynamic': 'node_risk', 'network_stopping_condition': 'node_risk_attribute'} where 'network_stopping_condition' refers to a nodal attribute where each value is 0-1 which represents the likelihood of death for a walker at that point)."}
if 'walker_count' not in parsed_val.keys():
parsed_val['walker_count'] = 10000
if 'node_identifier' not in parsed_val.keys():
parsed_val['node_identifier'] = 'id'
if 'edge_source_identifier' not in parsed_val.keys():
parsed_val['edge_source_identifier'] = 'source'
if 'edge_target_identifier' not in parsed_val.keys():
parsed_val['edge_target_identifier'] = 'target'
if 'node_initial_position_attribute' not in parsed_val.keys():
parsed_val['node_initial_position_attribute'] = 'weight'
if 'node_size_attribute' not in parsed_val.keys():
parsed_val['node_size_attribute'] = 'weight'
if 'edge_weight_attribute' not in parsed_val.keys():
parsed_val['edge_weight_attribute'] = 'weight'
if 'network_alterations' not in parsed_val.keys():
parsed_val['network_alterations'] = {'nodes': [], 'edges': []}
if 'nodes' not in parsed_val['network_alterations'].keys():
parsed_val['nodes'] = []
if 'edges' not in parsed_val['network_alterations'].keys():
parsed_val['edges'] = []
client = boto3.client('s3',aws_access_key_id=parsed_val['aws_access_key_id'],aws_secret_access_key=parsed_val['aws_secret_access_key'])
data_object = json.loads(client.get_object(Bucket=parsed_val['aws_bucket'], Key=parsed_val['aws_object_key'])['Body'].read())
nodes = {}
for node in data_object['nodes']:
nodes[node[parsed_val['node_identifier']]] = node
for node in parsed_val['network_alterations']['nodes']:
nodes[node[parsed_val['node_identifier']]] = node
nodes = nodes.values()
if check_nodes_against_network_dynamic == True:
for node in nodes:
if parsed_val['network_stopping_condition'] not in node.keys():
return [], [], {'callback_url': parsed_val['callback_url'], 'error': "Node has invalid risk property! Must exist and be between 0 and 1. Node was: "+json.dumps(node)}
elif parsed_val['network_stopping_condition'] in node.keys() and (node[parsed_val['network_stopping_condition']] < 0.0 or node[parsed_val['network_stopping_condition']] > 1.0):
return [], [], {'callback_url': parsed_val['callback_url'], 'error': "Node has invalid risk property! Must be between 0 and 1. Node was: "+json.dumps(node)}
edges = {}
for edge in data_object['edges']:
edges[edge[parsed_val['edge_source_identifier']]+"<--|^o^|-->"+edge[parsed_val['edge_target_identifier']]] = edge
for edge in parsed_val['network_alterations']['edges']:
edges[edge[parsed_val['edge_source_identifier']]+"<--|^o^|-->"+edge[parsed_val['edge_target_identifier']]] = edge
edges = edges.values()
return nodes, edges, parsed_val
def confirm_all_attributes(nodes, edges, directions):
failed_node_attributes = []
for attr in ['node_identifier', 'node_initial_position_attribute', 'node_size_attribute']:
for node in nodes:
if directions[attr] not in node.keys() and attr not in failed_node_attributes:
failed_node_attributes.append(attr)
failed_edge_attributes = []
for attr in ['edge_source_identifier', 'edge_target_identifier', 'edge_weight_attribute']:
for edge in edges:
if directions[attr] not in edge.keys() and attr not in failed_edge_attributes:
failed_edge_attributes.append(attr)
return failed_node_attributes, failed_edge_attributes
def request_headers():
return {'Content-Type': "application/json"}
def run_random_walk(event, context={}):
nodes, edges, directions = standardize_options(event)
if 'error' in directions.keys():
if directions['callback_url'] is not None:
grequests.post(directions['callback_url'], headers=request_headers(), data=json.dumps({'original_event': event, 'error': directions['error']})).send()
return {'original_event': event, 'error': directions['error']}
failed_node_attributes, failed_edge_attributes = confirm_all_attributes(nodes, edges, directions)
if len(failed_node_attributes) > 0 or len(failed_edge_attributes) > 0:
if directions['callback_url'] is not None:
grequests.post(directions['callback_url'], headers=request_headers(), data=json.dumps({'original_event': event, 'error': "Not all nodes or edges provided (either in the S3 Object or the altered nodes optionally passed through)- edge attributes missing were "+str.join(",", failed_edge_attributes)+", and node attributes missing were "+str.join(",", failed_node_attributes)})).send()
return {'original_event': event, 'error': "Not all nodes or edges provided (either in the S3 Object or the altered nodes optionally passed through)- edge attributes missing were "+str.join(",", failed_edge_attributes)+", and node attributes missing were "+str.join(",", failed_node_attributes)}
mat_graph = np.zeros([len(nodes),len(nodes)])
node_indices = {}
for i,node in enumerate(nodes):
node_indices[node[directions['node_identifier']]] = i
for edge in edges:
mat_graph[node_indices[edge[directions['edge_source_identifier']]]][node_indices[edge[directions['edge_target_identifier']]]] = edge[directions['edge_weight_attribute']]
mat_graph_normalized = np.zeros([len(nodes),len(nodes)])
for i,row in enumerate(mat_graph):
sum_row = sum(row)
if sum_row == 0:
mat_graph_normalized[i] = row
else:
mat_graph_normalized[i] = [float(el)/sum_row for el in row]
node_risks = np.zeros(len(nodes))
first_interactions = np.zeros(len(nodes))
for node in nodes:
first_interactions[node_indices[node[directions['node_identifier']]]] = float(node[directions['node_initial_position_attribute']])
if directions['network_dynamic'] == 'node_risk':
node_risks[node_indices[node['id']]] = float(node[directions['network_stopping_condition']])
walker_counts_t_steps, node_counts = run_random_walk(mat_graph_normalized, node_indices, node_risks, first_interactions, directions)
results = {"original_event": event, "walker_counts": walker_counts_t_steps, "node_counts": node_counts}
if directions['callback_url'] is not None:
grequests.post(directions['callback_url'], headers=request_headers(), data=json.dumps(results)).send()
return results